1# engine/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`."""
8from __future__ import annotations
9
10import contextlib
11import sys
12import typing
13from typing import Any
14from typing import Callable
15from typing import cast
16from typing import Iterable
17from typing import Iterator
18from typing import List
19from typing import Mapping
20from typing import NoReturn
21from typing import Optional
22from typing import overload
23from typing import Tuple
24from typing import Type
25from typing import TypeVar
26from typing import Union
27
28from .interfaces import BindTyping
29from .interfaces import ConnectionEventsTarget
30from .interfaces import DBAPICursor
31from .interfaces import ExceptionContext
32from .interfaces import ExecuteStyle
33from .interfaces import ExecutionContext
34from .interfaces import IsolationLevel
35from .util import _distill_params_20
36from .util import _distill_raw_params
37from .util import TransactionalContext
38from .. import exc
39from .. import inspection
40from .. import log
41from .. import util
42from ..sql import compiler
43from ..sql import util as sql_util
44
45if typing.TYPE_CHECKING:
46 from . import CursorResult
47 from . import ScalarResult
48 from .interfaces import _AnyExecuteParams
49 from .interfaces import _AnyMultiExecuteParams
50 from .interfaces import _CoreAnyExecuteParams
51 from .interfaces import _CoreMultiExecuteParams
52 from .interfaces import _CoreSingleExecuteParams
53 from .interfaces import _DBAPIAnyExecuteParams
54 from .interfaces import _DBAPISingleExecuteParams
55 from .interfaces import _ExecuteOptions
56 from .interfaces import CompiledCacheType
57 from .interfaces import CoreExecuteOptionsParameter
58 from .interfaces import Dialect
59 from .interfaces import SchemaTranslateMapType
60 from .reflection import Inspector # noqa
61 from .url import URL
62 from ..event import dispatcher
63 from ..log import _EchoFlagType
64 from ..pool import _ConnectionFairy
65 from ..pool import Pool
66 from ..pool import PoolProxiedConnection
67 from ..sql import Executable
68 from ..sql._typing import _InfoType
69 from ..sql.compiler import Compiled
70 from ..sql.ddl import ExecutableDDLElement
71 from ..sql.ddl import InvokeDDLBase
72 from ..sql.functions import FunctionElement
73 from ..sql.schema import DefaultGenerator
74 from ..sql.schema import HasSchemaAttr
75 from ..sql.schema import SchemaVisitable
76 from ..sql.selectable import TypedReturnsRows
77
78
79_T = TypeVar("_T", bound=Any)
80_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
81NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
82
83
84class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
85 """Provides high-level functionality for a wrapped DB-API connection.
86
87 The :class:`_engine.Connection` object is procured by calling the
88 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
89 object, and provides services for execution of SQL statements as well
90 as transaction control.
91
92 The Connection object is **not** thread-safe. While a Connection can be
93 shared among threads using properly synchronized access, it is still
94 possible that the underlying DBAPI connection may not support shared
95 access between threads. Check the DBAPI documentation for details.
96
97 The Connection object represents a single DBAPI connection checked out
98 from the connection pool. In this state, the connection pool has no
99 affect upon the connection, including its expiration or timeout state.
100 For the connection pool to properly manage connections, connections
101 should be returned to the connection pool (i.e. ``connection.close()``)
102 whenever the connection is not in use.
103
104 .. index::
105 single: thread safety; Connection
106
107 """
108
109 dialect: Dialect
110 dispatch: dispatcher[ConnectionEventsTarget]
111
112 _sqla_logger_namespace = "sqlalchemy.engine.Connection"
113
114 # used by sqlalchemy.engine.util.TransactionalContext
115 _trans_context_manager: Optional[TransactionalContext] = None
116
117 # legacy as of 2.0, should be eventually deprecated and
118 # removed. was used in the "pre_ping" recipe that's been in the docs
119 # a long time
120 should_close_with_result = False
121
122 _dbapi_connection: Optional[PoolProxiedConnection]
123
124 _execution_options: _ExecuteOptions
125
126 _transaction: Optional[RootTransaction]
127 _nested_transaction: Optional[NestedTransaction]
128
129 def __init__(
130 self,
131 engine: Engine,
132 connection: Optional[PoolProxiedConnection] = None,
133 _has_events: Optional[bool] = None,
134 _allow_revalidate: bool = True,
135 _allow_autobegin: bool = True,
136 ):
137 """Construct a new Connection."""
138 self.engine = engine
139 self.dialect = dialect = engine.dialect
140
141 if connection is None:
142 try:
143 self._dbapi_connection = engine.raw_connection()
144 except dialect.loaded_dbapi.Error as err:
145 Connection._handle_dbapi_exception_noconnection(
146 err, dialect, engine
147 )
148 raise
149 else:
150 self._dbapi_connection = connection
151
152 self._transaction = self._nested_transaction = None
153 self.__savepoint_seq = 0
154 self.__in_begin = False
155
156 self.__can_reconnect = _allow_revalidate
157 self._allow_autobegin = _allow_autobegin
158 self._echo = self.engine._should_log_info()
159
160 if _has_events is None:
161 # if _has_events is sent explicitly as False,
162 # then don't join the dispatch of the engine; we don't
163 # want to handle any of the engine's events in that case.
164 self.dispatch = self.dispatch._join(engine.dispatch)
165 self._has_events = _has_events or (
166 _has_events is None and engine._has_events
167 )
168
169 self._execution_options = engine._execution_options
170
171 if self._has_events or self.engine._has_events:
172 self.dispatch.engine_connect(self)
173
174 # this can be assigned differently via
175 # characteristics.LoggingTokenCharacteristic
176 _message_formatter: Any = None
177
178 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
179 fmt = self._message_formatter
180
181 if fmt:
182 message = fmt(message)
183
184 if log.STACKLEVEL:
185 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
186
187 self.engine.logger.info(message, *arg, **kw)
188
189 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
190 fmt = self._message_formatter
191
192 if fmt:
193 message = fmt(message)
194
195 if log.STACKLEVEL:
196 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
197
198 self.engine.logger.debug(message, *arg, **kw)
199
200 @property
201 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]:
202 schema_translate_map: Optional[SchemaTranslateMapType] = (
203 self._execution_options.get("schema_translate_map", None)
204 )
205
206 return schema_translate_map
207
208 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
209 """Return the schema name for the given schema item taking into
210 account current schema translate map.
211
212 """
213
214 name = obj.schema
215 schema_translate_map: Optional[SchemaTranslateMapType] = (
216 self._execution_options.get("schema_translate_map", None)
217 )
218
219 if (
220 schema_translate_map
221 and name in schema_translate_map
222 and obj._use_schema_map
223 ):
224 return schema_translate_map[name]
225 else:
226 return name
227
228 def __enter__(self) -> Connection:
229 return self
230
231 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
232 self.close()
233
234 @overload
235 def execution_options(
236 self,
237 *,
238 compiled_cache: Optional[CompiledCacheType] = ...,
239 logging_token: str = ...,
240 isolation_level: IsolationLevel = ...,
241 no_parameters: bool = False,
242 stream_results: bool = False,
243 max_row_buffer: int = ...,
244 yield_per: int = ...,
245 insertmanyvalues_page_size: int = ...,
246 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
247 preserve_rowcount: bool = False,
248 **opt: Any,
249 ) -> Connection: ...
250
251 @overload
252 def execution_options(self, **opt: Any) -> Connection: ...
253
254 def execution_options(self, **opt: Any) -> Connection:
255 r"""Set non-SQL options for the connection which take effect
256 during execution.
257
258 This method modifies this :class:`_engine.Connection` **in-place**;
259 the return value is the same :class:`_engine.Connection` object
260 upon which the method is called. Note that this is in contrast
261 to the behavior of the ``execution_options`` methods on other
262 objects such as :meth:`_engine.Engine.execution_options` and
263 :meth:`_sql.Executable.execution_options`. The rationale is that many
264 such execution options necessarily modify the state of the base
265 DBAPI connection in any case so there is no feasible means of
266 keeping the effect of such an option localized to a "sub" connection.
267
268 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options`
269 method, in contrast to other objects with this method, modifies
270 the connection in-place without creating copy of it.
271
272 As discussed elsewhere, the :meth:`_engine.Connection.execution_options`
273 method accepts any arbitrary parameters including user defined names.
274 All parameters given are consumable in a number of ways including
275 by using the :meth:`_engine.Connection.get_execution_options` method.
276 See the examples at :meth:`_sql.Executable.execution_options`
277 and :meth:`_engine.Engine.execution_options`.
278
279 The keywords that are currently recognized by SQLAlchemy itself
280 include all those listed under :meth:`.Executable.execution_options`,
281 as well as others that are specific to :class:`_engine.Connection`.
282
283 :param compiled_cache: Available on: :class:`_engine.Connection`,
284 :class:`_engine.Engine`.
285
286 A dictionary where :class:`.Compiled` objects
287 will be cached when the :class:`_engine.Connection`
288 compiles a clause
289 expression into a :class:`.Compiled` object. This dictionary will
290 supersede the statement cache that may be configured on the
291 :class:`_engine.Engine` itself. If set to None, caching
292 is disabled, even if the engine has a configured cache size.
293
294 Note that the ORM makes use of its own "compiled" caches for
295 some operations, including flush operations. The caching
296 used by the ORM internally supersedes a cache dictionary
297 specified here.
298
299 :param logging_token: Available on: :class:`_engine.Connection`,
300 :class:`_engine.Engine`, :class:`_sql.Executable`.
301
302 Adds the specified string token surrounded by brackets in log
303 messages logged by the connection, i.e. the logging that's enabled
304 either via the :paramref:`_sa.create_engine.echo` flag or via the
305 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
306 per-connection or per-sub-engine token to be available which is
307 useful for debugging concurrent connection scenarios.
308
309 .. versionadded:: 1.4.0b2
310
311 .. seealso::
312
313 :ref:`dbengine_logging_tokens` - usage example
314
315 :paramref:`_sa.create_engine.logging_name` - adds a name to the
316 name used by the Python logger object itself.
317
318 :param isolation_level: Available on: :class:`_engine.Connection`,
319 :class:`_engine.Engine`.
320
321 Set the transaction isolation level for the lifespan of this
322 :class:`_engine.Connection` object.
323 Valid values include those string
324 values accepted by the :paramref:`_sa.create_engine.isolation_level`
325 parameter passed to :func:`_sa.create_engine`. These levels are
326 semi-database specific; see individual dialect documentation for
327 valid levels.
328
329 The isolation level option applies the isolation level by emitting
330 statements on the DBAPI connection, and **necessarily affects the
331 original Connection object overall**. The isolation level will remain
332 at the given setting until explicitly changed, or when the DBAPI
333 connection itself is :term:`released` to the connection pool, i.e. the
334 :meth:`_engine.Connection.close` method is called, at which time an
335 event handler will emit additional statements on the DBAPI connection
336 in order to revert the isolation level change.
337
338 .. note:: The ``isolation_level`` execution option may only be
339 established before the :meth:`_engine.Connection.begin` method is
340 called, as well as before any SQL statements are emitted which
341 would otherwise trigger "autobegin", or directly after a call to
342 :meth:`_engine.Connection.commit` or
343 :meth:`_engine.Connection.rollback`. A database cannot change the
344 isolation level on a transaction in progress.
345
346 .. note:: The ``isolation_level`` execution option is implicitly
347 reset if the :class:`_engine.Connection` is invalidated, e.g. via
348 the :meth:`_engine.Connection.invalidate` method, or if a
349 disconnection error occurs. The new connection produced after the
350 invalidation will **not** have the selected isolation level
351 re-applied to it automatically.
352
353 .. seealso::
354
355 :ref:`dbapi_autocommit`
356
357 :meth:`_engine.Connection.get_isolation_level`
358 - view current actual level
359
360 :param no_parameters: Available on: :class:`_engine.Connection`,
361 :class:`_sql.Executable`.
362
363 When ``True``, if the final parameter
364 list or dictionary is totally empty, will invoke the
365 statement on the cursor as ``cursor.execute(statement)``,
366 not passing the parameter collection at all.
367 Some DBAPIs such as psycopg2 and mysql-python consider
368 percent signs as significant only when parameters are
369 present; this option allows code to generate SQL
370 containing percent signs (and possibly other characters)
371 that is neutral regarding whether it's executed by the DBAPI
372 or piped into a script that's later invoked by
373 command line tools.
374
375 :param stream_results: Available on: :class:`_engine.Connection`,
376 :class:`_sql.Executable`.
377
378 Indicate to the dialect that results should be "streamed" and not
379 pre-buffered, if possible. For backends such as PostgreSQL, MySQL
380 and MariaDB, this indicates the use of a "server side cursor" as
381 opposed to a client side cursor. Other backends such as that of
382 Oracle Database may already use server side cursors by default.
383
384 The usage of
385 :paramref:`_engine.Connection.execution_options.stream_results` is
386 usually combined with setting a fixed number of rows to to be fetched
387 in batches, to allow for efficient iteration of database rows while
388 at the same time not loading all result rows into memory at once;
389 this can be configured on a :class:`_engine.Result` object using the
390 :meth:`_engine.Result.yield_per` method, after execution has
391 returned a new :class:`_engine.Result`. If
392 :meth:`_engine.Result.yield_per` is not used,
393 the :paramref:`_engine.Connection.execution_options.stream_results`
394 mode of operation will instead use a dynamically sized buffer
395 which buffers sets of rows at a time, growing on each batch
396 based on a fixed growth size up until a limit which may
397 be configured using the
398 :paramref:`_engine.Connection.execution_options.max_row_buffer`
399 parameter.
400
401 When using the ORM to fetch ORM mapped objects from a result,
402 :meth:`_engine.Result.yield_per` should always be used with
403 :paramref:`_engine.Connection.execution_options.stream_results`,
404 so that the ORM does not fetch all rows into new ORM objects at once.
405
406 For typical use, the
407 :paramref:`_engine.Connection.execution_options.yield_per` execution
408 option should be preferred, which sets up both
409 :paramref:`_engine.Connection.execution_options.stream_results` and
410 :meth:`_engine.Result.yield_per` at once. This option is supported
411 both at a core level by :class:`_engine.Connection` as well as by the
412 ORM :class:`_engine.Session`; the latter is described at
413 :ref:`orm_queryguide_yield_per`.
414
415 .. seealso::
416
417 :ref:`engine_stream_results` - background on
418 :paramref:`_engine.Connection.execution_options.stream_results`
419
420 :paramref:`_engine.Connection.execution_options.max_row_buffer`
421
422 :paramref:`_engine.Connection.execution_options.yield_per`
423
424 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
425 describing the ORM version of ``yield_per``
426
427 :param max_row_buffer: Available on: :class:`_engine.Connection`,
428 :class:`_sql.Executable`. Sets a maximum
429 buffer size to use when the
430 :paramref:`_engine.Connection.execution_options.stream_results`
431 execution option is used on a backend that supports server side
432 cursors. The default value if not specified is 1000.
433
434 .. seealso::
435
436 :paramref:`_engine.Connection.execution_options.stream_results`
437
438 :ref:`engine_stream_results`
439
440
441 :param yield_per: Available on: :class:`_engine.Connection`,
442 :class:`_sql.Executable`. Integer value applied which will
443 set the :paramref:`_engine.Connection.execution_options.stream_results`
444 execution option and invoke :meth:`_engine.Result.yield_per`
445 automatically at once. Allows equivalent functionality as
446 is present when using this parameter with the ORM.
447
448 .. versionadded:: 1.4.40
449
450 .. seealso::
451
452 :ref:`engine_stream_results` - background and examples
453 on using server side cursors with Core.
454
455 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
456 describing the ORM version of ``yield_per``
457
458 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`,
459 :class:`_engine.Engine`. Number of rows to format into an
460 INSERT statement when the statement uses "insertmanyvalues" mode,
461 which is a paged form of bulk insert that is used for many backends
462 when using :term:`executemany` execution typically in conjunction
463 with RETURNING. Defaults to 1000. May also be modified on a
464 per-engine basis using the
465 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
466
467 .. versionadded:: 2.0
468
469 .. seealso::
470
471 :ref:`engine_insertmanyvalues`
472
473 :param schema_translate_map: Available on: :class:`_engine.Connection`,
474 :class:`_engine.Engine`, :class:`_sql.Executable`.
475
476 A dictionary mapping schema names to schema names, that will be
477 applied to the :paramref:`_schema.Table.schema` element of each
478 :class:`_schema.Table`
479 encountered when SQL or DDL expression elements
480 are compiled into strings; the resulting schema name will be
481 converted based on presence in the map of the original name.
482
483 .. seealso::
484
485 :ref:`schema_translating`
486
487 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount``
488 attribute will be unconditionally memoized within the result and
489 made available via the :attr:`.CursorResult.rowcount` attribute.
490 Normally, this attribute is only preserved for UPDATE and DELETE
491 statements. Using this option, the DBAPIs rowcount value can
492 be accessed for other kinds of statements such as INSERT and SELECT,
493 to the degree that the DBAPI supports these statements. See
494 :attr:`.CursorResult.rowcount` for notes regarding the behavior
495 of this attribute.
496
497 .. versionadded:: 2.0.28
498
499 .. seealso::
500
501 :meth:`_engine.Engine.execution_options`
502
503 :meth:`.Executable.execution_options`
504
505 :meth:`_engine.Connection.get_execution_options`
506
507 :ref:`orm_queryguide_execution_options` - documentation on all
508 ORM-specific execution options
509
510 """ # noqa
511 if self._has_events or self.engine._has_events:
512 self.dispatch.set_connection_execution_options(self, opt)
513 self._execution_options = self._execution_options.union(opt)
514 self.dialect.set_connection_execution_options(self, opt)
515 return self
516
517 def get_execution_options(self) -> _ExecuteOptions:
518 """Get the non-SQL options which will take effect during execution.
519
520 .. versionadded:: 1.3
521
522 .. seealso::
523
524 :meth:`_engine.Connection.execution_options`
525 """
526 return self._execution_options
527
528 @property
529 def _still_open_and_dbapi_connection_is_valid(self) -> bool:
530 pool_proxied_connection = self._dbapi_connection
531 return (
532 pool_proxied_connection is not None
533 and pool_proxied_connection.is_valid
534 )
535
536 @property
537 def closed(self) -> bool:
538 """Return True if this connection is closed."""
539
540 return self._dbapi_connection is None and not self.__can_reconnect
541
542 @property
543 def invalidated(self) -> bool:
544 """Return True if this connection was invalidated.
545
546 This does not indicate whether or not the connection was
547 invalidated at the pool level, however
548
549 """
550
551 # prior to 1.4, "invalid" was stored as a state independent of
552 # "closed", meaning an invalidated connection could be "closed",
553 # the _dbapi_connection would be None and closed=True, yet the
554 # "invalid" flag would stay True. This meant that there were
555 # three separate states (open/valid, closed/valid, closed/invalid)
556 # when there is really no reason for that; a connection that's
557 # "closed" does not need to be "invalid". So the state is now
558 # represented by the two facts alone.
559
560 pool_proxied_connection = self._dbapi_connection
561 return pool_proxied_connection is None and self.__can_reconnect
562
563 @property
564 def connection(self) -> PoolProxiedConnection:
565 """The underlying DB-API connection managed by this Connection.
566
567 This is a SQLAlchemy connection-pool proxied connection
568 which then has the attribute
569 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
570 actual driver connection.
571
572 .. seealso::
573
574
575 :ref:`dbapi_connections`
576
577 """
578
579 if self._dbapi_connection is None:
580 try:
581 return self._revalidate_connection()
582 except (exc.PendingRollbackError, exc.ResourceClosedError):
583 raise
584 except BaseException as e:
585 self._handle_dbapi_exception(e, None, None, None, None)
586 else:
587 return self._dbapi_connection
588
589 def get_isolation_level(self) -> IsolationLevel:
590 """Return the current **actual** isolation level that's present on
591 the database within the scope of this connection.
592
593 This attribute will perform a live SQL operation against the database
594 in order to procure the current isolation level, so the value returned
595 is the actual level on the underlying DBAPI connection regardless of
596 how this state was set. This will be one of the four actual isolation
597 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
598 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
599 level setting. Third party dialects may also feature additional
600 isolation level settings.
601
602 .. note:: This method **will not report** on the ``AUTOCOMMIT``
603 isolation level, which is a separate :term:`dbapi` setting that's
604 independent of **actual** isolation level. When ``AUTOCOMMIT`` is
605 in use, the database connection still has a "traditional" isolation
606 mode in effect, that is typically one of the four values
607 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
608 ``SERIALIZABLE``.
609
610 Compare to the :attr:`_engine.Connection.default_isolation_level`
611 accessor which returns the isolation level that is present on the
612 database at initial connection time.
613
614 .. seealso::
615
616 :attr:`_engine.Connection.default_isolation_level`
617 - view default level
618
619 :paramref:`_sa.create_engine.isolation_level`
620 - set per :class:`_engine.Engine` isolation level
621
622 :paramref:`.Connection.execution_options.isolation_level`
623 - set per :class:`_engine.Connection` isolation level
624
625 """
626 dbapi_connection = self.connection.dbapi_connection
627 assert dbapi_connection is not None
628 try:
629 return self.dialect.get_isolation_level(dbapi_connection)
630 except BaseException as e:
631 self._handle_dbapi_exception(e, None, None, None, None)
632
633 @property
634 def default_isolation_level(self) -> Optional[IsolationLevel]:
635 """The initial-connection time isolation level associated with the
636 :class:`_engine.Dialect` in use.
637
638 This value is independent of the
639 :paramref:`.Connection.execution_options.isolation_level` and
640 :paramref:`.Engine.execution_options.isolation_level` execution
641 options, and is determined by the :class:`_engine.Dialect` when the
642 first connection is created, by performing a SQL query against the
643 database for the current isolation level before any additional commands
644 have been emitted.
645
646 Calling this accessor does not invoke any new SQL queries.
647
648 .. seealso::
649
650 :meth:`_engine.Connection.get_isolation_level`
651 - view current actual isolation level
652
653 :paramref:`_sa.create_engine.isolation_level`
654 - set per :class:`_engine.Engine` isolation level
655
656 :paramref:`.Connection.execution_options.isolation_level`
657 - set per :class:`_engine.Connection` isolation level
658
659 """
660 return self.dialect.default_isolation_level
661
662 def _invalid_transaction(self) -> NoReturn:
663 raise exc.PendingRollbackError(
664 "Can't reconnect until invalid %stransaction is rolled "
665 "back. Please rollback() fully before proceeding"
666 % ("savepoint " if self._nested_transaction is not None else ""),
667 code="8s2b",
668 )
669
670 def _revalidate_connection(self) -> PoolProxiedConnection:
671 if self.__can_reconnect and self.invalidated:
672 if self._transaction is not None:
673 self._invalid_transaction()
674 self._dbapi_connection = self.engine.raw_connection()
675 return self._dbapi_connection
676 raise exc.ResourceClosedError("This Connection is closed")
677
678 @property
679 def info(self) -> _InfoType:
680 """Info dictionary associated with the underlying DBAPI connection
681 referred to by this :class:`_engine.Connection`, allowing user-defined
682 data to be associated with the connection.
683
684 The data here will follow along with the DBAPI connection including
685 after it is returned to the connection pool and used again
686 in subsequent instances of :class:`_engine.Connection`.
687
688 """
689
690 return self.connection.info
691
692 def invalidate(self, exception: Optional[BaseException] = None) -> None:
693 """Invalidate the underlying DBAPI connection associated with
694 this :class:`_engine.Connection`.
695
696 An attempt will be made to close the underlying DBAPI connection
697 immediately; however if this operation fails, the error is logged
698 but not raised. The connection is then discarded whether or not
699 close() succeeded.
700
701 Upon the next use (where "use" typically means using the
702 :meth:`_engine.Connection.execute` method or similar),
703 this :class:`_engine.Connection` will attempt to
704 procure a new DBAPI connection using the services of the
705 :class:`_pool.Pool` as a source of connectivity (e.g.
706 a "reconnection").
707
708 If a transaction was in progress (e.g. the
709 :meth:`_engine.Connection.begin` method has been called) when
710 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
711 level all state associated with this transaction is lost, as
712 the DBAPI connection is closed. The :class:`_engine.Connection`
713 will not allow a reconnection to proceed until the
714 :class:`.Transaction` object is ended, by calling the
715 :meth:`.Transaction.rollback` method; until that point, any attempt at
716 continuing to use the :class:`_engine.Connection` will raise an
717 :class:`~sqlalchemy.exc.InvalidRequestError`.
718 This is to prevent applications from accidentally
719 continuing an ongoing transactional operations despite the
720 fact that the transaction has been lost due to an
721 invalidation.
722
723 The :meth:`_engine.Connection.invalidate` method,
724 just like auto-invalidation,
725 will at the connection pool level invoke the
726 :meth:`_events.PoolEvents.invalidate` event.
727
728 :param exception: an optional ``Exception`` instance that's the
729 reason for the invalidation. is passed along to event handlers
730 and logging functions.
731
732 .. seealso::
733
734 :ref:`pool_connection_invalidation`
735
736 """
737
738 if self.invalidated:
739 return
740
741 if self.closed:
742 raise exc.ResourceClosedError("This Connection is closed")
743
744 if self._still_open_and_dbapi_connection_is_valid:
745 pool_proxied_connection = self._dbapi_connection
746 assert pool_proxied_connection is not None
747 pool_proxied_connection.invalidate(exception)
748
749 self._dbapi_connection = None
750
751 def detach(self) -> None:
752 """Detach the underlying DB-API connection from its connection pool.
753
754 E.g.::
755
756 with engine.connect() as conn:
757 conn.detach()
758 conn.execute(text("SET search_path TO schema1, schema2"))
759
760 # work with connection
761
762 # connection is fully closed (since we used "with:", can
763 # also call .close())
764
765 This :class:`_engine.Connection` instance will remain usable.
766 When closed
767 (or exited from a context manager context as above),
768 the DB-API connection will be literally closed and not
769 returned to its originating pool.
770
771 This method can be used to insulate the rest of an application
772 from a modified state on a connection (such as a transaction
773 isolation level or similar).
774
775 """
776
777 if self.closed:
778 raise exc.ResourceClosedError("This Connection is closed")
779
780 pool_proxied_connection = self._dbapi_connection
781 if pool_proxied_connection is None:
782 raise exc.InvalidRequestError(
783 "Can't detach an invalidated Connection"
784 )
785 pool_proxied_connection.detach()
786
787 def _autobegin(self) -> None:
788 if self._allow_autobegin and not self.__in_begin:
789 self.begin()
790
791 def begin(self) -> RootTransaction:
792 """Begin a transaction prior to autobegin occurring.
793
794 E.g.::
795
796 with engine.connect() as conn:
797 with conn.begin() as trans:
798 conn.execute(table.insert(), {"username": "sandy"})
799
800 The returned object is an instance of :class:`_engine.RootTransaction`.
801 This object represents the "scope" of the transaction,
802 which completes when either the :meth:`_engine.Transaction.rollback`
803 or :meth:`_engine.Transaction.commit` method is called; the object
804 also works as a context manager as illustrated above.
805
806 The :meth:`_engine.Connection.begin` method begins a
807 transaction that normally will be begun in any case when the connection
808 is first used to execute a statement. The reason this method might be
809 used would be to invoke the :meth:`_events.ConnectionEvents.begin`
810 event at a specific time, or to organize code within the scope of a
811 connection checkout in terms of context managed blocks, such as::
812
813 with engine.connect() as conn:
814 with conn.begin():
815 conn.execute(...)
816 conn.execute(...)
817
818 with conn.begin():
819 conn.execute(...)
820 conn.execute(...)
821
822 The above code is not fundamentally any different in its behavior than
823 the following code which does not use
824 :meth:`_engine.Connection.begin`; the below style is known
825 as "commit as you go" style::
826
827 with engine.connect() as conn:
828 conn.execute(...)
829 conn.execute(...)
830 conn.commit()
831
832 conn.execute(...)
833 conn.execute(...)
834 conn.commit()
835
836 From a database point of view, the :meth:`_engine.Connection.begin`
837 method does not emit any SQL or change the state of the underlying
838 DBAPI connection in any way; the Python DBAPI does not have any
839 concept of explicit transaction begin.
840
841 .. seealso::
842
843 :ref:`tutorial_working_with_transactions` - in the
844 :ref:`unified_tutorial`
845
846 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
847
848 :meth:`_engine.Connection.begin_twophase` -
849 use a two phase /XID transaction
850
851 :meth:`_engine.Engine.begin` - context manager available from
852 :class:`_engine.Engine`
853
854 """
855 if self._transaction is None:
856 self._transaction = RootTransaction(self)
857 return self._transaction
858 else:
859 raise exc.InvalidRequestError(
860 "This connection has already initialized a SQLAlchemy "
861 "Transaction() object via begin() or autobegin; can't "
862 "call begin() here unless rollback() or commit() "
863 "is called first."
864 )
865
866 def begin_nested(self) -> NestedTransaction:
867 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
868 handle that controls the scope of the SAVEPOINT.
869
870 E.g.::
871
872 with engine.begin() as connection:
873 with connection.begin_nested():
874 connection.execute(table.insert(), {"username": "sandy"})
875
876 The returned object is an instance of
877 :class:`_engine.NestedTransaction`, which includes transactional
878 methods :meth:`_engine.NestedTransaction.commit` and
879 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction,
880 these methods correspond to the operations "RELEASE SAVEPOINT <name>"
881 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local
882 to the :class:`_engine.NestedTransaction` object and is generated
883 automatically. Like any other :class:`_engine.Transaction`, the
884 :class:`_engine.NestedTransaction` may be used as a context manager as
885 illustrated above which will "release" or "rollback" corresponding to
886 if the operation within the block were successful or raised an
887 exception.
888
889 Nested transactions require SAVEPOINT support in the underlying
890 database, else the behavior is undefined. SAVEPOINT is commonly used to
891 run operations within a transaction that may fail, while continuing the
892 outer transaction. E.g.::
893
894 from sqlalchemy import exc
895
896 with engine.begin() as connection:
897 trans = connection.begin_nested()
898 try:
899 connection.execute(table.insert(), {"username": "sandy"})
900 trans.commit()
901 except exc.IntegrityError: # catch for duplicate username
902 trans.rollback() # rollback to savepoint
903
904 # outer transaction continues
905 connection.execute(...)
906
907 If :meth:`_engine.Connection.begin_nested` is called without first
908 calling :meth:`_engine.Connection.begin` or
909 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object
910 will "autobegin" the outer transaction first. This outer transaction
911 may be committed using "commit-as-you-go" style, e.g.::
912
913 with engine.connect() as connection: # begin() wasn't called
914
915 with connection.begin_nested(): # will auto-"begin()" first
916 connection.execute(...)
917 # savepoint is released
918
919 connection.execute(...)
920
921 # explicitly commit outer transaction
922 connection.commit()
923
924 # can continue working with connection here
925
926 .. versionchanged:: 2.0
927
928 :meth:`_engine.Connection.begin_nested` will now participate
929 in the connection "autobegin" behavior that is new as of
930 2.0 / "future" style connections in 1.4.
931
932 .. seealso::
933
934 :meth:`_engine.Connection.begin`
935
936 :ref:`session_begin_nested` - ORM support for SAVEPOINT
937
938 """
939 if self._transaction is None:
940 self._autobegin()
941
942 return NestedTransaction(self)
943
944 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
945 """Begin a two-phase or XA transaction and return a transaction
946 handle.
947
948 The returned object is an instance of :class:`.TwoPhaseTransaction`,
949 which in addition to the methods provided by
950 :class:`.Transaction`, also provides a
951 :meth:`~.TwoPhaseTransaction.prepare` method.
952
953 :param xid: the two phase transaction id. If not supplied, a
954 random id will be generated.
955
956 .. seealso::
957
958 :meth:`_engine.Connection.begin`
959
960 :meth:`_engine.Connection.begin_twophase`
961
962 """
963
964 if self._transaction is not None:
965 raise exc.InvalidRequestError(
966 "Cannot start a two phase transaction when a transaction "
967 "is already in progress."
968 )
969 if xid is None:
970 xid = self.engine.dialect.create_xid()
971 return TwoPhaseTransaction(self, xid)
972
973 def commit(self) -> None:
974 """Commit the transaction that is currently in progress.
975
976 This method commits the current transaction if one has been started.
977 If no transaction was started, the method has no effect, assuming
978 the connection is in a non-invalidated state.
979
980 A transaction is begun on a :class:`_engine.Connection` automatically
981 whenever a statement is first executed, or when the
982 :meth:`_engine.Connection.begin` method is called.
983
984 .. note:: The :meth:`_engine.Connection.commit` method only acts upon
985 the primary database transaction that is linked to the
986 :class:`_engine.Connection` object. It does not operate upon a
987 SAVEPOINT that would have been invoked from the
988 :meth:`_engine.Connection.begin_nested` method; for control of a
989 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
990 :class:`_engine.NestedTransaction` that is returned by the
991 :meth:`_engine.Connection.begin_nested` method itself.
992
993
994 """
995 if self._transaction:
996 self._transaction.commit()
997
998 def rollback(self) -> None:
999 """Roll back the transaction that is currently in progress.
1000
1001 This method rolls back the current transaction if one has been started.
1002 If no transaction was started, the method has no effect. If a
1003 transaction was started and the connection is in an invalidated state,
1004 the transaction is cleared using this method.
1005
1006 A transaction is begun on a :class:`_engine.Connection` automatically
1007 whenever a statement is first executed, or when the
1008 :meth:`_engine.Connection.begin` method is called.
1009
1010 .. note:: The :meth:`_engine.Connection.rollback` method only acts
1011 upon the primary database transaction that is linked to the
1012 :class:`_engine.Connection` object. It does not operate upon a
1013 SAVEPOINT that would have been invoked from the
1014 :meth:`_engine.Connection.begin_nested` method; for control of a
1015 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
1016 :class:`_engine.NestedTransaction` that is returned by the
1017 :meth:`_engine.Connection.begin_nested` method itself.
1018
1019
1020 """
1021 if self._transaction:
1022 self._transaction.rollback()
1023
1024 def recover_twophase(self) -> List[Any]:
1025 return self.engine.dialect.do_recover_twophase(self)
1026
1027 def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
1028 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
1029
1030 def commit_prepared(self, xid: Any, recover: bool = False) -> None:
1031 self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
1032
1033 def in_transaction(self) -> bool:
1034 """Return True if a transaction is in progress."""
1035 return self._transaction is not None and self._transaction.is_active
1036
1037 def in_nested_transaction(self) -> bool:
1038 """Return True if a transaction is in progress."""
1039 return (
1040 self._nested_transaction is not None
1041 and self._nested_transaction.is_active
1042 )
1043
1044 def _is_autocommit_isolation(self) -> bool:
1045 opt_iso = self._execution_options.get("isolation_level", None)
1046 return bool(
1047 opt_iso == "AUTOCOMMIT"
1048 or (
1049 opt_iso is None
1050 and self.engine.dialect._on_connect_isolation_level
1051 == "AUTOCOMMIT"
1052 )
1053 )
1054
1055 def _get_required_transaction(self) -> RootTransaction:
1056 trans = self._transaction
1057 if trans is None:
1058 raise exc.InvalidRequestError("connection is not in a transaction")
1059 return trans
1060
1061 def _get_required_nested_transaction(self) -> NestedTransaction:
1062 trans = self._nested_transaction
1063 if trans is None:
1064 raise exc.InvalidRequestError(
1065 "connection is not in a nested transaction"
1066 )
1067 return trans
1068
1069 def get_transaction(self) -> Optional[RootTransaction]:
1070 """Return the current root transaction in progress, if any.
1071
1072 .. versionadded:: 1.4
1073
1074 """
1075
1076 return self._transaction
1077
1078 def get_nested_transaction(self) -> Optional[NestedTransaction]:
1079 """Return the current nested transaction in progress, if any.
1080
1081 .. versionadded:: 1.4
1082
1083 """
1084 return self._nested_transaction
1085
1086 def _begin_impl(self, transaction: RootTransaction) -> None:
1087 if self._echo:
1088 if self._is_autocommit_isolation():
1089 self._log_info(
1090 "BEGIN (implicit; DBAPI should not BEGIN due to "
1091 "autocommit mode)"
1092 )
1093 else:
1094 self._log_info("BEGIN (implicit)")
1095
1096 self.__in_begin = True
1097
1098 if self._has_events or self.engine._has_events:
1099 self.dispatch.begin(self)
1100
1101 try:
1102 self.engine.dialect.do_begin(self.connection)
1103 except BaseException as e:
1104 self._handle_dbapi_exception(e, None, None, None, None)
1105 finally:
1106 self.__in_begin = False
1107
1108 def _rollback_impl(self) -> None:
1109 if self._has_events or self.engine._has_events:
1110 self.dispatch.rollback(self)
1111
1112 if self._still_open_and_dbapi_connection_is_valid:
1113 if self._echo:
1114 if self._is_autocommit_isolation():
1115 if self.dialect.skip_autocommit_rollback:
1116 self._log_info(
1117 "ROLLBACK will be skipped by "
1118 "skip_autocommit_rollback"
1119 )
1120 else:
1121 self._log_info(
1122 "ROLLBACK using DBAPI connection.rollback(); "
1123 "set skip_autocommit_rollback to prevent fully"
1124 )
1125 else:
1126 self._log_info("ROLLBACK")
1127 try:
1128 self.engine.dialect.do_rollback(self.connection)
1129 except BaseException as e:
1130 self._handle_dbapi_exception(e, None, None, None, None)
1131
1132 def _commit_impl(self) -> None:
1133 if self._has_events or self.engine._has_events:
1134 self.dispatch.commit(self)
1135
1136 if self._echo:
1137 if self._is_autocommit_isolation():
1138 self._log_info(
1139 "COMMIT using DBAPI connection.commit(), "
1140 "has no effect due to autocommit mode"
1141 )
1142 else:
1143 self._log_info("COMMIT")
1144 try:
1145 self.engine.dialect.do_commit(self.connection)
1146 except BaseException as e:
1147 self._handle_dbapi_exception(e, None, None, None, None)
1148
1149 def _savepoint_impl(self, name: Optional[str] = None) -> str:
1150 if self._has_events or self.engine._has_events:
1151 self.dispatch.savepoint(self, name)
1152
1153 if name is None:
1154 self.__savepoint_seq += 1
1155 name = "sa_savepoint_%s" % self.__savepoint_seq
1156 self.engine.dialect.do_savepoint(self, name)
1157 return name
1158
1159 def _rollback_to_savepoint_impl(self, name: str) -> None:
1160 if self._has_events or self.engine._has_events:
1161 self.dispatch.rollback_savepoint(self, name, None)
1162
1163 if self._still_open_and_dbapi_connection_is_valid:
1164 self.engine.dialect.do_rollback_to_savepoint(self, name)
1165
1166 def _release_savepoint_impl(self, name: str) -> None:
1167 if self._has_events or self.engine._has_events:
1168 self.dispatch.release_savepoint(self, name, None)
1169
1170 self.engine.dialect.do_release_savepoint(self, name)
1171
1172 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
1173 if self._echo:
1174 self._log_info("BEGIN TWOPHASE (implicit)")
1175 if self._has_events or self.engine._has_events:
1176 self.dispatch.begin_twophase(self, transaction.xid)
1177
1178 self.__in_begin = True
1179 try:
1180 self.engine.dialect.do_begin_twophase(self, transaction.xid)
1181 except BaseException as e:
1182 self._handle_dbapi_exception(e, None, None, None, None)
1183 finally:
1184 self.__in_begin = False
1185
1186 def _prepare_twophase_impl(self, xid: Any) -> None:
1187 if self._has_events or self.engine._has_events:
1188 self.dispatch.prepare_twophase(self, xid)
1189
1190 assert isinstance(self._transaction, TwoPhaseTransaction)
1191 try:
1192 self.engine.dialect.do_prepare_twophase(self, xid)
1193 except BaseException as e:
1194 self._handle_dbapi_exception(e, None, None, None, None)
1195
1196 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1197 if self._has_events or self.engine._has_events:
1198 self.dispatch.rollback_twophase(self, xid, is_prepared)
1199
1200 if self._still_open_and_dbapi_connection_is_valid:
1201 assert isinstance(self._transaction, TwoPhaseTransaction)
1202 try:
1203 self.engine.dialect.do_rollback_twophase(
1204 self, xid, is_prepared
1205 )
1206 except BaseException as e:
1207 self._handle_dbapi_exception(e, None, None, None, None)
1208
1209 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1210 if self._has_events or self.engine._has_events:
1211 self.dispatch.commit_twophase(self, xid, is_prepared)
1212
1213 assert isinstance(self._transaction, TwoPhaseTransaction)
1214 try:
1215 self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
1216 except BaseException as e:
1217 self._handle_dbapi_exception(e, None, None, None, None)
1218
1219 def close(self) -> None:
1220 """Close this :class:`_engine.Connection`.
1221
1222 This results in a release of the underlying database
1223 resources, that is, the DBAPI connection referenced
1224 internally. The DBAPI connection is typically restored
1225 back to the connection-holding :class:`_pool.Pool` referenced
1226 by the :class:`_engine.Engine` that produced this
1227 :class:`_engine.Connection`. Any transactional state present on
1228 the DBAPI connection is also unconditionally released via
1229 the DBAPI connection's ``rollback()`` method, regardless
1230 of any :class:`.Transaction` object that may be
1231 outstanding with regards to this :class:`_engine.Connection`.
1232
1233 This has the effect of also calling :meth:`_engine.Connection.rollback`
1234 if any transaction is in place.
1235
1236 After :meth:`_engine.Connection.close` is called, the
1237 :class:`_engine.Connection` is permanently in a closed state,
1238 and will allow no further operations.
1239
1240 """
1241
1242 if self._transaction:
1243 self._transaction.close()
1244 skip_reset = True
1245 else:
1246 skip_reset = False
1247
1248 if self._dbapi_connection is not None:
1249 conn = self._dbapi_connection
1250
1251 # as we just closed the transaction, close the connection
1252 # pool connection without doing an additional reset
1253 if skip_reset:
1254 cast("_ConnectionFairy", conn)._close_special(
1255 transaction_reset=True
1256 )
1257 else:
1258 conn.close()
1259
1260 # There is a slight chance that conn.close() may have
1261 # triggered an invalidation here in which case
1262 # _dbapi_connection would already be None, however usually
1263 # it will be non-None here and in a "closed" state.
1264 self._dbapi_connection = None
1265 self.__can_reconnect = False
1266
1267 @overload
1268 def scalar(
1269 self,
1270 statement: TypedReturnsRows[Tuple[_T]],
1271 parameters: Optional[_CoreSingleExecuteParams] = None,
1272 *,
1273 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1274 ) -> Optional[_T]: ...
1275
1276 @overload
1277 def scalar(
1278 self,
1279 statement: Executable,
1280 parameters: Optional[_CoreSingleExecuteParams] = None,
1281 *,
1282 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1283 ) -> Any: ...
1284
1285 def scalar(
1286 self,
1287 statement: Executable,
1288 parameters: Optional[_CoreSingleExecuteParams] = None,
1289 *,
1290 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1291 ) -> Any:
1292 r"""Executes a SQL statement construct and returns a scalar object.
1293
1294 This method is shorthand for invoking the
1295 :meth:`_engine.Result.scalar` method after invoking the
1296 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
1297
1298 :return: a scalar Python value representing the first column of the
1299 first row returned.
1300
1301 """
1302 distilled_parameters = _distill_params_20(parameters)
1303 try:
1304 meth = statement._execute_on_scalar
1305 except AttributeError as err:
1306 raise exc.ObjectNotExecutableError(statement) from err
1307 else:
1308 return meth(
1309 self,
1310 distilled_parameters,
1311 execution_options or NO_OPTIONS,
1312 )
1313
1314 @overload
1315 def scalars(
1316 self,
1317 statement: TypedReturnsRows[Tuple[_T]],
1318 parameters: Optional[_CoreAnyExecuteParams] = None,
1319 *,
1320 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1321 ) -> ScalarResult[_T]: ...
1322
1323 @overload
1324 def scalars(
1325 self,
1326 statement: Executable,
1327 parameters: Optional[_CoreAnyExecuteParams] = None,
1328 *,
1329 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1330 ) -> ScalarResult[Any]: ...
1331
1332 def scalars(
1333 self,
1334 statement: Executable,
1335 parameters: Optional[_CoreAnyExecuteParams] = None,
1336 *,
1337 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1338 ) -> ScalarResult[Any]:
1339 """Executes and returns a scalar result set, which yields scalar values
1340 from the first column of each row.
1341
1342 This method is equivalent to calling :meth:`_engine.Connection.execute`
1343 to receive a :class:`_result.Result` object, then invoking the
1344 :meth:`_result.Result.scalars` method to produce a
1345 :class:`_result.ScalarResult` instance.
1346
1347 :return: a :class:`_result.ScalarResult`
1348
1349 .. versionadded:: 1.4.24
1350
1351 """
1352
1353 return self.execute(
1354 statement, parameters, execution_options=execution_options
1355 ).scalars()
1356
1357 @overload
1358 def execute(
1359 self,
1360 statement: TypedReturnsRows[_T],
1361 parameters: Optional[_CoreAnyExecuteParams] = None,
1362 *,
1363 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1364 ) -> CursorResult[_T]: ...
1365
1366 @overload
1367 def execute(
1368 self,
1369 statement: Executable,
1370 parameters: Optional[_CoreAnyExecuteParams] = None,
1371 *,
1372 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1373 ) -> CursorResult[Any]: ...
1374
1375 def execute(
1376 self,
1377 statement: Executable,
1378 parameters: Optional[_CoreAnyExecuteParams] = None,
1379 *,
1380 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1381 ) -> CursorResult[Any]:
1382 r"""Executes a SQL statement construct and returns a
1383 :class:`_engine.CursorResult`.
1384
1385 :param statement: The statement to be executed. This is always
1386 an object that is in both the :class:`_expression.ClauseElement` and
1387 :class:`_expression.Executable` hierarchies, including:
1388
1389 * :class:`_expression.Select`
1390 * :class:`_expression.Insert`, :class:`_expression.Update`,
1391 :class:`_expression.Delete`
1392 * :class:`_expression.TextClause` and
1393 :class:`_expression.TextualSelect`
1394 * :class:`_schema.DDL` and objects which inherit from
1395 :class:`_schema.ExecutableDDLElement`
1396
1397 :param parameters: parameters which will be bound into the statement.
1398 This may be either a dictionary of parameter names to values,
1399 or a mutable sequence (e.g. a list) of dictionaries. When a
1400 list of dictionaries is passed, the underlying statement execution
1401 will make use of the DBAPI ``cursor.executemany()`` method.
1402 When a single dictionary is passed, the DBAPI ``cursor.execute()``
1403 method will be used.
1404
1405 :param execution_options: optional dictionary of execution options,
1406 which will be associated with the statement execution. This
1407 dictionary can provide a subset of the options that are accepted
1408 by :meth:`_engine.Connection.execution_options`.
1409
1410 :return: a :class:`_engine.Result` object.
1411
1412 """
1413 distilled_parameters = _distill_params_20(parameters)
1414 try:
1415 meth = statement._execute_on_connection
1416 except AttributeError as err:
1417 raise exc.ObjectNotExecutableError(statement) from err
1418 else:
1419 return meth(
1420 self,
1421 distilled_parameters,
1422 execution_options or NO_OPTIONS,
1423 )
1424
1425 def _execute_function(
1426 self,
1427 func: FunctionElement[Any],
1428 distilled_parameters: _CoreMultiExecuteParams,
1429 execution_options: CoreExecuteOptionsParameter,
1430 ) -> CursorResult[Any]:
1431 """Execute a sql.FunctionElement object."""
1432
1433 return self._execute_clauseelement(
1434 func.select(), distilled_parameters, execution_options
1435 )
1436
1437 def _execute_default(
1438 self,
1439 default: DefaultGenerator,
1440 distilled_parameters: _CoreMultiExecuteParams,
1441 execution_options: CoreExecuteOptionsParameter,
1442 ) -> Any:
1443 """Execute a schema.ColumnDefault object."""
1444
1445 execution_options = self._execution_options.merge_with(
1446 execution_options
1447 )
1448
1449 event_multiparams: Optional[_CoreMultiExecuteParams]
1450 event_params: Optional[_CoreAnyExecuteParams]
1451
1452 # note for event handlers, the "distilled parameters" which is always
1453 # a list of dicts is broken out into separate "multiparams" and
1454 # "params" collections, which allows the handler to distinguish
1455 # between an executemany and execute style set of parameters.
1456 if self._has_events or self.engine._has_events:
1457 (
1458 default,
1459 distilled_parameters,
1460 event_multiparams,
1461 event_params,
1462 ) = self._invoke_before_exec_event(
1463 default, distilled_parameters, execution_options
1464 )
1465 else:
1466 event_multiparams = event_params = None
1467
1468 try:
1469 conn = self._dbapi_connection
1470 if conn is None:
1471 conn = self._revalidate_connection()
1472
1473 dialect = self.dialect
1474 ctx = dialect.execution_ctx_cls._init_default(
1475 dialect, self, conn, execution_options
1476 )
1477 except (exc.PendingRollbackError, exc.ResourceClosedError):
1478 raise
1479 except BaseException as e:
1480 self._handle_dbapi_exception(e, None, None, None, None)
1481
1482 ret = ctx._exec_default(None, default, None)
1483
1484 if self._has_events or self.engine._has_events:
1485 self.dispatch.after_execute(
1486 self,
1487 default,
1488 event_multiparams,
1489 event_params,
1490 execution_options,
1491 ret,
1492 )
1493
1494 return ret
1495
1496 def _execute_ddl(
1497 self,
1498 ddl: ExecutableDDLElement,
1499 distilled_parameters: _CoreMultiExecuteParams,
1500 execution_options: CoreExecuteOptionsParameter,
1501 ) -> CursorResult[Any]:
1502 """Execute a schema.DDL object."""
1503
1504 exec_opts = ddl._execution_options.merge_with(
1505 self._execution_options, execution_options
1506 )
1507
1508 event_multiparams: Optional[_CoreMultiExecuteParams]
1509 event_params: Optional[_CoreSingleExecuteParams]
1510
1511 if self._has_events or self.engine._has_events:
1512 (
1513 ddl,
1514 distilled_parameters,
1515 event_multiparams,
1516 event_params,
1517 ) = self._invoke_before_exec_event(
1518 ddl, distilled_parameters, exec_opts
1519 )
1520 else:
1521 event_multiparams = event_params = None
1522
1523 schema_translate_map = exec_opts.get("schema_translate_map", None)
1524
1525 dialect = self.dialect
1526
1527 compiled = ddl.compile(
1528 dialect=dialect, schema_translate_map=schema_translate_map
1529 )
1530 ret = self._execute_context(
1531 dialect,
1532 dialect.execution_ctx_cls._init_ddl,
1533 compiled,
1534 None,
1535 exec_opts,
1536 compiled,
1537 )
1538 if self._has_events or self.engine._has_events:
1539 self.dispatch.after_execute(
1540 self,
1541 ddl,
1542 event_multiparams,
1543 event_params,
1544 exec_opts,
1545 ret,
1546 )
1547 return ret
1548
1549 def _invoke_before_exec_event(
1550 self,
1551 elem: Any,
1552 distilled_params: _CoreMultiExecuteParams,
1553 execution_options: _ExecuteOptions,
1554 ) -> Tuple[
1555 Any,
1556 _CoreMultiExecuteParams,
1557 _CoreMultiExecuteParams,
1558 _CoreSingleExecuteParams,
1559 ]:
1560 event_multiparams: _CoreMultiExecuteParams
1561 event_params: _CoreSingleExecuteParams
1562
1563 if len(distilled_params) == 1:
1564 event_multiparams, event_params = [], distilled_params[0]
1565 else:
1566 event_multiparams, event_params = distilled_params, {}
1567
1568 for fn in self.dispatch.before_execute:
1569 elem, event_multiparams, event_params = fn(
1570 self,
1571 elem,
1572 event_multiparams,
1573 event_params,
1574 execution_options,
1575 )
1576
1577 if event_multiparams:
1578 distilled_params = list(event_multiparams)
1579 if event_params:
1580 raise exc.InvalidRequestError(
1581 "Event handler can't return non-empty multiparams "
1582 "and params at the same time"
1583 )
1584 elif event_params:
1585 distilled_params = [event_params]
1586 else:
1587 distilled_params = []
1588
1589 return elem, distilled_params, event_multiparams, event_params
1590
1591 def _execute_clauseelement(
1592 self,
1593 elem: Executable,
1594 distilled_parameters: _CoreMultiExecuteParams,
1595 execution_options: CoreExecuteOptionsParameter,
1596 ) -> CursorResult[Any]:
1597 """Execute a sql.ClauseElement object."""
1598
1599 execution_options = elem._execution_options.merge_with(
1600 self._execution_options, execution_options
1601 )
1602
1603 has_events = self._has_events or self.engine._has_events
1604 if has_events:
1605 (
1606 elem,
1607 distilled_parameters,
1608 event_multiparams,
1609 event_params,
1610 ) = self._invoke_before_exec_event(
1611 elem, distilled_parameters, execution_options
1612 )
1613
1614 if distilled_parameters:
1615 # ensure we don't retain a link to the view object for keys()
1616 # which links to the values, which we don't want to cache
1617 keys = sorted(distilled_parameters[0])
1618 for_executemany = len(distilled_parameters) > 1
1619 else:
1620 keys = []
1621 for_executemany = False
1622
1623 dialect = self.dialect
1624
1625 schema_translate_map = execution_options.get(
1626 "schema_translate_map", None
1627 )
1628
1629 compiled_cache: Optional[CompiledCacheType] = execution_options.get(
1630 "compiled_cache", self.engine._compiled_cache
1631 )
1632
1633 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
1634 dialect=dialect,
1635 compiled_cache=compiled_cache,
1636 column_keys=keys,
1637 for_executemany=for_executemany,
1638 schema_translate_map=schema_translate_map,
1639 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1640 )
1641 ret = self._execute_context(
1642 dialect,
1643 dialect.execution_ctx_cls._init_compiled,
1644 compiled_sql,
1645 distilled_parameters,
1646 execution_options,
1647 compiled_sql,
1648 distilled_parameters,
1649 elem,
1650 extracted_params,
1651 cache_hit=cache_hit,
1652 )
1653 if has_events:
1654 self.dispatch.after_execute(
1655 self,
1656 elem,
1657 event_multiparams,
1658 event_params,
1659 execution_options,
1660 ret,
1661 )
1662 return ret
1663
1664 def _execute_compiled(
1665 self,
1666 compiled: Compiled,
1667 distilled_parameters: _CoreMultiExecuteParams,
1668 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
1669 ) -> CursorResult[Any]:
1670 """Execute a sql.Compiled object.
1671
1672 TODO: why do we have this? likely deprecate or remove
1673
1674 """
1675
1676 execution_options = compiled.execution_options.merge_with(
1677 self._execution_options, execution_options
1678 )
1679
1680 if self._has_events or self.engine._has_events:
1681 (
1682 compiled,
1683 distilled_parameters,
1684 event_multiparams,
1685 event_params,
1686 ) = self._invoke_before_exec_event(
1687 compiled, distilled_parameters, execution_options
1688 )
1689
1690 dialect = self.dialect
1691
1692 ret = self._execute_context(
1693 dialect,
1694 dialect.execution_ctx_cls._init_compiled,
1695 compiled,
1696 distilled_parameters,
1697 execution_options,
1698 compiled,
1699 distilled_parameters,
1700 None,
1701 None,
1702 )
1703 if self._has_events or self.engine._has_events:
1704 self.dispatch.after_execute(
1705 self,
1706 compiled,
1707 event_multiparams,
1708 event_params,
1709 execution_options,
1710 ret,
1711 )
1712 return ret
1713
1714 def exec_driver_sql(
1715 self,
1716 statement: str,
1717 parameters: Optional[_DBAPIAnyExecuteParams] = None,
1718 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1719 ) -> CursorResult[Any]:
1720 r"""Executes a string SQL statement on the DBAPI cursor directly,
1721 without any SQL compilation steps.
1722
1723 This can be used to pass any string directly to the
1724 ``cursor.execute()`` method of the DBAPI in use.
1725
1726 :param statement: The statement str to be executed. Bound parameters
1727 must use the underlying DBAPI's paramstyle, such as "qmark",
1728 "pyformat", "format", etc.
1729
1730 :param parameters: represent bound parameter values to be used in the
1731 execution. The format is one of: a dictionary of named parameters,
1732 a tuple of positional parameters, or a list containing either
1733 dictionaries or tuples for multiple-execute support.
1734
1735 :return: a :class:`_engine.CursorResult`.
1736
1737 E.g. multiple dictionaries::
1738
1739
1740 conn.exec_driver_sql(
1741 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1742 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}],
1743 )
1744
1745 Single dictionary::
1746
1747 conn.exec_driver_sql(
1748 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1749 dict(id=1, value="v1"),
1750 )
1751
1752 Single tuple::
1753
1754 conn.exec_driver_sql(
1755 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1")
1756 )
1757
1758 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
1759 not participate in the
1760 :meth:`_events.ConnectionEvents.before_execute` and
1761 :meth:`_events.ConnectionEvents.after_execute` events. To
1762 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
1763 :meth:`_events.ConnectionEvents.before_cursor_execute` and
1764 :meth:`_events.ConnectionEvents.after_cursor_execute`.
1765
1766 .. seealso::
1767
1768 :pep:`249`
1769
1770 """
1771
1772 distilled_parameters = _distill_raw_params(parameters)
1773
1774 execution_options = self._execution_options.merge_with(
1775 execution_options
1776 )
1777
1778 dialect = self.dialect
1779 ret = self._execute_context(
1780 dialect,
1781 dialect.execution_ctx_cls._init_statement,
1782 statement,
1783 None,
1784 execution_options,
1785 statement,
1786 distilled_parameters,
1787 )
1788
1789 return ret
1790
1791 def _execute_context(
1792 self,
1793 dialect: Dialect,
1794 constructor: Callable[..., ExecutionContext],
1795 statement: Union[str, Compiled],
1796 parameters: Optional[_AnyMultiExecuteParams],
1797 execution_options: _ExecuteOptions,
1798 *args: Any,
1799 **kw: Any,
1800 ) -> CursorResult[Any]:
1801 """Create an :class:`.ExecutionContext` and execute, returning
1802 a :class:`_engine.CursorResult`."""
1803
1804 if execution_options:
1805 yp = execution_options.get("yield_per", None)
1806 if yp:
1807 execution_options = execution_options.union(
1808 {"stream_results": True, "max_row_buffer": yp}
1809 )
1810 try:
1811 conn = self._dbapi_connection
1812 if conn is None:
1813 conn = self._revalidate_connection()
1814
1815 context = constructor(
1816 dialect, self, conn, execution_options, *args, **kw
1817 )
1818 except (exc.PendingRollbackError, exc.ResourceClosedError):
1819 raise
1820 except BaseException as e:
1821 self._handle_dbapi_exception(
1822 e, str(statement), parameters, None, None
1823 )
1824
1825 if (
1826 self._transaction
1827 and not self._transaction.is_active
1828 or (
1829 self._nested_transaction
1830 and not self._nested_transaction.is_active
1831 )
1832 ):
1833 self._invalid_transaction()
1834
1835 elif self._trans_context_manager:
1836 TransactionalContext._trans_ctx_check(self)
1837
1838 if self._transaction is None:
1839 self._autobegin()
1840
1841 context.pre_exec()
1842
1843 if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
1844 return self._exec_insertmany_context(dialect, context)
1845 else:
1846 return self._exec_single_context(
1847 dialect, context, statement, parameters
1848 )
1849
1850 def _exec_single_context(
1851 self,
1852 dialect: Dialect,
1853 context: ExecutionContext,
1854 statement: Union[str, Compiled],
1855 parameters: Optional[_AnyMultiExecuteParams],
1856 ) -> CursorResult[Any]:
1857 """continue the _execute_context() method for a single DBAPI
1858 cursor.execute() or cursor.executemany() call.
1859
1860 """
1861 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1862 generic_setinputsizes = context._prepare_set_input_sizes()
1863
1864 if generic_setinputsizes:
1865 try:
1866 dialect.do_set_input_sizes(
1867 context.cursor, generic_setinputsizes, context
1868 )
1869 except BaseException as e:
1870 self._handle_dbapi_exception(
1871 e, str(statement), parameters, None, context
1872 )
1873
1874 cursor, str_statement, parameters = (
1875 context.cursor,
1876 context.statement,
1877 context.parameters,
1878 )
1879
1880 effective_parameters: Optional[_AnyExecuteParams]
1881
1882 if not context.executemany:
1883 effective_parameters = parameters[0]
1884 else:
1885 effective_parameters = parameters
1886
1887 if self._has_events or self.engine._has_events:
1888 for fn in self.dispatch.before_cursor_execute:
1889 str_statement, effective_parameters = fn(
1890 self,
1891 cursor,
1892 str_statement,
1893 effective_parameters,
1894 context,
1895 context.executemany,
1896 )
1897
1898 if self._echo:
1899 self._log_info(str_statement)
1900
1901 stats = context._get_cache_stats()
1902
1903 if not self.engine.hide_parameters:
1904 self._log_info(
1905 "[%s] %r",
1906 stats,
1907 sql_util._repr_params(
1908 effective_parameters,
1909 batches=10,
1910 ismulti=context.executemany,
1911 ),
1912 )
1913 else:
1914 self._log_info(
1915 "[%s] [SQL parameters hidden due to hide_parameters=True]",
1916 stats,
1917 )
1918
1919 evt_handled: bool = False
1920 try:
1921 if context.execute_style is ExecuteStyle.EXECUTEMANY:
1922 effective_parameters = cast(
1923 "_CoreMultiExecuteParams", effective_parameters
1924 )
1925 if self.dialect._has_events:
1926 for fn in self.dialect.dispatch.do_executemany:
1927 if fn(
1928 cursor,
1929 str_statement,
1930 effective_parameters,
1931 context,
1932 ):
1933 evt_handled = True
1934 break
1935 if not evt_handled:
1936 self.dialect.do_executemany(
1937 cursor,
1938 str_statement,
1939 effective_parameters,
1940 context,
1941 )
1942 elif not effective_parameters and context.no_parameters:
1943 if self.dialect._has_events:
1944 for fn in self.dialect.dispatch.do_execute_no_params:
1945 if fn(cursor, str_statement, context):
1946 evt_handled = True
1947 break
1948 if not evt_handled:
1949 self.dialect.do_execute_no_params(
1950 cursor, str_statement, context
1951 )
1952 else:
1953 effective_parameters = cast(
1954 "_CoreSingleExecuteParams", effective_parameters
1955 )
1956 if self.dialect._has_events:
1957 for fn in self.dialect.dispatch.do_execute:
1958 if fn(
1959 cursor,
1960 str_statement,
1961 effective_parameters,
1962 context,
1963 ):
1964 evt_handled = True
1965 break
1966 if not evt_handled:
1967 self.dialect.do_execute(
1968 cursor, str_statement, effective_parameters, context
1969 )
1970
1971 if self._has_events or self.engine._has_events:
1972 self.dispatch.after_cursor_execute(
1973 self,
1974 cursor,
1975 str_statement,
1976 effective_parameters,
1977 context,
1978 context.executemany,
1979 )
1980
1981 context.post_exec()
1982
1983 result = context._setup_result_proxy()
1984
1985 except BaseException as e:
1986 self._handle_dbapi_exception(
1987 e, str_statement, effective_parameters, cursor, context
1988 )
1989
1990 return result
1991
1992 def _exec_insertmany_context(
1993 self,
1994 dialect: Dialect,
1995 context: ExecutionContext,
1996 ) -> CursorResult[Any]:
1997 """continue the _execute_context() method for an "insertmanyvalues"
1998 operation, which will invoke DBAPI
1999 cursor.execute() one or more times with individual log and
2000 event hook calls.
2001
2002 """
2003
2004 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
2005 generic_setinputsizes = context._prepare_set_input_sizes()
2006 else:
2007 generic_setinputsizes = None
2008
2009 cursor, str_statement, parameters = (
2010 context.cursor,
2011 context.statement,
2012 context.parameters,
2013 )
2014
2015 effective_parameters = parameters
2016
2017 engine_events = self._has_events or self.engine._has_events
2018 if self.dialect._has_events:
2019 do_execute_dispatch: Iterable[Any] = (
2020 self.dialect.dispatch.do_execute
2021 )
2022 else:
2023 do_execute_dispatch = ()
2024
2025 if engine_events:
2026 _WORKAROUND_ISSUE_13018 = getattr(
2027 self, "_WORKAROUND_ISSUE_13018", False
2028 )
2029 else:
2030 _WORKAROUND_ISSUE_13018 = False
2031
2032 if self._echo:
2033 stats = context._get_cache_stats() + " (insertmanyvalues)"
2034
2035 preserve_rowcount = context.execution_options.get(
2036 "preserve_rowcount", False
2037 )
2038 rowcount = 0
2039
2040 for imv_batch in dialect._deliver_insertmanyvalues_batches(
2041 self,
2042 cursor,
2043 str_statement,
2044 effective_parameters,
2045 generic_setinputsizes,
2046 context,
2047 ):
2048 if imv_batch.processed_setinputsizes:
2049 try:
2050 dialect.do_set_input_sizes(
2051 context.cursor,
2052 imv_batch.processed_setinputsizes,
2053 context,
2054 )
2055 except BaseException as e:
2056 self._handle_dbapi_exception(
2057 e,
2058 sql_util._long_statement(imv_batch.replaced_statement),
2059 imv_batch.replaced_parameters,
2060 None,
2061 context,
2062 is_sub_exec=True,
2063 )
2064
2065 sub_stmt = imv_batch.replaced_statement
2066 sub_params = imv_batch.replaced_parameters
2067
2068 if engine_events:
2069 for fn in self.dispatch.before_cursor_execute:
2070 sub_stmt, sub_params = fn(
2071 self,
2072 cursor,
2073 sub_stmt,
2074 sub_params,
2075 context,
2076 True,
2077 )
2078
2079 if self._echo:
2080 self._log_info(sql_util._long_statement(sub_stmt))
2081
2082 imv_stats = f""" {imv_batch.batchnum}/{
2083 imv_batch.total_batches
2084 } ({
2085 'ordered'
2086 if imv_batch.rows_sorted else 'unordered'
2087 }{
2088 '; batch not supported'
2089 if imv_batch.is_downgraded
2090 else ''
2091 })"""
2092
2093 if imv_batch.batchnum == 1:
2094 stats += imv_stats
2095 else:
2096 stats = f"insertmanyvalues{imv_stats}"
2097
2098 if not self.engine.hide_parameters:
2099 self._log_info(
2100 "[%s] %r",
2101 stats,
2102 sql_util._repr_params(
2103 sub_params,
2104 batches=10,
2105 ismulti=False,
2106 ),
2107 )
2108 else:
2109 self._log_info(
2110 "[%s] [SQL parameters hidden due to "
2111 "hide_parameters=True]",
2112 stats,
2113 )
2114
2115 try:
2116 for fn in do_execute_dispatch:
2117 if fn(
2118 cursor,
2119 sub_stmt,
2120 sub_params,
2121 context,
2122 ):
2123 break
2124 else:
2125 dialect.do_execute(
2126 cursor,
2127 sub_stmt,
2128 sub_params,
2129 context,
2130 )
2131
2132 except BaseException as e:
2133 self._handle_dbapi_exception(
2134 e,
2135 sql_util._long_statement(sub_stmt),
2136 sub_params,
2137 cursor,
2138 context,
2139 is_sub_exec=True,
2140 )
2141
2142 if engine_events:
2143 self.dispatch.after_cursor_execute(
2144 self,
2145 cursor,
2146 # TODO: this will be fixed by #13018
2147 sub_stmt if _WORKAROUND_ISSUE_13018 else str_statement,
2148 sub_params if _WORKAROUND_ISSUE_13018 else parameters,
2149 context,
2150 context.executemany,
2151 )
2152
2153 if preserve_rowcount:
2154 rowcount += imv_batch.current_batch_size
2155
2156 try:
2157 context.post_exec()
2158
2159 if preserve_rowcount:
2160 context._rowcount = rowcount # type: ignore[attr-defined]
2161
2162 result = context._setup_result_proxy()
2163
2164 except BaseException as e:
2165 self._handle_dbapi_exception(
2166 e, str_statement, effective_parameters, cursor, context
2167 )
2168
2169 return result
2170
2171 def _cursor_execute(
2172 self,
2173 cursor: DBAPICursor,
2174 statement: str,
2175 parameters: _DBAPISingleExecuteParams,
2176 context: Optional[ExecutionContext] = None,
2177 ) -> None:
2178 """Execute a statement + params on the given cursor.
2179
2180 Adds appropriate logging and exception handling.
2181
2182 This method is used by DefaultDialect for special-case
2183 executions, such as for sequences and column defaults.
2184 The path of statement execution in the majority of cases
2185 terminates at _execute_context().
2186
2187 """
2188 if self._has_events or self.engine._has_events:
2189 for fn in self.dispatch.before_cursor_execute:
2190 statement, parameters = fn(
2191 self, cursor, statement, parameters, context, False
2192 )
2193
2194 if self._echo:
2195 self._log_info(statement)
2196 self._log_info("[raw sql] %r", parameters)
2197 try:
2198 for fn in (
2199 ()
2200 if not self.dialect._has_events
2201 else self.dialect.dispatch.do_execute
2202 ):
2203 if fn(cursor, statement, parameters, context):
2204 break
2205 else:
2206 self.dialect.do_execute(cursor, statement, parameters, context)
2207 except BaseException as e:
2208 self._handle_dbapi_exception(
2209 e, statement, parameters, cursor, context
2210 )
2211
2212 if self._has_events or self.engine._has_events:
2213 self.dispatch.after_cursor_execute(
2214 self, cursor, statement, parameters, context, False
2215 )
2216
2217 def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
2218 """Close the given cursor, catching exceptions
2219 and turning into log warnings.
2220
2221 """
2222 try:
2223 cursor.close()
2224 except Exception:
2225 # log the error through the connection pool's logger.
2226 self.engine.pool.logger.error(
2227 "Error closing cursor", exc_info=True
2228 )
2229
2230 _reentrant_error = False
2231 _is_disconnect = False
2232
2233 def _handle_dbapi_exception(
2234 self,
2235 e: BaseException,
2236 statement: Optional[str],
2237 parameters: Optional[_AnyExecuteParams],
2238 cursor: Optional[DBAPICursor],
2239 context: Optional[ExecutionContext],
2240 is_sub_exec: bool = False,
2241 ) -> NoReturn:
2242 exc_info = sys.exc_info()
2243
2244 is_exit_exception = util.is_exit_exception(e)
2245
2246 if not self._is_disconnect:
2247 self._is_disconnect = (
2248 isinstance(e, self.dialect.loaded_dbapi.Error)
2249 and not self.closed
2250 and self.dialect.is_disconnect(
2251 e,
2252 self._dbapi_connection if not self.invalidated else None,
2253 cursor,
2254 )
2255 ) or (is_exit_exception and not self.closed)
2256
2257 invalidate_pool_on_disconnect = not is_exit_exception
2258
2259 ismulti: bool = (
2260 not is_sub_exec and context.executemany
2261 if context is not None
2262 else False
2263 )
2264 if self._reentrant_error:
2265 raise exc.DBAPIError.instance(
2266 statement,
2267 parameters,
2268 e,
2269 self.dialect.loaded_dbapi.Error,
2270 hide_parameters=self.engine.hide_parameters,
2271 dialect=self.dialect,
2272 ismulti=ismulti,
2273 ).with_traceback(exc_info[2]) from e
2274 self._reentrant_error = True
2275 try:
2276 # non-DBAPI error - if we already got a context,
2277 # or there's no string statement, don't wrap it
2278 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or (
2279 statement is not None
2280 and context is None
2281 and not is_exit_exception
2282 )
2283
2284 if should_wrap:
2285 sqlalchemy_exception = exc.DBAPIError.instance(
2286 statement,
2287 parameters,
2288 cast(Exception, e),
2289 self.dialect.loaded_dbapi.Error,
2290 hide_parameters=self.engine.hide_parameters,
2291 connection_invalidated=self._is_disconnect,
2292 dialect=self.dialect,
2293 ismulti=ismulti,
2294 )
2295 else:
2296 sqlalchemy_exception = None
2297
2298 newraise = None
2299
2300 if (self.dialect._has_events) and not self._execution_options.get(
2301 "skip_user_error_events", False
2302 ):
2303 ctx = ExceptionContextImpl(
2304 e,
2305 sqlalchemy_exception,
2306 self.engine,
2307 self.dialect,
2308 self,
2309 cursor,
2310 statement,
2311 parameters,
2312 context,
2313 self._is_disconnect,
2314 invalidate_pool_on_disconnect,
2315 False,
2316 )
2317
2318 for fn in self.dialect.dispatch.handle_error:
2319 try:
2320 # handler returns an exception;
2321 # call next handler in a chain
2322 per_fn = fn(ctx)
2323 if per_fn is not None:
2324 ctx.chained_exception = newraise = per_fn
2325 except Exception as _raised:
2326 # handler raises an exception - stop processing
2327 newraise = _raised
2328 break
2329
2330 if self._is_disconnect != ctx.is_disconnect:
2331 self._is_disconnect = ctx.is_disconnect
2332 if sqlalchemy_exception:
2333 sqlalchemy_exception.connection_invalidated = (
2334 ctx.is_disconnect
2335 )
2336
2337 # set up potentially user-defined value for
2338 # invalidate pool.
2339 invalidate_pool_on_disconnect = (
2340 ctx.invalidate_pool_on_disconnect
2341 )
2342
2343 if should_wrap and context:
2344 context.handle_dbapi_exception(e)
2345
2346 if not self._is_disconnect:
2347 if cursor:
2348 self._safe_close_cursor(cursor)
2349 # "autorollback" was mostly relevant in 1.x series.
2350 # It's very unlikely to reach here, as the connection
2351 # does autobegin so when we are here, we are usually
2352 # in an explicit / semi-explicit transaction.
2353 # however we have a test which manufactures this
2354 # scenario in any case using an event handler.
2355 # test/engine/test_execute.py-> test_actual_autorollback
2356 if not self.in_transaction():
2357 self._rollback_impl()
2358
2359 if newraise:
2360 raise newraise.with_traceback(exc_info[2]) from e
2361 elif should_wrap:
2362 assert sqlalchemy_exception is not None
2363 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2364 else:
2365 assert exc_info[1] is not None
2366 raise exc_info[1].with_traceback(exc_info[2])
2367 finally:
2368 del self._reentrant_error
2369 if self._is_disconnect:
2370 del self._is_disconnect
2371 if not self.invalidated:
2372 dbapi_conn_wrapper = self._dbapi_connection
2373 assert dbapi_conn_wrapper is not None
2374 if invalidate_pool_on_disconnect:
2375 self.engine.pool._invalidate(dbapi_conn_wrapper, e)
2376 self.invalidate(e)
2377
2378 @classmethod
2379 def _handle_dbapi_exception_noconnection(
2380 cls,
2381 e: BaseException,
2382 dialect: Dialect,
2383 engine: Optional[Engine] = None,
2384 is_disconnect: Optional[bool] = None,
2385 invalidate_pool_on_disconnect: bool = True,
2386 is_pre_ping: bool = False,
2387 ) -> NoReturn:
2388 exc_info = sys.exc_info()
2389
2390 if is_disconnect is None:
2391 is_disconnect = isinstance(
2392 e, dialect.loaded_dbapi.Error
2393 ) and dialect.is_disconnect(e, None, None)
2394
2395 should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
2396
2397 if should_wrap:
2398 sqlalchemy_exception = exc.DBAPIError.instance(
2399 None,
2400 None,
2401 cast(Exception, e),
2402 dialect.loaded_dbapi.Error,
2403 hide_parameters=(
2404 engine.hide_parameters if engine is not None else False
2405 ),
2406 connection_invalidated=is_disconnect,
2407 dialect=dialect,
2408 )
2409 else:
2410 sqlalchemy_exception = None
2411
2412 newraise = None
2413
2414 if dialect._has_events:
2415 ctx = ExceptionContextImpl(
2416 e,
2417 sqlalchemy_exception,
2418 engine,
2419 dialect,
2420 None,
2421 None,
2422 None,
2423 None,
2424 None,
2425 is_disconnect,
2426 invalidate_pool_on_disconnect,
2427 is_pre_ping,
2428 )
2429 for fn in dialect.dispatch.handle_error:
2430 try:
2431 # handler returns an exception;
2432 # call next handler in a chain
2433 per_fn = fn(ctx)
2434 if per_fn is not None:
2435 ctx.chained_exception = newraise = per_fn
2436 except Exception as _raised:
2437 # handler raises an exception - stop processing
2438 newraise = _raised
2439 break
2440
2441 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
2442 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect
2443
2444 if newraise:
2445 raise newraise.with_traceback(exc_info[2]) from e
2446 elif should_wrap:
2447 assert sqlalchemy_exception is not None
2448 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2449 else:
2450 assert exc_info[1] is not None
2451 raise exc_info[1].with_traceback(exc_info[2])
2452
2453 def _run_ddl_visitor(
2454 self,
2455 visitorcallable: Type[InvokeDDLBase],
2456 element: SchemaVisitable,
2457 **kwargs: Any,
2458 ) -> None:
2459 """run a DDL visitor.
2460
2461 This method is only here so that the MockConnection can change the
2462 options given to the visitor so that "checkfirst" is skipped.
2463
2464 """
2465 visitorcallable(
2466 dialect=self.dialect, connection=self, **kwargs
2467 ).traverse_single(element)
2468
2469
2470class ExceptionContextImpl(ExceptionContext):
2471 """Implement the :class:`.ExceptionContext` interface."""
2472
2473 __slots__ = (
2474 "connection",
2475 "engine",
2476 "dialect",
2477 "cursor",
2478 "statement",
2479 "parameters",
2480 "original_exception",
2481 "sqlalchemy_exception",
2482 "chained_exception",
2483 "execution_context",
2484 "is_disconnect",
2485 "invalidate_pool_on_disconnect",
2486 "is_pre_ping",
2487 )
2488
2489 def __init__(
2490 self,
2491 exception: BaseException,
2492 sqlalchemy_exception: Optional[exc.StatementError],
2493 engine: Optional[Engine],
2494 dialect: Dialect,
2495 connection: Optional[Connection],
2496 cursor: Optional[DBAPICursor],
2497 statement: Optional[str],
2498 parameters: Optional[_DBAPIAnyExecuteParams],
2499 context: Optional[ExecutionContext],
2500 is_disconnect: bool,
2501 invalidate_pool_on_disconnect: bool,
2502 is_pre_ping: bool,
2503 ):
2504 self.engine = engine
2505 self.dialect = dialect
2506 self.connection = connection
2507 self.sqlalchemy_exception = sqlalchemy_exception
2508 self.original_exception = exception
2509 self.execution_context = context
2510 self.statement = statement
2511 self.parameters = parameters
2512 self.is_disconnect = is_disconnect
2513 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
2514 self.is_pre_ping = is_pre_ping
2515
2516
2517class Transaction(TransactionalContext):
2518 """Represent a database transaction in progress.
2519
2520 The :class:`.Transaction` object is procured by
2521 calling the :meth:`_engine.Connection.begin` method of
2522 :class:`_engine.Connection`::
2523
2524 from sqlalchemy import create_engine
2525
2526 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
2527 connection = engine.connect()
2528 trans = connection.begin()
2529 connection.execute(text("insert into x (a, b) values (1, 2)"))
2530 trans.commit()
2531
2532 The object provides :meth:`.rollback` and :meth:`.commit`
2533 methods in order to control transaction boundaries. It
2534 also implements a context manager interface so that
2535 the Python ``with`` statement can be used with the
2536 :meth:`_engine.Connection.begin` method::
2537
2538 with connection.begin():
2539 connection.execute(text("insert into x (a, b) values (1, 2)"))
2540
2541 The Transaction object is **not** threadsafe.
2542
2543 .. seealso::
2544
2545 :meth:`_engine.Connection.begin`
2546
2547 :meth:`_engine.Connection.begin_twophase`
2548
2549 :meth:`_engine.Connection.begin_nested`
2550
2551 .. index::
2552 single: thread safety; Transaction
2553 """ # noqa
2554
2555 __slots__ = ()
2556
2557 _is_root: bool = False
2558 is_active: bool
2559 connection: Connection
2560
2561 def __init__(self, connection: Connection):
2562 raise NotImplementedError()
2563
2564 @property
2565 def _deactivated_from_connection(self) -> bool:
2566 """True if this transaction is totally deactivated from the connection
2567 and therefore can no longer affect its state.
2568
2569 """
2570 raise NotImplementedError()
2571
2572 def _do_close(self) -> None:
2573 raise NotImplementedError()
2574
2575 def _do_rollback(self) -> None:
2576 raise NotImplementedError()
2577
2578 def _do_commit(self) -> None:
2579 raise NotImplementedError()
2580
2581 @property
2582 def is_valid(self) -> bool:
2583 return self.is_active and not self.connection.invalidated
2584
2585 def close(self) -> None:
2586 """Close this :class:`.Transaction`.
2587
2588 If this transaction is the base transaction in a begin/commit
2589 nesting, the transaction will rollback(). Otherwise, the
2590 method returns.
2591
2592 This is used to cancel a Transaction without affecting the scope of
2593 an enclosing transaction.
2594
2595 """
2596 try:
2597 self._do_close()
2598 finally:
2599 assert not self.is_active
2600
2601 def rollback(self) -> None:
2602 """Roll back this :class:`.Transaction`.
2603
2604 The implementation of this may vary based on the type of transaction in
2605 use:
2606
2607 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2608 it corresponds to a ROLLBACK.
2609
2610 * For a :class:`.NestedTransaction`, it corresponds to a
2611 "ROLLBACK TO SAVEPOINT" operation.
2612
2613 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2614 phase transactions may be used.
2615
2616
2617 """
2618 try:
2619 self._do_rollback()
2620 finally:
2621 assert not self.is_active
2622
2623 def commit(self) -> None:
2624 """Commit this :class:`.Transaction`.
2625
2626 The implementation of this may vary based on the type of transaction in
2627 use:
2628
2629 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2630 it corresponds to a COMMIT.
2631
2632 * For a :class:`.NestedTransaction`, it corresponds to a
2633 "RELEASE SAVEPOINT" operation.
2634
2635 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2636 phase transactions may be used.
2637
2638 """
2639 try:
2640 self._do_commit()
2641 finally:
2642 assert not self.is_active
2643
2644 def _get_subject(self) -> Connection:
2645 return self.connection
2646
2647 def _transaction_is_active(self) -> bool:
2648 return self.is_active
2649
2650 def _transaction_is_closed(self) -> bool:
2651 return not self._deactivated_from_connection
2652
2653 def _rollback_can_be_called(self) -> bool:
2654 # for RootTransaction / NestedTransaction, it's safe to call
2655 # rollback() even if the transaction is deactive and no warnings
2656 # will be emitted. tested in
2657 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
2658 return True
2659
2660
2661class RootTransaction(Transaction):
2662 """Represent the "root" transaction on a :class:`_engine.Connection`.
2663
2664 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
2665 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
2666 is created by calling upon the :meth:`_engine.Connection.begin` method, and
2667 remains associated with the :class:`_engine.Connection` throughout its
2668 active span. The current :class:`_engine.RootTransaction` in use is
2669 accessible via the :attr:`_engine.Connection.get_transaction` method of
2670 :class:`_engine.Connection`.
2671
2672 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs
2673 "autobegin" behavior that will create a new
2674 :class:`_engine.RootTransaction` whenever a connection in a
2675 non-transactional state is used to emit commands on the DBAPI connection.
2676 The scope of the :class:`_engine.RootTransaction` in 2.0 style
2677 use can be controlled using the :meth:`_engine.Connection.commit` and
2678 :meth:`_engine.Connection.rollback` methods.
2679
2680
2681 """
2682
2683 _is_root = True
2684
2685 __slots__ = ("connection", "is_active")
2686
2687 def __init__(self, connection: Connection):
2688 assert connection._transaction is None
2689 if connection._trans_context_manager:
2690 TransactionalContext._trans_ctx_check(connection)
2691 self.connection = connection
2692 self._connection_begin_impl()
2693 connection._transaction = self
2694
2695 self.is_active = True
2696
2697 def _deactivate_from_connection(self) -> None:
2698 if self.is_active:
2699 assert self.connection._transaction is self
2700 self.is_active = False
2701
2702 elif self.connection._transaction is not self:
2703 util.warn("transaction already deassociated from connection")
2704
2705 @property
2706 def _deactivated_from_connection(self) -> bool:
2707 return self.connection._transaction is not self
2708
2709 def _connection_begin_impl(self) -> None:
2710 self.connection._begin_impl(self)
2711
2712 def _connection_rollback_impl(self) -> None:
2713 self.connection._rollback_impl()
2714
2715 def _connection_commit_impl(self) -> None:
2716 self.connection._commit_impl()
2717
2718 def _close_impl(self, try_deactivate: bool = False) -> None:
2719 try:
2720 if self.is_active:
2721 self._connection_rollback_impl()
2722
2723 if self.connection._nested_transaction:
2724 self.connection._nested_transaction._cancel()
2725 finally:
2726 if self.is_active or try_deactivate:
2727 self._deactivate_from_connection()
2728 if self.connection._transaction is self:
2729 self.connection._transaction = None
2730
2731 assert not self.is_active
2732 assert self.connection._transaction is not self
2733
2734 def _do_close(self) -> None:
2735 self._close_impl()
2736
2737 def _do_rollback(self) -> None:
2738 self._close_impl(try_deactivate=True)
2739
2740 def _do_commit(self) -> None:
2741 if self.is_active:
2742 assert self.connection._transaction is self
2743
2744 try:
2745 self._connection_commit_impl()
2746 finally:
2747 # whether or not commit succeeds, cancel any
2748 # nested transactions, make this transaction "inactive"
2749 # and remove it as a reset agent
2750 if self.connection._nested_transaction:
2751 self.connection._nested_transaction._cancel()
2752
2753 self._deactivate_from_connection()
2754
2755 # ...however only remove as the connection's current transaction
2756 # if commit succeeded. otherwise it stays on so that a rollback
2757 # needs to occur.
2758 self.connection._transaction = None
2759 else:
2760 if self.connection._transaction is self:
2761 self.connection._invalid_transaction()
2762 else:
2763 raise exc.InvalidRequestError("This transaction is inactive")
2764
2765 assert not self.is_active
2766 assert self.connection._transaction is not self
2767
2768
2769class NestedTransaction(Transaction):
2770 """Represent a 'nested', or SAVEPOINT transaction.
2771
2772 The :class:`.NestedTransaction` object is created by calling the
2773 :meth:`_engine.Connection.begin_nested` method of
2774 :class:`_engine.Connection`.
2775
2776 When using :class:`.NestedTransaction`, the semantics of "begin" /
2777 "commit" / "rollback" are as follows:
2778
2779 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
2780 the savepoint is given an explicit name that is part of the state
2781 of this object.
2782
2783 * The :meth:`.NestedTransaction.commit` method corresponds to a
2784 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
2785 with this :class:`.NestedTransaction`.
2786
2787 * The :meth:`.NestedTransaction.rollback` method corresponds to a
2788 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
2789 associated with this :class:`.NestedTransaction`.
2790
2791 The rationale for mimicking the semantics of an outer transaction in
2792 terms of savepoints so that code may deal with a "savepoint" transaction
2793 and an "outer" transaction in an agnostic way.
2794
2795 .. seealso::
2796
2797 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
2798
2799 """
2800
2801 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
2802
2803 _savepoint: str
2804
2805 def __init__(self, connection: Connection):
2806 assert connection._transaction is not None
2807 if connection._trans_context_manager:
2808 TransactionalContext._trans_ctx_check(connection)
2809 self.connection = connection
2810 self._savepoint = self.connection._savepoint_impl()
2811 self.is_active = True
2812 self._previous_nested = connection._nested_transaction
2813 connection._nested_transaction = self
2814
2815 def _deactivate_from_connection(self, warn: bool = True) -> None:
2816 if self.connection._nested_transaction is self:
2817 self.connection._nested_transaction = self._previous_nested
2818 elif warn:
2819 util.warn(
2820 "nested transaction already deassociated from connection"
2821 )
2822
2823 @property
2824 def _deactivated_from_connection(self) -> bool:
2825 return self.connection._nested_transaction is not self
2826
2827 def _cancel(self) -> None:
2828 # called by RootTransaction when the outer transaction is
2829 # committed, rolled back, or closed to cancel all savepoints
2830 # without any action being taken
2831 self.is_active = False
2832 self._deactivate_from_connection()
2833 if self._previous_nested:
2834 self._previous_nested._cancel()
2835
2836 def _close_impl(
2837 self, deactivate_from_connection: bool, warn_already_deactive: bool
2838 ) -> None:
2839 try:
2840 if (
2841 self.is_active
2842 and self.connection._transaction
2843 and self.connection._transaction.is_active
2844 ):
2845 self.connection._rollback_to_savepoint_impl(self._savepoint)
2846 finally:
2847 self.is_active = False
2848
2849 if deactivate_from_connection:
2850 self._deactivate_from_connection(warn=warn_already_deactive)
2851
2852 assert not self.is_active
2853 if deactivate_from_connection:
2854 assert self.connection._nested_transaction is not self
2855
2856 def _do_close(self) -> None:
2857 self._close_impl(True, False)
2858
2859 def _do_rollback(self) -> None:
2860 self._close_impl(True, True)
2861
2862 def _do_commit(self) -> None:
2863 if self.is_active:
2864 try:
2865 self.connection._release_savepoint_impl(self._savepoint)
2866 finally:
2867 # nested trans becomes inactive on failed release
2868 # unconditionally. this prevents it from trying to
2869 # emit SQL when it rolls back.
2870 self.is_active = False
2871
2872 # but only de-associate from connection if it succeeded
2873 self._deactivate_from_connection()
2874 else:
2875 if self.connection._nested_transaction is self:
2876 self.connection._invalid_transaction()
2877 else:
2878 raise exc.InvalidRequestError(
2879 "This nested transaction is inactive"
2880 )
2881
2882
2883class TwoPhaseTransaction(RootTransaction):
2884 """Represent a two-phase transaction.
2885
2886 A new :class:`.TwoPhaseTransaction` object may be procured
2887 using the :meth:`_engine.Connection.begin_twophase` method.
2888
2889 The interface is the same as that of :class:`.Transaction`
2890 with the addition of the :meth:`prepare` method.
2891
2892 """
2893
2894 __slots__ = ("xid", "_is_prepared")
2895
2896 xid: Any
2897
2898 def __init__(self, connection: Connection, xid: Any):
2899 self._is_prepared = False
2900 self.xid = xid
2901 super().__init__(connection)
2902
2903 def prepare(self) -> None:
2904 """Prepare this :class:`.TwoPhaseTransaction`.
2905
2906 After a PREPARE, the transaction can be committed.
2907
2908 """
2909 if not self.is_active:
2910 raise exc.InvalidRequestError("This transaction is inactive")
2911 self.connection._prepare_twophase_impl(self.xid)
2912 self._is_prepared = True
2913
2914 def _connection_begin_impl(self) -> None:
2915 self.connection._begin_twophase_impl(self)
2916
2917 def _connection_rollback_impl(self) -> None:
2918 self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
2919
2920 def _connection_commit_impl(self) -> None:
2921 self.connection._commit_twophase_impl(self.xid, self._is_prepared)
2922
2923
2924class Engine(
2925 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"]
2926):
2927 """
2928 Connects a :class:`~sqlalchemy.pool.Pool` and
2929 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
2930 source of database connectivity and behavior.
2931
2932 An :class:`_engine.Engine` object is instantiated publicly using the
2933 :func:`~sqlalchemy.create_engine` function.
2934
2935 .. seealso::
2936
2937 :doc:`/core/engines`
2938
2939 :ref:`connections_toplevel`
2940
2941 """
2942
2943 dispatch: dispatcher[ConnectionEventsTarget]
2944
2945 _compiled_cache: Optional[CompiledCacheType]
2946
2947 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
2948 _has_events: bool = False
2949 _connection_cls: Type[Connection] = Connection
2950 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
2951 _is_future: bool = False
2952
2953 _schema_translate_map: Optional[SchemaTranslateMapType] = None
2954 _option_cls: Type[OptionEngine]
2955
2956 dialect: Dialect
2957 pool: Pool
2958 url: URL
2959 hide_parameters: bool
2960
2961 def __init__(
2962 self,
2963 pool: Pool,
2964 dialect: Dialect,
2965 url: URL,
2966 logging_name: Optional[str] = None,
2967 echo: Optional[_EchoFlagType] = None,
2968 query_cache_size: int = 500,
2969 execution_options: Optional[Mapping[str, Any]] = None,
2970 hide_parameters: bool = False,
2971 ):
2972 self.pool = pool
2973 self.url = url
2974 self.dialect = dialect
2975 if logging_name:
2976 self.logging_name = logging_name
2977 self.echo = echo
2978 self.hide_parameters = hide_parameters
2979 if query_cache_size != 0:
2980 self._compiled_cache = util.LRUCache(
2981 query_cache_size, size_alert=self._lru_size_alert
2982 )
2983 else:
2984 self._compiled_cache = None
2985 log.instance_logger(self, echoflag=echo)
2986 if execution_options:
2987 self.update_execution_options(**execution_options)
2988
2989 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
2990 if self._should_log_info():
2991 self.logger.info(
2992 "Compiled cache size pruning from %d items to %d. "
2993 "Increase cache size to reduce the frequency of pruning.",
2994 len(cache),
2995 cache.capacity,
2996 )
2997
2998 @property
2999 def engine(self) -> Engine:
3000 """Returns this :class:`.Engine`.
3001
3002 Used for legacy schemes that accept :class:`.Connection` /
3003 :class:`.Engine` objects within the same variable.
3004
3005 """
3006 return self
3007
3008 def clear_compiled_cache(self) -> None:
3009 """Clear the compiled cache associated with the dialect.
3010
3011 This applies **only** to the built-in cache that is established
3012 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
3013 It will not impact any dictionary caches that were passed via the
3014 :paramref:`.Connection.execution_options.compiled_cache` parameter.
3015
3016 .. versionadded:: 1.4
3017
3018 """
3019 if self._compiled_cache:
3020 self._compiled_cache.clear()
3021
3022 def update_execution_options(self, **opt: Any) -> None:
3023 r"""Update the default execution_options dictionary
3024 of this :class:`_engine.Engine`.
3025
3026 The given keys/values in \**opt are added to the
3027 default execution options that will be used for
3028 all connections. The initial contents of this dictionary
3029 can be sent via the ``execution_options`` parameter
3030 to :func:`_sa.create_engine`.
3031
3032 .. seealso::
3033
3034 :meth:`_engine.Connection.execution_options`
3035
3036 :meth:`_engine.Engine.execution_options`
3037
3038 """
3039 self.dispatch.set_engine_execution_options(self, opt)
3040 self._execution_options = self._execution_options.union(opt)
3041 self.dialect.set_engine_execution_options(self, opt)
3042
3043 @overload
3044 def execution_options(
3045 self,
3046 *,
3047 compiled_cache: Optional[CompiledCacheType] = ...,
3048 logging_token: str = ...,
3049 isolation_level: IsolationLevel = ...,
3050 insertmanyvalues_page_size: int = ...,
3051 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
3052 **opt: Any,
3053 ) -> OptionEngine: ...
3054
3055 @overload
3056 def execution_options(self, **opt: Any) -> OptionEngine: ...
3057
3058 def execution_options(self, **opt: Any) -> OptionEngine:
3059 """Return a new :class:`_engine.Engine` that will provide
3060 :class:`_engine.Connection` objects with the given execution options.
3061
3062 The returned :class:`_engine.Engine` remains related to the original
3063 :class:`_engine.Engine` in that it shares the same connection pool and
3064 other state:
3065
3066 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
3067 is the
3068 same instance. The :meth:`_engine.Engine.dispose`
3069 method will replace
3070 the connection pool instance for the parent engine as well
3071 as this one.
3072 * Event listeners are "cascaded" - meaning, the new
3073 :class:`_engine.Engine`
3074 inherits the events of the parent, and new events can be associated
3075 with the new :class:`_engine.Engine` individually.
3076 * The logging configuration and logging_name is copied from the parent
3077 :class:`_engine.Engine`.
3078
3079 The intent of the :meth:`_engine.Engine.execution_options` method is
3080 to implement schemes where multiple :class:`_engine.Engine`
3081 objects refer to the same connection pool, but are differentiated
3082 by options that affect some execution-level behavior for each
3083 engine. One such example is breaking into separate "reader" and
3084 "writer" :class:`_engine.Engine` instances, where one
3085 :class:`_engine.Engine`
3086 has a lower :term:`isolation level` setting configured or is even
3087 transaction-disabled using "autocommit". An example of this
3088 configuration is at :ref:`dbapi_autocommit_multiple`.
3089
3090 Another example is one that
3091 uses a custom option ``shard_id`` which is consumed by an event
3092 to change the current schema on a database connection::
3093
3094 from sqlalchemy import event
3095 from sqlalchemy.engine import Engine
3096
3097 primary_engine = create_engine("mysql+mysqldb://")
3098 shard1 = primary_engine.execution_options(shard_id="shard1")
3099 shard2 = primary_engine.execution_options(shard_id="shard2")
3100
3101 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}
3102
3103
3104 @event.listens_for(Engine, "before_cursor_execute")
3105 def _switch_shard(conn, cursor, stmt, params, context, executemany):
3106 shard_id = conn.get_execution_options().get("shard_id", "default")
3107 current_shard = conn.info.get("current_shard", None)
3108
3109 if current_shard != shard_id:
3110 cursor.execute("use %s" % shards[shard_id])
3111 conn.info["current_shard"] = shard_id
3112
3113 The above recipe illustrates two :class:`_engine.Engine` objects that
3114 will each serve as factories for :class:`_engine.Connection` objects
3115 that have pre-established "shard_id" execution options present. A
3116 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler
3117 then interprets this execution option to emit a MySQL ``use`` statement
3118 to switch databases before a statement execution, while at the same
3119 time keeping track of which database we've established using the
3120 :attr:`_engine.Connection.info` dictionary.
3121
3122 .. seealso::
3123
3124 :meth:`_engine.Connection.execution_options`
3125 - update execution options
3126 on a :class:`_engine.Connection` object.
3127
3128 :meth:`_engine.Engine.update_execution_options`
3129 - update the execution
3130 options for a given :class:`_engine.Engine` in place.
3131
3132 :meth:`_engine.Engine.get_execution_options`
3133
3134
3135 """ # noqa: E501
3136 return self._option_cls(self, opt)
3137
3138 def get_execution_options(self) -> _ExecuteOptions:
3139 """Get the non-SQL options which will take effect during execution.
3140
3141 .. versionadded: 1.3
3142
3143 .. seealso::
3144
3145 :meth:`_engine.Engine.execution_options`
3146 """
3147 return self._execution_options
3148
3149 @property
3150 def name(self) -> str:
3151 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3152 in use by this :class:`Engine`.
3153
3154 """
3155
3156 return self.dialect.name
3157
3158 @property
3159 def driver(self) -> str:
3160 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3161 in use by this :class:`Engine`.
3162
3163 """
3164
3165 return self.dialect.driver
3166
3167 echo = log.echo_property()
3168
3169 def __repr__(self) -> str:
3170 return "Engine(%r)" % (self.url,)
3171
3172 def dispose(self, close: bool = True) -> None:
3173 """Dispose of the connection pool used by this
3174 :class:`_engine.Engine`.
3175
3176 A new connection pool is created immediately after the old one has been
3177 disposed. The previous connection pool is disposed either actively, by
3178 closing out all currently checked-in connections in that pool, or
3179 passively, by losing references to it but otherwise not closing any
3180 connections. The latter strategy is more appropriate for an initializer
3181 in a forked Python process.
3182
3183 :param close: if left at its default of ``True``, has the
3184 effect of fully closing all **currently checked in**
3185 database connections. Connections that are still checked out
3186 will **not** be closed, however they will no longer be associated
3187 with this :class:`_engine.Engine`,
3188 so when they are closed individually, eventually the
3189 :class:`_pool.Pool` which they are associated with will
3190 be garbage collected and they will be closed out fully, if
3191 not already closed on checkin.
3192
3193 If set to ``False``, the previous connection pool is de-referenced,
3194 and otherwise not touched in any way.
3195
3196 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
3197 parameter to allow the replacement of a connection pool in a child
3198 process without interfering with the connections used by the parent
3199 process.
3200
3201
3202 .. seealso::
3203
3204 :ref:`engine_disposal`
3205
3206 :ref:`pooling_multiprocessing`
3207
3208 """
3209 if close:
3210 self.pool.dispose()
3211 self.pool = self.pool.recreate()
3212 self.dispatch.engine_disposed(self)
3213
3214 @contextlib.contextmanager
3215 def _optional_conn_ctx_manager(
3216 self, connection: Optional[Connection] = None
3217 ) -> Iterator[Connection]:
3218 if connection is None:
3219 with self.connect() as conn:
3220 yield conn
3221 else:
3222 yield connection
3223
3224 @contextlib.contextmanager
3225 def begin(self) -> Iterator[Connection]:
3226 """Return a context manager delivering a :class:`_engine.Connection`
3227 with a :class:`.Transaction` established.
3228
3229 E.g.::
3230
3231 with engine.begin() as conn:
3232 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)"))
3233 conn.execute(text("my_special_procedure(5)"))
3234
3235 Upon successful operation, the :class:`.Transaction`
3236 is committed. If an error is raised, the :class:`.Transaction`
3237 is rolled back.
3238
3239 .. seealso::
3240
3241 :meth:`_engine.Engine.connect` - procure a
3242 :class:`_engine.Connection` from
3243 an :class:`_engine.Engine`.
3244
3245 :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
3246 for a particular :class:`_engine.Connection`.
3247
3248 """ # noqa: E501
3249 with self.connect() as conn:
3250 with conn.begin():
3251 yield conn
3252
3253 def _run_ddl_visitor(
3254 self,
3255 visitorcallable: Type[InvokeDDLBase],
3256 element: SchemaVisitable,
3257 **kwargs: Any,
3258 ) -> None:
3259 with self.begin() as conn:
3260 conn._run_ddl_visitor(visitorcallable, element, **kwargs)
3261
3262 def connect(self) -> Connection:
3263 """Return a new :class:`_engine.Connection` object.
3264
3265 The :class:`_engine.Connection` acts as a Python context manager, so
3266 the typical use of this method looks like::
3267
3268 with engine.connect() as connection:
3269 connection.execute(text("insert into table values ('foo')"))
3270 connection.commit()
3271
3272 Where above, after the block is completed, the connection is "closed"
3273 and its underlying DBAPI resources are returned to the connection pool.
3274 This also has the effect of rolling back any transaction that
3275 was explicitly begun or was begun via autobegin, and will
3276 emit the :meth:`_events.ConnectionEvents.rollback` event if one was
3277 started and is still in progress.
3278
3279 .. seealso::
3280
3281 :meth:`_engine.Engine.begin`
3282
3283 """
3284
3285 return self._connection_cls(self)
3286
3287 def raw_connection(self) -> PoolProxiedConnection:
3288 """Return a "raw" DBAPI connection from the connection pool.
3289
3290 The returned object is a proxied version of the DBAPI
3291 connection object used by the underlying driver in use.
3292 The object will have all the same behavior as the real DBAPI
3293 connection, except that its ``close()`` method will result in the
3294 connection being returned to the pool, rather than being closed
3295 for real.
3296
3297 This method provides direct DBAPI connection access for
3298 special situations when the API provided by
3299 :class:`_engine.Connection`
3300 is not needed. When a :class:`_engine.Connection` object is already
3301 present, the DBAPI connection is available using
3302 the :attr:`_engine.Connection.connection` accessor.
3303
3304 .. seealso::
3305
3306 :ref:`dbapi_connections`
3307
3308 """
3309 return self.pool.connect()
3310
3311
3312class OptionEngineMixin(log.Identified):
3313 _sa_propagate_class_events = False
3314
3315 dispatch: dispatcher[ConnectionEventsTarget]
3316 _compiled_cache: Optional[CompiledCacheType]
3317 dialect: Dialect
3318 pool: Pool
3319 url: URL
3320 hide_parameters: bool
3321 echo: log.echo_property
3322
3323 def __init__(
3324 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter
3325 ):
3326 self._proxied = proxied
3327 self.url = proxied.url
3328 self.dialect = proxied.dialect
3329 self.logging_name = proxied.logging_name
3330 self.echo = proxied.echo
3331 self._compiled_cache = proxied._compiled_cache
3332 self.hide_parameters = proxied.hide_parameters
3333 log.instance_logger(self, echoflag=self.echo)
3334
3335 # note: this will propagate events that are assigned to the parent
3336 # engine after this OptionEngine is created. Since we share
3337 # the events of the parent we also disallow class-level events
3338 # to apply to the OptionEngine class directly.
3339 #
3340 # the other way this can work would be to transfer existing
3341 # events only, using:
3342 # self.dispatch._update(proxied.dispatch)
3343 #
3344 # that might be more appropriate however it would be a behavioral
3345 # change for logic that assigns events to the parent engine and
3346 # would like it to take effect for the already-created sub-engine.
3347 self.dispatch = self.dispatch._join(proxied.dispatch)
3348
3349 self._execution_options = proxied._execution_options
3350 self.update_execution_options(**execution_options)
3351
3352 def update_execution_options(self, **opt: Any) -> None:
3353 raise NotImplementedError()
3354
3355 if not typing.TYPE_CHECKING:
3356 # https://github.com/python/typing/discussions/1095
3357
3358 @property
3359 def pool(self) -> Pool:
3360 return self._proxied.pool
3361
3362 @pool.setter
3363 def pool(self, pool: Pool) -> None:
3364 self._proxied.pool = pool
3365
3366 @property
3367 def _has_events(self) -> bool:
3368 return self._proxied._has_events or self.__dict__.get(
3369 "_has_events", False
3370 )
3371
3372 @_has_events.setter
3373 def _has_events(self, value: bool) -> None:
3374 self.__dict__["_has_events"] = value
3375
3376
3377class OptionEngine(OptionEngineMixin, Engine):
3378 def update_execution_options(self, **opt: Any) -> None:
3379 Engine.update_execution_options(self, **opt)
3380
3381
3382Engine._option_cls = OptionEngine