1# engine/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.
8
9"""
10from __future__ import annotations
11
12import contextlib
13import sys
14import typing
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Iterable
19from typing import Iterator
20from typing import List
21from typing import Mapping
22from typing import NoReturn
23from typing import Optional
24from typing import overload
25from typing import Tuple
26from typing import Type
27from typing import TypeVar
28from typing import Union
29
30from .interfaces import BindTyping
31from .interfaces import ConnectionEventsTarget
32from .interfaces import DBAPICursor
33from .interfaces import ExceptionContext
34from .interfaces import ExecuteStyle
35from .interfaces import ExecutionContext
36from .interfaces import IsolationLevel
37from .util import _distill_params_20
38from .util import _distill_raw_params
39from .util import TransactionalContext
40from .. import exc
41from .. import inspection
42from .. import log
43from .. import util
44from ..sql import compiler
45from ..sql import util as sql_util
46from ..util.typing import TupleAny
47from ..util.typing import TypeVarTuple
48from ..util.typing import Unpack
49
50if typing.TYPE_CHECKING:
51 from . import CursorResult
52 from . import ScalarResult
53 from .interfaces import _AnyExecuteParams
54 from .interfaces import _AnyMultiExecuteParams
55 from .interfaces import _CoreAnyExecuteParams
56 from .interfaces import _CoreMultiExecuteParams
57 from .interfaces import _CoreSingleExecuteParams
58 from .interfaces import _DBAPIAnyExecuteParams
59 from .interfaces import _DBAPISingleExecuteParams
60 from .interfaces import _ExecuteOptions
61 from .interfaces import CompiledCacheType
62 from .interfaces import CoreExecuteOptionsParameter
63 from .interfaces import Dialect
64 from .interfaces import SchemaTranslateMapType
65 from .reflection import Inspector # noqa
66 from .url import URL
67 from ..event import dispatcher
68 from ..log import _EchoFlagType
69 from ..pool import _ConnectionFairy
70 from ..pool import Pool
71 from ..pool import PoolProxiedConnection
72 from ..sql import Executable
73 from ..sql._typing import _InfoType
74 from ..sql.compiler import Compiled
75 from ..sql.ddl import ExecutableDDLElement
76 from ..sql.ddl import SchemaDropper
77 from ..sql.ddl import SchemaGenerator
78 from ..sql.functions import FunctionElement
79 from ..sql.schema import DefaultGenerator
80 from ..sql.schema import HasSchemaAttr
81 from ..sql.schema import SchemaItem
82 from ..sql.selectable import TypedReturnsRows
83
84
85_T = TypeVar("_T", bound=Any)
86_Ts = TypeVarTuple("_Ts")
87_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
88NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
89
90
91class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
92 """Provides high-level functionality for a wrapped DB-API connection.
93
94 The :class:`_engine.Connection` object is procured by calling the
95 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
96 object, and provides services for execution of SQL statements as well
97 as transaction control.
98
99 The Connection object is **not** thread-safe. While a Connection can be
100 shared among threads using properly synchronized access, it is still
101 possible that the underlying DBAPI connection may not support shared
102 access between threads. Check the DBAPI documentation for details.
103
104 The Connection object represents a single DBAPI connection checked out
105 from the connection pool. In this state, the connection pool has no
106 affect upon the connection, including its expiration or timeout state.
107 For the connection pool to properly manage connections, connections
108 should be returned to the connection pool (i.e. ``connection.close()``)
109 whenever the connection is not in use.
110
111 .. index::
112 single: thread safety; Connection
113
114 """
115
116 dialect: Dialect
117 dispatch: dispatcher[ConnectionEventsTarget]
118
119 _sqla_logger_namespace = "sqlalchemy.engine.Connection"
120
121 # used by sqlalchemy.engine.util.TransactionalContext
122 _trans_context_manager: Optional[TransactionalContext] = None
123
124 # legacy as of 2.0, should be eventually deprecated and
125 # removed. was used in the "pre_ping" recipe that's been in the docs
126 # a long time
127 should_close_with_result = False
128
129 _dbapi_connection: Optional[PoolProxiedConnection]
130
131 _execution_options: _ExecuteOptions
132
133 _transaction: Optional[RootTransaction]
134 _nested_transaction: Optional[NestedTransaction]
135
136 def __init__(
137 self,
138 engine: Engine,
139 connection: Optional[PoolProxiedConnection] = None,
140 _has_events: Optional[bool] = None,
141 _allow_revalidate: bool = True,
142 _allow_autobegin: bool = True,
143 ):
144 """Construct a new Connection."""
145 self.engine = engine
146 self.dialect = dialect = engine.dialect
147
148 if connection is None:
149 try:
150 self._dbapi_connection = engine.raw_connection()
151 except dialect.loaded_dbapi.Error as err:
152 Connection._handle_dbapi_exception_noconnection(
153 err, dialect, engine
154 )
155 raise
156 else:
157 self._dbapi_connection = connection
158
159 self._transaction = self._nested_transaction = None
160 self.__savepoint_seq = 0
161 self.__in_begin = False
162
163 self.__can_reconnect = _allow_revalidate
164 self._allow_autobegin = _allow_autobegin
165 self._echo = self.engine._should_log_info()
166
167 if _has_events is None:
168 # if _has_events is sent explicitly as False,
169 # then don't join the dispatch of the engine; we don't
170 # want to handle any of the engine's events in that case.
171 self.dispatch = self.dispatch._join(engine.dispatch)
172 self._has_events = _has_events or (
173 _has_events is None and engine._has_events
174 )
175
176 self._execution_options = engine._execution_options
177
178 if self._has_events or self.engine._has_events:
179 self.dispatch.engine_connect(self)
180
181 # this can be assigned differently via
182 # characteristics.LoggingTokenCharacteristic
183 _message_formatter: Any = None
184
185 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
186 fmt = self._message_formatter
187
188 if fmt:
189 message = fmt(message)
190
191 if log.STACKLEVEL:
192 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
193
194 self.engine.logger.info(message, *arg, **kw)
195
196 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
197 fmt = self._message_formatter
198
199 if fmt:
200 message = fmt(message)
201
202 if log.STACKLEVEL:
203 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
204
205 self.engine.logger.debug(message, *arg, **kw)
206
207 @property
208 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]:
209 schema_translate_map: Optional[SchemaTranslateMapType] = (
210 self._execution_options.get("schema_translate_map", None)
211 )
212
213 return schema_translate_map
214
215 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
216 """Return the schema name for the given schema item taking into
217 account current schema translate map.
218
219 """
220
221 name = obj.schema
222 schema_translate_map: Optional[SchemaTranslateMapType] = (
223 self._execution_options.get("schema_translate_map", None)
224 )
225
226 if (
227 schema_translate_map
228 and name in schema_translate_map
229 and obj._use_schema_map
230 ):
231 return schema_translate_map[name]
232 else:
233 return name
234
235 def __enter__(self) -> Connection:
236 return self
237
238 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
239 self.close()
240
241 @overload
242 def execution_options(
243 self,
244 *,
245 compiled_cache: Optional[CompiledCacheType] = ...,
246 logging_token: str = ...,
247 isolation_level: IsolationLevel = ...,
248 no_parameters: bool = False,
249 stream_results: bool = False,
250 max_row_buffer: int = ...,
251 yield_per: int = ...,
252 insertmanyvalues_page_size: int = ...,
253 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
254 preserve_rowcount: bool = False,
255 driver_column_names: bool = False,
256 **opt: Any,
257 ) -> Connection: ...
258
259 @overload
260 def execution_options(self, **opt: Any) -> Connection: ...
261
262 def execution_options(self, **opt: Any) -> Connection:
263 r"""Set non-SQL options for the connection which take effect
264 during execution.
265
266 This method modifies this :class:`_engine.Connection` **in-place**;
267 the return value is the same :class:`_engine.Connection` object
268 upon which the method is called. Note that this is in contrast
269 to the behavior of the ``execution_options`` methods on other
270 objects such as :meth:`_engine.Engine.execution_options` and
271 :meth:`_sql.Executable.execution_options`. The rationale is that many
272 such execution options necessarily modify the state of the base
273 DBAPI connection in any case so there is no feasible means of
274 keeping the effect of such an option localized to a "sub" connection.
275
276 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options`
277 method, in contrast to other objects with this method, modifies
278 the connection in-place without creating copy of it.
279
280 As discussed elsewhere, the :meth:`_engine.Connection.execution_options`
281 method accepts any arbitrary parameters including user defined names.
282 All parameters given are consumable in a number of ways including
283 by using the :meth:`_engine.Connection.get_execution_options` method.
284 See the examples at :meth:`_sql.Executable.execution_options`
285 and :meth:`_engine.Engine.execution_options`.
286
287 The keywords that are currently recognized by SQLAlchemy itself
288 include all those listed under :meth:`.Executable.execution_options`,
289 as well as others that are specific to :class:`_engine.Connection`.
290
291 :param compiled_cache: Available on: :class:`_engine.Connection`,
292 :class:`_engine.Engine`.
293
294 A dictionary where :class:`.Compiled` objects
295 will be cached when the :class:`_engine.Connection`
296 compiles a clause
297 expression into a :class:`.Compiled` object. This dictionary will
298 supersede the statement cache that may be configured on the
299 :class:`_engine.Engine` itself. If set to None, caching
300 is disabled, even if the engine has a configured cache size.
301
302 Note that the ORM makes use of its own "compiled" caches for
303 some operations, including flush operations. The caching
304 used by the ORM internally supersedes a cache dictionary
305 specified here.
306
307 :param logging_token: Available on: :class:`_engine.Connection`,
308 :class:`_engine.Engine`, :class:`_sql.Executable`.
309
310 Adds the specified string token surrounded by brackets in log
311 messages logged by the connection, i.e. the logging that's enabled
312 either via the :paramref:`_sa.create_engine.echo` flag or via the
313 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
314 per-connection or per-sub-engine token to be available which is
315 useful for debugging concurrent connection scenarios.
316
317 .. versionadded:: 1.4.0b2
318
319 .. seealso::
320
321 :ref:`dbengine_logging_tokens` - usage example
322
323 :paramref:`_sa.create_engine.logging_name` - adds a name to the
324 name used by the Python logger object itself.
325
326 :param isolation_level: Available on: :class:`_engine.Connection`,
327 :class:`_engine.Engine`.
328
329 Set the transaction isolation level for the lifespan of this
330 :class:`_engine.Connection` object.
331 Valid values include those string
332 values accepted by the :paramref:`_sa.create_engine.isolation_level`
333 parameter passed to :func:`_sa.create_engine`. These levels are
334 semi-database specific; see individual dialect documentation for
335 valid levels.
336
337 The isolation level option applies the isolation level by emitting
338 statements on the DBAPI connection, and **necessarily affects the
339 original Connection object overall**. The isolation level will remain
340 at the given setting until explicitly changed, or when the DBAPI
341 connection itself is :term:`released` to the connection pool, i.e. the
342 :meth:`_engine.Connection.close` method is called, at which time an
343 event handler will emit additional statements on the DBAPI connection
344 in order to revert the isolation level change.
345
346 .. note:: The ``isolation_level`` execution option may only be
347 established before the :meth:`_engine.Connection.begin` method is
348 called, as well as before any SQL statements are emitted which
349 would otherwise trigger "autobegin", or directly after a call to
350 :meth:`_engine.Connection.commit` or
351 :meth:`_engine.Connection.rollback`. A database cannot change the
352 isolation level on a transaction in progress.
353
354 .. note:: The ``isolation_level`` execution option is implicitly
355 reset if the :class:`_engine.Connection` is invalidated, e.g. via
356 the :meth:`_engine.Connection.invalidate` method, or if a
357 disconnection error occurs. The new connection produced after the
358 invalidation will **not** have the selected isolation level
359 re-applied to it automatically.
360
361 .. seealso::
362
363 :ref:`dbapi_autocommit`
364
365 :meth:`_engine.Connection.get_isolation_level`
366 - view current actual level
367
368 :param no_parameters: Available on: :class:`_engine.Connection`,
369 :class:`_sql.Executable`.
370
371 When ``True``, if the final parameter
372 list or dictionary is totally empty, will invoke the
373 statement on the cursor as ``cursor.execute(statement)``,
374 not passing the parameter collection at all.
375 Some DBAPIs such as psycopg2 and mysql-python consider
376 percent signs as significant only when parameters are
377 present; this option allows code to generate SQL
378 containing percent signs (and possibly other characters)
379 that is neutral regarding whether it's executed by the DBAPI
380 or piped into a script that's later invoked by
381 command line tools.
382
383 :param stream_results: Available on: :class:`_engine.Connection`,
384 :class:`_sql.Executable`.
385
386 Indicate to the dialect that results should be
387 "streamed" and not pre-buffered, if possible. For backends
388 such as PostgreSQL, MySQL and MariaDB, this indicates the use of
389 a "server side cursor" as opposed to a client side cursor.
390 Other backends such as that of Oracle may already use server
391 side cursors by default.
392
393 The usage of
394 :paramref:`_engine.Connection.execution_options.stream_results` is
395 usually combined with setting a fixed number of rows to to be fetched
396 in batches, to allow for efficient iteration of database rows while
397 at the same time not loading all result rows into memory at once;
398 this can be configured on a :class:`_engine.Result` object using the
399 :meth:`_engine.Result.yield_per` method, after execution has
400 returned a new :class:`_engine.Result`. If
401 :meth:`_engine.Result.yield_per` is not used,
402 the :paramref:`_engine.Connection.execution_options.stream_results`
403 mode of operation will instead use a dynamically sized buffer
404 which buffers sets of rows at a time, growing on each batch
405 based on a fixed growth size up until a limit which may
406 be configured using the
407 :paramref:`_engine.Connection.execution_options.max_row_buffer`
408 parameter.
409
410 When using the ORM to fetch ORM mapped objects from a result,
411 :meth:`_engine.Result.yield_per` should always be used with
412 :paramref:`_engine.Connection.execution_options.stream_results`,
413 so that the ORM does not fetch all rows into new ORM objects at once.
414
415 For typical use, the
416 :paramref:`_engine.Connection.execution_options.yield_per` execution
417 option should be preferred, which sets up both
418 :paramref:`_engine.Connection.execution_options.stream_results` and
419 :meth:`_engine.Result.yield_per` at once. This option is supported
420 both at a core level by :class:`_engine.Connection` as well as by the
421 ORM :class:`_engine.Session`; the latter is described at
422 :ref:`orm_queryguide_yield_per`.
423
424 .. seealso::
425
426 :ref:`engine_stream_results` - background on
427 :paramref:`_engine.Connection.execution_options.stream_results`
428
429 :paramref:`_engine.Connection.execution_options.max_row_buffer`
430
431 :paramref:`_engine.Connection.execution_options.yield_per`
432
433 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
434 describing the ORM version of ``yield_per``
435
436 :param max_row_buffer: Available on: :class:`_engine.Connection`,
437 :class:`_sql.Executable`. Sets a maximum
438 buffer size to use when the
439 :paramref:`_engine.Connection.execution_options.stream_results`
440 execution option is used on a backend that supports server side
441 cursors. The default value if not specified is 1000.
442
443 .. seealso::
444
445 :paramref:`_engine.Connection.execution_options.stream_results`
446
447 :ref:`engine_stream_results`
448
449
450 :param yield_per: Available on: :class:`_engine.Connection`,
451 :class:`_sql.Executable`. Integer value applied which will
452 set the :paramref:`_engine.Connection.execution_options.stream_results`
453 execution option and invoke :meth:`_engine.Result.yield_per`
454 automatically at once. Allows equivalent functionality as
455 is present when using this parameter with the ORM.
456
457 .. versionadded:: 1.4.40
458
459 .. seealso::
460
461 :ref:`engine_stream_results` - background and examples
462 on using server side cursors with Core.
463
464 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
465 describing the ORM version of ``yield_per``
466
467 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`,
468 :class:`_engine.Engine`. Number of rows to format into an
469 INSERT statement when the statement uses "insertmanyvalues" mode,
470 which is a paged form of bulk insert that is used for many backends
471 when using :term:`executemany` execution typically in conjunction
472 with RETURNING. Defaults to 1000. May also be modified on a
473 per-engine basis using the
474 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
475
476 .. versionadded:: 2.0
477
478 .. seealso::
479
480 :ref:`engine_insertmanyvalues`
481
482 :param schema_translate_map: Available on: :class:`_engine.Connection`,
483 :class:`_engine.Engine`, :class:`_sql.Executable`.
484
485 A dictionary mapping schema names to schema names, that will be
486 applied to the :paramref:`_schema.Table.schema` element of each
487 :class:`_schema.Table`
488 encountered when SQL or DDL expression elements
489 are compiled into strings; the resulting schema name will be
490 converted based on presence in the map of the original name.
491
492 .. seealso::
493
494 :ref:`schema_translating`
495
496 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount``
497 attribute will be unconditionally memoized within the result and
498 made available via the :attr:`.CursorResult.rowcount` attribute.
499 Normally, this attribute is only preserved for UPDATE and DELETE
500 statements. Using this option, the DBAPIs rowcount value can
501 be accessed for other kinds of statements such as INSERT and SELECT,
502 to the degree that the DBAPI supports these statements. See
503 :attr:`.CursorResult.rowcount` for notes regarding the behavior
504 of this attribute.
505
506 .. versionadded:: 2.0.28
507
508 .. seealso::
509
510 :meth:`_engine.Engine.execution_options`
511
512 :meth:`.Executable.execution_options`
513
514 :meth:`_engine.Connection.get_execution_options`
515
516 :ref:`orm_queryguide_execution_options` - documentation on all
517 ORM-specific execution options
518
519 :param driver_column_names: When True, the returned
520 :class:`_engine.CursorResult` will use the column names as written in
521 ``cursor.description`` to set up the keys for the result set,
522 including the names of columns for the :class:`_engine.Row` object as
523 well as the dictionary keys when using :attr:`_engine.Row._mapping`.
524 On backends that use "name normalization" such as Oracle to correct
525 for lower case names being converted to all uppercase, this behavior
526 is turned off and the raw UPPERCASE names in cursor.description will
527 be present.
528
529 .. versionadded:: 2.1
530
531 """ # noqa
532 if self._has_events or self.engine._has_events:
533 self.dispatch.set_connection_execution_options(self, opt)
534 self._execution_options = self._execution_options.union(opt)
535 self.dialect.set_connection_execution_options(self, opt)
536 return self
537
538 def get_execution_options(self) -> _ExecuteOptions:
539 """Get the non-SQL options which will take effect during execution.
540
541 .. versionadded:: 1.3
542
543 .. seealso::
544
545 :meth:`_engine.Connection.execution_options`
546 """
547 return self._execution_options
548
549 @property
550 def _still_open_and_dbapi_connection_is_valid(self) -> bool:
551 pool_proxied_connection = self._dbapi_connection
552 return (
553 pool_proxied_connection is not None
554 and pool_proxied_connection.is_valid
555 )
556
557 @property
558 def closed(self) -> bool:
559 """Return True if this connection is closed."""
560
561 return self._dbapi_connection is None and not self.__can_reconnect
562
563 @property
564 def invalidated(self) -> bool:
565 """Return True if this connection was invalidated.
566
567 This does not indicate whether or not the connection was
568 invalidated at the pool level, however
569
570 """
571
572 # prior to 1.4, "invalid" was stored as a state independent of
573 # "closed", meaning an invalidated connection could be "closed",
574 # the _dbapi_connection would be None and closed=True, yet the
575 # "invalid" flag would stay True. This meant that there were
576 # three separate states (open/valid, closed/valid, closed/invalid)
577 # when there is really no reason for that; a connection that's
578 # "closed" does not need to be "invalid". So the state is now
579 # represented by the two facts alone.
580
581 pool_proxied_connection = self._dbapi_connection
582 return pool_proxied_connection is None and self.__can_reconnect
583
584 @property
585 def connection(self) -> PoolProxiedConnection:
586 """The underlying DB-API connection managed by this Connection.
587
588 This is a SQLAlchemy connection-pool proxied connection
589 which then has the attribute
590 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
591 actual driver connection.
592
593 .. seealso::
594
595
596 :ref:`dbapi_connections`
597
598 """
599
600 if self._dbapi_connection is None:
601 try:
602 return self._revalidate_connection()
603 except (exc.PendingRollbackError, exc.ResourceClosedError):
604 raise
605 except BaseException as e:
606 self._handle_dbapi_exception(e, None, None, None, None)
607 else:
608 return self._dbapi_connection
609
610 def get_isolation_level(self) -> IsolationLevel:
611 """Return the current **actual** isolation level that's present on
612 the database within the scope of this connection.
613
614 This attribute will perform a live SQL operation against the database
615 in order to procure the current isolation level, so the value returned
616 is the actual level on the underlying DBAPI connection regardless of
617 how this state was set. This will be one of the four actual isolation
618 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
619 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
620 level setting. Third party dialects may also feature additional
621 isolation level settings.
622
623 .. note:: This method **will not report** on the ``AUTOCOMMIT``
624 isolation level, which is a separate :term:`dbapi` setting that's
625 independent of **actual** isolation level. When ``AUTOCOMMIT`` is
626 in use, the database connection still has a "traditional" isolation
627 mode in effect, that is typically one of the four values
628 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
629 ``SERIALIZABLE``.
630
631 Compare to the :attr:`_engine.Connection.default_isolation_level`
632 accessor which returns the isolation level that is present on the
633 database at initial connection time.
634
635 .. seealso::
636
637 :attr:`_engine.Connection.default_isolation_level`
638 - view default level
639
640 :paramref:`_sa.create_engine.isolation_level`
641 - set per :class:`_engine.Engine` isolation level
642
643 :paramref:`.Connection.execution_options.isolation_level`
644 - set per :class:`_engine.Connection` isolation level
645
646 """
647 dbapi_connection = self.connection.dbapi_connection
648 assert dbapi_connection is not None
649 try:
650 return self.dialect.get_isolation_level(dbapi_connection)
651 except BaseException as e:
652 self._handle_dbapi_exception(e, None, None, None, None)
653
654 @property
655 def default_isolation_level(self) -> Optional[IsolationLevel]:
656 """The initial-connection time isolation level associated with the
657 :class:`_engine.Dialect` in use.
658
659 This value is independent of the
660 :paramref:`.Connection.execution_options.isolation_level` and
661 :paramref:`.Engine.execution_options.isolation_level` execution
662 options, and is determined by the :class:`_engine.Dialect` when the
663 first connection is created, by performing a SQL query against the
664 database for the current isolation level before any additional commands
665 have been emitted.
666
667 Calling this accessor does not invoke any new SQL queries.
668
669 .. seealso::
670
671 :meth:`_engine.Connection.get_isolation_level`
672 - view current actual isolation level
673
674 :paramref:`_sa.create_engine.isolation_level`
675 - set per :class:`_engine.Engine` isolation level
676
677 :paramref:`.Connection.execution_options.isolation_level`
678 - set per :class:`_engine.Connection` isolation level
679
680 """
681 return self.dialect.default_isolation_level
682
683 def _invalid_transaction(self) -> NoReturn:
684 raise exc.PendingRollbackError(
685 "Can't reconnect until invalid %stransaction is rolled "
686 "back. Please rollback() fully before proceeding"
687 % ("savepoint " if self._nested_transaction is not None else ""),
688 code="8s2b",
689 )
690
691 def _revalidate_connection(self) -> PoolProxiedConnection:
692 if self.__can_reconnect and self.invalidated:
693 if self._transaction is not None:
694 self._invalid_transaction()
695 self._dbapi_connection = self.engine.raw_connection()
696 return self._dbapi_connection
697 raise exc.ResourceClosedError("This Connection is closed")
698
699 @property
700 def info(self) -> _InfoType:
701 """Info dictionary associated with the underlying DBAPI connection
702 referred to by this :class:`_engine.Connection`, allowing user-defined
703 data to be associated with the connection.
704
705 The data here will follow along with the DBAPI connection including
706 after it is returned to the connection pool and used again
707 in subsequent instances of :class:`_engine.Connection`.
708
709 """
710
711 return self.connection.info
712
713 def invalidate(self, exception: Optional[BaseException] = None) -> None:
714 """Invalidate the underlying DBAPI connection associated with
715 this :class:`_engine.Connection`.
716
717 An attempt will be made to close the underlying DBAPI connection
718 immediately; however if this operation fails, the error is logged
719 but not raised. The connection is then discarded whether or not
720 close() succeeded.
721
722 Upon the next use (where "use" typically means using the
723 :meth:`_engine.Connection.execute` method or similar),
724 this :class:`_engine.Connection` will attempt to
725 procure a new DBAPI connection using the services of the
726 :class:`_pool.Pool` as a source of connectivity (e.g.
727 a "reconnection").
728
729 If a transaction was in progress (e.g. the
730 :meth:`_engine.Connection.begin` method has been called) when
731 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
732 level all state associated with this transaction is lost, as
733 the DBAPI connection is closed. The :class:`_engine.Connection`
734 will not allow a reconnection to proceed until the
735 :class:`.Transaction` object is ended, by calling the
736 :meth:`.Transaction.rollback` method; until that point, any attempt at
737 continuing to use the :class:`_engine.Connection` will raise an
738 :class:`~sqlalchemy.exc.InvalidRequestError`.
739 This is to prevent applications from accidentally
740 continuing an ongoing transactional operations despite the
741 fact that the transaction has been lost due to an
742 invalidation.
743
744 The :meth:`_engine.Connection.invalidate` method,
745 just like auto-invalidation,
746 will at the connection pool level invoke the
747 :meth:`_events.PoolEvents.invalidate` event.
748
749 :param exception: an optional ``Exception`` instance that's the
750 reason for the invalidation. is passed along to event handlers
751 and logging functions.
752
753 .. seealso::
754
755 :ref:`pool_connection_invalidation`
756
757 """
758
759 if self.invalidated:
760 return
761
762 if self.closed:
763 raise exc.ResourceClosedError("This Connection is closed")
764
765 if self._still_open_and_dbapi_connection_is_valid:
766 pool_proxied_connection = self._dbapi_connection
767 assert pool_proxied_connection is not None
768 pool_proxied_connection.invalidate(exception)
769
770 self._dbapi_connection = None
771
772 def detach(self) -> None:
773 """Detach the underlying DB-API connection from its connection pool.
774
775 E.g.::
776
777 with engine.connect() as conn:
778 conn.detach()
779 conn.execute(text("SET search_path TO schema1, schema2"))
780
781 # work with connection
782
783 # connection is fully closed (since we used "with:", can
784 # also call .close())
785
786 This :class:`_engine.Connection` instance will remain usable.
787 When closed
788 (or exited from a context manager context as above),
789 the DB-API connection will be literally closed and not
790 returned to its originating pool.
791
792 This method can be used to insulate the rest of an application
793 from a modified state on a connection (such as a transaction
794 isolation level or similar).
795
796 """
797
798 if self.closed:
799 raise exc.ResourceClosedError("This Connection is closed")
800
801 pool_proxied_connection = self._dbapi_connection
802 if pool_proxied_connection is None:
803 raise exc.InvalidRequestError(
804 "Can't detach an invalidated Connection"
805 )
806 pool_proxied_connection.detach()
807
808 def _autobegin(self) -> None:
809 if self._allow_autobegin and not self.__in_begin:
810 self.begin()
811
812 def begin(self) -> RootTransaction:
813 """Begin a transaction prior to autobegin occurring.
814
815 E.g.::
816
817 with engine.connect() as conn:
818 with conn.begin() as trans:
819 conn.execute(table.insert(), {"username": "sandy"})
820
821
822 The returned object is an instance of :class:`_engine.RootTransaction`.
823 This object represents the "scope" of the transaction,
824 which completes when either the :meth:`_engine.Transaction.rollback`
825 or :meth:`_engine.Transaction.commit` method is called; the object
826 also works as a context manager as illustrated above.
827
828 The :meth:`_engine.Connection.begin` method begins a
829 transaction that normally will be begun in any case when the connection
830 is first used to execute a statement. The reason this method might be
831 used would be to invoke the :meth:`_events.ConnectionEvents.begin`
832 event at a specific time, or to organize code within the scope of a
833 connection checkout in terms of context managed blocks, such as::
834
835 with engine.connect() as conn:
836 with conn.begin():
837 conn.execute(...)
838 conn.execute(...)
839
840 with conn.begin():
841 conn.execute(...)
842 conn.execute(...)
843
844 The above code is not fundamentally any different in its behavior than
845 the following code which does not use
846 :meth:`_engine.Connection.begin`; the below style is known
847 as "commit as you go" style::
848
849 with engine.connect() as conn:
850 conn.execute(...)
851 conn.execute(...)
852 conn.commit()
853
854 conn.execute(...)
855 conn.execute(...)
856 conn.commit()
857
858 From a database point of view, the :meth:`_engine.Connection.begin`
859 method does not emit any SQL or change the state of the underlying
860 DBAPI connection in any way; the Python DBAPI does not have any
861 concept of explicit transaction begin.
862
863 .. seealso::
864
865 :ref:`tutorial_working_with_transactions` - in the
866 :ref:`unified_tutorial`
867
868 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
869
870 :meth:`_engine.Connection.begin_twophase` -
871 use a two phase /XID transaction
872
873 :meth:`_engine.Engine.begin` - context manager available from
874 :class:`_engine.Engine`
875
876 """
877 if self._transaction is None:
878 self._transaction = RootTransaction(self)
879 return self._transaction
880 else:
881 raise exc.InvalidRequestError(
882 "This connection has already initialized a SQLAlchemy "
883 "Transaction() object via begin() or autobegin; can't "
884 "call begin() here unless rollback() or commit() "
885 "is called first."
886 )
887
888 def begin_nested(self) -> NestedTransaction:
889 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
890 handle that controls the scope of the SAVEPOINT.
891
892 E.g.::
893
894 with engine.begin() as connection:
895 with connection.begin_nested():
896 connection.execute(table.insert(), {"username": "sandy"})
897
898 The returned object is an instance of
899 :class:`_engine.NestedTransaction`, which includes transactional
900 methods :meth:`_engine.NestedTransaction.commit` and
901 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction,
902 these methods correspond to the operations "RELEASE SAVEPOINT <name>"
903 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local
904 to the :class:`_engine.NestedTransaction` object and is generated
905 automatically. Like any other :class:`_engine.Transaction`, the
906 :class:`_engine.NestedTransaction` may be used as a context manager as
907 illustrated above which will "release" or "rollback" corresponding to
908 if the operation within the block were successful or raised an
909 exception.
910
911 Nested transactions require SAVEPOINT support in the underlying
912 database, else the behavior is undefined. SAVEPOINT is commonly used to
913 run operations within a transaction that may fail, while continuing the
914 outer transaction. E.g.::
915
916 from sqlalchemy import exc
917
918 with engine.begin() as connection:
919 trans = connection.begin_nested()
920 try:
921 connection.execute(table.insert(), {"username": "sandy"})
922 trans.commit()
923 except exc.IntegrityError: # catch for duplicate username
924 trans.rollback() # rollback to savepoint
925
926 # outer transaction continues
927 connection.execute( ... )
928
929 If :meth:`_engine.Connection.begin_nested` is called without first
930 calling :meth:`_engine.Connection.begin` or
931 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object
932 will "autobegin" the outer transaction first. This outer transaction
933 may be committed using "commit-as-you-go" style, e.g.::
934
935 with engine.connect() as connection: # begin() wasn't called
936
937 with connection.begin_nested(): will auto-"begin()" first
938 connection.execute( ... )
939 # savepoint is released
940
941 connection.execute( ... )
942
943 # explicitly commit outer transaction
944 connection.commit()
945
946 # can continue working with connection here
947
948 .. versionchanged:: 2.0
949
950 :meth:`_engine.Connection.begin_nested` will now participate
951 in the connection "autobegin" behavior that is new as of
952 2.0 / "future" style connections in 1.4.
953
954 .. seealso::
955
956 :meth:`_engine.Connection.begin`
957
958 :ref:`session_begin_nested` - ORM support for SAVEPOINT
959
960 """
961 if self._transaction is None:
962 self._autobegin()
963
964 return NestedTransaction(self)
965
966 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
967 """Begin a two-phase or XA transaction and return a transaction
968 handle.
969
970 The returned object is an instance of :class:`.TwoPhaseTransaction`,
971 which in addition to the methods provided by
972 :class:`.Transaction`, also provides a
973 :meth:`~.TwoPhaseTransaction.prepare` method.
974
975 :param xid: the two phase transaction id. If not supplied, a
976 random id will be generated.
977
978 .. seealso::
979
980 :meth:`_engine.Connection.begin`
981
982 :meth:`_engine.Connection.begin_twophase`
983
984 """
985
986 if self._transaction is not None:
987 raise exc.InvalidRequestError(
988 "Cannot start a two phase transaction when a transaction "
989 "is already in progress."
990 )
991 if xid is None:
992 xid = self.engine.dialect.create_xid()
993 return TwoPhaseTransaction(self, xid)
994
995 def commit(self) -> None:
996 """Commit the transaction that is currently in progress.
997
998 This method commits the current transaction if one has been started.
999 If no transaction was started, the method has no effect, assuming
1000 the connection is in a non-invalidated state.
1001
1002 A transaction is begun on a :class:`_engine.Connection` automatically
1003 whenever a statement is first executed, or when the
1004 :meth:`_engine.Connection.begin` method is called.
1005
1006 .. note:: The :meth:`_engine.Connection.commit` method only acts upon
1007 the primary database transaction that is linked to the
1008 :class:`_engine.Connection` object. It does not operate upon a
1009 SAVEPOINT that would have been invoked from the
1010 :meth:`_engine.Connection.begin_nested` method; for control of a
1011 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
1012 :class:`_engine.NestedTransaction` that is returned by the
1013 :meth:`_engine.Connection.begin_nested` method itself.
1014
1015
1016 """
1017 if self._transaction:
1018 self._transaction.commit()
1019
1020 def rollback(self) -> None:
1021 """Roll back the transaction that is currently in progress.
1022
1023 This method rolls back the current transaction if one has been started.
1024 If no transaction was started, the method has no effect. If a
1025 transaction was started and the connection is in an invalidated state,
1026 the transaction is cleared using this method.
1027
1028 A transaction is begun on a :class:`_engine.Connection` automatically
1029 whenever a statement is first executed, or when the
1030 :meth:`_engine.Connection.begin` method is called.
1031
1032 .. note:: The :meth:`_engine.Connection.rollback` method only acts
1033 upon the primary database transaction that is linked to the
1034 :class:`_engine.Connection` object. It does not operate upon a
1035 SAVEPOINT that would have been invoked from the
1036 :meth:`_engine.Connection.begin_nested` method; for control of a
1037 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
1038 :class:`_engine.NestedTransaction` that is returned by the
1039 :meth:`_engine.Connection.begin_nested` method itself.
1040
1041
1042 """
1043 if self._transaction:
1044 self._transaction.rollback()
1045
1046 def recover_twophase(self) -> List[Any]:
1047 return self.engine.dialect.do_recover_twophase(self)
1048
1049 def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
1050 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
1051
1052 def commit_prepared(self, xid: Any, recover: bool = False) -> None:
1053 self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
1054
1055 def in_transaction(self) -> bool:
1056 """Return True if a transaction is in progress."""
1057 return self._transaction is not None and self._transaction.is_active
1058
1059 def in_nested_transaction(self) -> bool:
1060 """Return True if a transaction is in progress."""
1061 return (
1062 self._nested_transaction is not None
1063 and self._nested_transaction.is_active
1064 )
1065
1066 def _is_autocommit_isolation(self) -> bool:
1067 opt_iso = self._execution_options.get("isolation_level", None)
1068 return bool(
1069 opt_iso == "AUTOCOMMIT"
1070 or (
1071 opt_iso is None
1072 and self.engine.dialect._on_connect_isolation_level
1073 == "AUTOCOMMIT"
1074 )
1075 )
1076
1077 def _get_required_transaction(self) -> RootTransaction:
1078 trans = self._transaction
1079 if trans is None:
1080 raise exc.InvalidRequestError("connection is not in a transaction")
1081 return trans
1082
1083 def _get_required_nested_transaction(self) -> NestedTransaction:
1084 trans = self._nested_transaction
1085 if trans is None:
1086 raise exc.InvalidRequestError(
1087 "connection is not in a nested transaction"
1088 )
1089 return trans
1090
1091 def get_transaction(self) -> Optional[RootTransaction]:
1092 """Return the current root transaction in progress, if any.
1093
1094 .. versionadded:: 1.4
1095
1096 """
1097
1098 return self._transaction
1099
1100 def get_nested_transaction(self) -> Optional[NestedTransaction]:
1101 """Return the current nested transaction in progress, if any.
1102
1103 .. versionadded:: 1.4
1104
1105 """
1106 return self._nested_transaction
1107
1108 def _begin_impl(self, transaction: RootTransaction) -> None:
1109 if self._echo:
1110 if self._is_autocommit_isolation():
1111 self._log_info(
1112 "BEGIN (implicit; DBAPI should not BEGIN due to "
1113 "autocommit mode)"
1114 )
1115 else:
1116 self._log_info("BEGIN (implicit)")
1117
1118 self.__in_begin = True
1119
1120 if self._has_events or self.engine._has_events:
1121 self.dispatch.begin(self)
1122
1123 try:
1124 self.engine.dialect.do_begin(self.connection)
1125 except BaseException as e:
1126 self._handle_dbapi_exception(e, None, None, None, None)
1127 finally:
1128 self.__in_begin = False
1129
1130 def _rollback_impl(self) -> None:
1131 if self._has_events or self.engine._has_events:
1132 self.dispatch.rollback(self)
1133
1134 if self._still_open_and_dbapi_connection_is_valid:
1135 if self._echo:
1136 if self._is_autocommit_isolation():
1137 self._log_info(
1138 "ROLLBACK using DBAPI connection.rollback(), "
1139 "DBAPI should ignore due to autocommit mode"
1140 )
1141 else:
1142 self._log_info("ROLLBACK")
1143 try:
1144 self.engine.dialect.do_rollback(self.connection)
1145 except BaseException as e:
1146 self._handle_dbapi_exception(e, None, None, None, None)
1147
1148 def _commit_impl(self) -> None:
1149 if self._has_events or self.engine._has_events:
1150 self.dispatch.commit(self)
1151
1152 if self._echo:
1153 if self._is_autocommit_isolation():
1154 self._log_info(
1155 "COMMIT using DBAPI connection.commit(), "
1156 "DBAPI should ignore due to autocommit mode"
1157 )
1158 else:
1159 self._log_info("COMMIT")
1160 try:
1161 self.engine.dialect.do_commit(self.connection)
1162 except BaseException as e:
1163 self._handle_dbapi_exception(e, None, None, None, None)
1164
1165 def _savepoint_impl(self, name: Optional[str] = None) -> str:
1166 if self._has_events or self.engine._has_events:
1167 self.dispatch.savepoint(self, name)
1168
1169 if name is None:
1170 self.__savepoint_seq += 1
1171 name = "sa_savepoint_%s" % self.__savepoint_seq
1172 self.engine.dialect.do_savepoint(self, name)
1173 return name
1174
1175 def _rollback_to_savepoint_impl(self, name: str) -> None:
1176 if self._has_events or self.engine._has_events:
1177 self.dispatch.rollback_savepoint(self, name, None)
1178
1179 if self._still_open_and_dbapi_connection_is_valid:
1180 self.engine.dialect.do_rollback_to_savepoint(self, name)
1181
1182 def _release_savepoint_impl(self, name: str) -> None:
1183 if self._has_events or self.engine._has_events:
1184 self.dispatch.release_savepoint(self, name, None)
1185
1186 self.engine.dialect.do_release_savepoint(self, name)
1187
1188 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
1189 if self._echo:
1190 self._log_info("BEGIN TWOPHASE (implicit)")
1191 if self._has_events or self.engine._has_events:
1192 self.dispatch.begin_twophase(self, transaction.xid)
1193
1194 self.__in_begin = True
1195 try:
1196 self.engine.dialect.do_begin_twophase(self, transaction.xid)
1197 except BaseException as e:
1198 self._handle_dbapi_exception(e, None, None, None, None)
1199 finally:
1200 self.__in_begin = False
1201
1202 def _prepare_twophase_impl(self, xid: Any) -> None:
1203 if self._has_events or self.engine._has_events:
1204 self.dispatch.prepare_twophase(self, xid)
1205
1206 assert isinstance(self._transaction, TwoPhaseTransaction)
1207 try:
1208 self.engine.dialect.do_prepare_twophase(self, xid)
1209 except BaseException as e:
1210 self._handle_dbapi_exception(e, None, None, None, None)
1211
1212 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1213 if self._has_events or self.engine._has_events:
1214 self.dispatch.rollback_twophase(self, xid, is_prepared)
1215
1216 if self._still_open_and_dbapi_connection_is_valid:
1217 assert isinstance(self._transaction, TwoPhaseTransaction)
1218 try:
1219 self.engine.dialect.do_rollback_twophase(
1220 self, xid, is_prepared
1221 )
1222 except BaseException as e:
1223 self._handle_dbapi_exception(e, None, None, None, None)
1224
1225 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1226 if self._has_events or self.engine._has_events:
1227 self.dispatch.commit_twophase(self, xid, is_prepared)
1228
1229 assert isinstance(self._transaction, TwoPhaseTransaction)
1230 try:
1231 self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
1232 except BaseException as e:
1233 self._handle_dbapi_exception(e, None, None, None, None)
1234
1235 def close(self) -> None:
1236 """Close this :class:`_engine.Connection`.
1237
1238 This results in a release of the underlying database
1239 resources, that is, the DBAPI connection referenced
1240 internally. The DBAPI connection is typically restored
1241 back to the connection-holding :class:`_pool.Pool` referenced
1242 by the :class:`_engine.Engine` that produced this
1243 :class:`_engine.Connection`. Any transactional state present on
1244 the DBAPI connection is also unconditionally released via
1245 the DBAPI connection's ``rollback()`` method, regardless
1246 of any :class:`.Transaction` object that may be
1247 outstanding with regards to this :class:`_engine.Connection`.
1248
1249 This has the effect of also calling :meth:`_engine.Connection.rollback`
1250 if any transaction is in place.
1251
1252 After :meth:`_engine.Connection.close` is called, the
1253 :class:`_engine.Connection` is permanently in a closed state,
1254 and will allow no further operations.
1255
1256 """
1257
1258 if self._transaction:
1259 self._transaction.close()
1260 skip_reset = True
1261 else:
1262 skip_reset = False
1263
1264 if self._dbapi_connection is not None:
1265 conn = self._dbapi_connection
1266
1267 # as we just closed the transaction, close the connection
1268 # pool connection without doing an additional reset
1269 if skip_reset:
1270 cast("_ConnectionFairy", conn)._close_special(
1271 transaction_reset=True
1272 )
1273 else:
1274 conn.close()
1275
1276 # There is a slight chance that conn.close() may have
1277 # triggered an invalidation here in which case
1278 # _dbapi_connection would already be None, however usually
1279 # it will be non-None here and in a "closed" state.
1280 self._dbapi_connection = None
1281 self.__can_reconnect = False
1282
1283 @overload
1284 def scalar(
1285 self,
1286 statement: TypedReturnsRows[_T],
1287 parameters: Optional[_CoreSingleExecuteParams] = None,
1288 *,
1289 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1290 ) -> Optional[_T]: ...
1291
1292 @overload
1293 def scalar(
1294 self,
1295 statement: Executable,
1296 parameters: Optional[_CoreSingleExecuteParams] = None,
1297 *,
1298 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1299 ) -> Any: ...
1300
1301 def scalar(
1302 self,
1303 statement: Executable,
1304 parameters: Optional[_CoreSingleExecuteParams] = None,
1305 *,
1306 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1307 ) -> Any:
1308 r"""Executes a SQL statement construct and returns a scalar object.
1309
1310 This method is shorthand for invoking the
1311 :meth:`_engine.Result.scalar` method after invoking the
1312 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
1313
1314 :return: a scalar Python value representing the first column of the
1315 first row returned.
1316
1317 """
1318 distilled_parameters = _distill_params_20(parameters)
1319 try:
1320 meth = statement._execute_on_scalar
1321 except AttributeError as err:
1322 raise exc.ObjectNotExecutableError(statement) from err
1323 else:
1324 return meth(
1325 self,
1326 distilled_parameters,
1327 execution_options or NO_OPTIONS,
1328 )
1329
1330 @overload
1331 def scalars(
1332 self,
1333 statement: TypedReturnsRows[_T],
1334 parameters: Optional[_CoreAnyExecuteParams] = None,
1335 *,
1336 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1337 ) -> ScalarResult[_T]: ...
1338
1339 @overload
1340 def scalars(
1341 self,
1342 statement: Executable,
1343 parameters: Optional[_CoreAnyExecuteParams] = None,
1344 *,
1345 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1346 ) -> ScalarResult[Any]: ...
1347
1348 def scalars(
1349 self,
1350 statement: Executable,
1351 parameters: Optional[_CoreAnyExecuteParams] = None,
1352 *,
1353 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1354 ) -> ScalarResult[Any]:
1355 """Executes and returns a scalar result set, which yields scalar values
1356 from the first column of each row.
1357
1358 This method is equivalent to calling :meth:`_engine.Connection.execute`
1359 to receive a :class:`_result.Result` object, then invoking the
1360 :meth:`_result.Result.scalars` method to produce a
1361 :class:`_result.ScalarResult` instance.
1362
1363 :return: a :class:`_result.ScalarResult`
1364
1365 .. versionadded:: 1.4.24
1366
1367 """
1368
1369 return self.execute(
1370 statement, parameters, execution_options=execution_options
1371 ).scalars()
1372
1373 @overload
1374 def execute(
1375 self,
1376 statement: TypedReturnsRows[Unpack[_Ts]],
1377 parameters: Optional[_CoreAnyExecuteParams] = None,
1378 *,
1379 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1380 ) -> CursorResult[Unpack[_Ts]]: ...
1381
1382 @overload
1383 def execute(
1384 self,
1385 statement: Executable,
1386 parameters: Optional[_CoreAnyExecuteParams] = None,
1387 *,
1388 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1389 ) -> CursorResult[Unpack[TupleAny]]: ...
1390
1391 def execute(
1392 self,
1393 statement: Executable,
1394 parameters: Optional[_CoreAnyExecuteParams] = None,
1395 *,
1396 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1397 ) -> CursorResult[Unpack[TupleAny]]:
1398 r"""Executes a SQL statement construct and returns a
1399 :class:`_engine.CursorResult`.
1400
1401 :param statement: The statement to be executed. This is always
1402 an object that is in both the :class:`_expression.ClauseElement` and
1403 :class:`_expression.Executable` hierarchies, including:
1404
1405 * :class:`_expression.Select`
1406 * :class:`_expression.Insert`, :class:`_expression.Update`,
1407 :class:`_expression.Delete`
1408 * :class:`_expression.TextClause` and
1409 :class:`_expression.TextualSelect`
1410 * :class:`_schema.DDL` and objects which inherit from
1411 :class:`_schema.ExecutableDDLElement`
1412
1413 :param parameters: parameters which will be bound into the statement.
1414 This may be either a dictionary of parameter names to values,
1415 or a mutable sequence (e.g. a list) of dictionaries. When a
1416 list of dictionaries is passed, the underlying statement execution
1417 will make use of the DBAPI ``cursor.executemany()`` method.
1418 When a single dictionary is passed, the DBAPI ``cursor.execute()``
1419 method will be used.
1420
1421 :param execution_options: optional dictionary of execution options,
1422 which will be associated with the statement execution. This
1423 dictionary can provide a subset of the options that are accepted
1424 by :meth:`_engine.Connection.execution_options`.
1425
1426 :return: a :class:`_engine.Result` object.
1427
1428 """
1429 distilled_parameters = _distill_params_20(parameters)
1430 try:
1431 meth = statement._execute_on_connection
1432 except AttributeError as err:
1433 raise exc.ObjectNotExecutableError(statement) from err
1434 else:
1435 return meth(
1436 self,
1437 distilled_parameters,
1438 execution_options or NO_OPTIONS,
1439 )
1440
1441 def _execute_function(
1442 self,
1443 func: FunctionElement[Any],
1444 distilled_parameters: _CoreMultiExecuteParams,
1445 execution_options: CoreExecuteOptionsParameter,
1446 ) -> CursorResult[Unpack[TupleAny]]:
1447 """Execute a sql.FunctionElement object."""
1448
1449 return self._execute_clauseelement(
1450 func.select(), distilled_parameters, execution_options
1451 )
1452
1453 def _execute_default(
1454 self,
1455 default: DefaultGenerator,
1456 distilled_parameters: _CoreMultiExecuteParams,
1457 execution_options: CoreExecuteOptionsParameter,
1458 ) -> Any:
1459 """Execute a schema.ColumnDefault object."""
1460
1461 exec_opts = self._execution_options.merge_with(execution_options)
1462
1463 event_multiparams: Optional[_CoreMultiExecuteParams]
1464 event_params: Optional[_CoreAnyExecuteParams]
1465
1466 # note for event handlers, the "distilled parameters" which is always
1467 # a list of dicts is broken out into separate "multiparams" and
1468 # "params" collections, which allows the handler to distinguish
1469 # between an executemany and execute style set of parameters.
1470 if self._has_events or self.engine._has_events:
1471 (
1472 default,
1473 distilled_parameters,
1474 event_multiparams,
1475 event_params,
1476 ) = self._invoke_before_exec_event(
1477 default, distilled_parameters, exec_opts
1478 )
1479 else:
1480 event_multiparams = event_params = None
1481
1482 try:
1483 conn = self._dbapi_connection
1484 if conn is None:
1485 conn = self._revalidate_connection()
1486
1487 dialect = self.dialect
1488 ctx = dialect.execution_ctx_cls._init_default(
1489 dialect, self, conn, exec_opts
1490 )
1491 except (exc.PendingRollbackError, exc.ResourceClosedError):
1492 raise
1493 except BaseException as e:
1494 self._handle_dbapi_exception(e, None, None, None, None)
1495
1496 ret = ctx._exec_default(None, default, None)
1497
1498 if self._has_events or self.engine._has_events:
1499 self.dispatch.after_execute(
1500 self,
1501 default,
1502 event_multiparams,
1503 event_params,
1504 exec_opts,
1505 ret,
1506 )
1507
1508 return ret
1509
1510 def _execute_ddl(
1511 self,
1512 ddl: ExecutableDDLElement,
1513 distilled_parameters: _CoreMultiExecuteParams,
1514 execution_options: CoreExecuteOptionsParameter,
1515 ) -> CursorResult[Unpack[TupleAny]]:
1516 """Execute a schema.DDL object."""
1517
1518 exec_opts = ddl._execution_options.merge_with(
1519 self._execution_options, execution_options
1520 )
1521
1522 event_multiparams: Optional[_CoreMultiExecuteParams]
1523 event_params: Optional[_CoreSingleExecuteParams]
1524
1525 if self._has_events or self.engine._has_events:
1526 (
1527 ddl,
1528 distilled_parameters,
1529 event_multiparams,
1530 event_params,
1531 ) = self._invoke_before_exec_event(
1532 ddl, distilled_parameters, exec_opts
1533 )
1534 else:
1535 event_multiparams = event_params = None
1536
1537 schema_translate_map = exec_opts.get("schema_translate_map", None)
1538
1539 dialect = self.dialect
1540
1541 compiled = ddl.compile(
1542 dialect=dialect, schema_translate_map=schema_translate_map
1543 )
1544 ret = self._execute_context(
1545 dialect,
1546 dialect.execution_ctx_cls._init_ddl,
1547 compiled,
1548 None,
1549 exec_opts,
1550 compiled,
1551 )
1552 if self._has_events or self.engine._has_events:
1553 self.dispatch.after_execute(
1554 self,
1555 ddl,
1556 event_multiparams,
1557 event_params,
1558 exec_opts,
1559 ret,
1560 )
1561 return ret
1562
1563 def _invoke_before_exec_event(
1564 self,
1565 elem: Any,
1566 distilled_params: _CoreMultiExecuteParams,
1567 execution_options: _ExecuteOptions,
1568 ) -> Tuple[
1569 Any,
1570 _CoreMultiExecuteParams,
1571 _CoreMultiExecuteParams,
1572 _CoreSingleExecuteParams,
1573 ]:
1574 event_multiparams: _CoreMultiExecuteParams
1575 event_params: _CoreSingleExecuteParams
1576
1577 if len(distilled_params) == 1:
1578 event_multiparams, event_params = [], distilled_params[0]
1579 else:
1580 event_multiparams, event_params = distilled_params, {}
1581
1582 for fn in self.dispatch.before_execute:
1583 elem, event_multiparams, event_params = fn(
1584 self,
1585 elem,
1586 event_multiparams,
1587 event_params,
1588 execution_options,
1589 )
1590
1591 if event_multiparams:
1592 distilled_params = list(event_multiparams)
1593 if event_params:
1594 raise exc.InvalidRequestError(
1595 "Event handler can't return non-empty multiparams "
1596 "and params at the same time"
1597 )
1598 elif event_params:
1599 distilled_params = [event_params]
1600 else:
1601 distilled_params = []
1602
1603 return elem, distilled_params, event_multiparams, event_params
1604
1605 def _execute_clauseelement(
1606 self,
1607 elem: Executable,
1608 distilled_parameters: _CoreMultiExecuteParams,
1609 execution_options: CoreExecuteOptionsParameter,
1610 ) -> CursorResult[Unpack[TupleAny]]:
1611 """Execute a sql.ClauseElement object."""
1612
1613 exec_opts = elem._execution_options.merge_with(
1614 self._execution_options, execution_options
1615 )
1616
1617 has_events = self._has_events or self.engine._has_events
1618 if has_events:
1619 (
1620 elem,
1621 distilled_parameters,
1622 event_multiparams,
1623 event_params,
1624 ) = self._invoke_before_exec_event(
1625 elem, distilled_parameters, exec_opts
1626 )
1627
1628 if distilled_parameters:
1629 # ensure we don't retain a link to the view object for keys()
1630 # which links to the values, which we don't want to cache
1631 keys = sorted(distilled_parameters[0])
1632 for_executemany = len(distilled_parameters) > 1
1633 else:
1634 keys = []
1635 for_executemany = False
1636
1637 dialect = self.dialect
1638
1639 schema_translate_map = exec_opts.get("schema_translate_map", None)
1640
1641 compiled_cache: Optional[CompiledCacheType] = exec_opts.get(
1642 "compiled_cache", self.engine._compiled_cache
1643 )
1644
1645 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
1646 dialect=dialect,
1647 compiled_cache=compiled_cache,
1648 column_keys=keys,
1649 for_executemany=for_executemany,
1650 schema_translate_map=schema_translate_map,
1651 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1652 )
1653 ret = self._execute_context(
1654 dialect,
1655 dialect.execution_ctx_cls._init_compiled,
1656 compiled_sql,
1657 distilled_parameters,
1658 exec_opts,
1659 compiled_sql,
1660 distilled_parameters,
1661 elem,
1662 extracted_params,
1663 cache_hit=cache_hit,
1664 )
1665 if has_events:
1666 self.dispatch.after_execute(
1667 self,
1668 elem,
1669 event_multiparams,
1670 event_params,
1671 exec_opts,
1672 ret,
1673 )
1674 return ret
1675
1676 def _execute_compiled(
1677 self,
1678 compiled: Compiled,
1679 distilled_parameters: _CoreMultiExecuteParams,
1680 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
1681 ) -> CursorResult[Unpack[TupleAny]]:
1682 """Execute a sql.Compiled object.
1683
1684 TODO: why do we have this? likely deprecate or remove
1685
1686 """
1687
1688 exec_opts = compiled.execution_options.merge_with(
1689 self._execution_options, execution_options
1690 )
1691
1692 if self._has_events or self.engine._has_events:
1693 (
1694 compiled,
1695 distilled_parameters,
1696 event_multiparams,
1697 event_params,
1698 ) = self._invoke_before_exec_event(
1699 compiled, distilled_parameters, exec_opts
1700 )
1701
1702 dialect = self.dialect
1703
1704 ret = self._execute_context(
1705 dialect,
1706 dialect.execution_ctx_cls._init_compiled,
1707 compiled,
1708 distilled_parameters,
1709 exec_opts,
1710 compiled,
1711 distilled_parameters,
1712 None,
1713 None,
1714 )
1715 if self._has_events or self.engine._has_events:
1716 self.dispatch.after_execute(
1717 self,
1718 compiled,
1719 event_multiparams,
1720 event_params,
1721 exec_opts,
1722 ret,
1723 )
1724 return ret
1725
1726 def exec_driver_sql(
1727 self,
1728 statement: str,
1729 parameters: Optional[_DBAPIAnyExecuteParams] = None,
1730 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1731 ) -> CursorResult[Unpack[TupleAny]]:
1732 r"""Executes a string SQL statement on the DBAPI cursor directly,
1733 without any SQL compilation steps.
1734
1735 This can be used to pass any string directly to the
1736 ``cursor.execute()`` method of the DBAPI in use.
1737
1738 :param statement: The statement str to be executed. Bound parameters
1739 must use the underlying DBAPI's paramstyle, such as "qmark",
1740 "pyformat", "format", etc.
1741
1742 :param parameters: represent bound parameter values to be used in the
1743 execution. The format is one of: a dictionary of named parameters,
1744 a tuple of positional parameters, or a list containing either
1745 dictionaries or tuples for multiple-execute support.
1746
1747 :return: a :class:`_engine.CursorResult`.
1748
1749 E.g. multiple dictionaries::
1750
1751
1752 conn.exec_driver_sql(
1753 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1754 [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}]
1755 )
1756
1757 Single dictionary::
1758
1759 conn.exec_driver_sql(
1760 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1761 dict(id=1, value="v1")
1762 )
1763
1764 Single tuple::
1765
1766 conn.exec_driver_sql(
1767 "INSERT INTO table (id, value) VALUES (?, ?)",
1768 (1, 'v1')
1769 )
1770
1771 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
1772 not participate in the
1773 :meth:`_events.ConnectionEvents.before_execute` and
1774 :meth:`_events.ConnectionEvents.after_execute` events. To
1775 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
1776 :meth:`_events.ConnectionEvents.before_cursor_execute` and
1777 :meth:`_events.ConnectionEvents.after_cursor_execute`.
1778
1779 .. seealso::
1780
1781 :pep:`249`
1782
1783 """
1784
1785 distilled_parameters = _distill_raw_params(parameters)
1786
1787 exec_opts = self._execution_options.merge_with(execution_options)
1788
1789 dialect = self.dialect
1790 ret = self._execute_context(
1791 dialect,
1792 dialect.execution_ctx_cls._init_statement,
1793 statement,
1794 None,
1795 exec_opts,
1796 statement,
1797 distilled_parameters,
1798 )
1799
1800 return ret
1801
1802 def _execute_context(
1803 self,
1804 dialect: Dialect,
1805 constructor: Callable[..., ExecutionContext],
1806 statement: Union[str, Compiled],
1807 parameters: Optional[_AnyMultiExecuteParams],
1808 execution_options: _ExecuteOptions,
1809 *args: Any,
1810 **kw: Any,
1811 ) -> CursorResult[Unpack[TupleAny]]:
1812 """Create an :class:`.ExecutionContext` and execute, returning
1813 a :class:`_engine.CursorResult`."""
1814
1815 if execution_options:
1816 yp = execution_options.get("yield_per", None)
1817 if yp:
1818 execution_options = execution_options.union(
1819 {"stream_results": True, "max_row_buffer": yp}
1820 )
1821 try:
1822 conn = self._dbapi_connection
1823 if conn is None:
1824 conn = self._revalidate_connection()
1825
1826 context = constructor(
1827 dialect, self, conn, execution_options, *args, **kw
1828 )
1829 except (exc.PendingRollbackError, exc.ResourceClosedError):
1830 raise
1831 except BaseException as e:
1832 self._handle_dbapi_exception(
1833 e, str(statement), parameters, None, None
1834 )
1835
1836 if (
1837 self._transaction
1838 and not self._transaction.is_active
1839 or (
1840 self._nested_transaction
1841 and not self._nested_transaction.is_active
1842 )
1843 ):
1844 self._invalid_transaction()
1845
1846 elif self._trans_context_manager:
1847 TransactionalContext._trans_ctx_check(self)
1848
1849 if self._transaction is None:
1850 self._autobegin()
1851
1852 context.pre_exec()
1853
1854 if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
1855 return self._exec_insertmany_context(dialect, context)
1856 else:
1857 return self._exec_single_context(
1858 dialect, context, statement, parameters
1859 )
1860
1861 def _exec_single_context(
1862 self,
1863 dialect: Dialect,
1864 context: ExecutionContext,
1865 statement: Union[str, Compiled],
1866 parameters: Optional[_AnyMultiExecuteParams],
1867 ) -> CursorResult[Unpack[TupleAny]]:
1868 """continue the _execute_context() method for a single DBAPI
1869 cursor.execute() or cursor.executemany() call.
1870
1871 """
1872 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1873 generic_setinputsizes = context._prepare_set_input_sizes()
1874
1875 if generic_setinputsizes:
1876 try:
1877 dialect.do_set_input_sizes(
1878 context.cursor, generic_setinputsizes, context
1879 )
1880 except BaseException as e:
1881 self._handle_dbapi_exception(
1882 e, str(statement), parameters, None, context
1883 )
1884
1885 cursor, str_statement, parameters = (
1886 context.cursor,
1887 context.statement,
1888 context.parameters,
1889 )
1890
1891 effective_parameters: Optional[_AnyExecuteParams]
1892
1893 if not context.executemany:
1894 effective_parameters = parameters[0]
1895 else:
1896 effective_parameters = parameters
1897
1898 if self._has_events or self.engine._has_events:
1899 for fn in self.dispatch.before_cursor_execute:
1900 str_statement, effective_parameters = fn(
1901 self,
1902 cursor,
1903 str_statement,
1904 effective_parameters,
1905 context,
1906 context.executemany,
1907 )
1908
1909 if self._echo:
1910 self._log_info(str_statement)
1911
1912 stats = context._get_cache_stats()
1913
1914 if not self.engine.hide_parameters:
1915 self._log_info(
1916 "[%s] %r",
1917 stats,
1918 sql_util._repr_params(
1919 effective_parameters,
1920 batches=10,
1921 ismulti=context.executemany,
1922 ),
1923 )
1924 else:
1925 self._log_info(
1926 "[%s] [SQL parameters hidden due to hide_parameters=True]",
1927 stats,
1928 )
1929
1930 evt_handled: bool = False
1931 try:
1932 if context.execute_style is ExecuteStyle.EXECUTEMANY:
1933 effective_parameters = cast(
1934 "_CoreMultiExecuteParams", effective_parameters
1935 )
1936 if self.dialect._has_events:
1937 for fn in self.dialect.dispatch.do_executemany:
1938 if fn(
1939 cursor,
1940 str_statement,
1941 effective_parameters,
1942 context,
1943 ):
1944 evt_handled = True
1945 break
1946 if not evt_handled:
1947 self.dialect.do_executemany(
1948 cursor,
1949 str_statement,
1950 effective_parameters,
1951 context,
1952 )
1953 elif not effective_parameters and context.no_parameters:
1954 if self.dialect._has_events:
1955 for fn in self.dialect.dispatch.do_execute_no_params:
1956 if fn(cursor, str_statement, context):
1957 evt_handled = True
1958 break
1959 if not evt_handled:
1960 self.dialect.do_execute_no_params(
1961 cursor, str_statement, context
1962 )
1963 else:
1964 effective_parameters = cast(
1965 "_CoreSingleExecuteParams", effective_parameters
1966 )
1967 if self.dialect._has_events:
1968 for fn in self.dialect.dispatch.do_execute:
1969 if fn(
1970 cursor,
1971 str_statement,
1972 effective_parameters,
1973 context,
1974 ):
1975 evt_handled = True
1976 break
1977 if not evt_handled:
1978 self.dialect.do_execute(
1979 cursor, str_statement, effective_parameters, context
1980 )
1981
1982 if self._has_events or self.engine._has_events:
1983 self.dispatch.after_cursor_execute(
1984 self,
1985 cursor,
1986 str_statement,
1987 effective_parameters,
1988 context,
1989 context.executemany,
1990 )
1991
1992 context.post_exec()
1993
1994 result = context._setup_result_proxy()
1995
1996 except BaseException as e:
1997 self._handle_dbapi_exception(
1998 e, str_statement, effective_parameters, cursor, context
1999 )
2000
2001 return result
2002
2003 def _exec_insertmany_context(
2004 self,
2005 dialect: Dialect,
2006 context: ExecutionContext,
2007 ) -> CursorResult[Unpack[TupleAny]]:
2008 """continue the _execute_context() method for an "insertmanyvalues"
2009 operation, which will invoke DBAPI
2010 cursor.execute() one or more times with individual log and
2011 event hook calls.
2012
2013 """
2014
2015 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
2016 generic_setinputsizes = context._prepare_set_input_sizes()
2017 else:
2018 generic_setinputsizes = None
2019
2020 cursor, str_statement, parameters = (
2021 context.cursor,
2022 context.statement,
2023 context.parameters,
2024 )
2025
2026 effective_parameters = parameters
2027
2028 engine_events = self._has_events or self.engine._has_events
2029 if self.dialect._has_events:
2030 do_execute_dispatch: Iterable[Any] = (
2031 self.dialect.dispatch.do_execute
2032 )
2033 else:
2034 do_execute_dispatch = ()
2035
2036 if self._echo:
2037 stats = context._get_cache_stats() + " (insertmanyvalues)"
2038
2039 preserve_rowcount = context.execution_options.get(
2040 "preserve_rowcount", False
2041 )
2042 rowcount = 0
2043
2044 for imv_batch in dialect._deliver_insertmanyvalues_batches(
2045 self,
2046 cursor,
2047 str_statement,
2048 effective_parameters,
2049 generic_setinputsizes,
2050 context,
2051 ):
2052 if imv_batch.processed_setinputsizes:
2053 try:
2054 dialect.do_set_input_sizes(
2055 context.cursor,
2056 imv_batch.processed_setinputsizes,
2057 context,
2058 )
2059 except BaseException as e:
2060 self._handle_dbapi_exception(
2061 e,
2062 sql_util._long_statement(imv_batch.replaced_statement),
2063 imv_batch.replaced_parameters,
2064 None,
2065 context,
2066 is_sub_exec=True,
2067 )
2068
2069 sub_stmt = imv_batch.replaced_statement
2070 sub_params = imv_batch.replaced_parameters
2071
2072 if engine_events:
2073 for fn in self.dispatch.before_cursor_execute:
2074 sub_stmt, sub_params = fn(
2075 self,
2076 cursor,
2077 sub_stmt,
2078 sub_params,
2079 context,
2080 True,
2081 )
2082
2083 if self._echo:
2084 self._log_info(sql_util._long_statement(sub_stmt))
2085
2086 imv_stats = f""" {imv_batch.batchnum}/{
2087 imv_batch.total_batches
2088 } ({
2089 'ordered'
2090 if imv_batch.rows_sorted else 'unordered'
2091 }{
2092 '; batch not supported'
2093 if imv_batch.is_downgraded
2094 else ''
2095 })"""
2096
2097 if imv_batch.batchnum == 1:
2098 stats += imv_stats
2099 else:
2100 stats = f"insertmanyvalues{imv_stats}"
2101
2102 if not self.engine.hide_parameters:
2103 self._log_info(
2104 "[%s] %r",
2105 stats,
2106 sql_util._repr_params(
2107 sub_params,
2108 batches=10,
2109 ismulti=False,
2110 ),
2111 )
2112 else:
2113 self._log_info(
2114 "[%s] [SQL parameters hidden due to "
2115 "hide_parameters=True]",
2116 stats,
2117 )
2118
2119 try:
2120 for fn in do_execute_dispatch:
2121 if fn(
2122 cursor,
2123 sub_stmt,
2124 sub_params,
2125 context,
2126 ):
2127 break
2128 else:
2129 dialect.do_execute(
2130 cursor,
2131 sub_stmt,
2132 sub_params,
2133 context,
2134 )
2135
2136 except BaseException as e:
2137 self._handle_dbapi_exception(
2138 e,
2139 sql_util._long_statement(sub_stmt),
2140 sub_params,
2141 cursor,
2142 context,
2143 is_sub_exec=True,
2144 )
2145
2146 if engine_events:
2147 self.dispatch.after_cursor_execute(
2148 self,
2149 cursor,
2150 str_statement,
2151 effective_parameters,
2152 context,
2153 context.executemany,
2154 )
2155
2156 if preserve_rowcount:
2157 rowcount += imv_batch.current_batch_size
2158
2159 try:
2160 context.post_exec()
2161
2162 if preserve_rowcount:
2163 context._rowcount = rowcount # type: ignore[attr-defined]
2164
2165 result = context._setup_result_proxy()
2166
2167 except BaseException as e:
2168 self._handle_dbapi_exception(
2169 e, str_statement, effective_parameters, cursor, context
2170 )
2171
2172 return result
2173
2174 def _cursor_execute(
2175 self,
2176 cursor: DBAPICursor,
2177 statement: str,
2178 parameters: _DBAPISingleExecuteParams,
2179 context: Optional[ExecutionContext] = None,
2180 ) -> None:
2181 """Execute a statement + params on the given cursor.
2182
2183 Adds appropriate logging and exception handling.
2184
2185 This method is used by DefaultDialect for special-case
2186 executions, such as for sequences and column defaults.
2187 The path of statement execution in the majority of cases
2188 terminates at _execute_context().
2189
2190 """
2191 if self._has_events or self.engine._has_events:
2192 for fn in self.dispatch.before_cursor_execute:
2193 statement, parameters = fn(
2194 self, cursor, statement, parameters, context, False
2195 )
2196
2197 if self._echo:
2198 self._log_info(statement)
2199 self._log_info("[raw sql] %r", parameters)
2200 try:
2201 for fn in (
2202 ()
2203 if not self.dialect._has_events
2204 else self.dialect.dispatch.do_execute
2205 ):
2206 if fn(cursor, statement, parameters, context):
2207 break
2208 else:
2209 self.dialect.do_execute(cursor, statement, parameters, context)
2210 except BaseException as e:
2211 self._handle_dbapi_exception(
2212 e, statement, parameters, cursor, context
2213 )
2214
2215 if self._has_events or self.engine._has_events:
2216 self.dispatch.after_cursor_execute(
2217 self, cursor, statement, parameters, context, False
2218 )
2219
2220 def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
2221 """Close the given cursor, catching exceptions
2222 and turning into log warnings.
2223
2224 """
2225 try:
2226 cursor.close()
2227 except Exception:
2228 # log the error through the connection pool's logger.
2229 self.engine.pool.logger.error(
2230 "Error closing cursor", exc_info=True
2231 )
2232
2233 _reentrant_error = False
2234 _is_disconnect = False
2235
2236 def _handle_dbapi_exception(
2237 self,
2238 e: BaseException,
2239 statement: Optional[str],
2240 parameters: Optional[_AnyExecuteParams],
2241 cursor: Optional[DBAPICursor],
2242 context: Optional[ExecutionContext],
2243 is_sub_exec: bool = False,
2244 ) -> NoReturn:
2245 exc_info = sys.exc_info()
2246
2247 is_exit_exception = util.is_exit_exception(e)
2248
2249 if not self._is_disconnect:
2250 self._is_disconnect = (
2251 isinstance(e, self.dialect.loaded_dbapi.Error)
2252 and not self.closed
2253 and self.dialect.is_disconnect(
2254 e,
2255 self._dbapi_connection if not self.invalidated else None,
2256 cursor,
2257 )
2258 ) or (is_exit_exception and not self.closed)
2259
2260 invalidate_pool_on_disconnect = not is_exit_exception
2261
2262 ismulti: bool = (
2263 not is_sub_exec and context.executemany
2264 if context is not None
2265 else False
2266 )
2267 if self._reentrant_error:
2268 raise exc.DBAPIError.instance(
2269 statement,
2270 parameters,
2271 e,
2272 self.dialect.loaded_dbapi.Error,
2273 hide_parameters=self.engine.hide_parameters,
2274 dialect=self.dialect,
2275 ismulti=ismulti,
2276 ).with_traceback(exc_info[2]) from e
2277 self._reentrant_error = True
2278 try:
2279 # non-DBAPI error - if we already got a context,
2280 # or there's no string statement, don't wrap it
2281 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or (
2282 statement is not None
2283 and context is None
2284 and not is_exit_exception
2285 )
2286
2287 if should_wrap:
2288 sqlalchemy_exception = exc.DBAPIError.instance(
2289 statement,
2290 parameters,
2291 cast(Exception, e),
2292 self.dialect.loaded_dbapi.Error,
2293 hide_parameters=self.engine.hide_parameters,
2294 connection_invalidated=self._is_disconnect,
2295 dialect=self.dialect,
2296 ismulti=ismulti,
2297 )
2298 else:
2299 sqlalchemy_exception = None
2300
2301 newraise = None
2302
2303 if (self.dialect._has_events) and not self._execution_options.get(
2304 "skip_user_error_events", False
2305 ):
2306 ctx = ExceptionContextImpl(
2307 e,
2308 sqlalchemy_exception,
2309 self.engine,
2310 self.dialect,
2311 self,
2312 cursor,
2313 statement,
2314 parameters,
2315 context,
2316 self._is_disconnect,
2317 invalidate_pool_on_disconnect,
2318 False,
2319 )
2320
2321 for fn in self.dialect.dispatch.handle_error:
2322 try:
2323 # handler returns an exception;
2324 # call next handler in a chain
2325 per_fn = fn(ctx)
2326 if per_fn is not None:
2327 ctx.chained_exception = newraise = per_fn
2328 except Exception as _raised:
2329 # handler raises an exception - stop processing
2330 newraise = _raised
2331 break
2332
2333 if self._is_disconnect != ctx.is_disconnect:
2334 self._is_disconnect = ctx.is_disconnect
2335 if sqlalchemy_exception:
2336 sqlalchemy_exception.connection_invalidated = (
2337 ctx.is_disconnect
2338 )
2339
2340 # set up potentially user-defined value for
2341 # invalidate pool.
2342 invalidate_pool_on_disconnect = (
2343 ctx.invalidate_pool_on_disconnect
2344 )
2345
2346 if should_wrap and context:
2347 context.handle_dbapi_exception(e)
2348
2349 if not self._is_disconnect:
2350 if cursor:
2351 self._safe_close_cursor(cursor)
2352 # "autorollback" was mostly relevant in 1.x series.
2353 # It's very unlikely to reach here, as the connection
2354 # does autobegin so when we are here, we are usually
2355 # in an explicit / semi-explicit transaction.
2356 # however we have a test which manufactures this
2357 # scenario in any case using an event handler.
2358 # test/engine/test_execute.py-> test_actual_autorollback
2359 if not self.in_transaction():
2360 self._rollback_impl()
2361
2362 if newraise:
2363 raise newraise.with_traceback(exc_info[2]) from e
2364 elif should_wrap:
2365 assert sqlalchemy_exception is not None
2366 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2367 else:
2368 assert exc_info[1] is not None
2369 raise exc_info[1].with_traceback(exc_info[2])
2370 finally:
2371 del self._reentrant_error
2372 if self._is_disconnect:
2373 del self._is_disconnect
2374 if not self.invalidated:
2375 dbapi_conn_wrapper = self._dbapi_connection
2376 assert dbapi_conn_wrapper is not None
2377 if invalidate_pool_on_disconnect:
2378 self.engine.pool._invalidate(dbapi_conn_wrapper, e)
2379 self.invalidate(e)
2380
2381 @classmethod
2382 def _handle_dbapi_exception_noconnection(
2383 cls,
2384 e: BaseException,
2385 dialect: Dialect,
2386 engine: Optional[Engine] = None,
2387 is_disconnect: Optional[bool] = None,
2388 invalidate_pool_on_disconnect: bool = True,
2389 is_pre_ping: bool = False,
2390 ) -> NoReturn:
2391 exc_info = sys.exc_info()
2392
2393 if is_disconnect is None:
2394 is_disconnect = isinstance(
2395 e, dialect.loaded_dbapi.Error
2396 ) and dialect.is_disconnect(e, None, None)
2397
2398 should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
2399
2400 if should_wrap:
2401 sqlalchemy_exception = exc.DBAPIError.instance(
2402 None,
2403 None,
2404 cast(Exception, e),
2405 dialect.loaded_dbapi.Error,
2406 hide_parameters=(
2407 engine.hide_parameters if engine is not None else False
2408 ),
2409 connection_invalidated=is_disconnect,
2410 dialect=dialect,
2411 )
2412 else:
2413 sqlalchemy_exception = None
2414
2415 newraise = None
2416
2417 if dialect._has_events:
2418 ctx = ExceptionContextImpl(
2419 e,
2420 sqlalchemy_exception,
2421 engine,
2422 dialect,
2423 None,
2424 None,
2425 None,
2426 None,
2427 None,
2428 is_disconnect,
2429 invalidate_pool_on_disconnect,
2430 is_pre_ping,
2431 )
2432 for fn in dialect.dispatch.handle_error:
2433 try:
2434 # handler returns an exception;
2435 # call next handler in a chain
2436 per_fn = fn(ctx)
2437 if per_fn is not None:
2438 ctx.chained_exception = newraise = per_fn
2439 except Exception as _raised:
2440 # handler raises an exception - stop processing
2441 newraise = _raised
2442 break
2443
2444 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
2445 sqlalchemy_exception.connection_invalidated = is_disconnect = (
2446 ctx.is_disconnect
2447 )
2448
2449 if newraise:
2450 raise newraise.with_traceback(exc_info[2]) from e
2451 elif should_wrap:
2452 assert sqlalchemy_exception is not None
2453 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2454 else:
2455 assert exc_info[1] is not None
2456 raise exc_info[1].with_traceback(exc_info[2])
2457
2458 def _run_ddl_visitor(
2459 self,
2460 visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]],
2461 element: SchemaItem,
2462 **kwargs: Any,
2463 ) -> None:
2464 """run a DDL visitor.
2465
2466 This method is only here so that the MockConnection can change the
2467 options given to the visitor so that "checkfirst" is skipped.
2468
2469 """
2470 visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
2471
2472
2473class ExceptionContextImpl(ExceptionContext):
2474 """Implement the :class:`.ExceptionContext` interface."""
2475
2476 __slots__ = (
2477 "connection",
2478 "engine",
2479 "dialect",
2480 "cursor",
2481 "statement",
2482 "parameters",
2483 "original_exception",
2484 "sqlalchemy_exception",
2485 "chained_exception",
2486 "execution_context",
2487 "is_disconnect",
2488 "invalidate_pool_on_disconnect",
2489 "is_pre_ping",
2490 )
2491
2492 def __init__(
2493 self,
2494 exception: BaseException,
2495 sqlalchemy_exception: Optional[exc.StatementError],
2496 engine: Optional[Engine],
2497 dialect: Dialect,
2498 connection: Optional[Connection],
2499 cursor: Optional[DBAPICursor],
2500 statement: Optional[str],
2501 parameters: Optional[_DBAPIAnyExecuteParams],
2502 context: Optional[ExecutionContext],
2503 is_disconnect: bool,
2504 invalidate_pool_on_disconnect: bool,
2505 is_pre_ping: bool,
2506 ):
2507 self.engine = engine
2508 self.dialect = dialect
2509 self.connection = connection
2510 self.sqlalchemy_exception = sqlalchemy_exception
2511 self.original_exception = exception
2512 self.execution_context = context
2513 self.statement = statement
2514 self.parameters = parameters
2515 self.is_disconnect = is_disconnect
2516 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
2517 self.is_pre_ping = is_pre_ping
2518
2519
2520class Transaction(TransactionalContext):
2521 """Represent a database transaction in progress.
2522
2523 The :class:`.Transaction` object is procured by
2524 calling the :meth:`_engine.Connection.begin` method of
2525 :class:`_engine.Connection`::
2526
2527 from sqlalchemy import create_engine
2528 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
2529 connection = engine.connect()
2530 trans = connection.begin()
2531 connection.execute(text("insert into x (a, b) values (1, 2)"))
2532 trans.commit()
2533
2534 The object provides :meth:`.rollback` and :meth:`.commit`
2535 methods in order to control transaction boundaries. It
2536 also implements a context manager interface so that
2537 the Python ``with`` statement can be used with the
2538 :meth:`_engine.Connection.begin` method::
2539
2540 with connection.begin():
2541 connection.execute(text("insert into x (a, b) values (1, 2)"))
2542
2543 The Transaction object is **not** threadsafe.
2544
2545 .. seealso::
2546
2547 :meth:`_engine.Connection.begin`
2548
2549 :meth:`_engine.Connection.begin_twophase`
2550
2551 :meth:`_engine.Connection.begin_nested`
2552
2553 .. index::
2554 single: thread safety; Transaction
2555 """ # noqa
2556
2557 __slots__ = ()
2558
2559 _is_root: bool = False
2560 is_active: bool
2561 connection: Connection
2562
2563 def __init__(self, connection: Connection):
2564 raise NotImplementedError()
2565
2566 @property
2567 def _deactivated_from_connection(self) -> bool:
2568 """True if this transaction is totally deactivated from the connection
2569 and therefore can no longer affect its state.
2570
2571 """
2572 raise NotImplementedError()
2573
2574 def _do_close(self) -> None:
2575 raise NotImplementedError()
2576
2577 def _do_rollback(self) -> None:
2578 raise NotImplementedError()
2579
2580 def _do_commit(self) -> None:
2581 raise NotImplementedError()
2582
2583 @property
2584 def is_valid(self) -> bool:
2585 return self.is_active and not self.connection.invalidated
2586
2587 def close(self) -> None:
2588 """Close this :class:`.Transaction`.
2589
2590 If this transaction is the base transaction in a begin/commit
2591 nesting, the transaction will rollback(). Otherwise, the
2592 method returns.
2593
2594 This is used to cancel a Transaction without affecting the scope of
2595 an enclosing transaction.
2596
2597 """
2598 try:
2599 self._do_close()
2600 finally:
2601 assert not self.is_active
2602
2603 def rollback(self) -> None:
2604 """Roll back this :class:`.Transaction`.
2605
2606 The implementation of this may vary based on the type of transaction in
2607 use:
2608
2609 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2610 it corresponds to a ROLLBACK.
2611
2612 * For a :class:`.NestedTransaction`, it corresponds to a
2613 "ROLLBACK TO SAVEPOINT" operation.
2614
2615 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2616 phase transactions may be used.
2617
2618
2619 """
2620 try:
2621 self._do_rollback()
2622 finally:
2623 assert not self.is_active
2624
2625 def commit(self) -> None:
2626 """Commit this :class:`.Transaction`.
2627
2628 The implementation of this may vary based on the type of transaction in
2629 use:
2630
2631 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2632 it corresponds to a COMMIT.
2633
2634 * For a :class:`.NestedTransaction`, it corresponds to a
2635 "RELEASE SAVEPOINT" operation.
2636
2637 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2638 phase transactions may be used.
2639
2640 """
2641 try:
2642 self._do_commit()
2643 finally:
2644 assert not self.is_active
2645
2646 def _get_subject(self) -> Connection:
2647 return self.connection
2648
2649 def _transaction_is_active(self) -> bool:
2650 return self.is_active
2651
2652 def _transaction_is_closed(self) -> bool:
2653 return not self._deactivated_from_connection
2654
2655 def _rollback_can_be_called(self) -> bool:
2656 # for RootTransaction / NestedTransaction, it's safe to call
2657 # rollback() even if the transaction is deactive and no warnings
2658 # will be emitted. tested in
2659 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
2660 return True
2661
2662
2663class RootTransaction(Transaction):
2664 """Represent the "root" transaction on a :class:`_engine.Connection`.
2665
2666 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
2667 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
2668 is created by calling upon the :meth:`_engine.Connection.begin` method, and
2669 remains associated with the :class:`_engine.Connection` throughout its
2670 active span. The current :class:`_engine.RootTransaction` in use is
2671 accessible via the :attr:`_engine.Connection.get_transaction` method of
2672 :class:`_engine.Connection`.
2673
2674 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs
2675 "autobegin" behavior that will create a new
2676 :class:`_engine.RootTransaction` whenever a connection in a
2677 non-transactional state is used to emit commands on the DBAPI connection.
2678 The scope of the :class:`_engine.RootTransaction` in 2.0 style
2679 use can be controlled using the :meth:`_engine.Connection.commit` and
2680 :meth:`_engine.Connection.rollback` methods.
2681
2682
2683 """
2684
2685 _is_root = True
2686
2687 __slots__ = ("connection", "is_active")
2688
2689 def __init__(self, connection: Connection):
2690 assert connection._transaction is None
2691 if connection._trans_context_manager:
2692 TransactionalContext._trans_ctx_check(connection)
2693 self.connection = connection
2694 self._connection_begin_impl()
2695 connection._transaction = self
2696
2697 self.is_active = True
2698
2699 def _deactivate_from_connection(self) -> None:
2700 if self.is_active:
2701 assert self.connection._transaction is self
2702 self.is_active = False
2703
2704 elif self.connection._transaction is not self:
2705 util.warn("transaction already deassociated from connection")
2706
2707 @property
2708 def _deactivated_from_connection(self) -> bool:
2709 return self.connection._transaction is not self
2710
2711 def _connection_begin_impl(self) -> None:
2712 self.connection._begin_impl(self)
2713
2714 def _connection_rollback_impl(self) -> None:
2715 self.connection._rollback_impl()
2716
2717 def _connection_commit_impl(self) -> None:
2718 self.connection._commit_impl()
2719
2720 def _close_impl(self, try_deactivate: bool = False) -> None:
2721 try:
2722 if self.is_active:
2723 self._connection_rollback_impl()
2724
2725 if self.connection._nested_transaction:
2726 self.connection._nested_transaction._cancel()
2727 finally:
2728 if self.is_active or try_deactivate:
2729 self._deactivate_from_connection()
2730 if self.connection._transaction is self:
2731 self.connection._transaction = None
2732
2733 assert not self.is_active
2734 assert self.connection._transaction is not self
2735
2736 def _do_close(self) -> None:
2737 self._close_impl()
2738
2739 def _do_rollback(self) -> None:
2740 self._close_impl(try_deactivate=True)
2741
2742 def _do_commit(self) -> None:
2743 if self.is_active:
2744 assert self.connection._transaction is self
2745
2746 try:
2747 self._connection_commit_impl()
2748 finally:
2749 # whether or not commit succeeds, cancel any
2750 # nested transactions, make this transaction "inactive"
2751 # and remove it as a reset agent
2752 if self.connection._nested_transaction:
2753 self.connection._nested_transaction._cancel()
2754
2755 self._deactivate_from_connection()
2756
2757 # ...however only remove as the connection's current transaction
2758 # if commit succeeded. otherwise it stays on so that a rollback
2759 # needs to occur.
2760 self.connection._transaction = None
2761 else:
2762 if self.connection._transaction is self:
2763 self.connection._invalid_transaction()
2764 else:
2765 raise exc.InvalidRequestError("This transaction is inactive")
2766
2767 assert not self.is_active
2768 assert self.connection._transaction is not self
2769
2770
2771class NestedTransaction(Transaction):
2772 """Represent a 'nested', or SAVEPOINT transaction.
2773
2774 The :class:`.NestedTransaction` object is created by calling the
2775 :meth:`_engine.Connection.begin_nested` method of
2776 :class:`_engine.Connection`.
2777
2778 When using :class:`.NestedTransaction`, the semantics of "begin" /
2779 "commit" / "rollback" are as follows:
2780
2781 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
2782 the savepoint is given an explicit name that is part of the state
2783 of this object.
2784
2785 * The :meth:`.NestedTransaction.commit` method corresponds to a
2786 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
2787 with this :class:`.NestedTransaction`.
2788
2789 * The :meth:`.NestedTransaction.rollback` method corresponds to a
2790 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
2791 associated with this :class:`.NestedTransaction`.
2792
2793 The rationale for mimicking the semantics of an outer transaction in
2794 terms of savepoints so that code may deal with a "savepoint" transaction
2795 and an "outer" transaction in an agnostic way.
2796
2797 .. seealso::
2798
2799 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
2800
2801 """
2802
2803 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
2804
2805 _savepoint: str
2806
2807 def __init__(self, connection: Connection):
2808 assert connection._transaction is not None
2809 if connection._trans_context_manager:
2810 TransactionalContext._trans_ctx_check(connection)
2811 self.connection = connection
2812 self._savepoint = self.connection._savepoint_impl()
2813 self.is_active = True
2814 self._previous_nested = connection._nested_transaction
2815 connection._nested_transaction = self
2816
2817 def _deactivate_from_connection(self, warn: bool = True) -> None:
2818 if self.connection._nested_transaction is self:
2819 self.connection._nested_transaction = self._previous_nested
2820 elif warn:
2821 util.warn(
2822 "nested transaction already deassociated from connection"
2823 )
2824
2825 @property
2826 def _deactivated_from_connection(self) -> bool:
2827 return self.connection._nested_transaction is not self
2828
2829 def _cancel(self) -> None:
2830 # called by RootTransaction when the outer transaction is
2831 # committed, rolled back, or closed to cancel all savepoints
2832 # without any action being taken
2833 self.is_active = False
2834 self._deactivate_from_connection()
2835 if self._previous_nested:
2836 self._previous_nested._cancel()
2837
2838 def _close_impl(
2839 self, deactivate_from_connection: bool, warn_already_deactive: bool
2840 ) -> None:
2841 try:
2842 if (
2843 self.is_active
2844 and self.connection._transaction
2845 and self.connection._transaction.is_active
2846 ):
2847 self.connection._rollback_to_savepoint_impl(self._savepoint)
2848 finally:
2849 self.is_active = False
2850
2851 if deactivate_from_connection:
2852 self._deactivate_from_connection(warn=warn_already_deactive)
2853
2854 assert not self.is_active
2855 if deactivate_from_connection:
2856 assert self.connection._nested_transaction is not self
2857
2858 def _do_close(self) -> None:
2859 self._close_impl(True, False)
2860
2861 def _do_rollback(self) -> None:
2862 self._close_impl(True, True)
2863
2864 def _do_commit(self) -> None:
2865 if self.is_active:
2866 try:
2867 self.connection._release_savepoint_impl(self._savepoint)
2868 finally:
2869 # nested trans becomes inactive on failed release
2870 # unconditionally. this prevents it from trying to
2871 # emit SQL when it rolls back.
2872 self.is_active = False
2873
2874 # but only de-associate from connection if it succeeded
2875 self._deactivate_from_connection()
2876 else:
2877 if self.connection._nested_transaction is self:
2878 self.connection._invalid_transaction()
2879 else:
2880 raise exc.InvalidRequestError(
2881 "This nested transaction is inactive"
2882 )
2883
2884
2885class TwoPhaseTransaction(RootTransaction):
2886 """Represent a two-phase transaction.
2887
2888 A new :class:`.TwoPhaseTransaction` object may be procured
2889 using the :meth:`_engine.Connection.begin_twophase` method.
2890
2891 The interface is the same as that of :class:`.Transaction`
2892 with the addition of the :meth:`prepare` method.
2893
2894 """
2895
2896 __slots__ = ("xid", "_is_prepared")
2897
2898 xid: Any
2899
2900 def __init__(self, connection: Connection, xid: Any):
2901 self._is_prepared = False
2902 self.xid = xid
2903 super().__init__(connection)
2904
2905 def prepare(self) -> None:
2906 """Prepare this :class:`.TwoPhaseTransaction`.
2907
2908 After a PREPARE, the transaction can be committed.
2909
2910 """
2911 if not self.is_active:
2912 raise exc.InvalidRequestError("This transaction is inactive")
2913 self.connection._prepare_twophase_impl(self.xid)
2914 self._is_prepared = True
2915
2916 def _connection_begin_impl(self) -> None:
2917 self.connection._begin_twophase_impl(self)
2918
2919 def _connection_rollback_impl(self) -> None:
2920 self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
2921
2922 def _connection_commit_impl(self) -> None:
2923 self.connection._commit_twophase_impl(self.xid, self._is_prepared)
2924
2925
2926class Engine(
2927 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"]
2928):
2929 """
2930 Connects a :class:`~sqlalchemy.pool.Pool` and
2931 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
2932 source of database connectivity and behavior.
2933
2934 An :class:`_engine.Engine` object is instantiated publicly using the
2935 :func:`~sqlalchemy.create_engine` function.
2936
2937 .. seealso::
2938
2939 :doc:`/core/engines`
2940
2941 :ref:`connections_toplevel`
2942
2943 """
2944
2945 dispatch: dispatcher[ConnectionEventsTarget]
2946
2947 _compiled_cache: Optional[CompiledCacheType]
2948
2949 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
2950 _has_events: bool = False
2951 _connection_cls: Type[Connection] = Connection
2952 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
2953 _is_future: bool = False
2954
2955 _schema_translate_map: Optional[SchemaTranslateMapType] = None
2956 _option_cls: Type[OptionEngine]
2957
2958 dialect: Dialect
2959 pool: Pool
2960 url: URL
2961 hide_parameters: bool
2962
2963 def __init__(
2964 self,
2965 pool: Pool,
2966 dialect: Dialect,
2967 url: URL,
2968 logging_name: Optional[str] = None,
2969 echo: Optional[_EchoFlagType] = None,
2970 query_cache_size: int = 500,
2971 execution_options: Optional[Mapping[str, Any]] = None,
2972 hide_parameters: bool = False,
2973 ):
2974 self.pool = pool
2975 self.url = url
2976 self.dialect = dialect
2977 if logging_name:
2978 self.logging_name = logging_name
2979 self.echo = echo
2980 self.hide_parameters = hide_parameters
2981 if query_cache_size != 0:
2982 self._compiled_cache = util.LRUCache(
2983 query_cache_size, size_alert=self._lru_size_alert
2984 )
2985 else:
2986 self._compiled_cache = None
2987 log.instance_logger(self, echoflag=echo)
2988 if execution_options:
2989 self.update_execution_options(**execution_options)
2990
2991 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
2992 if self._should_log_info():
2993 self.logger.info(
2994 "Compiled cache size pruning from %d items to %d. "
2995 "Increase cache size to reduce the frequency of pruning.",
2996 len(cache),
2997 cache.capacity,
2998 )
2999
3000 @property
3001 def engine(self) -> Engine:
3002 """Returns this :class:`.Engine`.
3003
3004 Used for legacy schemes that accept :class:`.Connection` /
3005 :class:`.Engine` objects within the same variable.
3006
3007 """
3008 return self
3009
3010 def clear_compiled_cache(self) -> None:
3011 """Clear the compiled cache associated with the dialect.
3012
3013 This applies **only** to the built-in cache that is established
3014 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
3015 It will not impact any dictionary caches that were passed via the
3016 :paramref:`.Connection.execution_options.compiled_cache` parameter.
3017
3018 .. versionadded:: 1.4
3019
3020 """
3021 if self._compiled_cache:
3022 self._compiled_cache.clear()
3023
3024 def update_execution_options(self, **opt: Any) -> None:
3025 r"""Update the default execution_options dictionary
3026 of this :class:`_engine.Engine`.
3027
3028 The given keys/values in \**opt are added to the
3029 default execution options that will be used for
3030 all connections. The initial contents of this dictionary
3031 can be sent via the ``execution_options`` parameter
3032 to :func:`_sa.create_engine`.
3033
3034 .. seealso::
3035
3036 :meth:`_engine.Connection.execution_options`
3037
3038 :meth:`_engine.Engine.execution_options`
3039
3040 """
3041 self.dispatch.set_engine_execution_options(self, opt)
3042 self._execution_options = self._execution_options.union(opt)
3043 self.dialect.set_engine_execution_options(self, opt)
3044
3045 @overload
3046 def execution_options(
3047 self,
3048 *,
3049 compiled_cache: Optional[CompiledCacheType] = ...,
3050 logging_token: str = ...,
3051 isolation_level: IsolationLevel = ...,
3052 insertmanyvalues_page_size: int = ...,
3053 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
3054 **opt: Any,
3055 ) -> OptionEngine: ...
3056
3057 @overload
3058 def execution_options(self, **opt: Any) -> OptionEngine: ...
3059
3060 def execution_options(self, **opt: Any) -> OptionEngine:
3061 """Return a new :class:`_engine.Engine` that will provide
3062 :class:`_engine.Connection` objects with the given execution options.
3063
3064 The returned :class:`_engine.Engine` remains related to the original
3065 :class:`_engine.Engine` in that it shares the same connection pool and
3066 other state:
3067
3068 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
3069 is the
3070 same instance. The :meth:`_engine.Engine.dispose`
3071 method will replace
3072 the connection pool instance for the parent engine as well
3073 as this one.
3074 * Event listeners are "cascaded" - meaning, the new
3075 :class:`_engine.Engine`
3076 inherits the events of the parent, and new events can be associated
3077 with the new :class:`_engine.Engine` individually.
3078 * The logging configuration and logging_name is copied from the parent
3079 :class:`_engine.Engine`.
3080
3081 The intent of the :meth:`_engine.Engine.execution_options` method is
3082 to implement schemes where multiple :class:`_engine.Engine`
3083 objects refer to the same connection pool, but are differentiated
3084 by options that affect some execution-level behavior for each
3085 engine. One such example is breaking into separate "reader" and
3086 "writer" :class:`_engine.Engine` instances, where one
3087 :class:`_engine.Engine`
3088 has a lower :term:`isolation level` setting configured or is even
3089 transaction-disabled using "autocommit". An example of this
3090 configuration is at :ref:`dbapi_autocommit_multiple`.
3091
3092 Another example is one that
3093 uses a custom option ``shard_id`` which is consumed by an event
3094 to change the current schema on a database connection::
3095
3096 from sqlalchemy import event
3097 from sqlalchemy.engine import Engine
3098
3099 primary_engine = create_engine("mysql+mysqldb://")
3100 shard1 = primary_engine.execution_options(shard_id="shard1")
3101 shard2 = primary_engine.execution_options(shard_id="shard2")
3102
3103 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}
3104
3105 @event.listens_for(Engine, "before_cursor_execute")
3106 def _switch_shard(conn, cursor, stmt,
3107 params, context, executemany):
3108 shard_id = conn.get_execution_options().get('shard_id', "default")
3109 current_shard = conn.info.get("current_shard", None)
3110
3111 if current_shard != shard_id:
3112 cursor.execute("use %s" % shards[shard_id])
3113 conn.info["current_shard"] = shard_id
3114
3115 The above recipe illustrates two :class:`_engine.Engine` objects that
3116 will each serve as factories for :class:`_engine.Connection` objects
3117 that have pre-established "shard_id" execution options present. A
3118 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler
3119 then interprets this execution option to emit a MySQL ``use`` statement
3120 to switch databases before a statement execution, while at the same
3121 time keeping track of which database we've established using the
3122 :attr:`_engine.Connection.info` dictionary.
3123
3124 .. seealso::
3125
3126 :meth:`_engine.Connection.execution_options`
3127 - update execution options
3128 on a :class:`_engine.Connection` object.
3129
3130 :meth:`_engine.Engine.update_execution_options`
3131 - update the execution
3132 options for a given :class:`_engine.Engine` in place.
3133
3134 :meth:`_engine.Engine.get_execution_options`
3135
3136
3137 """ # noqa: E501
3138 return self._option_cls(self, opt)
3139
3140 def get_execution_options(self) -> _ExecuteOptions:
3141 """Get the non-SQL options which will take effect during execution.
3142
3143 .. versionadded: 1.3
3144
3145 .. seealso::
3146
3147 :meth:`_engine.Engine.execution_options`
3148 """
3149 return self._execution_options
3150
3151 @property
3152 def name(self) -> str:
3153 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3154 in use by this :class:`Engine`.
3155
3156 """
3157
3158 return self.dialect.name
3159
3160 @property
3161 def driver(self) -> str:
3162 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3163 in use by this :class:`Engine`.
3164
3165 """
3166
3167 return self.dialect.driver
3168
3169 echo = log.echo_property()
3170
3171 def __repr__(self) -> str:
3172 return "Engine(%r)" % (self.url,)
3173
3174 def dispose(self, close: bool = True) -> None:
3175 """Dispose of the connection pool used by this
3176 :class:`_engine.Engine`.
3177
3178 A new connection pool is created immediately after the old one has been
3179 disposed. The previous connection pool is disposed either actively, by
3180 closing out all currently checked-in connections in that pool, or
3181 passively, by losing references to it but otherwise not closing any
3182 connections. The latter strategy is more appropriate for an initializer
3183 in a forked Python process.
3184
3185 :param close: if left at its default of ``True``, has the
3186 effect of fully closing all **currently checked in**
3187 database connections. Connections that are still checked out
3188 will **not** be closed, however they will no longer be associated
3189 with this :class:`_engine.Engine`,
3190 so when they are closed individually, eventually the
3191 :class:`_pool.Pool` which they are associated with will
3192 be garbage collected and they will be closed out fully, if
3193 not already closed on checkin.
3194
3195 If set to ``False``, the previous connection pool is de-referenced,
3196 and otherwise not touched in any way.
3197
3198 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
3199 parameter to allow the replacement of a connection pool in a child
3200 process without interfering with the connections used by the parent
3201 process.
3202
3203
3204 .. seealso::
3205
3206 :ref:`engine_disposal`
3207
3208 :ref:`pooling_multiprocessing`
3209
3210 """
3211 if close:
3212 self.pool.dispose()
3213 self.pool = self.pool.recreate()
3214 self.dispatch.engine_disposed(self)
3215
3216 @contextlib.contextmanager
3217 def _optional_conn_ctx_manager(
3218 self, connection: Optional[Connection] = None
3219 ) -> Iterator[Connection]:
3220 if connection is None:
3221 with self.connect() as conn:
3222 yield conn
3223 else:
3224 yield connection
3225
3226 @contextlib.contextmanager
3227 def begin(self) -> Iterator[Connection]:
3228 """Return a context manager delivering a :class:`_engine.Connection`
3229 with a :class:`.Transaction` established.
3230
3231 E.g.::
3232
3233 with engine.begin() as conn:
3234 conn.execute(
3235 text("insert into table (x, y, z) values (1, 2, 3)")
3236 )
3237 conn.execute(text("my_special_procedure(5)"))
3238
3239 Upon successful operation, the :class:`.Transaction`
3240 is committed. If an error is raised, the :class:`.Transaction`
3241 is rolled back.
3242
3243 .. seealso::
3244
3245 :meth:`_engine.Engine.connect` - procure a
3246 :class:`_engine.Connection` from
3247 an :class:`_engine.Engine`.
3248
3249 :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
3250 for a particular :class:`_engine.Connection`.
3251
3252 """
3253 with self.connect() as conn:
3254 with conn.begin():
3255 yield conn
3256
3257 def _run_ddl_visitor(
3258 self,
3259 visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]],
3260 element: SchemaItem,
3261 **kwargs: Any,
3262 ) -> None:
3263 with self.begin() as conn:
3264 conn._run_ddl_visitor(visitorcallable, element, **kwargs)
3265
3266 def connect(self) -> Connection:
3267 """Return a new :class:`_engine.Connection` object.
3268
3269 The :class:`_engine.Connection` acts as a Python context manager, so
3270 the typical use of this method looks like::
3271
3272 with engine.connect() as connection:
3273 connection.execute(text("insert into table values ('foo')"))
3274 connection.commit()
3275
3276 Where above, after the block is completed, the connection is "closed"
3277 and its underlying DBAPI resources are returned to the connection pool.
3278 This also has the effect of rolling back any transaction that
3279 was explicitly begun or was begun via autobegin, and will
3280 emit the :meth:`_events.ConnectionEvents.rollback` event if one was
3281 started and is still in progress.
3282
3283 .. seealso::
3284
3285 :meth:`_engine.Engine.begin`
3286
3287 """
3288
3289 return self._connection_cls(self)
3290
3291 def raw_connection(self) -> PoolProxiedConnection:
3292 """Return a "raw" DBAPI connection from the connection pool.
3293
3294 The returned object is a proxied version of the DBAPI
3295 connection object used by the underlying driver in use.
3296 The object will have all the same behavior as the real DBAPI
3297 connection, except that its ``close()`` method will result in the
3298 connection being returned to the pool, rather than being closed
3299 for real.
3300
3301 This method provides direct DBAPI connection access for
3302 special situations when the API provided by
3303 :class:`_engine.Connection`
3304 is not needed. When a :class:`_engine.Connection` object is already
3305 present, the DBAPI connection is available using
3306 the :attr:`_engine.Connection.connection` accessor.
3307
3308 .. seealso::
3309
3310 :ref:`dbapi_connections`
3311
3312 """
3313 return self.pool.connect()
3314
3315
3316class OptionEngineMixin(log.Identified):
3317 _sa_propagate_class_events = False
3318
3319 dispatch: dispatcher[ConnectionEventsTarget]
3320 _compiled_cache: Optional[CompiledCacheType]
3321 dialect: Dialect
3322 pool: Pool
3323 url: URL
3324 hide_parameters: bool
3325 echo: log.echo_property
3326
3327 def __init__(
3328 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter
3329 ):
3330 self._proxied = proxied
3331 self.url = proxied.url
3332 self.dialect = proxied.dialect
3333 self.logging_name = proxied.logging_name
3334 self.echo = proxied.echo
3335 self._compiled_cache = proxied._compiled_cache
3336 self.hide_parameters = proxied.hide_parameters
3337 log.instance_logger(self, echoflag=self.echo)
3338
3339 # note: this will propagate events that are assigned to the parent
3340 # engine after this OptionEngine is created. Since we share
3341 # the events of the parent we also disallow class-level events
3342 # to apply to the OptionEngine class directly.
3343 #
3344 # the other way this can work would be to transfer existing
3345 # events only, using:
3346 # self.dispatch._update(proxied.dispatch)
3347 #
3348 # that might be more appropriate however it would be a behavioral
3349 # change for logic that assigns events to the parent engine and
3350 # would like it to take effect for the already-created sub-engine.
3351 self.dispatch = self.dispatch._join(proxied.dispatch)
3352
3353 self._execution_options = proxied._execution_options
3354 self.update_execution_options(**execution_options)
3355
3356 def update_execution_options(self, **opt: Any) -> None:
3357 raise NotImplementedError()
3358
3359 if not typing.TYPE_CHECKING:
3360 # https://github.com/python/typing/discussions/1095
3361
3362 @property
3363 def pool(self) -> Pool:
3364 return self._proxied.pool
3365
3366 @pool.setter
3367 def pool(self, pool: Pool) -> None:
3368 self._proxied.pool = pool
3369
3370 @property
3371 def _has_events(self) -> bool:
3372 return self._proxied._has_events or self.__dict__.get(
3373 "_has_events", False
3374 )
3375
3376 @_has_events.setter
3377 def _has_events(self, value: bool) -> None:
3378 self.__dict__["_has_events"] = value
3379
3380
3381class OptionEngine(OptionEngineMixin, Engine):
3382 def update_execution_options(self, **opt: Any) -> None:
3383 Engine.update_execution_options(self, **opt)
3384
3385
3386Engine._option_cls = OptionEngine