1# engine/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`."""
8
9from __future__ import annotations
10
11import contextlib
12import sys
13import typing
14from typing import Any
15from typing import Callable
16from typing import cast
17from typing import Iterable
18from typing import Iterator
19from typing import List
20from typing import Mapping
21from typing import NoReturn
22from typing import Optional
23from typing import overload
24from typing import Tuple
25from typing import Type
26from typing import TypeVar
27from typing import Union
28
29from .interfaces import BindTyping
30from .interfaces import ConnectionEventsTarget
31from .interfaces import DBAPICursor
32from .interfaces import ExceptionContext
33from .interfaces import ExecuteStyle
34from .interfaces import ExecutionContext
35from .interfaces import IsolationLevel
36from .util import _distill_params_20
37from .util import _distill_raw_params
38from .util import TransactionalContext
39from .. import exc
40from .. import inspection
41from .. import log
42from .. import util
43from ..sql import compiler
44from ..sql import util as sql_util
45from ..util.typing import Never
46from ..util.typing import TupleAny
47from ..util.typing import TypeVarTuple
48from ..util.typing import Unpack
49
50if typing.TYPE_CHECKING:
51 from . import CursorResult
52 from . import ScalarResult
53 from .interfaces import _AnyExecuteParams
54 from .interfaces import _AnyMultiExecuteParams
55 from .interfaces import _CoreAnyExecuteParams
56 from .interfaces import _CoreMultiExecuteParams
57 from .interfaces import _CoreSingleExecuteParams
58 from .interfaces import _DBAPIAnyExecuteParams
59 from .interfaces import _DBAPISingleExecuteParams
60 from .interfaces import _ExecuteOptions
61 from .interfaces import CompiledCacheType
62 from .interfaces import CoreExecuteOptionsParameter
63 from .interfaces import Dialect
64 from .interfaces import SchemaTranslateMapType
65 from .reflection import Inspector # noqa
66 from .url import URL
67 from ..event import dispatcher
68 from ..log import _EchoFlagType
69 from ..pool import _ConnectionFairy
70 from ..pool import Pool
71 from ..pool import PoolProxiedConnection
72 from ..sql import Executable
73 from ..sql._typing import _InfoType
74 from ..sql.compiler import Compiled
75 from ..sql.ddl import ExecutableDDLElement
76 from ..sql.ddl import InvokeDDLBase
77 from ..sql.functions import FunctionElement
78 from ..sql.schema import DefaultGenerator
79 from ..sql.schema import HasSchemaAttr
80 from ..sql.schema import SchemaVisitable
81 from ..sql.selectable import TypedReturnsRows
82
83
84_T = TypeVar("_T", bound=Any)
85_Ts = TypeVarTuple("_Ts")
86_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
87NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
88
89
90class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
91 """Provides high-level functionality for a wrapped DB-API connection.
92
93 The :class:`_engine.Connection` object is procured by calling the
94 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
95 object, and provides services for execution of SQL statements as well
96 as transaction control.
97
98 The Connection object is **not** thread-safe. While a Connection can be
99 shared among threads using properly synchronized access, it is still
100 possible that the underlying DBAPI connection may not support shared
101 access between threads. Check the DBAPI documentation for details.
102
103 The Connection object represents a single DBAPI connection checked out
104 from the connection pool. In this state, the connection pool has no
105 affect upon the connection, including its expiration or timeout state.
106 For the connection pool to properly manage connections, connections
107 should be returned to the connection pool (i.e. ``connection.close()``)
108 whenever the connection is not in use.
109
110 .. index::
111 single: thread safety; Connection
112
113 """
114
115 dialect: Dialect
116 dispatch: dispatcher[ConnectionEventsTarget]
117
118 _sqla_logger_namespace = "sqlalchemy.engine.Connection"
119
120 # used by sqlalchemy.engine.util.TransactionalContext
121 _trans_context_manager: Optional[TransactionalContext] = None
122
123 # legacy as of 2.0, should be eventually deprecated and
124 # removed. was used in the "pre_ping" recipe that's been in the docs
125 # a long time
126 should_close_with_result = False
127
128 _dbapi_connection: Optional[PoolProxiedConnection]
129
130 _execution_options: _ExecuteOptions
131
132 _transaction: Optional[RootTransaction]
133 _nested_transaction: Optional[NestedTransaction]
134
135 def __init__(
136 self,
137 engine: Engine,
138 connection: Optional[PoolProxiedConnection] = None,
139 _has_events: Optional[bool] = None,
140 _allow_revalidate: bool = True,
141 _allow_autobegin: bool = True,
142 ):
143 """Construct a new Connection."""
144 self.engine = engine
145 self.dialect = dialect = engine.dialect
146
147 if connection is None:
148 try:
149 self._dbapi_connection = engine.raw_connection()
150 except dialect.loaded_dbapi.Error as err:
151 Connection._handle_dbapi_exception_noconnection(
152 err, dialect, engine
153 )
154 raise
155 else:
156 self._dbapi_connection = connection
157
158 self._transaction = self._nested_transaction = None
159 self.__savepoint_seq = 0
160 self.__in_begin = False
161
162 self.__can_reconnect = _allow_revalidate
163 self._allow_autobegin = _allow_autobegin
164 self._echo = self.engine._should_log_info()
165
166 if _has_events is None:
167 # if _has_events is sent explicitly as False,
168 # then don't join the dispatch of the engine; we don't
169 # want to handle any of the engine's events in that case.
170 self.dispatch = self.dispatch._join(engine.dispatch)
171 self._has_events = _has_events or (
172 _has_events is None and engine._has_events
173 )
174
175 self._execution_options = engine._execution_options
176
177 if self._has_events or self.engine._has_events:
178 self.dispatch.engine_connect(self)
179
180 # this can be assigned differently via
181 # characteristics.LoggingTokenCharacteristic
182 _message_formatter: Any = None
183
184 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
185 fmt = self._message_formatter
186
187 if fmt:
188 message = fmt(message)
189
190 if log.STACKLEVEL:
191 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
192
193 self.engine.logger.info(message, *arg, **kw)
194
195 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
196 fmt = self._message_formatter
197
198 if fmt:
199 message = fmt(message)
200
201 if log.STACKLEVEL:
202 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
203
204 self.engine.logger.debug(message, *arg, **kw)
205
206 @property
207 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]:
208 schema_translate_map: Optional[SchemaTranslateMapType] = (
209 self._execution_options.get("schema_translate_map", None)
210 )
211
212 return schema_translate_map
213
214 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
215 """Return the schema name for the given schema item taking into
216 account current schema translate map.
217
218 """
219
220 name = obj.schema
221 schema_translate_map: Optional[SchemaTranslateMapType] = (
222 self._execution_options.get("schema_translate_map", None)
223 )
224
225 if (
226 schema_translate_map
227 and name in schema_translate_map
228 and obj._use_schema_map
229 ):
230 return schema_translate_map[name]
231 else:
232 return name
233
234 def __enter__(self) -> Connection:
235 return self
236
237 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
238 self.close()
239
240 @overload
241 def execution_options(
242 self,
243 *,
244 compiled_cache: Optional[CompiledCacheType] = ...,
245 logging_token: str = ...,
246 isolation_level: IsolationLevel = ...,
247 no_parameters: bool = False,
248 stream_results: bool = False,
249 max_row_buffer: int = ...,
250 yield_per: int = ...,
251 insertmanyvalues_page_size: int = ...,
252 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
253 preserve_rowcount: bool = False,
254 driver_column_names: bool = False,
255 **opt: Any,
256 ) -> Connection: ...
257
258 @overload
259 def execution_options(self, **opt: Any) -> Connection: ...
260
261 def execution_options(self, **opt: Any) -> Connection:
262 r"""Set non-SQL options for the connection which take effect
263 during execution.
264
265 This method modifies this :class:`_engine.Connection` **in-place**;
266 the return value is the same :class:`_engine.Connection` object
267 upon which the method is called. Note that this is in contrast
268 to the behavior of the ``execution_options`` methods on other
269 objects such as :meth:`_engine.Engine.execution_options` and
270 :meth:`_sql.Executable.execution_options`. The rationale is that many
271 such execution options necessarily modify the state of the base
272 DBAPI connection in any case so there is no feasible means of
273 keeping the effect of such an option localized to a "sub" connection.
274
275 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options`
276 method, in contrast to other objects with this method, modifies
277 the connection in-place without creating copy of it.
278
279 As discussed elsewhere, the :meth:`_engine.Connection.execution_options`
280 method accepts any arbitrary parameters including user defined names.
281 All parameters given are consumable in a number of ways including
282 by using the :meth:`_engine.Connection.get_execution_options` method.
283 See the examples at :meth:`_sql.Executable.execution_options`
284 and :meth:`_engine.Engine.execution_options`.
285
286 The keywords that are currently recognized by SQLAlchemy itself
287 include all those listed under :meth:`.Executable.execution_options`,
288 as well as others that are specific to :class:`_engine.Connection`.
289
290 :param compiled_cache: Available on: :class:`_engine.Connection`,
291 :class:`_engine.Engine`.
292
293 A dictionary where :class:`.Compiled` objects
294 will be cached when the :class:`_engine.Connection`
295 compiles a clause
296 expression into a :class:`.Compiled` object. This dictionary will
297 supersede the statement cache that may be configured on the
298 :class:`_engine.Engine` itself. If set to None, caching
299 is disabled, even if the engine has a configured cache size.
300
301 Note that the ORM makes use of its own "compiled" caches for
302 some operations, including flush operations. The caching
303 used by the ORM internally supersedes a cache dictionary
304 specified here.
305
306 :param logging_token: Available on: :class:`_engine.Connection`,
307 :class:`_engine.Engine`, :class:`_sql.Executable`.
308
309 Adds the specified string token surrounded by brackets in log
310 messages logged by the connection, i.e. the logging that's enabled
311 either via the :paramref:`_sa.create_engine.echo` flag or via the
312 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
313 per-connection or per-sub-engine token to be available which is
314 useful for debugging concurrent connection scenarios.
315
316 .. versionadded:: 1.4.0b2
317
318 .. seealso::
319
320 :ref:`dbengine_logging_tokens` - usage example
321
322 :paramref:`_sa.create_engine.logging_name` - adds a name to the
323 name used by the Python logger object itself.
324
325 :param isolation_level: Available on: :class:`_engine.Connection`,
326 :class:`_engine.Engine`.
327
328 Set the transaction isolation level for the lifespan of this
329 :class:`_engine.Connection` object.
330 Valid values include those string
331 values accepted by the :paramref:`_sa.create_engine.isolation_level`
332 parameter passed to :func:`_sa.create_engine`. These levels are
333 semi-database specific; see individual dialect documentation for
334 valid levels.
335
336 The isolation level option applies the isolation level by emitting
337 statements on the DBAPI connection, and **necessarily affects the
338 original Connection object overall**. The isolation level will remain
339 at the given setting until explicitly changed, or when the DBAPI
340 connection itself is :term:`released` to the connection pool, i.e. the
341 :meth:`_engine.Connection.close` method is called, at which time an
342 event handler will emit additional statements on the DBAPI connection
343 in order to revert the isolation level change.
344
345 .. note:: The ``isolation_level`` execution option may only be
346 established before the :meth:`_engine.Connection.begin` method is
347 called, as well as before any SQL statements are emitted which
348 would otherwise trigger "autobegin", or directly after a call to
349 :meth:`_engine.Connection.commit` or
350 :meth:`_engine.Connection.rollback`. A database cannot change the
351 isolation level on a transaction in progress.
352
353 .. note:: The ``isolation_level`` execution option is implicitly
354 reset if the :class:`_engine.Connection` is invalidated, e.g. via
355 the :meth:`_engine.Connection.invalidate` method, or if a
356 disconnection error occurs. The new connection produced after the
357 invalidation will **not** have the selected isolation level
358 re-applied to it automatically.
359
360 .. seealso::
361
362 :ref:`dbapi_autocommit`
363
364 :meth:`_engine.Connection.get_isolation_level`
365 - view current actual level
366
367 :param no_parameters: Available on: :class:`_engine.Connection`,
368 :class:`_sql.Executable`.
369
370 When ``True``, if the final parameter
371 list or dictionary is totally empty, will invoke the
372 statement on the cursor as ``cursor.execute(statement)``,
373 not passing the parameter collection at all.
374 Some DBAPIs such as psycopg2 and mysql-python consider
375 percent signs as significant only when parameters are
376 present; this option allows code to generate SQL
377 containing percent signs (and possibly other characters)
378 that is neutral regarding whether it's executed by the DBAPI
379 or piped into a script that's later invoked by
380 command line tools.
381
382 :param stream_results: Available on: :class:`_engine.Connection`,
383 :class:`_sql.Executable`.
384
385 Indicate to the dialect that results should be "streamed" and not
386 pre-buffered, if possible. For backends such as PostgreSQL, MySQL
387 and MariaDB, this indicates the use of a "server side cursor" as
388 opposed to a client side cursor. Other backends such as that of
389 Oracle Database may already use server side cursors by default.
390
391 The usage of
392 :paramref:`_engine.Connection.execution_options.stream_results` is
393 usually combined with setting a fixed number of rows to to be fetched
394 in batches, to allow for efficient iteration of database rows while
395 at the same time not loading all result rows into memory at once;
396 this can be configured on a :class:`_engine.Result` object using the
397 :meth:`_engine.Result.yield_per` method, after execution has
398 returned a new :class:`_engine.Result`. If
399 :meth:`_engine.Result.yield_per` is not used,
400 the :paramref:`_engine.Connection.execution_options.stream_results`
401 mode of operation will instead use a dynamically sized buffer
402 which buffers sets of rows at a time, growing on each batch
403 based on a fixed growth size up until a limit which may
404 be configured using the
405 :paramref:`_engine.Connection.execution_options.max_row_buffer`
406 parameter.
407
408 When using the ORM to fetch ORM mapped objects from a result,
409 :meth:`_engine.Result.yield_per` should always be used with
410 :paramref:`_engine.Connection.execution_options.stream_results`,
411 so that the ORM does not fetch all rows into new ORM objects at once.
412
413 For typical use, the
414 :paramref:`_engine.Connection.execution_options.yield_per` execution
415 option should be preferred, which sets up both
416 :paramref:`_engine.Connection.execution_options.stream_results` and
417 :meth:`_engine.Result.yield_per` at once. This option is supported
418 both at a core level by :class:`_engine.Connection` as well as by the
419 ORM :class:`_engine.Session`; the latter is described at
420 :ref:`orm_queryguide_yield_per`.
421
422 .. seealso::
423
424 :ref:`engine_stream_results` - background on
425 :paramref:`_engine.Connection.execution_options.stream_results`
426
427 :paramref:`_engine.Connection.execution_options.max_row_buffer`
428
429 :paramref:`_engine.Connection.execution_options.yield_per`
430
431 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
432 describing the ORM version of ``yield_per``
433
434 :param max_row_buffer: Available on: :class:`_engine.Connection`,
435 :class:`_sql.Executable`. Sets a maximum
436 buffer size to use when the
437 :paramref:`_engine.Connection.execution_options.stream_results`
438 execution option is used on a backend that supports server side
439 cursors. The default value if not specified is 1000.
440
441 .. seealso::
442
443 :paramref:`_engine.Connection.execution_options.stream_results`
444
445 :ref:`engine_stream_results`
446
447
448 :param yield_per: Available on: :class:`_engine.Connection`,
449 :class:`_sql.Executable`. Integer value applied which will
450 set the :paramref:`_engine.Connection.execution_options.stream_results`
451 execution option and invoke :meth:`_engine.Result.yield_per`
452 automatically at once. Allows equivalent functionality as
453 is present when using this parameter with the ORM.
454
455 .. versionadded:: 1.4.40
456
457 .. seealso::
458
459 :ref:`engine_stream_results` - background and examples
460 on using server side cursors with Core.
461
462 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
463 describing the ORM version of ``yield_per``
464
465 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`,
466 :class:`_engine.Engine`. Number of rows to format into an
467 INSERT statement when the statement uses "insertmanyvalues" mode,
468 which is a paged form of bulk insert that is used for many backends
469 when using :term:`executemany` execution typically in conjunction
470 with RETURNING. Defaults to 1000. May also be modified on a
471 per-engine basis using the
472 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
473
474 .. versionadded:: 2.0
475
476 .. seealso::
477
478 :ref:`engine_insertmanyvalues`
479
480 :param schema_translate_map: Available on: :class:`_engine.Connection`,
481 :class:`_engine.Engine`, :class:`_sql.Executable`.
482
483 A dictionary mapping schema names to schema names, that will be
484 applied to the :paramref:`_schema.Table.schema` element of each
485 :class:`_schema.Table`
486 encountered when SQL or DDL expression elements
487 are compiled into strings; the resulting schema name will be
488 converted based on presence in the map of the original name.
489
490 .. seealso::
491
492 :ref:`schema_translating`
493
494 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount``
495 attribute will be unconditionally memoized within the result and
496 made available via the :attr:`.CursorResult.rowcount` attribute.
497 Normally, this attribute is only preserved for UPDATE and DELETE
498 statements. Using this option, the DBAPIs rowcount value can
499 be accessed for other kinds of statements such as INSERT and SELECT,
500 to the degree that the DBAPI supports these statements. See
501 :attr:`.CursorResult.rowcount` for notes regarding the behavior
502 of this attribute.
503
504 .. versionadded:: 2.0.28
505
506 .. seealso::
507
508 :meth:`_engine.Engine.execution_options`
509
510 :meth:`.Executable.execution_options`
511
512 :meth:`_engine.Connection.get_execution_options`
513
514 :ref:`orm_queryguide_execution_options` - documentation on all
515 ORM-specific execution options
516
517 :param driver_column_names: When True, the returned
518 :class:`_engine.CursorResult` will use the column names as written in
519 ``cursor.description`` to set up the keys for the result set,
520 including the names of columns for the :class:`_engine.Row` object as
521 well as the dictionary keys when using :attr:`_engine.Row._mapping`.
522 On backends that use "name normalization" such as Oracle Database to
523 correct for lower case names being converted to all uppercase, this
524 behavior is turned off and the raw UPPERCASE names in
525 cursor.description will be present.
526
527 .. versionadded:: 2.1
528
529 """ # noqa
530 if self._has_events or self.engine._has_events:
531 self.dispatch.set_connection_execution_options(self, opt)
532 self._execution_options = self._execution_options.union(opt)
533 self.dialect.set_connection_execution_options(self, opt)
534 return self
535
536 def get_execution_options(self) -> _ExecuteOptions:
537 """Get the non-SQL options which will take effect during execution.
538
539 .. seealso::
540
541 :meth:`_engine.Connection.execution_options`
542 """
543 return self._execution_options
544
545 @property
546 def _still_open_and_dbapi_connection_is_valid(self) -> bool:
547 pool_proxied_connection = self._dbapi_connection
548 return (
549 pool_proxied_connection is not None
550 and pool_proxied_connection.is_valid
551 )
552
553 @property
554 def closed(self) -> bool:
555 """Return True if this connection is closed."""
556
557 return self._dbapi_connection is None and not self.__can_reconnect
558
559 @property
560 def invalidated(self) -> bool:
561 """Return True if this connection was invalidated.
562
563 This does not indicate whether or not the connection was
564 invalidated at the pool level, however
565
566 """
567
568 # prior to 1.4, "invalid" was stored as a state independent of
569 # "closed", meaning an invalidated connection could be "closed",
570 # the _dbapi_connection would be None and closed=True, yet the
571 # "invalid" flag would stay True. This meant that there were
572 # three separate states (open/valid, closed/valid, closed/invalid)
573 # when there is really no reason for that; a connection that's
574 # "closed" does not need to be "invalid". So the state is now
575 # represented by the two facts alone.
576
577 pool_proxied_connection = self._dbapi_connection
578 return pool_proxied_connection is None and self.__can_reconnect
579
580 @property
581 def connection(self) -> PoolProxiedConnection:
582 """The underlying DB-API connection managed by this Connection.
583
584 This is a SQLAlchemy connection-pool proxied connection
585 which then has the attribute
586 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
587 actual driver connection.
588
589 .. seealso::
590
591
592 :ref:`dbapi_connections`
593
594 """
595
596 if self._dbapi_connection is None:
597 try:
598 return self._revalidate_connection()
599 except (exc.PendingRollbackError, exc.ResourceClosedError):
600 raise
601 except BaseException as e:
602 self._handle_dbapi_exception(e, None, None, None, None)
603 else:
604 return self._dbapi_connection
605
606 def get_isolation_level(self) -> IsolationLevel:
607 """Return the current **actual** isolation level that's present on
608 the database within the scope of this connection.
609
610 This attribute will perform a live SQL operation against the database
611 in order to procure the current isolation level, so the value returned
612 is the actual level on the underlying DBAPI connection regardless of
613 how this state was set. This will be one of the four actual isolation
614 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
615 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
616 level setting. Third party dialects may also feature additional
617 isolation level settings.
618
619 .. note:: This method **will not report** on the ``AUTOCOMMIT``
620 isolation level, which is a separate :term:`dbapi` setting that's
621 independent of **actual** isolation level. When ``AUTOCOMMIT`` is
622 in use, the database connection still has a "traditional" isolation
623 mode in effect, that is typically one of the four values
624 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
625 ``SERIALIZABLE``.
626
627 Compare to the :attr:`_engine.Connection.default_isolation_level`
628 accessor which returns the isolation level that is present on the
629 database at initial connection time.
630
631 .. seealso::
632
633 :attr:`_engine.Connection.default_isolation_level`
634 - view default level
635
636 :paramref:`_sa.create_engine.isolation_level`
637 - set per :class:`_engine.Engine` isolation level
638
639 :paramref:`.Connection.execution_options.isolation_level`
640 - set per :class:`_engine.Connection` isolation level
641
642 """
643 dbapi_connection = self.connection.dbapi_connection
644 assert dbapi_connection is not None
645 try:
646 return self.dialect.get_isolation_level(dbapi_connection)
647 except BaseException as e:
648 self._handle_dbapi_exception(e, None, None, None, None)
649
650 @property
651 def default_isolation_level(self) -> Optional[IsolationLevel]:
652 """The initial-connection time isolation level associated with the
653 :class:`_engine.Dialect` in use.
654
655 This value is independent of the
656 :paramref:`.Connection.execution_options.isolation_level` and
657 :paramref:`.Engine.execution_options.isolation_level` execution
658 options, and is determined by the :class:`_engine.Dialect` when the
659 first connection is created, by performing a SQL query against the
660 database for the current isolation level before any additional commands
661 have been emitted.
662
663 Calling this accessor does not invoke any new SQL queries.
664
665 .. seealso::
666
667 :meth:`_engine.Connection.get_isolation_level`
668 - view current actual isolation level
669
670 :paramref:`_sa.create_engine.isolation_level`
671 - set per :class:`_engine.Engine` isolation level
672
673 :paramref:`.Connection.execution_options.isolation_level`
674 - set per :class:`_engine.Connection` isolation level
675
676 """
677 return self.dialect.default_isolation_level
678
679 def _invalid_transaction(self) -> NoReturn:
680 raise exc.PendingRollbackError(
681 "Can't reconnect until invalid %stransaction is rolled "
682 "back. Please rollback() fully before proceeding"
683 % ("savepoint " if self._nested_transaction is not None else ""),
684 code="8s2b",
685 )
686
687 def _revalidate_connection(self) -> PoolProxiedConnection:
688 if self.__can_reconnect and self.invalidated:
689 if self._transaction is not None:
690 self._invalid_transaction()
691 self._dbapi_connection = self.engine.raw_connection()
692 return self._dbapi_connection
693 raise exc.ResourceClosedError("This Connection is closed")
694
695 @property
696 def info(self) -> _InfoType:
697 """Info dictionary associated with the underlying DBAPI connection
698 referred to by this :class:`_engine.Connection`, allowing user-defined
699 data to be associated with the connection.
700
701 The data here will follow along with the DBAPI connection including
702 after it is returned to the connection pool and used again
703 in subsequent instances of :class:`_engine.Connection`.
704
705 """
706
707 return self.connection.info
708
709 def invalidate(self, exception: Optional[BaseException] = None) -> None:
710 """Invalidate the underlying DBAPI connection associated with
711 this :class:`_engine.Connection`.
712
713 An attempt will be made to close the underlying DBAPI connection
714 immediately; however if this operation fails, the error is logged
715 but not raised. The connection is then discarded whether or not
716 close() succeeded.
717
718 Upon the next use (where "use" typically means using the
719 :meth:`_engine.Connection.execute` method or similar),
720 this :class:`_engine.Connection` will attempt to
721 procure a new DBAPI connection using the services of the
722 :class:`_pool.Pool` as a source of connectivity (e.g.
723 a "reconnection").
724
725 If a transaction was in progress (e.g. the
726 :meth:`_engine.Connection.begin` method has been called) when
727 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
728 level all state associated with this transaction is lost, as
729 the DBAPI connection is closed. The :class:`_engine.Connection`
730 will not allow a reconnection to proceed until the
731 :class:`.Transaction` object is ended, by calling the
732 :meth:`.Transaction.rollback` method; until that point, any attempt at
733 continuing to use the :class:`_engine.Connection` will raise an
734 :class:`~sqlalchemy.exc.InvalidRequestError`.
735 This is to prevent applications from accidentally
736 continuing an ongoing transactional operations despite the
737 fact that the transaction has been lost due to an
738 invalidation.
739
740 The :meth:`_engine.Connection.invalidate` method,
741 just like auto-invalidation,
742 will at the connection pool level invoke the
743 :meth:`_events.PoolEvents.invalidate` event.
744
745 :param exception: an optional ``Exception`` instance that's the
746 reason for the invalidation. is passed along to event handlers
747 and logging functions.
748
749 .. seealso::
750
751 :ref:`pool_connection_invalidation`
752
753 """
754
755 if self.invalidated:
756 return
757
758 if self.closed:
759 raise exc.ResourceClosedError("This Connection is closed")
760
761 if self._still_open_and_dbapi_connection_is_valid:
762 pool_proxied_connection = self._dbapi_connection
763 assert pool_proxied_connection is not None
764 pool_proxied_connection.invalidate(exception)
765
766 self._dbapi_connection = None
767
768 def detach(self) -> None:
769 """Detach the underlying DB-API connection from its connection pool.
770
771 E.g.::
772
773 with engine.connect() as conn:
774 conn.detach()
775 conn.execute(text("SET search_path TO schema1, schema2"))
776
777 # work with connection
778
779 # connection is fully closed (since we used "with:", can
780 # also call .close())
781
782 This :class:`_engine.Connection` instance will remain usable.
783 When closed
784 (or exited from a context manager context as above),
785 the DB-API connection will be literally closed and not
786 returned to its originating pool.
787
788 This method can be used to insulate the rest of an application
789 from a modified state on a connection (such as a transaction
790 isolation level or similar).
791
792 """
793
794 if self.closed:
795 raise exc.ResourceClosedError("This Connection is closed")
796
797 pool_proxied_connection = self._dbapi_connection
798 if pool_proxied_connection is None:
799 raise exc.InvalidRequestError(
800 "Can't detach an invalidated Connection"
801 )
802 pool_proxied_connection.detach()
803
804 def _autobegin(self) -> None:
805 if self._allow_autobegin and not self.__in_begin:
806 self.begin()
807
808 def begin(self) -> RootTransaction:
809 """Begin a transaction prior to autobegin occurring.
810
811 E.g.::
812
813 with engine.connect() as conn:
814 with conn.begin() as trans:
815 conn.execute(table.insert(), {"username": "sandy"})
816
817 The returned object is an instance of :class:`_engine.RootTransaction`.
818 This object represents the "scope" of the transaction,
819 which completes when either the :meth:`_engine.Transaction.rollback`
820 or :meth:`_engine.Transaction.commit` method is called; the object
821 also works as a context manager as illustrated above.
822
823 The :meth:`_engine.Connection.begin` method begins a
824 transaction that normally will be begun in any case when the connection
825 is first used to execute a statement. The reason this method might be
826 used would be to invoke the :meth:`_events.ConnectionEvents.begin`
827 event at a specific time, or to organize code within the scope of a
828 connection checkout in terms of context managed blocks, such as::
829
830 with engine.connect() as conn:
831 with conn.begin():
832 conn.execute(...)
833 conn.execute(...)
834
835 with conn.begin():
836 conn.execute(...)
837 conn.execute(...)
838
839 The above code is not fundamentally any different in its behavior than
840 the following code which does not use
841 :meth:`_engine.Connection.begin`; the below style is known
842 as "commit as you go" style::
843
844 with engine.connect() as conn:
845 conn.execute(...)
846 conn.execute(...)
847 conn.commit()
848
849 conn.execute(...)
850 conn.execute(...)
851 conn.commit()
852
853 From a database point of view, the :meth:`_engine.Connection.begin`
854 method does not emit any SQL or change the state of the underlying
855 DBAPI connection in any way; the Python DBAPI does not have any
856 concept of explicit transaction begin.
857
858 .. seealso::
859
860 :ref:`tutorial_working_with_transactions` - in the
861 :ref:`unified_tutorial`
862
863 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
864
865 :meth:`_engine.Connection.begin_twophase` -
866 use a two phase /XID transaction
867
868 :meth:`_engine.Engine.begin` - context manager available from
869 :class:`_engine.Engine`
870
871 """
872 if self._transaction is None:
873 self._transaction = RootTransaction(self)
874 return self._transaction
875 else:
876 raise exc.InvalidRequestError(
877 "This connection has already initialized a SQLAlchemy "
878 "Transaction() object via begin() or autobegin; can't "
879 "call begin() here unless rollback() or commit() "
880 "is called first."
881 )
882
883 def begin_nested(self) -> NestedTransaction:
884 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
885 handle that controls the scope of the SAVEPOINT.
886
887 E.g.::
888
889 with engine.begin() as connection:
890 with connection.begin_nested():
891 connection.execute(table.insert(), {"username": "sandy"})
892
893 The returned object is an instance of
894 :class:`_engine.NestedTransaction`, which includes transactional
895 methods :meth:`_engine.NestedTransaction.commit` and
896 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction,
897 these methods correspond to the operations "RELEASE SAVEPOINT <name>"
898 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local
899 to the :class:`_engine.NestedTransaction` object and is generated
900 automatically. Like any other :class:`_engine.Transaction`, the
901 :class:`_engine.NestedTransaction` may be used as a context manager as
902 illustrated above which will "release" or "rollback" corresponding to
903 if the operation within the block were successful or raised an
904 exception.
905
906 Nested transactions require SAVEPOINT support in the underlying
907 database, else the behavior is undefined. SAVEPOINT is commonly used to
908 run operations within a transaction that may fail, while continuing the
909 outer transaction. E.g.::
910
911 from sqlalchemy import exc
912
913 with engine.begin() as connection:
914 trans = connection.begin_nested()
915 try:
916 connection.execute(table.insert(), {"username": "sandy"})
917 trans.commit()
918 except exc.IntegrityError: # catch for duplicate username
919 trans.rollback() # rollback to savepoint
920
921 # outer transaction continues
922 connection.execute(...)
923
924 If :meth:`_engine.Connection.begin_nested` is called without first
925 calling :meth:`_engine.Connection.begin` or
926 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object
927 will "autobegin" the outer transaction first. This outer transaction
928 may be committed using "commit-as-you-go" style, e.g.::
929
930 with engine.connect() as connection: # begin() wasn't called
931
932 with connection.begin_nested(): # will auto-"begin()" first
933 connection.execute(...)
934 # savepoint is released
935
936 connection.execute(...)
937
938 # explicitly commit outer transaction
939 connection.commit()
940
941 # can continue working with connection here
942
943 .. versionchanged:: 2.0
944
945 :meth:`_engine.Connection.begin_nested` will now participate
946 in the connection "autobegin" behavior that is new as of
947 2.0 / "future" style connections in 1.4.
948
949 .. seealso::
950
951 :meth:`_engine.Connection.begin`
952
953 :ref:`session_begin_nested` - ORM support for SAVEPOINT
954
955 """
956 if self._transaction is None:
957 self._autobegin()
958
959 return NestedTransaction(self)
960
961 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
962 """Begin a two-phase or XA transaction and return a transaction
963 handle.
964
965 The returned object is an instance of :class:`.TwoPhaseTransaction`,
966 which in addition to the methods provided by
967 :class:`.Transaction`, also provides a
968 :meth:`~.TwoPhaseTransaction.prepare` method.
969
970 :param xid: the two phase transaction id. If not supplied, a
971 random id will be generated. The accepted type and value depends on
972 the driver in use.
973
974 .. seealso::
975
976 :meth:`_engine.Connection.begin`
977
978 :meth:`_engine.Connection.begin_twophase`
979
980 """
981
982 if self._transaction is not None:
983 raise exc.InvalidRequestError(
984 "Cannot start a two phase transaction when a transaction "
985 "is already in progress."
986 )
987 if xid is None:
988 xid = self.engine.dialect.create_xid()
989 return TwoPhaseTransaction(self, xid)
990
991 def commit(self) -> None:
992 """Commit the transaction that is currently in progress.
993
994 This method commits the current transaction if one has been started.
995 If no transaction was started, the method has no effect, assuming
996 the connection is in a non-invalidated state.
997
998 A transaction is begun on a :class:`_engine.Connection` automatically
999 whenever a statement is first executed, or when the
1000 :meth:`_engine.Connection.begin` method is called.
1001
1002 .. note:: The :meth:`_engine.Connection.commit` method only acts upon
1003 the primary database transaction that is linked to the
1004 :class:`_engine.Connection` object. It does not operate upon a
1005 SAVEPOINT that would have been invoked from the
1006 :meth:`_engine.Connection.begin_nested` method; for control of a
1007 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
1008 :class:`_engine.NestedTransaction` that is returned by the
1009 :meth:`_engine.Connection.begin_nested` method itself.
1010
1011
1012 """
1013 if self._transaction:
1014 self._transaction.commit()
1015
1016 def rollback(self) -> None:
1017 """Roll back the transaction that is currently in progress.
1018
1019 This method rolls back the current transaction if one has been started.
1020 If no transaction was started, the method has no effect. If a
1021 transaction was started and the connection is in an invalidated state,
1022 the transaction is cleared using this method.
1023
1024 A transaction is begun on a :class:`_engine.Connection` automatically
1025 whenever a statement is first executed, or when the
1026 :meth:`_engine.Connection.begin` method is called.
1027
1028 .. note:: The :meth:`_engine.Connection.rollback` method only acts
1029 upon the primary database transaction that is linked to the
1030 :class:`_engine.Connection` object. It does not operate upon a
1031 SAVEPOINT that would have been invoked from the
1032 :meth:`_engine.Connection.begin_nested` method; for control of a
1033 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
1034 :class:`_engine.NestedTransaction` that is returned by the
1035 :meth:`_engine.Connection.begin_nested` method itself.
1036
1037
1038 """
1039 if self._transaction:
1040 self._transaction.rollback()
1041
1042 def recover_twophase(self) -> List[Any]:
1043 return self.engine.dialect.do_recover_twophase(self)
1044
1045 def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
1046 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
1047
1048 def commit_prepared(self, xid: Any, recover: bool = False) -> None:
1049 self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
1050
1051 def in_transaction(self) -> bool:
1052 """Return True if a transaction is in progress."""
1053 return self._transaction is not None and self._transaction.is_active
1054
1055 def in_nested_transaction(self) -> bool:
1056 """Return True if a transaction is in progress."""
1057 return (
1058 self._nested_transaction is not None
1059 and self._nested_transaction.is_active
1060 )
1061
1062 def _is_autocommit_isolation(self) -> bool:
1063 opt_iso = self._execution_options.get("isolation_level", None)
1064 return bool(
1065 opt_iso == "AUTOCOMMIT"
1066 or (
1067 opt_iso is None
1068 and self.engine.dialect._on_connect_isolation_level
1069 == "AUTOCOMMIT"
1070 )
1071 )
1072
1073 def _get_required_transaction(self) -> RootTransaction:
1074 trans = self._transaction
1075 if trans is None:
1076 raise exc.InvalidRequestError("connection is not in a transaction")
1077 return trans
1078
1079 def _get_required_nested_transaction(self) -> NestedTransaction:
1080 trans = self._nested_transaction
1081 if trans is None:
1082 raise exc.InvalidRequestError(
1083 "connection is not in a nested transaction"
1084 )
1085 return trans
1086
1087 def get_transaction(self) -> Optional[RootTransaction]:
1088 """Return the current root transaction in progress, if any.
1089
1090 .. versionadded:: 1.4
1091
1092 """
1093
1094 return self._transaction
1095
1096 def get_nested_transaction(self) -> Optional[NestedTransaction]:
1097 """Return the current nested transaction in progress, if any.
1098
1099 .. versionadded:: 1.4
1100
1101 """
1102 return self._nested_transaction
1103
1104 def _begin_impl(self, transaction: RootTransaction) -> None:
1105 if self._echo:
1106 if self._is_autocommit_isolation():
1107 self._log_info(
1108 "BEGIN (implicit; DBAPI should not BEGIN due to "
1109 "autocommit mode)"
1110 )
1111 else:
1112 self._log_info("BEGIN (implicit)")
1113
1114 self.__in_begin = True
1115
1116 if self._has_events or self.engine._has_events:
1117 self.dispatch.begin(self)
1118
1119 try:
1120 self.engine.dialect.do_begin(self.connection)
1121 except BaseException as e:
1122 self._handle_dbapi_exception(e, None, None, None, None)
1123 finally:
1124 self.__in_begin = False
1125
1126 def _rollback_impl(self) -> None:
1127 if self._has_events or self.engine._has_events:
1128 self.dispatch.rollback(self)
1129
1130 if self._still_open_and_dbapi_connection_is_valid:
1131 if self._echo:
1132 if self._is_autocommit_isolation():
1133 if self.dialect.skip_autocommit_rollback:
1134 self._log_info(
1135 "ROLLBACK will be skipped by "
1136 "skip_autocommit_rollback"
1137 )
1138 else:
1139 self._log_info(
1140 "ROLLBACK using DBAPI connection.rollback(); "
1141 "set skip_autocommit_rollback to prevent fully"
1142 )
1143 else:
1144 self._log_info("ROLLBACK")
1145 try:
1146 self.engine.dialect.do_rollback(self.connection)
1147 except BaseException as e:
1148 self._handle_dbapi_exception(e, None, None, None, None)
1149
1150 def _commit_impl(self) -> None:
1151 if self._has_events or self.engine._has_events:
1152 self.dispatch.commit(self)
1153
1154 if self._echo:
1155 if self._is_autocommit_isolation():
1156 self._log_info(
1157 "COMMIT using DBAPI connection.commit(), "
1158 "has no effect due to autocommit mode"
1159 )
1160 else:
1161 self._log_info("COMMIT")
1162 try:
1163 self.engine.dialect.do_commit(self.connection)
1164 except BaseException as e:
1165 self._handle_dbapi_exception(e, None, None, None, None)
1166
1167 def _savepoint_impl(self, name: Optional[str] = None) -> str:
1168 if self._has_events or self.engine._has_events:
1169 self.dispatch.savepoint(self, name)
1170
1171 if name is None:
1172 self.__savepoint_seq += 1
1173 name = "sa_savepoint_%s" % self.__savepoint_seq
1174 self.engine.dialect.do_savepoint(self, name)
1175 return name
1176
1177 def _rollback_to_savepoint_impl(self, name: str) -> None:
1178 if self._has_events or self.engine._has_events:
1179 self.dispatch.rollback_savepoint(self, name, None)
1180
1181 if self._still_open_and_dbapi_connection_is_valid:
1182 self.engine.dialect.do_rollback_to_savepoint(self, name)
1183
1184 def _release_savepoint_impl(self, name: str) -> None:
1185 if self._has_events or self.engine._has_events:
1186 self.dispatch.release_savepoint(self, name, None)
1187
1188 self.engine.dialect.do_release_savepoint(self, name)
1189
1190 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
1191 if self._echo:
1192 self._log_info("BEGIN TWOPHASE (implicit)")
1193 if self._has_events or self.engine._has_events:
1194 self.dispatch.begin_twophase(self, transaction.xid)
1195
1196 self.__in_begin = True
1197 try:
1198 self.engine.dialect.do_begin_twophase(self, transaction.xid)
1199 except BaseException as e:
1200 self._handle_dbapi_exception(e, None, None, None, None)
1201 finally:
1202 self.__in_begin = False
1203
1204 def _prepare_twophase_impl(self, xid: Any) -> None:
1205 if self._has_events or self.engine._has_events:
1206 self.dispatch.prepare_twophase(self, xid)
1207
1208 assert isinstance(self._transaction, TwoPhaseTransaction)
1209 try:
1210 self.engine.dialect.do_prepare_twophase(self, xid)
1211 except BaseException as e:
1212 self._handle_dbapi_exception(e, None, None, None, None)
1213
1214 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1215 if self._has_events or self.engine._has_events:
1216 self.dispatch.rollback_twophase(self, xid, is_prepared)
1217
1218 if self._still_open_and_dbapi_connection_is_valid:
1219 assert isinstance(self._transaction, TwoPhaseTransaction)
1220 try:
1221 self.engine.dialect.do_rollback_twophase(
1222 self, xid, is_prepared
1223 )
1224 except BaseException as e:
1225 self._handle_dbapi_exception(e, None, None, None, None)
1226
1227 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1228 if self._has_events or self.engine._has_events:
1229 self.dispatch.commit_twophase(self, xid, is_prepared)
1230
1231 assert isinstance(self._transaction, TwoPhaseTransaction)
1232 try:
1233 self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
1234 except BaseException as e:
1235 self._handle_dbapi_exception(e, None, None, None, None)
1236
1237 def close(self) -> None:
1238 """Close this :class:`_engine.Connection`.
1239
1240 This results in a release of the underlying database
1241 resources, that is, the DBAPI connection referenced
1242 internally. The DBAPI connection is typically restored
1243 back to the connection-holding :class:`_pool.Pool` referenced
1244 by the :class:`_engine.Engine` that produced this
1245 :class:`_engine.Connection`. Any transactional state present on
1246 the DBAPI connection is also unconditionally released via
1247 the DBAPI connection's ``rollback()`` method, regardless
1248 of any :class:`.Transaction` object that may be
1249 outstanding with regards to this :class:`_engine.Connection`.
1250
1251 This has the effect of also calling :meth:`_engine.Connection.rollback`
1252 if any transaction is in place.
1253
1254 After :meth:`_engine.Connection.close` is called, the
1255 :class:`_engine.Connection` is permanently in a closed state,
1256 and will allow no further operations.
1257
1258 """
1259
1260 if self._transaction:
1261 self._transaction.close()
1262 skip_reset = True
1263 else:
1264 skip_reset = False
1265
1266 if self._dbapi_connection is not None:
1267 conn = self._dbapi_connection
1268
1269 # as we just closed the transaction, close the connection
1270 # pool connection without doing an additional reset
1271 if skip_reset:
1272 cast("_ConnectionFairy", conn)._close_special(
1273 transaction_reset=True
1274 )
1275 else:
1276 conn.close()
1277
1278 # There is a slight chance that conn.close() may have
1279 # triggered an invalidation here in which case
1280 # _dbapi_connection would already be None, however usually
1281 # it will be non-None here and in a "closed" state.
1282 self._dbapi_connection = None
1283 self.__can_reconnect = False
1284
1285 # special case to handle mypy issue:
1286 # https://github.com/python/mypy/issues/20651
1287 @overload
1288 def scalar(
1289 self,
1290 statement: TypedReturnsRows[Never],
1291 parameters: Optional[_CoreSingleExecuteParams] = None,
1292 *,
1293 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1294 ) -> Optional[Any]: ...
1295
1296 @overload
1297 def scalar(
1298 self,
1299 statement: TypedReturnsRows[_T],
1300 parameters: Optional[_CoreSingleExecuteParams] = None,
1301 *,
1302 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1303 ) -> Optional[_T]: ...
1304
1305 @overload
1306 def scalar(
1307 self,
1308 statement: Executable,
1309 parameters: Optional[_CoreSingleExecuteParams] = None,
1310 *,
1311 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1312 ) -> Any: ...
1313
1314 def scalar(
1315 self,
1316 statement: Executable,
1317 parameters: Optional[_CoreSingleExecuteParams] = None,
1318 *,
1319 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1320 ) -> Any:
1321 r"""Executes a SQL statement construct and returns a scalar object.
1322
1323 This method is shorthand for invoking the
1324 :meth:`_engine.Result.scalar` method after invoking the
1325 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
1326
1327 :return: a scalar Python value representing the first column of the
1328 first row returned.
1329
1330 """
1331 distilled_parameters = _distill_params_20(parameters)
1332 try:
1333 meth = statement._execute_on_scalar
1334 except AttributeError as err:
1335 raise exc.ObjectNotExecutableError(statement) from err
1336 else:
1337 return meth(
1338 self,
1339 distilled_parameters,
1340 execution_options or NO_OPTIONS,
1341 )
1342
1343 @overload
1344 def scalars(
1345 self,
1346 statement: TypedReturnsRows[_T],
1347 parameters: Optional[_CoreAnyExecuteParams] = None,
1348 *,
1349 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1350 ) -> ScalarResult[_T]: ...
1351
1352 @overload
1353 def scalars(
1354 self,
1355 statement: Executable,
1356 parameters: Optional[_CoreAnyExecuteParams] = None,
1357 *,
1358 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1359 ) -> ScalarResult[Any]: ...
1360
1361 def scalars(
1362 self,
1363 statement: Executable,
1364 parameters: Optional[_CoreAnyExecuteParams] = None,
1365 *,
1366 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1367 ) -> ScalarResult[Any]:
1368 """Executes and returns a scalar result set, which yields scalar values
1369 from the first column of each row.
1370
1371 This method is equivalent to calling :meth:`_engine.Connection.execute`
1372 to receive a :class:`_result.Result` object, then invoking the
1373 :meth:`_result.Result.scalars` method to produce a
1374 :class:`_result.ScalarResult` instance.
1375
1376 :return: a :class:`_result.ScalarResult`
1377
1378 .. versionadded:: 1.4.24
1379
1380 """
1381
1382 return self.execute(
1383 statement, parameters, execution_options=execution_options
1384 ).scalars()
1385
1386 @overload
1387 def execute(
1388 self,
1389 statement: TypedReturnsRows[Unpack[_Ts]],
1390 parameters: Optional[_CoreAnyExecuteParams] = None,
1391 *,
1392 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1393 ) -> CursorResult[Unpack[_Ts]]: ...
1394
1395 @overload
1396 def execute(
1397 self,
1398 statement: Executable,
1399 parameters: Optional[_CoreAnyExecuteParams] = None,
1400 *,
1401 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1402 ) -> CursorResult[Unpack[TupleAny]]: ...
1403
1404 def execute(
1405 self,
1406 statement: Executable,
1407 parameters: Optional[_CoreAnyExecuteParams] = None,
1408 *,
1409 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1410 ) -> CursorResult[Unpack[TupleAny]]:
1411 r"""Executes a SQL statement construct and returns a
1412 :class:`_engine.CursorResult`.
1413
1414 :param statement: The statement to be executed. This is always
1415 an object that is in both the :class:`_expression.ClauseElement` and
1416 :class:`_expression.Executable` hierarchies, including:
1417
1418 * :class:`_expression.Select`
1419 * :class:`_expression.Insert`, :class:`_expression.Update`,
1420 :class:`_expression.Delete`
1421 * :class:`_expression.TextClause` and
1422 :class:`_expression.TextualSelect`
1423 * :class:`_schema.DDL` and objects which inherit from
1424 :class:`_schema.ExecutableDDLElement`
1425
1426 :param parameters: parameters which will be bound into the statement.
1427 This may be either a dictionary of parameter names to values,
1428 or a mutable sequence (e.g. a list) of dictionaries. When a
1429 list of dictionaries is passed, the underlying statement execution
1430 will make use of the DBAPI ``cursor.executemany()`` method.
1431 When a single dictionary is passed, the DBAPI ``cursor.execute()``
1432 method will be used.
1433
1434 :param execution_options: optional dictionary of execution options,
1435 which will be associated with the statement execution. This
1436 dictionary can provide a subset of the options that are accepted
1437 by :meth:`_engine.Connection.execution_options`.
1438
1439 :return: a :class:`_engine.Result` object.
1440
1441 """
1442 distilled_parameters = _distill_params_20(parameters)
1443 try:
1444 meth = statement._execute_on_connection
1445 except AttributeError as err:
1446 raise exc.ObjectNotExecutableError(statement) from err
1447 else:
1448 return meth(
1449 self,
1450 distilled_parameters,
1451 execution_options or NO_OPTIONS,
1452 )
1453
1454 def _execute_function(
1455 self,
1456 func: FunctionElement[Any],
1457 distilled_parameters: _CoreMultiExecuteParams,
1458 execution_options: CoreExecuteOptionsParameter,
1459 ) -> CursorResult[Unpack[TupleAny]]:
1460 """Execute a sql.FunctionElement object."""
1461
1462 return self._execute_clauseelement(
1463 func.select(), distilled_parameters, execution_options
1464 )
1465
1466 def _execute_default(
1467 self,
1468 default: DefaultGenerator,
1469 distilled_parameters: _CoreMultiExecuteParams,
1470 execution_options: CoreExecuteOptionsParameter,
1471 ) -> Any:
1472 """Execute a schema.ColumnDefault object."""
1473
1474 exec_opts = self._execution_options.merge_with(execution_options)
1475
1476 event_multiparams: Optional[_CoreMultiExecuteParams]
1477 event_params: Optional[_CoreAnyExecuteParams]
1478
1479 # note for event handlers, the "distilled parameters" which is always
1480 # a list of dicts is broken out into separate "multiparams" and
1481 # "params" collections, which allows the handler to distinguish
1482 # between an executemany and execute style set of parameters.
1483 if self._has_events or self.engine._has_events:
1484 (
1485 default,
1486 distilled_parameters,
1487 event_multiparams,
1488 event_params,
1489 ) = self._invoke_before_exec_event(
1490 default, distilled_parameters, exec_opts
1491 )
1492 else:
1493 event_multiparams = event_params = None
1494
1495 try:
1496 conn = self._dbapi_connection
1497 if conn is None:
1498 conn = self._revalidate_connection()
1499
1500 dialect = self.dialect
1501 ctx = dialect.execution_ctx_cls._init_default(
1502 dialect, self, conn, exec_opts
1503 )
1504 except (exc.PendingRollbackError, exc.ResourceClosedError):
1505 raise
1506 except BaseException as e:
1507 self._handle_dbapi_exception(e, None, None, None, None)
1508
1509 ret = ctx._exec_default(None, default, None)
1510
1511 if self._has_events or self.engine._has_events:
1512 self.dispatch.after_execute(
1513 self,
1514 default,
1515 event_multiparams,
1516 event_params,
1517 exec_opts,
1518 ret,
1519 )
1520
1521 return ret
1522
1523 def _execute_ddl(
1524 self,
1525 ddl: ExecutableDDLElement,
1526 distilled_parameters: _CoreMultiExecuteParams,
1527 execution_options: CoreExecuteOptionsParameter,
1528 ) -> CursorResult[Unpack[TupleAny]]:
1529 """Execute a schema.DDL object."""
1530
1531 exec_opts = ddl._execution_options.merge_with(
1532 self._execution_options, execution_options
1533 )
1534
1535 event_multiparams: Optional[_CoreMultiExecuteParams]
1536 event_params: Optional[_CoreSingleExecuteParams]
1537
1538 if self._has_events or self.engine._has_events:
1539 (
1540 ddl,
1541 distilled_parameters,
1542 event_multiparams,
1543 event_params,
1544 ) = self._invoke_before_exec_event(
1545 ddl, distilled_parameters, exec_opts
1546 )
1547 else:
1548 event_multiparams = event_params = None
1549
1550 schema_translate_map = exec_opts.get("schema_translate_map", None)
1551
1552 dialect = self.dialect
1553
1554 compiled = ddl.compile(
1555 dialect=dialect, schema_translate_map=schema_translate_map
1556 )
1557 ret = self._execute_context(
1558 dialect,
1559 dialect.execution_ctx_cls._init_ddl,
1560 compiled,
1561 None,
1562 exec_opts,
1563 compiled,
1564 )
1565 if self._has_events or self.engine._has_events:
1566 self.dispatch.after_execute(
1567 self,
1568 ddl,
1569 event_multiparams,
1570 event_params,
1571 exec_opts,
1572 ret,
1573 )
1574 return ret
1575
1576 def _invoke_before_exec_event(
1577 self,
1578 elem: Any,
1579 distilled_params: _CoreMultiExecuteParams,
1580 execution_options: _ExecuteOptions,
1581 ) -> Tuple[
1582 Any,
1583 _CoreMultiExecuteParams,
1584 _CoreMultiExecuteParams,
1585 _CoreSingleExecuteParams,
1586 ]:
1587 event_multiparams: _CoreMultiExecuteParams
1588 event_params: _CoreSingleExecuteParams
1589
1590 if len(distilled_params) == 1:
1591 event_multiparams, event_params = [], distilled_params[0]
1592 else:
1593 event_multiparams, event_params = distilled_params, {}
1594
1595 for fn in self.dispatch.before_execute:
1596 elem, event_multiparams, event_params = fn(
1597 self,
1598 elem,
1599 event_multiparams,
1600 event_params,
1601 execution_options,
1602 )
1603
1604 if event_multiparams:
1605 distilled_params = list(event_multiparams)
1606 if event_params:
1607 raise exc.InvalidRequestError(
1608 "Event handler can't return non-empty multiparams "
1609 "and params at the same time"
1610 )
1611 elif event_params:
1612 distilled_params = [event_params]
1613 else:
1614 distilled_params = []
1615
1616 return elem, distilled_params, event_multiparams, event_params
1617
1618 def _execute_clauseelement(
1619 self,
1620 elem: Executable,
1621 distilled_parameters: _CoreMultiExecuteParams,
1622 execution_options: CoreExecuteOptionsParameter,
1623 ) -> CursorResult[Unpack[TupleAny]]:
1624 """Execute a sql.ClauseElement object."""
1625
1626 exec_opts = elem._execution_options.merge_with(
1627 self._execution_options, execution_options
1628 )
1629
1630 has_events = self._has_events or self.engine._has_events
1631 if has_events:
1632 (
1633 elem,
1634 distilled_parameters,
1635 event_multiparams,
1636 event_params,
1637 ) = self._invoke_before_exec_event(
1638 elem, distilled_parameters, exec_opts
1639 )
1640
1641 if distilled_parameters:
1642 # ensure we don't retain a link to the view object for keys()
1643 # which links to the values, which we don't want to cache
1644 keys = sorted(distilled_parameters[0])
1645 for_executemany = len(distilled_parameters) > 1
1646 else:
1647 keys = []
1648 for_executemany = False
1649
1650 dialect = self.dialect
1651
1652 schema_translate_map = exec_opts.get("schema_translate_map", None)
1653
1654 compiled_cache: Optional[CompiledCacheType] = exec_opts.get(
1655 "compiled_cache", self.engine._compiled_cache
1656 )
1657
1658 compiled_sql, extracted_params, param_dict, cache_hit = (
1659 elem._compile_w_cache(
1660 dialect=dialect,
1661 compiled_cache=compiled_cache,
1662 column_keys=keys,
1663 for_executemany=for_executemany,
1664 schema_translate_map=schema_translate_map,
1665 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1666 )
1667 )
1668 ret = self._execute_context(
1669 dialect,
1670 dialect.execution_ctx_cls._init_compiled,
1671 compiled_sql,
1672 distilled_parameters,
1673 exec_opts,
1674 compiled_sql,
1675 distilled_parameters,
1676 elem,
1677 extracted_params,
1678 cache_hit=cache_hit,
1679 param_dict=param_dict,
1680 )
1681 if has_events:
1682 self.dispatch.after_execute(
1683 self,
1684 elem,
1685 event_multiparams,
1686 event_params,
1687 exec_opts,
1688 ret,
1689 )
1690 return ret
1691
1692 def exec_driver_sql(
1693 self,
1694 statement: str,
1695 parameters: Optional[_DBAPIAnyExecuteParams] = None,
1696 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1697 ) -> CursorResult[Unpack[TupleAny]]:
1698 r"""Executes a string SQL statement on the DBAPI cursor directly,
1699 without any SQL compilation steps.
1700
1701 This can be used to pass any string directly to the
1702 ``cursor.execute()`` method of the DBAPI in use.
1703
1704 :param statement: The statement str to be executed. Bound parameters
1705 must use the underlying DBAPI's paramstyle, such as "qmark",
1706 "pyformat", "format", etc.
1707
1708 :param parameters: represent bound parameter values to be used in the
1709 execution. The format is one of: a dictionary of named parameters,
1710 a tuple of positional parameters, or a list containing either
1711 dictionaries or tuples for multiple-execute support.
1712
1713 :return: a :class:`_engine.CursorResult`.
1714
1715 E.g. multiple dictionaries::
1716
1717
1718 conn.exec_driver_sql(
1719 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1720 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}],
1721 )
1722
1723 Single dictionary::
1724
1725 conn.exec_driver_sql(
1726 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1727 dict(id=1, value="v1"),
1728 )
1729
1730 Single tuple::
1731
1732 conn.exec_driver_sql(
1733 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1")
1734 )
1735
1736 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
1737 not participate in the
1738 :meth:`_events.ConnectionEvents.before_execute` and
1739 :meth:`_events.ConnectionEvents.after_execute` events. To
1740 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
1741 :meth:`_events.ConnectionEvents.before_cursor_execute` and
1742 :meth:`_events.ConnectionEvents.after_cursor_execute`.
1743
1744 .. seealso::
1745
1746 :pep:`249`
1747
1748 """
1749
1750 distilled_parameters = _distill_raw_params(parameters)
1751
1752 exec_opts = self._execution_options.merge_with(execution_options)
1753
1754 dialect = self.dialect
1755 ret = self._execute_context(
1756 dialect,
1757 dialect.execution_ctx_cls._init_statement,
1758 statement,
1759 None,
1760 exec_opts,
1761 statement,
1762 distilled_parameters,
1763 )
1764
1765 return ret
1766
1767 def _execute_context(
1768 self,
1769 dialect: Dialect,
1770 constructor: Callable[..., ExecutionContext],
1771 statement: Union[str, Compiled],
1772 parameters: Optional[_AnyMultiExecuteParams],
1773 execution_options: _ExecuteOptions,
1774 *args: Any,
1775 **kw: Any,
1776 ) -> CursorResult[Unpack[TupleAny]]:
1777 """Create an :class:`.ExecutionContext` and execute, returning
1778 a :class:`_engine.CursorResult`."""
1779
1780 if execution_options:
1781 yp = execution_options.get("yield_per", None)
1782 if yp:
1783 execution_options = execution_options.union(
1784 {"stream_results": True, "max_row_buffer": yp}
1785 )
1786 try:
1787 conn = self._dbapi_connection
1788 if conn is None:
1789 conn = self._revalidate_connection()
1790
1791 context = constructor(
1792 dialect, self, conn, execution_options, *args, **kw
1793 )
1794 except (exc.PendingRollbackError, exc.ResourceClosedError):
1795 raise
1796 except BaseException as e:
1797 self._handle_dbapi_exception(
1798 e, str(statement), parameters, None, None
1799 )
1800
1801 if (
1802 self._transaction
1803 and not self._transaction.is_active
1804 or (
1805 self._nested_transaction
1806 and not self._nested_transaction.is_active
1807 )
1808 ):
1809 self._invalid_transaction()
1810
1811 elif self._trans_context_manager:
1812 TransactionalContext._trans_ctx_check(self)
1813
1814 if self._transaction is None:
1815 self._autobegin()
1816
1817 context.pre_exec()
1818
1819 if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
1820 return self._exec_insertmany_context(dialect, context)
1821 else:
1822 return self._exec_single_context(
1823 dialect, context, statement, parameters
1824 )
1825
1826 def _exec_single_context(
1827 self,
1828 dialect: Dialect,
1829 context: ExecutionContext,
1830 statement: Union[str, Compiled],
1831 parameters: Optional[_AnyMultiExecuteParams],
1832 ) -> CursorResult[Unpack[TupleAny]]:
1833 """continue the _execute_context() method for a single DBAPI
1834 cursor.execute() or cursor.executemany() call.
1835
1836 """
1837 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1838 generic_setinputsizes = context._prepare_set_input_sizes()
1839
1840 if generic_setinputsizes:
1841 try:
1842 dialect.do_set_input_sizes(
1843 context.cursor, generic_setinputsizes, context
1844 )
1845 except BaseException as e:
1846 self._handle_dbapi_exception(
1847 e, str(statement), parameters, None, context
1848 )
1849
1850 cursor, str_statement, parameters = (
1851 context.cursor,
1852 context.statement,
1853 context.parameters,
1854 )
1855
1856 effective_parameters: Optional[_AnyExecuteParams]
1857
1858 if not context.executemany:
1859 effective_parameters = parameters[0]
1860 else:
1861 effective_parameters = parameters
1862
1863 if self._has_events or self.engine._has_events:
1864 for fn in self.dispatch.before_cursor_execute:
1865 str_statement, effective_parameters = fn(
1866 self,
1867 cursor,
1868 str_statement,
1869 effective_parameters,
1870 context,
1871 context.executemany,
1872 )
1873
1874 if self._echo:
1875 self._log_info(str_statement)
1876
1877 stats = context._get_cache_stats()
1878
1879 if not self.engine.hide_parameters:
1880 self._log_info(
1881 "[%s] %r",
1882 stats,
1883 sql_util._repr_params(
1884 effective_parameters,
1885 batches=10,
1886 ismulti=context.executemany,
1887 ),
1888 )
1889 else:
1890 self._log_info(
1891 "[%s] [SQL parameters hidden due to hide_parameters=True]",
1892 stats,
1893 )
1894
1895 evt_handled: bool = False
1896 try:
1897 if context.execute_style is ExecuteStyle.EXECUTEMANY:
1898 effective_parameters = cast(
1899 "_CoreMultiExecuteParams", effective_parameters
1900 )
1901 if self.dialect._has_events:
1902 for fn in self.dialect.dispatch.do_executemany:
1903 if fn(
1904 cursor,
1905 str_statement,
1906 effective_parameters,
1907 context,
1908 ):
1909 evt_handled = True
1910 break
1911 if not evt_handled:
1912 self.dialect.do_executemany(
1913 cursor,
1914 str_statement,
1915 effective_parameters,
1916 context,
1917 )
1918 elif not effective_parameters and context.no_parameters:
1919 if self.dialect._has_events:
1920 for fn in self.dialect.dispatch.do_execute_no_params:
1921 if fn(cursor, str_statement, context):
1922 evt_handled = True
1923 break
1924 if not evt_handled:
1925 self.dialect.do_execute_no_params(
1926 cursor, str_statement, context
1927 )
1928 else:
1929 effective_parameters = cast(
1930 "_CoreSingleExecuteParams", effective_parameters
1931 )
1932 if self.dialect._has_events:
1933 for fn in self.dialect.dispatch.do_execute:
1934 if fn(
1935 cursor,
1936 str_statement,
1937 effective_parameters,
1938 context,
1939 ):
1940 evt_handled = True
1941 break
1942 if not evt_handled:
1943 self.dialect.do_execute(
1944 cursor, str_statement, effective_parameters, context
1945 )
1946
1947 if self._has_events or self.engine._has_events:
1948 self.dispatch.after_cursor_execute(
1949 self,
1950 cursor,
1951 str_statement,
1952 effective_parameters,
1953 context,
1954 context.executemany,
1955 )
1956
1957 context.post_exec()
1958
1959 result = context._setup_result_proxy()
1960
1961 except BaseException as e:
1962 self._handle_dbapi_exception(
1963 e, str_statement, effective_parameters, cursor, context
1964 )
1965
1966 return result
1967
1968 def _exec_insertmany_context(
1969 self,
1970 dialect: Dialect,
1971 context: ExecutionContext,
1972 ) -> CursorResult[Unpack[TupleAny]]:
1973 """continue the _execute_context() method for an "insertmanyvalues"
1974 operation, which will invoke DBAPI
1975 cursor.execute() one or more times with individual log and
1976 event hook calls.
1977
1978 """
1979
1980 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1981 generic_setinputsizes = context._prepare_set_input_sizes()
1982 else:
1983 generic_setinputsizes = None
1984
1985 cursor, str_statement, parameters = (
1986 context.cursor,
1987 context.statement,
1988 context.parameters,
1989 )
1990
1991 effective_parameters = parameters
1992
1993 engine_events = self._has_events or self.engine._has_events
1994 if self.dialect._has_events:
1995 do_execute_dispatch: Iterable[Any] = (
1996 self.dialect.dispatch.do_execute
1997 )
1998 else:
1999 do_execute_dispatch = ()
2000
2001 if self._echo:
2002 stats = context._get_cache_stats() + " (insertmanyvalues)"
2003
2004 preserve_rowcount = context.execution_options.get(
2005 "preserve_rowcount", False
2006 )
2007 rowcount = 0
2008
2009 for imv_batch in dialect._deliver_insertmanyvalues_batches(
2010 self,
2011 cursor,
2012 str_statement,
2013 effective_parameters,
2014 generic_setinputsizes,
2015 context,
2016 ):
2017 if imv_batch.processed_setinputsizes:
2018 try:
2019 dialect.do_set_input_sizes(
2020 context.cursor,
2021 imv_batch.processed_setinputsizes,
2022 context,
2023 )
2024 except BaseException as e:
2025 self._handle_dbapi_exception(
2026 e,
2027 sql_util._long_statement(imv_batch.replaced_statement),
2028 imv_batch.replaced_parameters,
2029 None,
2030 context,
2031 is_sub_exec=True,
2032 )
2033
2034 sub_stmt = imv_batch.replaced_statement
2035 sub_params = imv_batch.replaced_parameters
2036
2037 if engine_events:
2038 for fn in self.dispatch.before_cursor_execute:
2039 sub_stmt, sub_params = fn(
2040 self,
2041 cursor,
2042 sub_stmt,
2043 sub_params,
2044 context,
2045 True,
2046 )
2047
2048 if self._echo:
2049 self._log_info(sql_util._long_statement(sub_stmt))
2050
2051 imv_stats = f""" {imv_batch.batchnum}/{
2052 imv_batch.total_batches
2053 } ({
2054 'ordered'
2055 if imv_batch.rows_sorted else 'unordered'
2056 }{
2057 '; batch not supported'
2058 if imv_batch.is_downgraded
2059 else ''
2060 })"""
2061
2062 if imv_batch.batchnum == 1:
2063 stats += imv_stats
2064 else:
2065 stats = f"insertmanyvalues{imv_stats}"
2066
2067 if not self.engine.hide_parameters:
2068 self._log_info(
2069 "[%s] %r",
2070 stats,
2071 sql_util._repr_params(
2072 sub_params,
2073 batches=10,
2074 ismulti=False,
2075 ),
2076 )
2077 else:
2078 self._log_info(
2079 "[%s] [SQL parameters hidden due to "
2080 "hide_parameters=True]",
2081 stats,
2082 )
2083
2084 try:
2085 for fn in do_execute_dispatch:
2086 if fn(
2087 cursor,
2088 sub_stmt,
2089 sub_params,
2090 context,
2091 ):
2092 break
2093 else:
2094 dialect.do_execute(
2095 cursor,
2096 sub_stmt,
2097 sub_params,
2098 context,
2099 )
2100 except BaseException as e:
2101 self._handle_dbapi_exception(
2102 e,
2103 sql_util._long_statement(sub_stmt),
2104 sub_params,
2105 cursor,
2106 context,
2107 is_sub_exec=True,
2108 )
2109
2110 if engine_events:
2111 self.dispatch.after_cursor_execute(
2112 self,
2113 cursor,
2114 sub_stmt,
2115 sub_params,
2116 context,
2117 context.executemany,
2118 )
2119
2120 if preserve_rowcount:
2121 rowcount += imv_batch.current_batch_size
2122
2123 try:
2124 context.post_exec()
2125
2126 if preserve_rowcount:
2127 context._rowcount = rowcount # type: ignore[attr-defined]
2128
2129 result = context._setup_result_proxy()
2130
2131 except BaseException as e:
2132 self._handle_dbapi_exception(
2133 e, str_statement, effective_parameters, cursor, context
2134 )
2135
2136 return result
2137
2138 def _cursor_execute(
2139 self,
2140 cursor: DBAPICursor,
2141 statement: str,
2142 parameters: _DBAPISingleExecuteParams,
2143 context: Optional[ExecutionContext] = None,
2144 ) -> None:
2145 """Execute a statement + params on the given cursor.
2146
2147 Adds appropriate logging and exception handling.
2148
2149 This method is used by DefaultDialect for special-case
2150 executions, such as for sequences and column defaults.
2151 The path of statement execution in the majority of cases
2152 terminates at _execute_context().
2153
2154 """
2155 if self._has_events or self.engine._has_events:
2156 for fn in self.dispatch.before_cursor_execute:
2157 statement, parameters = fn(
2158 self, cursor, statement, parameters, context, False
2159 )
2160
2161 if self._echo:
2162 self._log_info(statement)
2163 self._log_info("[raw sql] %r", parameters)
2164 try:
2165 for fn in (
2166 ()
2167 if not self.dialect._has_events
2168 else self.dialect.dispatch.do_execute
2169 ):
2170 if fn(cursor, statement, parameters, context):
2171 break
2172 else:
2173 self.dialect.do_execute(cursor, statement, parameters, context)
2174 except BaseException as e:
2175 self._handle_dbapi_exception(
2176 e, statement, parameters, cursor, context
2177 )
2178
2179 if self._has_events or self.engine._has_events:
2180 self.dispatch.after_cursor_execute(
2181 self, cursor, statement, parameters, context, False
2182 )
2183
2184 def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
2185 """Close the given cursor, catching exceptions
2186 and turning into log warnings.
2187
2188 """
2189 try:
2190 cursor.close()
2191 except Exception:
2192 # log the error through the connection pool's logger.
2193 self.engine.pool.logger.error(
2194 "Error closing cursor", exc_info=True
2195 )
2196
2197 _reentrant_error = False
2198 _is_disconnect = False
2199
2200 def _handle_dbapi_exception(
2201 self,
2202 e: BaseException,
2203 statement: Optional[str],
2204 parameters: Optional[_AnyExecuteParams],
2205 cursor: Optional[DBAPICursor],
2206 context: Optional[ExecutionContext],
2207 is_sub_exec: bool = False,
2208 ) -> NoReturn:
2209 exc_info = sys.exc_info()
2210
2211 is_exit_exception = util.is_exit_exception(e)
2212
2213 if not self._is_disconnect:
2214 self._is_disconnect = (
2215 isinstance(e, self.dialect.loaded_dbapi.Error)
2216 and not self.closed
2217 and self.dialect.is_disconnect(
2218 e,
2219 self._dbapi_connection if not self.invalidated else None,
2220 cursor,
2221 )
2222 ) or (is_exit_exception and not self.closed)
2223
2224 invalidate_pool_on_disconnect = not is_exit_exception
2225
2226 ismulti: bool = (
2227 not is_sub_exec and context.executemany
2228 if context is not None
2229 else False
2230 )
2231 if self._reentrant_error:
2232 raise exc.DBAPIError.instance(
2233 statement,
2234 parameters,
2235 e,
2236 self.dialect.loaded_dbapi.Error,
2237 hide_parameters=self.engine.hide_parameters,
2238 dialect=self.dialect,
2239 ismulti=ismulti,
2240 ).with_traceback(exc_info[2]) from e
2241 self._reentrant_error = True
2242 try:
2243 # non-DBAPI error - if we already got a context,
2244 # or there's no string statement, don't wrap it
2245 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or (
2246 statement is not None
2247 and context is None
2248 and not is_exit_exception
2249 )
2250
2251 if should_wrap:
2252 sqlalchemy_exception = exc.DBAPIError.instance(
2253 statement,
2254 parameters,
2255 cast(Exception, e),
2256 self.dialect.loaded_dbapi.Error,
2257 hide_parameters=self.engine.hide_parameters,
2258 connection_invalidated=self._is_disconnect,
2259 dialect=self.dialect,
2260 ismulti=ismulti,
2261 )
2262 else:
2263 sqlalchemy_exception = None
2264
2265 newraise = None
2266
2267 if (self.dialect._has_events) and not self._execution_options.get(
2268 "skip_user_error_events", False
2269 ):
2270 ctx = ExceptionContextImpl(
2271 e,
2272 sqlalchemy_exception,
2273 self.engine,
2274 self.dialect,
2275 self,
2276 cursor,
2277 statement,
2278 parameters,
2279 context,
2280 self._is_disconnect,
2281 invalidate_pool_on_disconnect,
2282 False,
2283 )
2284
2285 for fn in self.dialect.dispatch.handle_error:
2286 try:
2287 # handler returns an exception;
2288 # call next handler in a chain
2289 per_fn = fn(ctx)
2290 if per_fn is not None:
2291 ctx.chained_exception = newraise = per_fn
2292 except Exception as _raised:
2293 # handler raises an exception - stop processing
2294 newraise = _raised
2295 break
2296
2297 if self._is_disconnect != ctx.is_disconnect:
2298 self._is_disconnect = ctx.is_disconnect
2299 if sqlalchemy_exception:
2300 sqlalchemy_exception.connection_invalidated = (
2301 ctx.is_disconnect
2302 )
2303
2304 # set up potentially user-defined value for
2305 # invalidate pool.
2306 invalidate_pool_on_disconnect = (
2307 ctx.invalidate_pool_on_disconnect
2308 )
2309
2310 if should_wrap and context:
2311 context.handle_dbapi_exception(e)
2312
2313 if not self._is_disconnect:
2314 if cursor:
2315 self._safe_close_cursor(cursor)
2316 # "autorollback" was mostly relevant in 1.x series.
2317 # It's very unlikely to reach here, as the connection
2318 # does autobegin so when we are here, we are usually
2319 # in an explicit / semi-explicit transaction.
2320 # however we have a test which manufactures this
2321 # scenario in any case using an event handler.
2322 # test/engine/test_execute.py-> test_actual_autorollback
2323 if not self.in_transaction():
2324 self._rollback_impl()
2325
2326 if newraise:
2327 raise newraise.with_traceback(exc_info[2]) from e
2328 elif should_wrap:
2329 assert sqlalchemy_exception is not None
2330 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2331 else:
2332 assert exc_info[1] is not None
2333 raise exc_info[1].with_traceback(exc_info[2])
2334 finally:
2335 del self._reentrant_error
2336 if self._is_disconnect:
2337 del self._is_disconnect
2338 if not self.invalidated:
2339 dbapi_conn_wrapper = self._dbapi_connection
2340 assert dbapi_conn_wrapper is not None
2341 if invalidate_pool_on_disconnect:
2342 self.engine.pool._invalidate(dbapi_conn_wrapper, e)
2343 self.invalidate(e)
2344
2345 @classmethod
2346 def _handle_dbapi_exception_noconnection(
2347 cls,
2348 e: BaseException,
2349 dialect: Dialect,
2350 engine: Optional[Engine] = None,
2351 is_disconnect: Optional[bool] = None,
2352 invalidate_pool_on_disconnect: bool = True,
2353 is_pre_ping: bool = False,
2354 ) -> NoReturn:
2355 exc_info = sys.exc_info()
2356
2357 if is_disconnect is None:
2358 is_disconnect = isinstance(
2359 e, dialect.loaded_dbapi.Error
2360 ) and dialect.is_disconnect(e, None, None)
2361
2362 should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
2363
2364 if should_wrap:
2365 sqlalchemy_exception = exc.DBAPIError.instance(
2366 None,
2367 None,
2368 cast(Exception, e),
2369 dialect.loaded_dbapi.Error,
2370 hide_parameters=(
2371 engine.hide_parameters if engine is not None else False
2372 ),
2373 connection_invalidated=is_disconnect,
2374 dialect=dialect,
2375 )
2376 else:
2377 sqlalchemy_exception = None
2378
2379 newraise = None
2380
2381 if dialect._has_events:
2382 ctx = ExceptionContextImpl(
2383 e,
2384 sqlalchemy_exception,
2385 engine,
2386 dialect,
2387 None,
2388 None,
2389 None,
2390 None,
2391 None,
2392 is_disconnect,
2393 invalidate_pool_on_disconnect,
2394 is_pre_ping,
2395 )
2396 for fn in dialect.dispatch.handle_error:
2397 try:
2398 # handler returns an exception;
2399 # call next handler in a chain
2400 per_fn = fn(ctx)
2401 if per_fn is not None:
2402 ctx.chained_exception = newraise = per_fn
2403 except Exception as _raised:
2404 # handler raises an exception - stop processing
2405 newraise = _raised
2406 break
2407
2408 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
2409 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect
2410
2411 if newraise:
2412 raise newraise.with_traceback(exc_info[2]) from e
2413 elif should_wrap:
2414 assert sqlalchemy_exception is not None
2415 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2416 else:
2417 assert exc_info[1] is not None
2418 raise exc_info[1].with_traceback(exc_info[2])
2419
2420 def _run_ddl_visitor(
2421 self,
2422 visitorcallable: Type[InvokeDDLBase],
2423 element: SchemaVisitable,
2424 **kwargs: Any,
2425 ) -> None:
2426 """run a DDL visitor.
2427
2428 This method is only here so that the MockConnection can change the
2429 options given to the visitor so that "checkfirst" is skipped.
2430
2431 """
2432 visitorcallable(
2433 dialect=self.dialect, connection=self, **kwargs
2434 ).traverse_single(element)
2435
2436
2437class ExceptionContextImpl(ExceptionContext):
2438 """Implement the :class:`.ExceptionContext` interface."""
2439
2440 __slots__ = (
2441 "connection",
2442 "engine",
2443 "dialect",
2444 "cursor",
2445 "statement",
2446 "parameters",
2447 "original_exception",
2448 "sqlalchemy_exception",
2449 "chained_exception",
2450 "execution_context",
2451 "is_disconnect",
2452 "invalidate_pool_on_disconnect",
2453 "is_pre_ping",
2454 )
2455
2456 def __init__(
2457 self,
2458 exception: BaseException,
2459 sqlalchemy_exception: Optional[exc.StatementError],
2460 engine: Optional[Engine],
2461 dialect: Dialect,
2462 connection: Optional[Connection],
2463 cursor: Optional[DBAPICursor],
2464 statement: Optional[str],
2465 parameters: Optional[_DBAPIAnyExecuteParams],
2466 context: Optional[ExecutionContext],
2467 is_disconnect: bool,
2468 invalidate_pool_on_disconnect: bool,
2469 is_pre_ping: bool,
2470 ):
2471 self.engine = engine
2472 self.dialect = dialect
2473 self.connection = connection
2474 self.sqlalchemy_exception = sqlalchemy_exception
2475 self.original_exception = exception
2476 self.execution_context = context
2477 self.statement = statement
2478 self.parameters = parameters
2479 self.is_disconnect = is_disconnect
2480 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
2481 self.is_pre_ping = is_pre_ping
2482
2483
2484class Transaction(TransactionalContext):
2485 """Represent a database transaction in progress.
2486
2487 The :class:`.Transaction` object is procured by
2488 calling the :meth:`_engine.Connection.begin` method of
2489 :class:`_engine.Connection`::
2490
2491 from sqlalchemy import create_engine
2492
2493 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
2494 connection = engine.connect()
2495 trans = connection.begin()
2496 connection.execute(text("insert into x (a, b) values (1, 2)"))
2497 trans.commit()
2498
2499 The object provides :meth:`.rollback` and :meth:`.commit`
2500 methods in order to control transaction boundaries. It
2501 also implements a context manager interface so that
2502 the Python ``with`` statement can be used with the
2503 :meth:`_engine.Connection.begin` method::
2504
2505 with connection.begin():
2506 connection.execute(text("insert into x (a, b) values (1, 2)"))
2507
2508 The Transaction object is **not** threadsafe.
2509
2510 .. seealso::
2511
2512 :meth:`_engine.Connection.begin`
2513
2514 :meth:`_engine.Connection.begin_twophase`
2515
2516 :meth:`_engine.Connection.begin_nested`
2517
2518 .. index::
2519 single: thread safety; Transaction
2520 """ # noqa
2521
2522 __slots__ = ()
2523
2524 _is_root: bool = False
2525 is_active: bool
2526 connection: Connection
2527
2528 def __init__(self, connection: Connection):
2529 raise NotImplementedError()
2530
2531 @property
2532 def _deactivated_from_connection(self) -> bool:
2533 """True if this transaction is totally deactivated from the connection
2534 and therefore can no longer affect its state.
2535
2536 """
2537 raise NotImplementedError()
2538
2539 def _do_close(self) -> None:
2540 raise NotImplementedError()
2541
2542 def _do_rollback(self) -> None:
2543 raise NotImplementedError()
2544
2545 def _do_commit(self) -> None:
2546 raise NotImplementedError()
2547
2548 @property
2549 def is_valid(self) -> bool:
2550 return self.is_active and not self.connection.invalidated
2551
2552 def close(self) -> None:
2553 """Close this :class:`.Transaction`.
2554
2555 If this transaction is the base transaction in a begin/commit
2556 nesting, the transaction will rollback(). Otherwise, the
2557 method returns.
2558
2559 This is used to cancel a Transaction without affecting the scope of
2560 an enclosing transaction.
2561
2562 """
2563 try:
2564 self._do_close()
2565 finally:
2566 assert not self.is_active
2567
2568 def rollback(self) -> None:
2569 """Roll back this :class:`.Transaction`.
2570
2571 The implementation of this may vary based on the type of transaction in
2572 use:
2573
2574 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2575 it corresponds to a ROLLBACK.
2576
2577 * For a :class:`.NestedTransaction`, it corresponds to a
2578 "ROLLBACK TO SAVEPOINT" operation.
2579
2580 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2581 phase transactions may be used.
2582
2583
2584 """
2585 try:
2586 self._do_rollback()
2587 finally:
2588 assert not self.is_active
2589
2590 def commit(self) -> None:
2591 """Commit this :class:`.Transaction`.
2592
2593 The implementation of this may vary based on the type of transaction in
2594 use:
2595
2596 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2597 it corresponds to a COMMIT.
2598
2599 * For a :class:`.NestedTransaction`, it corresponds to a
2600 "RELEASE SAVEPOINT" operation.
2601
2602 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2603 phase transactions may be used.
2604
2605 """
2606 try:
2607 self._do_commit()
2608 finally:
2609 assert not self.is_active
2610
2611 def _get_subject(self) -> Connection:
2612 return self.connection
2613
2614 def _transaction_is_active(self) -> bool:
2615 return self.is_active
2616
2617 def _transaction_is_closed(self) -> bool:
2618 return not self._deactivated_from_connection
2619
2620 def _rollback_can_be_called(self) -> bool:
2621 # for RootTransaction / NestedTransaction, it's safe to call
2622 # rollback() even if the transaction is deactive and no warnings
2623 # will be emitted. tested in
2624 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
2625 return True
2626
2627
2628class RootTransaction(Transaction):
2629 """Represent the "root" transaction on a :class:`_engine.Connection`.
2630
2631 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
2632 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
2633 is created by calling upon the :meth:`_engine.Connection.begin` method, and
2634 remains associated with the :class:`_engine.Connection` throughout its
2635 active span. The current :class:`_engine.RootTransaction` in use is
2636 accessible via the :attr:`_engine.Connection.get_transaction` method of
2637 :class:`_engine.Connection`.
2638
2639 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs
2640 "autobegin" behavior that will create a new
2641 :class:`_engine.RootTransaction` whenever a connection in a
2642 non-transactional state is used to emit commands on the DBAPI connection.
2643 The scope of the :class:`_engine.RootTransaction` in 2.0 style
2644 use can be controlled using the :meth:`_engine.Connection.commit` and
2645 :meth:`_engine.Connection.rollback` methods.
2646
2647
2648 """
2649
2650 _is_root = True
2651
2652 __slots__ = ("connection", "is_active")
2653
2654 def __init__(self, connection: Connection):
2655 assert connection._transaction is None
2656 if connection._trans_context_manager:
2657 TransactionalContext._trans_ctx_check(connection)
2658 self.connection = connection
2659 self._connection_begin_impl()
2660 connection._transaction = self
2661
2662 self.is_active = True
2663
2664 def _deactivate_from_connection(self) -> None:
2665 if self.is_active:
2666 assert self.connection._transaction is self
2667 self.is_active = False
2668
2669 elif self.connection._transaction is not self:
2670 util.warn("transaction already deassociated from connection")
2671
2672 @property
2673 def _deactivated_from_connection(self) -> bool:
2674 return self.connection._transaction is not self
2675
2676 def _connection_begin_impl(self) -> None:
2677 self.connection._begin_impl(self)
2678
2679 def _connection_rollback_impl(self) -> None:
2680 self.connection._rollback_impl()
2681
2682 def _connection_commit_impl(self) -> None:
2683 self.connection._commit_impl()
2684
2685 def _close_impl(self, try_deactivate: bool = False) -> None:
2686 try:
2687 if self.is_active:
2688 self._connection_rollback_impl()
2689
2690 if self.connection._nested_transaction:
2691 self.connection._nested_transaction._cancel()
2692 finally:
2693 if self.is_active or try_deactivate:
2694 self._deactivate_from_connection()
2695 if self.connection._transaction is self:
2696 self.connection._transaction = None
2697
2698 assert not self.is_active
2699 assert self.connection._transaction is not self
2700
2701 def _do_close(self) -> None:
2702 self._close_impl()
2703
2704 def _do_rollback(self) -> None:
2705 self._close_impl(try_deactivate=True)
2706
2707 def _do_commit(self) -> None:
2708 if self.is_active:
2709 assert self.connection._transaction is self
2710
2711 try:
2712 self._connection_commit_impl()
2713 finally:
2714 # whether or not commit succeeds, cancel any
2715 # nested transactions, make this transaction "inactive"
2716 # and remove it as a reset agent
2717 if self.connection._nested_transaction:
2718 self.connection._nested_transaction._cancel()
2719
2720 self._deactivate_from_connection()
2721
2722 # ...however only remove as the connection's current transaction
2723 # if commit succeeded. otherwise it stays on so that a rollback
2724 # needs to occur.
2725 self.connection._transaction = None
2726 else:
2727 if self.connection._transaction is self:
2728 self.connection._invalid_transaction()
2729 else:
2730 raise exc.InvalidRequestError("This transaction is inactive")
2731
2732 assert not self.is_active
2733 assert self.connection._transaction is not self
2734
2735
2736class NestedTransaction(Transaction):
2737 """Represent a 'nested', or SAVEPOINT transaction.
2738
2739 The :class:`.NestedTransaction` object is created by calling the
2740 :meth:`_engine.Connection.begin_nested` method of
2741 :class:`_engine.Connection`.
2742
2743 When using :class:`.NestedTransaction`, the semantics of "begin" /
2744 "commit" / "rollback" are as follows:
2745
2746 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
2747 the savepoint is given an explicit name that is part of the state
2748 of this object.
2749
2750 * The :meth:`.NestedTransaction.commit` method corresponds to a
2751 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
2752 with this :class:`.NestedTransaction`.
2753
2754 * The :meth:`.NestedTransaction.rollback` method corresponds to a
2755 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
2756 associated with this :class:`.NestedTransaction`.
2757
2758 The rationale for mimicking the semantics of an outer transaction in
2759 terms of savepoints so that code may deal with a "savepoint" transaction
2760 and an "outer" transaction in an agnostic way.
2761
2762 .. seealso::
2763
2764 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
2765
2766 """
2767
2768 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
2769
2770 _savepoint: str
2771
2772 def __init__(self, connection: Connection):
2773 assert connection._transaction is not None
2774 if connection._trans_context_manager:
2775 TransactionalContext._trans_ctx_check(connection)
2776 self.connection = connection
2777 self._savepoint = self.connection._savepoint_impl()
2778 self.is_active = True
2779 self._previous_nested = connection._nested_transaction
2780 connection._nested_transaction = self
2781
2782 def _deactivate_from_connection(self, warn: bool = True) -> None:
2783 if self.connection._nested_transaction is self:
2784 self.connection._nested_transaction = self._previous_nested
2785 elif warn:
2786 util.warn(
2787 "nested transaction already deassociated from connection"
2788 )
2789
2790 @property
2791 def _deactivated_from_connection(self) -> bool:
2792 return self.connection._nested_transaction is not self
2793
2794 def _cancel(self) -> None:
2795 # called by RootTransaction when the outer transaction is
2796 # committed, rolled back, or closed to cancel all savepoints
2797 # without any action being taken
2798 self.is_active = False
2799 self._deactivate_from_connection()
2800 if self._previous_nested:
2801 self._previous_nested._cancel()
2802
2803 def _close_impl(
2804 self, deactivate_from_connection: bool, warn_already_deactive: bool
2805 ) -> None:
2806 try:
2807 if (
2808 self.is_active
2809 and self.connection._transaction
2810 and self.connection._transaction.is_active
2811 ):
2812 self.connection._rollback_to_savepoint_impl(self._savepoint)
2813 finally:
2814 self.is_active = False
2815
2816 if deactivate_from_connection:
2817 self._deactivate_from_connection(warn=warn_already_deactive)
2818
2819 assert not self.is_active
2820 if deactivate_from_connection:
2821 assert self.connection._nested_transaction is not self
2822
2823 def _do_close(self) -> None:
2824 self._close_impl(True, False)
2825
2826 def _do_rollback(self) -> None:
2827 self._close_impl(True, True)
2828
2829 def _do_commit(self) -> None:
2830 if self.is_active:
2831 try:
2832 self.connection._release_savepoint_impl(self._savepoint)
2833 finally:
2834 # nested trans becomes inactive on failed release
2835 # unconditionally. this prevents it from trying to
2836 # emit SQL when it rolls back.
2837 self.is_active = False
2838
2839 # but only de-associate from connection if it succeeded
2840 self._deactivate_from_connection()
2841 else:
2842 if self.connection._nested_transaction is self:
2843 self.connection._invalid_transaction()
2844 else:
2845 raise exc.InvalidRequestError(
2846 "This nested transaction is inactive"
2847 )
2848
2849
2850class TwoPhaseTransaction(RootTransaction):
2851 """Represent a two-phase transaction.
2852
2853 A new :class:`.TwoPhaseTransaction` object may be procured
2854 using the :meth:`_engine.Connection.begin_twophase` method.
2855
2856 The interface is the same as that of :class:`.Transaction`
2857 with the addition of the :meth:`prepare` method.
2858
2859 """
2860
2861 __slots__ = ("xid", "_is_prepared")
2862
2863 xid: Any
2864
2865 def __init__(self, connection: Connection, xid: Any):
2866 self._is_prepared = False
2867 self.xid = xid
2868 super().__init__(connection)
2869
2870 def prepare(self) -> None:
2871 """Prepare this :class:`.TwoPhaseTransaction`.
2872
2873 After a PREPARE, the transaction can be committed.
2874
2875 """
2876 if not self.is_active:
2877 raise exc.InvalidRequestError("This transaction is inactive")
2878 self.connection._prepare_twophase_impl(self.xid)
2879 self._is_prepared = True
2880
2881 def _connection_begin_impl(self) -> None:
2882 self.connection._begin_twophase_impl(self)
2883
2884 def _connection_rollback_impl(self) -> None:
2885 self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
2886
2887 def _connection_commit_impl(self) -> None:
2888 self.connection._commit_twophase_impl(self.xid, self._is_prepared)
2889
2890
2891class Engine(
2892 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"]
2893):
2894 """
2895 Connects a :class:`~sqlalchemy.pool.Pool` and
2896 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
2897 source of database connectivity and behavior.
2898
2899 An :class:`_engine.Engine` object is instantiated publicly using the
2900 :func:`~sqlalchemy.create_engine` function.
2901
2902 .. seealso::
2903
2904 :doc:`/core/engines`
2905
2906 :ref:`connections_toplevel`
2907
2908 """
2909
2910 dispatch: dispatcher[ConnectionEventsTarget]
2911
2912 _compiled_cache: Optional[CompiledCacheType]
2913
2914 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
2915 _has_events: bool = False
2916 _connection_cls: Type[Connection] = Connection
2917 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
2918 _is_future: bool = False
2919
2920 _schema_translate_map: Optional[SchemaTranslateMapType] = None
2921 _option_cls: Type[OptionEngine]
2922
2923 dialect: Dialect
2924 pool: Pool
2925 url: URL
2926 hide_parameters: bool
2927
2928 def __init__(
2929 self,
2930 pool: Pool,
2931 dialect: Dialect,
2932 url: URL,
2933 logging_name: Optional[str] = None,
2934 echo: Optional[_EchoFlagType] = None,
2935 query_cache_size: int = 500,
2936 execution_options: Optional[Mapping[str, Any]] = None,
2937 hide_parameters: bool = False,
2938 ):
2939 self.pool = pool
2940 self.url = url
2941 self.dialect = dialect
2942 if logging_name:
2943 self.logging_name = logging_name
2944 self.echo = echo
2945 self.hide_parameters = hide_parameters
2946 if query_cache_size != 0:
2947 self._compiled_cache = util.LRUCache(
2948 query_cache_size, size_alert=self._lru_size_alert
2949 )
2950 else:
2951 self._compiled_cache = None
2952 log.instance_logger(self, echoflag=echo)
2953 if execution_options:
2954 self.update_execution_options(**execution_options)
2955
2956 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
2957 if self._should_log_info():
2958 self.logger.info(
2959 "Compiled cache size pruning from %d items to %d. "
2960 "Increase cache size to reduce the frequency of pruning.",
2961 len(cache),
2962 cache.capacity,
2963 )
2964
2965 @property
2966 def engine(self) -> Engine:
2967 """Returns this :class:`.Engine`.
2968
2969 Used for legacy schemes that accept :class:`.Connection` /
2970 :class:`.Engine` objects within the same variable.
2971
2972 """
2973 return self
2974
2975 def clear_compiled_cache(self) -> None:
2976 """Clear the compiled cache associated with the dialect.
2977
2978 This applies **only** to the built-in cache that is established
2979 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
2980 It will not impact any dictionary caches that were passed via the
2981 :paramref:`.Connection.execution_options.compiled_cache` parameter.
2982
2983 .. versionadded:: 1.4
2984
2985 """
2986 if self._compiled_cache:
2987 self._compiled_cache.clear()
2988
2989 def update_execution_options(self, **opt: Any) -> None:
2990 r"""Update the default execution_options dictionary
2991 of this :class:`_engine.Engine`.
2992
2993 The given keys/values in \**opt are added to the
2994 default execution options that will be used for
2995 all connections. The initial contents of this dictionary
2996 can be sent via the ``execution_options`` parameter
2997 to :func:`_sa.create_engine`.
2998
2999 .. seealso::
3000
3001 :meth:`_engine.Connection.execution_options`
3002
3003 :meth:`_engine.Engine.execution_options`
3004
3005 """
3006 self.dispatch.set_engine_execution_options(self, opt)
3007 self._execution_options = self._execution_options.union(opt)
3008 self.dialect.set_engine_execution_options(self, opt)
3009
3010 @overload
3011 def execution_options(
3012 self,
3013 *,
3014 compiled_cache: Optional[CompiledCacheType] = ...,
3015 logging_token: str = ...,
3016 isolation_level: IsolationLevel = ...,
3017 insertmanyvalues_page_size: int = ...,
3018 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
3019 **opt: Any,
3020 ) -> OptionEngine: ...
3021
3022 @overload
3023 def execution_options(self, **opt: Any) -> OptionEngine: ...
3024
3025 def execution_options(self, **opt: Any) -> OptionEngine:
3026 """Return a new :class:`_engine.Engine` that will provide
3027 :class:`_engine.Connection` objects with the given execution options.
3028
3029 The returned :class:`_engine.Engine` remains related to the original
3030 :class:`_engine.Engine` in that it shares the same connection pool and
3031 other state:
3032
3033 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
3034 is the
3035 same instance. The :meth:`_engine.Engine.dispose`
3036 method will replace
3037 the connection pool instance for the parent engine as well
3038 as this one.
3039 * Event listeners are "cascaded" - meaning, the new
3040 :class:`_engine.Engine`
3041 inherits the events of the parent, and new events can be associated
3042 with the new :class:`_engine.Engine` individually.
3043 * The logging configuration and logging_name is copied from the parent
3044 :class:`_engine.Engine`.
3045
3046 The intent of the :meth:`_engine.Engine.execution_options` method is
3047 to implement schemes where multiple :class:`_engine.Engine`
3048 objects refer to the same connection pool, but are differentiated
3049 by options that affect some execution-level behavior for each
3050 engine. One such example is breaking into separate "reader" and
3051 "writer" :class:`_engine.Engine` instances, where one
3052 :class:`_engine.Engine`
3053 has a lower :term:`isolation level` setting configured or is even
3054 transaction-disabled using "autocommit". An example of this
3055 configuration is at :ref:`dbapi_autocommit_multiple`.
3056
3057 Another example is one that
3058 uses a custom option ``shard_id`` which is consumed by an event
3059 to change the current schema on a database connection::
3060
3061 from sqlalchemy import event
3062 from sqlalchemy.engine import Engine
3063
3064 primary_engine = create_engine("mysql+mysqldb://")
3065 shard1 = primary_engine.execution_options(shard_id="shard1")
3066 shard2 = primary_engine.execution_options(shard_id="shard2")
3067
3068 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}
3069
3070
3071 @event.listens_for(Engine, "before_cursor_execute")
3072 def _switch_shard(conn, cursor, stmt, params, context, executemany):
3073 shard_id = conn.get_execution_options().get("shard_id", "default")
3074 current_shard = conn.info.get("current_shard", None)
3075
3076 if current_shard != shard_id:
3077 cursor.execute("use %s" % shards[shard_id])
3078 conn.info["current_shard"] = shard_id
3079
3080 The above recipe illustrates two :class:`_engine.Engine` objects that
3081 will each serve as factories for :class:`_engine.Connection` objects
3082 that have pre-established "shard_id" execution options present. A
3083 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler
3084 then interprets this execution option to emit a MySQL ``use`` statement
3085 to switch databases before a statement execution, while at the same
3086 time keeping track of which database we've established using the
3087 :attr:`_engine.Connection.info` dictionary.
3088
3089 .. seealso::
3090
3091 :meth:`_engine.Connection.execution_options`
3092 - update execution options
3093 on a :class:`_engine.Connection` object.
3094
3095 :meth:`_engine.Engine.update_execution_options`
3096 - update the execution
3097 options for a given :class:`_engine.Engine` in place.
3098
3099 :meth:`_engine.Engine.get_execution_options`
3100
3101
3102 """ # noqa: E501
3103 return self._option_cls(self, opt)
3104
3105 def get_execution_options(self) -> _ExecuteOptions:
3106 """Get the non-SQL options which will take effect during execution.
3107
3108 .. seealso::
3109
3110 :meth:`_engine.Engine.execution_options`
3111 """
3112 return self._execution_options
3113
3114 @property
3115 def name(self) -> str:
3116 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3117 in use by this :class:`Engine`.
3118
3119 """
3120
3121 return self.dialect.name
3122
3123 @property
3124 def driver(self) -> str:
3125 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3126 in use by this :class:`Engine`.
3127
3128 """
3129
3130 return self.dialect.driver
3131
3132 echo = log.echo_property()
3133
3134 def __repr__(self) -> str:
3135 return "Engine(%r)" % (self.url,)
3136
3137 def dispose(self, close: bool = True) -> None:
3138 """Dispose of the connection pool used by this
3139 :class:`_engine.Engine`.
3140
3141 A new connection pool is created immediately after the old one has been
3142 disposed. The previous connection pool is disposed either actively, by
3143 closing out all currently checked-in connections in that pool, or
3144 passively, by losing references to it but otherwise not closing any
3145 connections. The latter strategy is more appropriate for an initializer
3146 in a forked Python process.
3147
3148 Event listeners associated with the old pool via :class:`.PoolEvents`
3149 are **transferred to the new pool**; this is to support the pattern
3150 by which :class:`.PoolEvents` are set up in terms of the owning
3151 :class:`.Engine` without the need to refer to the :class:`.Pool`
3152 directly.
3153
3154 :param close: if left at its default of ``True``, has the
3155 effect of fully closing all **currently checked in**
3156 database connections. Connections that are still checked out
3157 will **not** be closed, however they will no longer be associated
3158 with this :class:`_engine.Engine`,
3159 so when they are closed individually, eventually the
3160 :class:`_pool.Pool` which they are associated with will
3161 be garbage collected and they will be closed out fully, if
3162 not already closed on checkin.
3163
3164 If set to ``False``, the previous connection pool is de-referenced,
3165 and otherwise not touched in any way.
3166
3167 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
3168 parameter to allow the replacement of a connection pool in a child
3169 process without interfering with the connections used by the parent
3170 process.
3171
3172
3173 .. seealso::
3174
3175 :ref:`engine_disposal`
3176
3177 :ref:`pooling_multiprocessing`
3178
3179 :meth:`.ConnectionEvents.engine_disposed`
3180
3181 """
3182 if close:
3183 self.pool.dispose()
3184 self.pool = self.pool.recreate()
3185 self.dispatch.engine_disposed(self)
3186
3187 @contextlib.contextmanager
3188 def _optional_conn_ctx_manager(
3189 self, connection: Optional[Connection] = None
3190 ) -> Iterator[Connection]:
3191 if connection is None:
3192 with self.connect() as conn:
3193 yield conn
3194 else:
3195 yield connection
3196
3197 @contextlib.contextmanager
3198 def begin(self) -> Iterator[Connection]:
3199 """Return a context manager delivering a :class:`_engine.Connection`
3200 with a :class:`.Transaction` established.
3201
3202 E.g.::
3203
3204 with engine.begin() as conn:
3205 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)"))
3206 conn.execute(text("my_special_procedure(5)"))
3207
3208 Upon successful operation, the :class:`.Transaction`
3209 is committed. If an error is raised, the :class:`.Transaction`
3210 is rolled back.
3211
3212 .. seealso::
3213
3214 :meth:`_engine.Engine.connect` - procure a
3215 :class:`_engine.Connection` from
3216 an :class:`_engine.Engine`.
3217
3218 :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
3219 for a particular :class:`_engine.Connection`.
3220
3221 """ # noqa: E501
3222 with self.connect() as conn:
3223 with conn.begin():
3224 yield conn
3225
3226 def _run_ddl_visitor(
3227 self,
3228 visitorcallable: Type[InvokeDDLBase],
3229 element: SchemaVisitable,
3230 **kwargs: Any,
3231 ) -> None:
3232 with self.begin() as conn:
3233 conn._run_ddl_visitor(visitorcallable, element, **kwargs)
3234
3235 def connect(self) -> Connection:
3236 """Return a new :class:`_engine.Connection` object.
3237
3238 The :class:`_engine.Connection` acts as a Python context manager, so
3239 the typical use of this method looks like::
3240
3241 with engine.connect() as connection:
3242 connection.execute(text("insert into table values ('foo')"))
3243 connection.commit()
3244
3245 Where above, after the block is completed, the connection is "closed"
3246 and its underlying DBAPI resources are returned to the connection pool.
3247 This also has the effect of rolling back any transaction that
3248 was explicitly begun or was begun via autobegin, and will
3249 emit the :meth:`_events.ConnectionEvents.rollback` event if one was
3250 started and is still in progress.
3251
3252 .. seealso::
3253
3254 :meth:`_engine.Engine.begin`
3255
3256 """
3257
3258 return self._connection_cls(self)
3259
3260 def raw_connection(self) -> PoolProxiedConnection:
3261 """Return a "raw" DBAPI connection from the connection pool.
3262
3263 The returned object is a proxied version of the DBAPI
3264 connection object used by the underlying driver in use.
3265 The object will have all the same behavior as the real DBAPI
3266 connection, except that its ``close()`` method will result in the
3267 connection being returned to the pool, rather than being closed
3268 for real.
3269
3270 This method provides direct DBAPI connection access for
3271 special situations when the API provided by
3272 :class:`_engine.Connection`
3273 is not needed. When a :class:`_engine.Connection` object is already
3274 present, the DBAPI connection is available using
3275 the :attr:`_engine.Connection.connection` accessor.
3276
3277 .. seealso::
3278
3279 :ref:`dbapi_connections`
3280
3281 """
3282 return self.pool.connect()
3283
3284
3285class OptionEngineMixin(log.Identified):
3286 _sa_propagate_class_events = False
3287
3288 dispatch: dispatcher[ConnectionEventsTarget]
3289 _compiled_cache: Optional[CompiledCacheType]
3290 dialect: Dialect
3291 pool: Pool
3292 url: URL
3293 hide_parameters: bool
3294 echo: log.echo_property
3295
3296 def __init__(
3297 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter
3298 ):
3299 self._proxied = proxied
3300 self.url = proxied.url
3301 self.dialect = proxied.dialect
3302 self.logging_name = proxied.logging_name
3303 self.echo = proxied.echo
3304 self._compiled_cache = proxied._compiled_cache
3305 self.hide_parameters = proxied.hide_parameters
3306 log.instance_logger(self, echoflag=self.echo)
3307
3308 # note: this will propagate events that are assigned to the parent
3309 # engine after this OptionEngine is created. Since we share
3310 # the events of the parent we also disallow class-level events
3311 # to apply to the OptionEngine class directly.
3312 #
3313 # the other way this can work would be to transfer existing
3314 # events only, using:
3315 # self.dispatch._update(proxied.dispatch)
3316 #
3317 # that might be more appropriate however it would be a behavioral
3318 # change for logic that assigns events to the parent engine and
3319 # would like it to take effect for the already-created sub-engine.
3320 self.dispatch = self.dispatch._join(proxied.dispatch)
3321
3322 self._execution_options = proxied._execution_options
3323 self.update_execution_options(**execution_options)
3324
3325 def update_execution_options(self, **opt: Any) -> None:
3326 raise NotImplementedError()
3327
3328 if not typing.TYPE_CHECKING:
3329 # https://github.com/python/typing/discussions/1095
3330
3331 @property
3332 def pool(self) -> Pool:
3333 return self._proxied.pool
3334
3335 @pool.setter
3336 def pool(self, pool: Pool) -> None:
3337 self._proxied.pool = pool
3338
3339 @property
3340 def _has_events(self) -> bool:
3341 return self._proxied._has_events or self.__dict__.get(
3342 "_has_events", False
3343 )
3344
3345 @_has_events.setter
3346 def _has_events(self, value: bool) -> None:
3347 self.__dict__["_has_events"] = value
3348
3349
3350class OptionEngine(OptionEngineMixin, Engine):
3351 def update_execution_options(self, **opt: Any) -> None:
3352 Engine.update_execution_options(self, **opt)
3353
3354
3355Engine._option_cls = OptionEngine