1# engine/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`."""
8
9from __future__ import annotations
10
11import contextlib
12import sys
13import typing
14from typing import Any
15from typing import Callable
16from typing import cast
17from typing import Iterable
18from typing import Iterator
19from typing import List
20from typing import Mapping
21from typing import NoReturn
22from typing import Optional
23from typing import overload
24from typing import Tuple
25from typing import Type
26from typing import TypeVar
27from typing import Union
28
29from .interfaces import BindTyping
30from .interfaces import ConnectionEventsTarget
31from .interfaces import DBAPICursor
32from .interfaces import ExceptionContext
33from .interfaces import ExecuteStyle
34from .interfaces import ExecutionContext
35from .interfaces import IsolationLevel
36from .util import _distill_params_20
37from .util import _distill_raw_params
38from .util import TransactionalContext
39from .. import exc
40from .. import inspection
41from .. import log
42from .. import util
43from ..sql import compiler
44from ..sql import util as sql_util
45
46if typing.TYPE_CHECKING:
47 from . import CursorResult
48 from . import ScalarResult
49 from .interfaces import _AnyExecuteParams
50 from .interfaces import _AnyMultiExecuteParams
51 from .interfaces import _CoreAnyExecuteParams
52 from .interfaces import _CoreMultiExecuteParams
53 from .interfaces import _CoreSingleExecuteParams
54 from .interfaces import _DBAPIAnyExecuteParams
55 from .interfaces import _DBAPISingleExecuteParams
56 from .interfaces import _ExecuteOptions
57 from .interfaces import CompiledCacheType
58 from .interfaces import CoreExecuteOptionsParameter
59 from .interfaces import Dialect
60 from .interfaces import SchemaTranslateMapType
61 from .reflection import Inspector # noqa
62 from .url import URL
63 from ..event import dispatcher
64 from ..log import _EchoFlagType
65 from ..pool import _ConnectionFairy
66 from ..pool import Pool
67 from ..pool import PoolProxiedConnection
68 from ..sql import Executable
69 from ..sql._typing import _InfoType
70 from ..sql.compiler import Compiled
71 from ..sql.ddl import ExecutableDDLElement
72 from ..sql.ddl import InvokeDDLBase
73 from ..sql.functions import FunctionElement
74 from ..sql.schema import DefaultGenerator
75 from ..sql.schema import HasSchemaAttr
76 from ..sql.schema import SchemaVisitable
77 from ..sql.selectable import TypedReturnsRows
78
79
80_T = TypeVar("_T", bound=Any)
81_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
82NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
83
84
85class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
86 """Provides high-level functionality for a wrapped DB-API connection.
87
88 The :class:`_engine.Connection` object is procured by calling the
89 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
90 object, and provides services for execution of SQL statements as well
91 as transaction control.
92
93 The Connection object is **not** thread-safe. While a Connection can be
94 shared among threads using properly synchronized access, it is still
95 possible that the underlying DBAPI connection may not support shared
96 access between threads. Check the DBAPI documentation for details.
97
98 The Connection object represents a single DBAPI connection checked out
99 from the connection pool. In this state, the connection pool has no
100 affect upon the connection, including its expiration or timeout state.
101 For the connection pool to properly manage connections, connections
102 should be returned to the connection pool (i.e. ``connection.close()``)
103 whenever the connection is not in use.
104
105 .. index::
106 single: thread safety; Connection
107
108 """
109
110 dialect: Dialect
111 dispatch: dispatcher[ConnectionEventsTarget]
112
113 _sqla_logger_namespace = "sqlalchemy.engine.Connection"
114
115 # used by sqlalchemy.engine.util.TransactionalContext
116 _trans_context_manager: Optional[TransactionalContext] = None
117
118 # legacy as of 2.0, should be eventually deprecated and
119 # removed. was used in the "pre_ping" recipe that's been in the docs
120 # a long time
121 should_close_with_result = False
122
123 _dbapi_connection: Optional[PoolProxiedConnection]
124
125 _execution_options: _ExecuteOptions
126
127 _transaction: Optional[RootTransaction]
128 _nested_transaction: Optional[NestedTransaction]
129
130 def __init__(
131 self,
132 engine: Engine,
133 connection: Optional[PoolProxiedConnection] = None,
134 _has_events: Optional[bool] = None,
135 _allow_revalidate: bool = True,
136 _allow_autobegin: bool = True,
137 ):
138 """Construct a new Connection."""
139 self.engine = engine
140 self.dialect = dialect = engine.dialect
141
142 if connection is None:
143 try:
144 self._dbapi_connection = engine.raw_connection()
145 except dialect.loaded_dbapi.Error as err:
146 Connection._handle_dbapi_exception_noconnection(
147 err, dialect, engine
148 )
149 raise
150 else:
151 self._dbapi_connection = connection
152
153 self._transaction = self._nested_transaction = None
154 self.__savepoint_seq = 0
155 self.__in_begin = False
156
157 self.__can_reconnect = _allow_revalidate
158 self._allow_autobegin = _allow_autobegin
159 self._echo = self.engine._should_log_info()
160
161 if _has_events is None:
162 # if _has_events is sent explicitly as False,
163 # then don't join the dispatch of the engine; we don't
164 # want to handle any of the engine's events in that case.
165 self.dispatch = self.dispatch._join(engine.dispatch)
166 self._has_events = _has_events or (
167 _has_events is None and engine._has_events
168 )
169
170 self._execution_options = engine._execution_options
171
172 if self._has_events or self.engine._has_events:
173 self.dispatch.engine_connect(self)
174
175 # this can be assigned differently via
176 # characteristics.LoggingTokenCharacteristic
177 _message_formatter: Any = None
178
179 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
180 fmt = self._message_formatter
181
182 if fmt:
183 message = fmt(message)
184
185 if log.STACKLEVEL:
186 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
187
188 self.engine.logger.info(message, *arg, **kw)
189
190 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
191 fmt = self._message_formatter
192
193 if fmt:
194 message = fmt(message)
195
196 if log.STACKLEVEL:
197 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
198
199 self.engine.logger.debug(message, *arg, **kw)
200
201 @property
202 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]:
203 schema_translate_map: Optional[SchemaTranslateMapType] = (
204 self._execution_options.get("schema_translate_map", None)
205 )
206
207 return schema_translate_map
208
209 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
210 """Return the schema name for the given schema item taking into
211 account current schema translate map.
212
213 """
214
215 name = obj.schema
216 schema_translate_map: Optional[SchemaTranslateMapType] = (
217 self._execution_options.get("schema_translate_map", None)
218 )
219
220 if (
221 schema_translate_map
222 and name in schema_translate_map
223 and obj._use_schema_map
224 ):
225 return schema_translate_map[name]
226 else:
227 return name
228
229 def __enter__(self) -> Connection:
230 return self
231
232 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
233 self.close()
234
235 @overload
236 def execution_options(
237 self,
238 *,
239 compiled_cache: Optional[CompiledCacheType] = ...,
240 logging_token: str = ...,
241 isolation_level: IsolationLevel = ...,
242 no_parameters: bool = False,
243 stream_results: bool = False,
244 max_row_buffer: int = ...,
245 yield_per: int = ...,
246 insertmanyvalues_page_size: int = ...,
247 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
248 preserve_rowcount: bool = False,
249 **opt: Any,
250 ) -> Connection: ...
251
252 @overload
253 def execution_options(self, **opt: Any) -> Connection: ...
254
255 def execution_options(self, **opt: Any) -> Connection:
256 r"""Set non-SQL options for the connection which take effect
257 during execution.
258
259 This method modifies this :class:`_engine.Connection` **in-place**;
260 the return value is the same :class:`_engine.Connection` object
261 upon which the method is called. Note that this is in contrast
262 to the behavior of the ``execution_options`` methods on other
263 objects such as :meth:`_engine.Engine.execution_options` and
264 :meth:`_sql.Executable.execution_options`. The rationale is that many
265 such execution options necessarily modify the state of the base
266 DBAPI connection in any case so there is no feasible means of
267 keeping the effect of such an option localized to a "sub" connection.
268
269 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options`
270 method, in contrast to other objects with this method, modifies
271 the connection in-place without creating copy of it.
272
273 As discussed elsewhere, the :meth:`_engine.Connection.execution_options`
274 method accepts any arbitrary parameters including user defined names.
275 All parameters given are consumable in a number of ways including
276 by using the :meth:`_engine.Connection.get_execution_options` method.
277 See the examples at :meth:`_sql.Executable.execution_options`
278 and :meth:`_engine.Engine.execution_options`.
279
280 The keywords that are currently recognized by SQLAlchemy itself
281 include all those listed under :meth:`.Executable.execution_options`,
282 as well as others that are specific to :class:`_engine.Connection`.
283
284 :param compiled_cache: Available on: :class:`_engine.Connection`,
285 :class:`_engine.Engine`.
286
287 A dictionary where :class:`.Compiled` objects
288 will be cached when the :class:`_engine.Connection`
289 compiles a clause
290 expression into a :class:`.Compiled` object. This dictionary will
291 supersede the statement cache that may be configured on the
292 :class:`_engine.Engine` itself. If set to None, caching
293 is disabled, even if the engine has a configured cache size.
294
295 Note that the ORM makes use of its own "compiled" caches for
296 some operations, including flush operations. The caching
297 used by the ORM internally supersedes a cache dictionary
298 specified here.
299
300 :param logging_token: Available on: :class:`_engine.Connection`,
301 :class:`_engine.Engine`, :class:`_sql.Executable`.
302
303 Adds the specified string token surrounded by brackets in log
304 messages logged by the connection, i.e. the logging that's enabled
305 either via the :paramref:`_sa.create_engine.echo` flag or via the
306 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
307 per-connection or per-sub-engine token to be available which is
308 useful for debugging concurrent connection scenarios.
309
310 .. versionadded:: 1.4.0b2
311
312 .. seealso::
313
314 :ref:`dbengine_logging_tokens` - usage example
315
316 :paramref:`_sa.create_engine.logging_name` - adds a name to the
317 name used by the Python logger object itself.
318
319 :param isolation_level: Available on: :class:`_engine.Connection`,
320 :class:`_engine.Engine`.
321
322 Set the transaction isolation level for the lifespan of this
323 :class:`_engine.Connection` object.
324 Valid values include those string
325 values accepted by the :paramref:`_sa.create_engine.isolation_level`
326 parameter passed to :func:`_sa.create_engine`. These levels are
327 semi-database specific; see individual dialect documentation for
328 valid levels.
329
330 The isolation level option applies the isolation level by emitting
331 statements on the DBAPI connection, and **necessarily affects the
332 original Connection object overall**. The isolation level will remain
333 at the given setting until explicitly changed, or when the DBAPI
334 connection itself is :term:`released` to the connection pool, i.e. the
335 :meth:`_engine.Connection.close` method is called, at which time an
336 event handler will emit additional statements on the DBAPI connection
337 in order to revert the isolation level change.
338
339 .. note:: The ``isolation_level`` execution option may only be
340 established before the :meth:`_engine.Connection.begin` method is
341 called, as well as before any SQL statements are emitted which
342 would otherwise trigger "autobegin", or directly after a call to
343 :meth:`_engine.Connection.commit` or
344 :meth:`_engine.Connection.rollback`. A database cannot change the
345 isolation level on a transaction in progress.
346
347 .. note:: The ``isolation_level`` execution option is implicitly
348 reset if the :class:`_engine.Connection` is invalidated, e.g. via
349 the :meth:`_engine.Connection.invalidate` method, or if a
350 disconnection error occurs. The new connection produced after the
351 invalidation will **not** have the selected isolation level
352 re-applied to it automatically.
353
354 .. seealso::
355
356 :ref:`dbapi_autocommit`
357
358 :meth:`_engine.Connection.get_isolation_level`
359 - view current actual level
360
361 :param no_parameters: Available on: :class:`_engine.Connection`,
362 :class:`_sql.Executable`.
363
364 When ``True``, if the final parameter
365 list or dictionary is totally empty, will invoke the
366 statement on the cursor as ``cursor.execute(statement)``,
367 not passing the parameter collection at all.
368 Some DBAPIs such as psycopg2 and mysql-python consider
369 percent signs as significant only when parameters are
370 present; this option allows code to generate SQL
371 containing percent signs (and possibly other characters)
372 that is neutral regarding whether it's executed by the DBAPI
373 or piped into a script that's later invoked by
374 command line tools.
375
376 :param stream_results: Available on: :class:`_engine.Connection`,
377 :class:`_sql.Executable`.
378
379 Indicate to the dialect that results should be "streamed" and not
380 pre-buffered, if possible. For backends such as PostgreSQL, MySQL
381 and MariaDB, this indicates the use of a "server side cursor" as
382 opposed to a client side cursor. Other backends such as that of
383 Oracle Database may already use server side cursors by default.
384
385 The usage of
386 :paramref:`_engine.Connection.execution_options.stream_results` is
387 usually combined with setting a fixed number of rows to to be fetched
388 in batches, to allow for efficient iteration of database rows while
389 at the same time not loading all result rows into memory at once;
390 this can be configured on a :class:`_engine.Result` object using the
391 :meth:`_engine.Result.yield_per` method, after execution has
392 returned a new :class:`_engine.Result`. If
393 :meth:`_engine.Result.yield_per` is not used,
394 the :paramref:`_engine.Connection.execution_options.stream_results`
395 mode of operation will instead use a dynamically sized buffer
396 which buffers sets of rows at a time, growing on each batch
397 based on a fixed growth size up until a limit which may
398 be configured using the
399 :paramref:`_engine.Connection.execution_options.max_row_buffer`
400 parameter.
401
402 When using the ORM to fetch ORM mapped objects from a result,
403 :meth:`_engine.Result.yield_per` should always be used with
404 :paramref:`_engine.Connection.execution_options.stream_results`,
405 so that the ORM does not fetch all rows into new ORM objects at once.
406
407 For typical use, the
408 :paramref:`_engine.Connection.execution_options.yield_per` execution
409 option should be preferred, which sets up both
410 :paramref:`_engine.Connection.execution_options.stream_results` and
411 :meth:`_engine.Result.yield_per` at once. This option is supported
412 both at a core level by :class:`_engine.Connection` as well as by the
413 ORM :class:`_engine.Session`; the latter is described at
414 :ref:`orm_queryguide_yield_per`.
415
416 .. seealso::
417
418 :ref:`engine_stream_results` - background on
419 :paramref:`_engine.Connection.execution_options.stream_results`
420
421 :paramref:`_engine.Connection.execution_options.max_row_buffer`
422
423 :paramref:`_engine.Connection.execution_options.yield_per`
424
425 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
426 describing the ORM version of ``yield_per``
427
428 :param max_row_buffer: Available on: :class:`_engine.Connection`,
429 :class:`_sql.Executable`. Sets a maximum
430 buffer size to use when the
431 :paramref:`_engine.Connection.execution_options.stream_results`
432 execution option is used on a backend that supports server side
433 cursors. The default value if not specified is 1000.
434
435 .. seealso::
436
437 :paramref:`_engine.Connection.execution_options.stream_results`
438
439 :ref:`engine_stream_results`
440
441
442 :param yield_per: Available on: :class:`_engine.Connection`,
443 :class:`_sql.Executable`. Integer value applied which will
444 set the :paramref:`_engine.Connection.execution_options.stream_results`
445 execution option and invoke :meth:`_engine.Result.yield_per`
446 automatically at once. Allows equivalent functionality as
447 is present when using this parameter with the ORM.
448
449 .. versionadded:: 1.4.40
450
451 .. seealso::
452
453 :ref:`engine_stream_results` - background and examples
454 on using server side cursors with Core.
455
456 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
457 describing the ORM version of ``yield_per``
458
459 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`,
460 :class:`_engine.Engine`. Number of rows to format into an
461 INSERT statement when the statement uses "insertmanyvalues" mode,
462 which is a paged form of bulk insert that is used for many backends
463 when using :term:`executemany` execution typically in conjunction
464 with RETURNING. Defaults to 1000. May also be modified on a
465 per-engine basis using the
466 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
467
468 .. versionadded:: 2.0
469
470 .. seealso::
471
472 :ref:`engine_insertmanyvalues`
473
474 :param schema_translate_map: Available on: :class:`_engine.Connection`,
475 :class:`_engine.Engine`, :class:`_sql.Executable`.
476
477 A dictionary mapping schema names to schema names, that will be
478 applied to the :paramref:`_schema.Table.schema` element of each
479 :class:`_schema.Table`
480 encountered when SQL or DDL expression elements
481 are compiled into strings; the resulting schema name will be
482 converted based on presence in the map of the original name.
483
484 .. seealso::
485
486 :ref:`schema_translating`
487
488 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount``
489 attribute will be unconditionally memoized within the result and
490 made available via the :attr:`.CursorResult.rowcount` attribute.
491 Normally, this attribute is only preserved for UPDATE and DELETE
492 statements. Using this option, the DBAPIs rowcount value can
493 be accessed for other kinds of statements such as INSERT and SELECT,
494 to the degree that the DBAPI supports these statements. See
495 :attr:`.CursorResult.rowcount` for notes regarding the behavior
496 of this attribute.
497
498 .. versionadded:: 2.0.28
499
500 .. seealso::
501
502 :meth:`_engine.Engine.execution_options`
503
504 :meth:`.Executable.execution_options`
505
506 :meth:`_engine.Connection.get_execution_options`
507
508 :ref:`orm_queryguide_execution_options` - documentation on all
509 ORM-specific execution options
510
511 """ # noqa
512 if self._has_events or self.engine._has_events:
513 self.dispatch.set_connection_execution_options(self, opt)
514 self._execution_options = self._execution_options.union(opt)
515 self.dialect.set_connection_execution_options(self, opt)
516 return self
517
518 def get_execution_options(self) -> _ExecuteOptions:
519 """Get the non-SQL options which will take effect during execution.
520
521 .. versionadded:: 1.3
522
523 .. seealso::
524
525 :meth:`_engine.Connection.execution_options`
526 """
527 return self._execution_options
528
529 @property
530 def _still_open_and_dbapi_connection_is_valid(self) -> bool:
531 pool_proxied_connection = self._dbapi_connection
532 return (
533 pool_proxied_connection is not None
534 and pool_proxied_connection.is_valid
535 )
536
537 @property
538 def closed(self) -> bool:
539 """Return True if this connection is closed."""
540
541 return self._dbapi_connection is None and not self.__can_reconnect
542
543 @property
544 def invalidated(self) -> bool:
545 """Return True if this connection was invalidated.
546
547 This does not indicate whether or not the connection was
548 invalidated at the pool level, however
549
550 """
551
552 # prior to 1.4, "invalid" was stored as a state independent of
553 # "closed", meaning an invalidated connection could be "closed",
554 # the _dbapi_connection would be None and closed=True, yet the
555 # "invalid" flag would stay True. This meant that there were
556 # three separate states (open/valid, closed/valid, closed/invalid)
557 # when there is really no reason for that; a connection that's
558 # "closed" does not need to be "invalid". So the state is now
559 # represented by the two facts alone.
560
561 pool_proxied_connection = self._dbapi_connection
562 return pool_proxied_connection is None and self.__can_reconnect
563
564 @property
565 def connection(self) -> PoolProxiedConnection:
566 """The underlying DB-API connection managed by this Connection.
567
568 This is a SQLAlchemy connection-pool proxied connection
569 which then has the attribute
570 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
571 actual driver connection.
572
573 .. seealso::
574
575
576 :ref:`dbapi_connections`
577
578 """
579
580 if self._dbapi_connection is None:
581 try:
582 return self._revalidate_connection()
583 except (exc.PendingRollbackError, exc.ResourceClosedError):
584 raise
585 except BaseException as e:
586 self._handle_dbapi_exception(e, None, None, None, None)
587 else:
588 return self._dbapi_connection
589
590 def get_isolation_level(self) -> IsolationLevel:
591 """Return the current **actual** isolation level that's present on
592 the database within the scope of this connection.
593
594 This attribute will perform a live SQL operation against the database
595 in order to procure the current isolation level, so the value returned
596 is the actual level on the underlying DBAPI connection regardless of
597 how this state was set. This will be one of the four actual isolation
598 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
599 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
600 level setting. Third party dialects may also feature additional
601 isolation level settings.
602
603 .. note:: This method **will not report** on the ``AUTOCOMMIT``
604 isolation level, which is a separate :term:`dbapi` setting that's
605 independent of **actual** isolation level. When ``AUTOCOMMIT`` is
606 in use, the database connection still has a "traditional" isolation
607 mode in effect, that is typically one of the four values
608 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
609 ``SERIALIZABLE``.
610
611 Compare to the :attr:`_engine.Connection.default_isolation_level`
612 accessor which returns the isolation level that is present on the
613 database at initial connection time.
614
615 .. seealso::
616
617 :attr:`_engine.Connection.default_isolation_level`
618 - view default level
619
620 :paramref:`_sa.create_engine.isolation_level`
621 - set per :class:`_engine.Engine` isolation level
622
623 :paramref:`.Connection.execution_options.isolation_level`
624 - set per :class:`_engine.Connection` isolation level
625
626 """
627 dbapi_connection = self.connection.dbapi_connection
628 assert dbapi_connection is not None
629 try:
630 return self.dialect.get_isolation_level(dbapi_connection)
631 except BaseException as e:
632 self._handle_dbapi_exception(e, None, None, None, None)
633
634 @property
635 def default_isolation_level(self) -> Optional[IsolationLevel]:
636 """The initial-connection time isolation level associated with the
637 :class:`_engine.Dialect` in use.
638
639 This value is independent of the
640 :paramref:`.Connection.execution_options.isolation_level` and
641 :paramref:`.Engine.execution_options.isolation_level` execution
642 options, and is determined by the :class:`_engine.Dialect` when the
643 first connection is created, by performing a SQL query against the
644 database for the current isolation level before any additional commands
645 have been emitted.
646
647 Calling this accessor does not invoke any new SQL queries.
648
649 .. seealso::
650
651 :meth:`_engine.Connection.get_isolation_level`
652 - view current actual isolation level
653
654 :paramref:`_sa.create_engine.isolation_level`
655 - set per :class:`_engine.Engine` isolation level
656
657 :paramref:`.Connection.execution_options.isolation_level`
658 - set per :class:`_engine.Connection` isolation level
659
660 """
661 return self.dialect.default_isolation_level
662
663 def _invalid_transaction(self) -> NoReturn:
664 raise exc.PendingRollbackError(
665 "Can't reconnect until invalid %stransaction is rolled "
666 "back. Please rollback() fully before proceeding"
667 % ("savepoint " if self._nested_transaction is not None else ""),
668 code="8s2b",
669 )
670
671 def _revalidate_connection(self) -> PoolProxiedConnection:
672 if self.__can_reconnect and self.invalidated:
673 if self._transaction is not None:
674 self._invalid_transaction()
675 self._dbapi_connection = self.engine.raw_connection()
676 return self._dbapi_connection
677 raise exc.ResourceClosedError("This Connection is closed")
678
679 @property
680 def info(self) -> _InfoType:
681 """Info dictionary associated with the underlying DBAPI connection
682 referred to by this :class:`_engine.Connection`, allowing user-defined
683 data to be associated with the connection.
684
685 The data here will follow along with the DBAPI connection including
686 after it is returned to the connection pool and used again
687 in subsequent instances of :class:`_engine.Connection`.
688
689 """
690
691 return self.connection.info
692
693 def invalidate(self, exception: Optional[BaseException] = None) -> None:
694 """Invalidate the underlying DBAPI connection associated with
695 this :class:`_engine.Connection`.
696
697 An attempt will be made to close the underlying DBAPI connection
698 immediately; however if this operation fails, the error is logged
699 but not raised. The connection is then discarded whether or not
700 close() succeeded.
701
702 Upon the next use (where "use" typically means using the
703 :meth:`_engine.Connection.execute` method or similar),
704 this :class:`_engine.Connection` will attempt to
705 procure a new DBAPI connection using the services of the
706 :class:`_pool.Pool` as a source of connectivity (e.g.
707 a "reconnection").
708
709 If a transaction was in progress (e.g. the
710 :meth:`_engine.Connection.begin` method has been called) when
711 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
712 level all state associated with this transaction is lost, as
713 the DBAPI connection is closed. The :class:`_engine.Connection`
714 will not allow a reconnection to proceed until the
715 :class:`.Transaction` object is ended, by calling the
716 :meth:`.Transaction.rollback` method; until that point, any attempt at
717 continuing to use the :class:`_engine.Connection` will raise an
718 :class:`~sqlalchemy.exc.InvalidRequestError`.
719 This is to prevent applications from accidentally
720 continuing an ongoing transactional operations despite the
721 fact that the transaction has been lost due to an
722 invalidation.
723
724 The :meth:`_engine.Connection.invalidate` method,
725 just like auto-invalidation,
726 will at the connection pool level invoke the
727 :meth:`_events.PoolEvents.invalidate` event.
728
729 :param exception: an optional ``Exception`` instance that's the
730 reason for the invalidation. is passed along to event handlers
731 and logging functions.
732
733 .. seealso::
734
735 :ref:`pool_connection_invalidation`
736
737 """
738
739 if self.invalidated:
740 return
741
742 if self.closed:
743 raise exc.ResourceClosedError("This Connection is closed")
744
745 if self._still_open_and_dbapi_connection_is_valid:
746 pool_proxied_connection = self._dbapi_connection
747 assert pool_proxied_connection is not None
748 pool_proxied_connection.invalidate(exception)
749
750 self._dbapi_connection = None
751
752 def detach(self) -> None:
753 """Detach the underlying DB-API connection from its connection pool.
754
755 E.g.::
756
757 with engine.connect() as conn:
758 conn.detach()
759 conn.execute(text("SET search_path TO schema1, schema2"))
760
761 # work with connection
762
763 # connection is fully closed (since we used "with:", can
764 # also call .close())
765
766 This :class:`_engine.Connection` instance will remain usable.
767 When closed
768 (or exited from a context manager context as above),
769 the DB-API connection will be literally closed and not
770 returned to its originating pool.
771
772 This method can be used to insulate the rest of an application
773 from a modified state on a connection (such as a transaction
774 isolation level or similar).
775
776 """
777
778 if self.closed:
779 raise exc.ResourceClosedError("This Connection is closed")
780
781 pool_proxied_connection = self._dbapi_connection
782 if pool_proxied_connection is None:
783 raise exc.InvalidRequestError(
784 "Can't detach an invalidated Connection"
785 )
786 pool_proxied_connection.detach()
787
788 def _autobegin(self) -> None:
789 if self._allow_autobegin and not self.__in_begin:
790 self.begin()
791
792 def begin(self) -> RootTransaction:
793 """Begin a transaction prior to autobegin occurring.
794
795 E.g.::
796
797 with engine.connect() as conn:
798 with conn.begin() as trans:
799 conn.execute(table.insert(), {"username": "sandy"})
800
801 The returned object is an instance of :class:`_engine.RootTransaction`.
802 This object represents the "scope" of the transaction,
803 which completes when either the :meth:`_engine.Transaction.rollback`
804 or :meth:`_engine.Transaction.commit` method is called; the object
805 also works as a context manager as illustrated above.
806
807 The :meth:`_engine.Connection.begin` method begins a
808 transaction that normally will be begun in any case when the connection
809 is first used to execute a statement. The reason this method might be
810 used would be to invoke the :meth:`_events.ConnectionEvents.begin`
811 event at a specific time, or to organize code within the scope of a
812 connection checkout in terms of context managed blocks, such as::
813
814 with engine.connect() as conn:
815 with conn.begin():
816 conn.execute(...)
817 conn.execute(...)
818
819 with conn.begin():
820 conn.execute(...)
821 conn.execute(...)
822
823 The above code is not fundamentally any different in its behavior than
824 the following code which does not use
825 :meth:`_engine.Connection.begin`; the below style is known
826 as "commit as you go" style::
827
828 with engine.connect() as conn:
829 conn.execute(...)
830 conn.execute(...)
831 conn.commit()
832
833 conn.execute(...)
834 conn.execute(...)
835 conn.commit()
836
837 From a database point of view, the :meth:`_engine.Connection.begin`
838 method does not emit any SQL or change the state of the underlying
839 DBAPI connection in any way; the Python DBAPI does not have any
840 concept of explicit transaction begin.
841
842 .. seealso::
843
844 :ref:`tutorial_working_with_transactions` - in the
845 :ref:`unified_tutorial`
846
847 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
848
849 :meth:`_engine.Connection.begin_twophase` -
850 use a two phase /XID transaction
851
852 :meth:`_engine.Engine.begin` - context manager available from
853 :class:`_engine.Engine`
854
855 """
856 if self._transaction is None:
857 self._transaction = RootTransaction(self)
858 return self._transaction
859 else:
860 raise exc.InvalidRequestError(
861 "This connection has already initialized a SQLAlchemy "
862 "Transaction() object via begin() or autobegin; can't "
863 "call begin() here unless rollback() or commit() "
864 "is called first."
865 )
866
867 def begin_nested(self) -> NestedTransaction:
868 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
869 handle that controls the scope of the SAVEPOINT.
870
871 E.g.::
872
873 with engine.begin() as connection:
874 with connection.begin_nested():
875 connection.execute(table.insert(), {"username": "sandy"})
876
877 The returned object is an instance of
878 :class:`_engine.NestedTransaction`, which includes transactional
879 methods :meth:`_engine.NestedTransaction.commit` and
880 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction,
881 these methods correspond to the operations "RELEASE SAVEPOINT <name>"
882 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local
883 to the :class:`_engine.NestedTransaction` object and is generated
884 automatically. Like any other :class:`_engine.Transaction`, the
885 :class:`_engine.NestedTransaction` may be used as a context manager as
886 illustrated above which will "release" or "rollback" corresponding to
887 if the operation within the block were successful or raised an
888 exception.
889
890 Nested transactions require SAVEPOINT support in the underlying
891 database, else the behavior is undefined. SAVEPOINT is commonly used to
892 run operations within a transaction that may fail, while continuing the
893 outer transaction. E.g.::
894
895 from sqlalchemy import exc
896
897 with engine.begin() as connection:
898 trans = connection.begin_nested()
899 try:
900 connection.execute(table.insert(), {"username": "sandy"})
901 trans.commit()
902 except exc.IntegrityError: # catch for duplicate username
903 trans.rollback() # rollback to savepoint
904
905 # outer transaction continues
906 connection.execute(...)
907
908 If :meth:`_engine.Connection.begin_nested` is called without first
909 calling :meth:`_engine.Connection.begin` or
910 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object
911 will "autobegin" the outer transaction first. This outer transaction
912 may be committed using "commit-as-you-go" style, e.g.::
913
914 with engine.connect() as connection: # begin() wasn't called
915
916 with connection.begin_nested(): # will auto-"begin()" first
917 connection.execute(...)
918 # savepoint is released
919
920 connection.execute(...)
921
922 # explicitly commit outer transaction
923 connection.commit()
924
925 # can continue working with connection here
926
927 .. versionchanged:: 2.0
928
929 :meth:`_engine.Connection.begin_nested` will now participate
930 in the connection "autobegin" behavior that is new as of
931 2.0 / "future" style connections in 1.4.
932
933 .. seealso::
934
935 :meth:`_engine.Connection.begin`
936
937 :ref:`session_begin_nested` - ORM support for SAVEPOINT
938
939 """
940 if self._transaction is None:
941 self._autobegin()
942
943 return NestedTransaction(self)
944
945 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
946 """Begin a two-phase or XA transaction and return a transaction
947 handle.
948
949 The returned object is an instance of :class:`.TwoPhaseTransaction`,
950 which in addition to the methods provided by
951 :class:`.Transaction`, also provides a
952 :meth:`~.TwoPhaseTransaction.prepare` method.
953
954 :param xid: the two phase transaction id. If not supplied, a
955 random id will be generated. The accepted type and value depends on
956 the driver in use.
957
958 .. seealso::
959
960 :meth:`_engine.Connection.begin`
961
962 :meth:`_engine.Connection.begin_twophase`
963
964 """
965
966 if self._transaction is not None:
967 raise exc.InvalidRequestError(
968 "Cannot start a two phase transaction when a transaction "
969 "is already in progress."
970 )
971 if xid is None:
972 xid = self.engine.dialect.create_xid()
973 return TwoPhaseTransaction(self, xid)
974
975 def commit(self) -> None:
976 """Commit the transaction that is currently in progress.
977
978 This method commits the current transaction if one has been started.
979 If no transaction was started, the method has no effect, assuming
980 the connection is in a non-invalidated state.
981
982 A transaction is begun on a :class:`_engine.Connection` automatically
983 whenever a statement is first executed, or when the
984 :meth:`_engine.Connection.begin` method is called.
985
986 .. note:: The :meth:`_engine.Connection.commit` method only acts upon
987 the primary database transaction that is linked to the
988 :class:`_engine.Connection` object. It does not operate upon a
989 SAVEPOINT that would have been invoked from the
990 :meth:`_engine.Connection.begin_nested` method; for control of a
991 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
992 :class:`_engine.NestedTransaction` that is returned by the
993 :meth:`_engine.Connection.begin_nested` method itself.
994
995
996 """
997 if self._transaction:
998 self._transaction.commit()
999
1000 def rollback(self) -> None:
1001 """Roll back the transaction that is currently in progress.
1002
1003 This method rolls back the current transaction if one has been started.
1004 If no transaction was started, the method has no effect. If a
1005 transaction was started and the connection is in an invalidated state,
1006 the transaction is cleared using this method.
1007
1008 A transaction is begun on a :class:`_engine.Connection` automatically
1009 whenever a statement is first executed, or when the
1010 :meth:`_engine.Connection.begin` method is called.
1011
1012 .. note:: The :meth:`_engine.Connection.rollback` method only acts
1013 upon the primary database transaction that is linked to the
1014 :class:`_engine.Connection` object. It does not operate upon a
1015 SAVEPOINT that would have been invoked from the
1016 :meth:`_engine.Connection.begin_nested` method; for control of a
1017 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
1018 :class:`_engine.NestedTransaction` that is returned by the
1019 :meth:`_engine.Connection.begin_nested` method itself.
1020
1021
1022 """
1023 if self._transaction:
1024 self._transaction.rollback()
1025
1026 def recover_twophase(self) -> List[Any]:
1027 return self.engine.dialect.do_recover_twophase(self)
1028
1029 def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
1030 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
1031
1032 def commit_prepared(self, xid: Any, recover: bool = False) -> None:
1033 self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
1034
1035 def in_transaction(self) -> bool:
1036 """Return True if a transaction is in progress."""
1037 return self._transaction is not None and self._transaction.is_active
1038
1039 def in_nested_transaction(self) -> bool:
1040 """Return True if a transaction is in progress."""
1041 return (
1042 self._nested_transaction is not None
1043 and self._nested_transaction.is_active
1044 )
1045
1046 def _is_autocommit_isolation(self) -> bool:
1047 opt_iso = self._execution_options.get("isolation_level", None)
1048 return bool(
1049 opt_iso == "AUTOCOMMIT"
1050 or (
1051 opt_iso is None
1052 and self.engine.dialect._on_connect_isolation_level
1053 == "AUTOCOMMIT"
1054 )
1055 )
1056
1057 def _get_required_transaction(self) -> RootTransaction:
1058 trans = self._transaction
1059 if trans is None:
1060 raise exc.InvalidRequestError("connection is not in a transaction")
1061 return trans
1062
1063 def _get_required_nested_transaction(self) -> NestedTransaction:
1064 trans = self._nested_transaction
1065 if trans is None:
1066 raise exc.InvalidRequestError(
1067 "connection is not in a nested transaction"
1068 )
1069 return trans
1070
1071 def get_transaction(self) -> Optional[RootTransaction]:
1072 """Return the current root transaction in progress, if any.
1073
1074 .. versionadded:: 1.4
1075
1076 """
1077
1078 return self._transaction
1079
1080 def get_nested_transaction(self) -> Optional[NestedTransaction]:
1081 """Return the current nested transaction in progress, if any.
1082
1083 .. versionadded:: 1.4
1084
1085 """
1086 return self._nested_transaction
1087
1088 def _begin_impl(self, transaction: RootTransaction) -> None:
1089 if self._echo:
1090 if self._is_autocommit_isolation():
1091 self._log_info(
1092 "BEGIN (implicit; DBAPI should not BEGIN due to "
1093 "autocommit mode)"
1094 )
1095 else:
1096 self._log_info("BEGIN (implicit)")
1097
1098 self.__in_begin = True
1099
1100 if self._has_events or self.engine._has_events:
1101 self.dispatch.begin(self)
1102
1103 try:
1104 self.engine.dialect.do_begin(self.connection)
1105 except BaseException as e:
1106 self._handle_dbapi_exception(e, None, None, None, None)
1107 finally:
1108 self.__in_begin = False
1109
1110 def _rollback_impl(self) -> None:
1111 if self._has_events or self.engine._has_events:
1112 self.dispatch.rollback(self)
1113
1114 if self._still_open_and_dbapi_connection_is_valid:
1115 if self._echo:
1116 if self._is_autocommit_isolation():
1117 if self.dialect.skip_autocommit_rollback:
1118 self._log_info(
1119 "ROLLBACK will be skipped by "
1120 "skip_autocommit_rollback"
1121 )
1122 else:
1123 self._log_info(
1124 "ROLLBACK using DBAPI connection.rollback(); "
1125 "set skip_autocommit_rollback to prevent fully"
1126 )
1127 else:
1128 self._log_info("ROLLBACK")
1129 try:
1130 self.engine.dialect.do_rollback(self.connection)
1131 except BaseException as e:
1132 self._handle_dbapi_exception(e, None, None, None, None)
1133
1134 def _commit_impl(self) -> None:
1135 if self._has_events or self.engine._has_events:
1136 self.dispatch.commit(self)
1137
1138 if self._echo:
1139 if self._is_autocommit_isolation():
1140 self._log_info(
1141 "COMMIT using DBAPI connection.commit(), "
1142 "has no effect due to autocommit mode"
1143 )
1144 else:
1145 self._log_info("COMMIT")
1146 try:
1147 self.engine.dialect.do_commit(self.connection)
1148 except BaseException as e:
1149 self._handle_dbapi_exception(e, None, None, None, None)
1150
1151 def _savepoint_impl(self, name: Optional[str] = None) -> str:
1152 if self._has_events or self.engine._has_events:
1153 self.dispatch.savepoint(self, name)
1154
1155 if name is None:
1156 self.__savepoint_seq += 1
1157 name = "sa_savepoint_%s" % self.__savepoint_seq
1158 self.engine.dialect.do_savepoint(self, name)
1159 return name
1160
1161 def _rollback_to_savepoint_impl(self, name: str) -> None:
1162 if self._has_events or self.engine._has_events:
1163 self.dispatch.rollback_savepoint(self, name, None)
1164
1165 if self._still_open_and_dbapi_connection_is_valid:
1166 self.engine.dialect.do_rollback_to_savepoint(self, name)
1167
1168 def _release_savepoint_impl(self, name: str) -> None:
1169 if self._has_events or self.engine._has_events:
1170 self.dispatch.release_savepoint(self, name, None)
1171
1172 self.engine.dialect.do_release_savepoint(self, name)
1173
1174 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
1175 if self._echo:
1176 self._log_info("BEGIN TWOPHASE (implicit)")
1177 if self._has_events or self.engine._has_events:
1178 self.dispatch.begin_twophase(self, transaction.xid)
1179
1180 self.__in_begin = True
1181 try:
1182 self.engine.dialect.do_begin_twophase(self, transaction.xid)
1183 except BaseException as e:
1184 self._handle_dbapi_exception(e, None, None, None, None)
1185 finally:
1186 self.__in_begin = False
1187
1188 def _prepare_twophase_impl(self, xid: Any) -> None:
1189 if self._has_events or self.engine._has_events:
1190 self.dispatch.prepare_twophase(self, xid)
1191
1192 assert isinstance(self._transaction, TwoPhaseTransaction)
1193 try:
1194 self.engine.dialect.do_prepare_twophase(self, xid)
1195 except BaseException as e:
1196 self._handle_dbapi_exception(e, None, None, None, None)
1197
1198 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1199 if self._has_events or self.engine._has_events:
1200 self.dispatch.rollback_twophase(self, xid, is_prepared)
1201
1202 if self._still_open_and_dbapi_connection_is_valid:
1203 assert isinstance(self._transaction, TwoPhaseTransaction)
1204 try:
1205 self.engine.dialect.do_rollback_twophase(
1206 self, xid, is_prepared
1207 )
1208 except BaseException as e:
1209 self._handle_dbapi_exception(e, None, None, None, None)
1210
1211 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1212 if self._has_events or self.engine._has_events:
1213 self.dispatch.commit_twophase(self, xid, is_prepared)
1214
1215 assert isinstance(self._transaction, TwoPhaseTransaction)
1216 try:
1217 self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
1218 except BaseException as e:
1219 self._handle_dbapi_exception(e, None, None, None, None)
1220
1221 def close(self) -> None:
1222 """Close this :class:`_engine.Connection`.
1223
1224 This results in a release of the underlying database
1225 resources, that is, the DBAPI connection referenced
1226 internally. The DBAPI connection is typically restored
1227 back to the connection-holding :class:`_pool.Pool` referenced
1228 by the :class:`_engine.Engine` that produced this
1229 :class:`_engine.Connection`. Any transactional state present on
1230 the DBAPI connection is also unconditionally released via
1231 the DBAPI connection's ``rollback()`` method, regardless
1232 of any :class:`.Transaction` object that may be
1233 outstanding with regards to this :class:`_engine.Connection`.
1234
1235 This has the effect of also calling :meth:`_engine.Connection.rollback`
1236 if any transaction is in place.
1237
1238 After :meth:`_engine.Connection.close` is called, the
1239 :class:`_engine.Connection` is permanently in a closed state,
1240 and will allow no further operations.
1241
1242 """
1243
1244 if self._transaction:
1245 self._transaction.close()
1246 skip_reset = True
1247 else:
1248 skip_reset = False
1249
1250 if self._dbapi_connection is not None:
1251 conn = self._dbapi_connection
1252
1253 # as we just closed the transaction, close the connection
1254 # pool connection without doing an additional reset
1255 if skip_reset:
1256 cast("_ConnectionFairy", conn)._close_special(
1257 transaction_reset=True
1258 )
1259 else:
1260 conn.close()
1261
1262 # There is a slight chance that conn.close() may have
1263 # triggered an invalidation here in which case
1264 # _dbapi_connection would already be None, however usually
1265 # it will be non-None here and in a "closed" state.
1266 self._dbapi_connection = None
1267 self.__can_reconnect = False
1268
1269 @overload
1270 def scalar(
1271 self,
1272 statement: TypedReturnsRows[Tuple[_T]],
1273 parameters: Optional[_CoreSingleExecuteParams] = None,
1274 *,
1275 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1276 ) -> Optional[_T]: ...
1277
1278 @overload
1279 def scalar(
1280 self,
1281 statement: Executable,
1282 parameters: Optional[_CoreSingleExecuteParams] = None,
1283 *,
1284 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1285 ) -> Any: ...
1286
1287 def scalar(
1288 self,
1289 statement: Executable,
1290 parameters: Optional[_CoreSingleExecuteParams] = None,
1291 *,
1292 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1293 ) -> Any:
1294 r"""Executes a SQL statement construct and returns a scalar object.
1295
1296 This method is shorthand for invoking the
1297 :meth:`_engine.Result.scalar` method after invoking the
1298 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
1299
1300 :return: a scalar Python value representing the first column of the
1301 first row returned.
1302
1303 """
1304 distilled_parameters = _distill_params_20(parameters)
1305 try:
1306 meth = statement._execute_on_scalar
1307 except AttributeError as err:
1308 raise exc.ObjectNotExecutableError(statement) from err
1309 else:
1310 return meth(
1311 self,
1312 distilled_parameters,
1313 execution_options or NO_OPTIONS,
1314 )
1315
1316 @overload
1317 def scalars(
1318 self,
1319 statement: TypedReturnsRows[Tuple[_T]],
1320 parameters: Optional[_CoreAnyExecuteParams] = None,
1321 *,
1322 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1323 ) -> ScalarResult[_T]: ...
1324
1325 @overload
1326 def scalars(
1327 self,
1328 statement: Executable,
1329 parameters: Optional[_CoreAnyExecuteParams] = None,
1330 *,
1331 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1332 ) -> ScalarResult[Any]: ...
1333
1334 def scalars(
1335 self,
1336 statement: Executable,
1337 parameters: Optional[_CoreAnyExecuteParams] = None,
1338 *,
1339 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1340 ) -> ScalarResult[Any]:
1341 """Executes and returns a scalar result set, which yields scalar values
1342 from the first column of each row.
1343
1344 This method is equivalent to calling :meth:`_engine.Connection.execute`
1345 to receive a :class:`_result.Result` object, then invoking the
1346 :meth:`_result.Result.scalars` method to produce a
1347 :class:`_result.ScalarResult` instance.
1348
1349 :return: a :class:`_result.ScalarResult`
1350
1351 .. versionadded:: 1.4.24
1352
1353 """
1354
1355 return self.execute(
1356 statement, parameters, execution_options=execution_options
1357 ).scalars()
1358
1359 @overload
1360 def execute(
1361 self,
1362 statement: TypedReturnsRows[_T],
1363 parameters: Optional[_CoreAnyExecuteParams] = None,
1364 *,
1365 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1366 ) -> CursorResult[_T]: ...
1367
1368 @overload
1369 def execute(
1370 self,
1371 statement: Executable,
1372 parameters: Optional[_CoreAnyExecuteParams] = None,
1373 *,
1374 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1375 ) -> CursorResult[Any]: ...
1376
1377 def execute(
1378 self,
1379 statement: Executable,
1380 parameters: Optional[_CoreAnyExecuteParams] = None,
1381 *,
1382 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1383 ) -> CursorResult[Any]:
1384 r"""Executes a SQL statement construct and returns a
1385 :class:`_engine.CursorResult`.
1386
1387 :param statement: The statement to be executed. This is always
1388 an object that is in both the :class:`_expression.ClauseElement` and
1389 :class:`_expression.Executable` hierarchies, including:
1390
1391 * :class:`_expression.Select`
1392 * :class:`_expression.Insert`, :class:`_expression.Update`,
1393 :class:`_expression.Delete`
1394 * :class:`_expression.TextClause` and
1395 :class:`_expression.TextualSelect`
1396 * :class:`_schema.DDL` and objects which inherit from
1397 :class:`_schema.ExecutableDDLElement`
1398
1399 :param parameters: parameters which will be bound into the statement.
1400 This may be either a dictionary of parameter names to values,
1401 or a mutable sequence (e.g. a list) of dictionaries. When a
1402 list of dictionaries is passed, the underlying statement execution
1403 will make use of the DBAPI ``cursor.executemany()`` method.
1404 When a single dictionary is passed, the DBAPI ``cursor.execute()``
1405 method will be used.
1406
1407 :param execution_options: optional dictionary of execution options,
1408 which will be associated with the statement execution. This
1409 dictionary can provide a subset of the options that are accepted
1410 by :meth:`_engine.Connection.execution_options`.
1411
1412 :return: a :class:`_engine.Result` object.
1413
1414 """
1415 distilled_parameters = _distill_params_20(parameters)
1416 try:
1417 meth = statement._execute_on_connection
1418 except AttributeError as err:
1419 raise exc.ObjectNotExecutableError(statement) from err
1420 else:
1421 return meth(
1422 self,
1423 distilled_parameters,
1424 execution_options or NO_OPTIONS,
1425 )
1426
1427 def _execute_function(
1428 self,
1429 func: FunctionElement[Any],
1430 distilled_parameters: _CoreMultiExecuteParams,
1431 execution_options: CoreExecuteOptionsParameter,
1432 ) -> CursorResult[Any]:
1433 """Execute a sql.FunctionElement object."""
1434
1435 return self._execute_clauseelement(
1436 func.select(), distilled_parameters, execution_options
1437 )
1438
1439 def _execute_default(
1440 self,
1441 default: DefaultGenerator,
1442 distilled_parameters: _CoreMultiExecuteParams,
1443 execution_options: CoreExecuteOptionsParameter,
1444 ) -> Any:
1445 """Execute a schema.ColumnDefault object."""
1446
1447 execution_options = self._execution_options.merge_with(
1448 execution_options
1449 )
1450
1451 event_multiparams: Optional[_CoreMultiExecuteParams]
1452 event_params: Optional[_CoreAnyExecuteParams]
1453
1454 # note for event handlers, the "distilled parameters" which is always
1455 # a list of dicts is broken out into separate "multiparams" and
1456 # "params" collections, which allows the handler to distinguish
1457 # between an executemany and execute style set of parameters.
1458 if self._has_events or self.engine._has_events:
1459 (
1460 default,
1461 distilled_parameters,
1462 event_multiparams,
1463 event_params,
1464 ) = self._invoke_before_exec_event(
1465 default, distilled_parameters, execution_options
1466 )
1467 else:
1468 event_multiparams = event_params = None
1469
1470 try:
1471 conn = self._dbapi_connection
1472 if conn is None:
1473 conn = self._revalidate_connection()
1474
1475 dialect = self.dialect
1476 ctx = dialect.execution_ctx_cls._init_default(
1477 dialect, self, conn, execution_options
1478 )
1479 except (exc.PendingRollbackError, exc.ResourceClosedError):
1480 raise
1481 except BaseException as e:
1482 self._handle_dbapi_exception(e, None, None, None, None)
1483
1484 ret = ctx._exec_default(None, default, None)
1485
1486 if self._has_events or self.engine._has_events:
1487 self.dispatch.after_execute(
1488 self,
1489 default,
1490 event_multiparams,
1491 event_params,
1492 execution_options,
1493 ret,
1494 )
1495
1496 return ret
1497
1498 def _execute_ddl(
1499 self,
1500 ddl: ExecutableDDLElement,
1501 distilled_parameters: _CoreMultiExecuteParams,
1502 execution_options: CoreExecuteOptionsParameter,
1503 ) -> CursorResult[Any]:
1504 """Execute a schema.DDL object."""
1505
1506 exec_opts = ddl._execution_options.merge_with(
1507 self._execution_options, execution_options
1508 )
1509
1510 event_multiparams: Optional[_CoreMultiExecuteParams]
1511 event_params: Optional[_CoreSingleExecuteParams]
1512
1513 if self._has_events or self.engine._has_events:
1514 (
1515 ddl,
1516 distilled_parameters,
1517 event_multiparams,
1518 event_params,
1519 ) = self._invoke_before_exec_event(
1520 ddl, distilled_parameters, exec_opts
1521 )
1522 else:
1523 event_multiparams = event_params = None
1524
1525 schema_translate_map = exec_opts.get("schema_translate_map", None)
1526
1527 dialect = self.dialect
1528
1529 compiled = ddl.compile(
1530 dialect=dialect, schema_translate_map=schema_translate_map
1531 )
1532 ret = self._execute_context(
1533 dialect,
1534 dialect.execution_ctx_cls._init_ddl,
1535 compiled,
1536 None,
1537 exec_opts,
1538 compiled,
1539 )
1540 if self._has_events or self.engine._has_events:
1541 self.dispatch.after_execute(
1542 self,
1543 ddl,
1544 event_multiparams,
1545 event_params,
1546 exec_opts,
1547 ret,
1548 )
1549 return ret
1550
1551 def _invoke_before_exec_event(
1552 self,
1553 elem: Any,
1554 distilled_params: _CoreMultiExecuteParams,
1555 execution_options: _ExecuteOptions,
1556 ) -> Tuple[
1557 Any,
1558 _CoreMultiExecuteParams,
1559 _CoreMultiExecuteParams,
1560 _CoreSingleExecuteParams,
1561 ]:
1562 event_multiparams: _CoreMultiExecuteParams
1563 event_params: _CoreSingleExecuteParams
1564
1565 if len(distilled_params) == 1:
1566 event_multiparams, event_params = [], distilled_params[0]
1567 else:
1568 event_multiparams, event_params = distilled_params, {}
1569
1570 for fn in self.dispatch.before_execute:
1571 elem, event_multiparams, event_params = fn(
1572 self,
1573 elem,
1574 event_multiparams,
1575 event_params,
1576 execution_options,
1577 )
1578
1579 if event_multiparams:
1580 distilled_params = list(event_multiparams)
1581 if event_params:
1582 raise exc.InvalidRequestError(
1583 "Event handler can't return non-empty multiparams "
1584 "and params at the same time"
1585 )
1586 elif event_params:
1587 distilled_params = [event_params]
1588 else:
1589 distilled_params = []
1590
1591 return elem, distilled_params, event_multiparams, event_params
1592
1593 def _execute_clauseelement(
1594 self,
1595 elem: Executable,
1596 distilled_parameters: _CoreMultiExecuteParams,
1597 execution_options: CoreExecuteOptionsParameter,
1598 ) -> CursorResult[Any]:
1599 """Execute a sql.ClauseElement object."""
1600
1601 execution_options = elem._execution_options.merge_with(
1602 self._execution_options, execution_options
1603 )
1604
1605 has_events = self._has_events or self.engine._has_events
1606 if has_events:
1607 (
1608 elem,
1609 distilled_parameters,
1610 event_multiparams,
1611 event_params,
1612 ) = self._invoke_before_exec_event(
1613 elem, distilled_parameters, execution_options
1614 )
1615
1616 if distilled_parameters:
1617 # ensure we don't retain a link to the view object for keys()
1618 # which links to the values, which we don't want to cache
1619 keys = sorted(distilled_parameters[0])
1620 for_executemany = len(distilled_parameters) > 1
1621 else:
1622 keys = []
1623 for_executemany = False
1624
1625 dialect = self.dialect
1626
1627 schema_translate_map = execution_options.get(
1628 "schema_translate_map", None
1629 )
1630
1631 compiled_cache: Optional[CompiledCacheType] = execution_options.get(
1632 "compiled_cache", self.engine._compiled_cache
1633 )
1634
1635 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
1636 dialect=dialect,
1637 compiled_cache=compiled_cache,
1638 column_keys=keys,
1639 for_executemany=for_executemany,
1640 schema_translate_map=schema_translate_map,
1641 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1642 )
1643 ret = self._execute_context(
1644 dialect,
1645 dialect.execution_ctx_cls._init_compiled,
1646 compiled_sql,
1647 distilled_parameters,
1648 execution_options,
1649 compiled_sql,
1650 distilled_parameters,
1651 elem,
1652 extracted_params,
1653 cache_hit=cache_hit,
1654 )
1655 if has_events:
1656 self.dispatch.after_execute(
1657 self,
1658 elem,
1659 event_multiparams,
1660 event_params,
1661 execution_options,
1662 ret,
1663 )
1664 return ret
1665
1666 def _execute_compiled(
1667 self,
1668 compiled: Compiled,
1669 distilled_parameters: _CoreMultiExecuteParams,
1670 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
1671 ) -> CursorResult[Any]:
1672 """Execute a sql.Compiled object.
1673
1674 TODO: why do we have this? likely deprecate or remove
1675
1676 """
1677
1678 execution_options = compiled.execution_options.merge_with(
1679 self._execution_options, execution_options
1680 )
1681
1682 if self._has_events or self.engine._has_events:
1683 (
1684 compiled,
1685 distilled_parameters,
1686 event_multiparams,
1687 event_params,
1688 ) = self._invoke_before_exec_event(
1689 compiled, distilled_parameters, execution_options
1690 )
1691
1692 dialect = self.dialect
1693
1694 ret = self._execute_context(
1695 dialect,
1696 dialect.execution_ctx_cls._init_compiled,
1697 compiled,
1698 distilled_parameters,
1699 execution_options,
1700 compiled,
1701 distilled_parameters,
1702 None,
1703 None,
1704 )
1705 if self._has_events or self.engine._has_events:
1706 self.dispatch.after_execute(
1707 self,
1708 compiled,
1709 event_multiparams,
1710 event_params,
1711 execution_options,
1712 ret,
1713 )
1714 return ret
1715
1716 def exec_driver_sql(
1717 self,
1718 statement: str,
1719 parameters: Optional[_DBAPIAnyExecuteParams] = None,
1720 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1721 ) -> CursorResult[Any]:
1722 r"""Executes a string SQL statement on the DBAPI cursor directly,
1723 without any SQL compilation steps.
1724
1725 This can be used to pass any string directly to the
1726 ``cursor.execute()`` method of the DBAPI in use.
1727
1728 :param statement: The statement str to be executed. Bound parameters
1729 must use the underlying DBAPI's paramstyle, such as "qmark",
1730 "pyformat", "format", etc.
1731
1732 :param parameters: represent bound parameter values to be used in the
1733 execution. The format is one of: a dictionary of named parameters,
1734 a tuple of positional parameters, or a list containing either
1735 dictionaries or tuples for multiple-execute support.
1736
1737 :return: a :class:`_engine.CursorResult`.
1738
1739 E.g. multiple dictionaries::
1740
1741
1742 conn.exec_driver_sql(
1743 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1744 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}],
1745 )
1746
1747 Single dictionary::
1748
1749 conn.exec_driver_sql(
1750 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1751 dict(id=1, value="v1"),
1752 )
1753
1754 Single tuple::
1755
1756 conn.exec_driver_sql(
1757 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1")
1758 )
1759
1760 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
1761 not participate in the
1762 :meth:`_events.ConnectionEvents.before_execute` and
1763 :meth:`_events.ConnectionEvents.after_execute` events. To
1764 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
1765 :meth:`_events.ConnectionEvents.before_cursor_execute` and
1766 :meth:`_events.ConnectionEvents.after_cursor_execute`.
1767
1768 .. seealso::
1769
1770 :pep:`249`
1771
1772 """
1773
1774 distilled_parameters = _distill_raw_params(parameters)
1775
1776 execution_options = self._execution_options.merge_with(
1777 execution_options
1778 )
1779
1780 dialect = self.dialect
1781 ret = self._execute_context(
1782 dialect,
1783 dialect.execution_ctx_cls._init_statement,
1784 statement,
1785 None,
1786 execution_options,
1787 statement,
1788 distilled_parameters,
1789 )
1790
1791 return ret
1792
1793 def _execute_context(
1794 self,
1795 dialect: Dialect,
1796 constructor: Callable[..., ExecutionContext],
1797 statement: Union[str, Compiled],
1798 parameters: Optional[_AnyMultiExecuteParams],
1799 execution_options: _ExecuteOptions,
1800 *args: Any,
1801 **kw: Any,
1802 ) -> CursorResult[Any]:
1803 """Create an :class:`.ExecutionContext` and execute, returning
1804 a :class:`_engine.CursorResult`."""
1805
1806 if execution_options:
1807 yp = execution_options.get("yield_per", None)
1808 if yp:
1809 execution_options = execution_options.union(
1810 {"stream_results": True, "max_row_buffer": yp}
1811 )
1812 try:
1813 conn = self._dbapi_connection
1814 if conn is None:
1815 conn = self._revalidate_connection()
1816
1817 context = constructor(
1818 dialect, self, conn, execution_options, *args, **kw
1819 )
1820 except (exc.PendingRollbackError, exc.ResourceClosedError):
1821 raise
1822 except BaseException as e:
1823 self._handle_dbapi_exception(
1824 e, str(statement), parameters, None, None
1825 )
1826
1827 if (
1828 self._transaction
1829 and not self._transaction.is_active
1830 or (
1831 self._nested_transaction
1832 and not self._nested_transaction.is_active
1833 )
1834 ):
1835 self._invalid_transaction()
1836
1837 elif self._trans_context_manager:
1838 TransactionalContext._trans_ctx_check(self)
1839
1840 if self._transaction is None:
1841 self._autobegin()
1842
1843 context.pre_exec()
1844
1845 if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
1846 return self._exec_insertmany_context(dialect, context)
1847 else:
1848 return self._exec_single_context(
1849 dialect, context, statement, parameters
1850 )
1851
1852 def _exec_single_context(
1853 self,
1854 dialect: Dialect,
1855 context: ExecutionContext,
1856 statement: Union[str, Compiled],
1857 parameters: Optional[_AnyMultiExecuteParams],
1858 ) -> CursorResult[Any]:
1859 """continue the _execute_context() method for a single DBAPI
1860 cursor.execute() or cursor.executemany() call.
1861
1862 """
1863 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1864 generic_setinputsizes = context._prepare_set_input_sizes()
1865
1866 if generic_setinputsizes:
1867 try:
1868 dialect.do_set_input_sizes(
1869 context.cursor, generic_setinputsizes, context
1870 )
1871 except BaseException as e:
1872 self._handle_dbapi_exception(
1873 e, str(statement), parameters, None, context
1874 )
1875
1876 cursor, str_statement, parameters = (
1877 context.cursor,
1878 context.statement,
1879 context.parameters,
1880 )
1881
1882 effective_parameters: Optional[_AnyExecuteParams]
1883
1884 if not context.executemany:
1885 effective_parameters = parameters[0]
1886 else:
1887 effective_parameters = parameters
1888
1889 if self._has_events or self.engine._has_events:
1890 for fn in self.dispatch.before_cursor_execute:
1891 str_statement, effective_parameters = fn(
1892 self,
1893 cursor,
1894 str_statement,
1895 effective_parameters,
1896 context,
1897 context.executemany,
1898 )
1899
1900 if self._echo:
1901 self._log_info(str_statement)
1902
1903 stats = context._get_cache_stats()
1904
1905 if not self.engine.hide_parameters:
1906 self._log_info(
1907 "[%s] %r",
1908 stats,
1909 sql_util._repr_params(
1910 effective_parameters,
1911 batches=10,
1912 ismulti=context.executemany,
1913 ),
1914 )
1915 else:
1916 self._log_info(
1917 "[%s] [SQL parameters hidden due to hide_parameters=True]",
1918 stats,
1919 )
1920
1921 evt_handled: bool = False
1922 try:
1923 if context.execute_style is ExecuteStyle.EXECUTEMANY:
1924 effective_parameters = cast(
1925 "_CoreMultiExecuteParams", effective_parameters
1926 )
1927 if self.dialect._has_events:
1928 for fn in self.dialect.dispatch.do_executemany:
1929 if fn(
1930 cursor,
1931 str_statement,
1932 effective_parameters,
1933 context,
1934 ):
1935 evt_handled = True
1936 break
1937 if not evt_handled:
1938 self.dialect.do_executemany(
1939 cursor,
1940 str_statement,
1941 effective_parameters,
1942 context,
1943 )
1944 elif not effective_parameters and context.no_parameters:
1945 if self.dialect._has_events:
1946 for fn in self.dialect.dispatch.do_execute_no_params:
1947 if fn(cursor, str_statement, context):
1948 evt_handled = True
1949 break
1950 if not evt_handled:
1951 self.dialect.do_execute_no_params(
1952 cursor, str_statement, context
1953 )
1954 else:
1955 effective_parameters = cast(
1956 "_CoreSingleExecuteParams", effective_parameters
1957 )
1958 if self.dialect._has_events:
1959 for fn in self.dialect.dispatch.do_execute:
1960 if fn(
1961 cursor,
1962 str_statement,
1963 effective_parameters,
1964 context,
1965 ):
1966 evt_handled = True
1967 break
1968 if not evt_handled:
1969 self.dialect.do_execute(
1970 cursor, str_statement, effective_parameters, context
1971 )
1972
1973 if self._has_events or self.engine._has_events:
1974 self.dispatch.after_cursor_execute(
1975 self,
1976 cursor,
1977 str_statement,
1978 effective_parameters,
1979 context,
1980 context.executemany,
1981 )
1982
1983 context.post_exec()
1984
1985 result = context._setup_result_proxy()
1986
1987 except BaseException as e:
1988 self._handle_dbapi_exception(
1989 e, str_statement, effective_parameters, cursor, context
1990 )
1991
1992 return result
1993
1994 def _exec_insertmany_context(
1995 self,
1996 dialect: Dialect,
1997 context: ExecutionContext,
1998 ) -> CursorResult[Any]:
1999 """continue the _execute_context() method for an "insertmanyvalues"
2000 operation, which will invoke DBAPI
2001 cursor.execute() one or more times with individual log and
2002 event hook calls.
2003
2004 """
2005
2006 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
2007 generic_setinputsizes = context._prepare_set_input_sizes()
2008 else:
2009 generic_setinputsizes = None
2010
2011 cursor, str_statement, parameters = (
2012 context.cursor,
2013 context.statement,
2014 context.parameters,
2015 )
2016
2017 effective_parameters = parameters
2018
2019 engine_events = self._has_events or self.engine._has_events
2020 if self.dialect._has_events:
2021 do_execute_dispatch: Iterable[Any] = (
2022 self.dialect.dispatch.do_execute
2023 )
2024 else:
2025 do_execute_dispatch = ()
2026
2027 if engine_events:
2028 _WORKAROUND_ISSUE_13018 = getattr(
2029 self, "_WORKAROUND_ISSUE_13018", False
2030 )
2031 else:
2032 _WORKAROUND_ISSUE_13018 = False
2033
2034 if self._echo:
2035 stats = context._get_cache_stats() + " (insertmanyvalues)"
2036
2037 preserve_rowcount = context.execution_options.get(
2038 "preserve_rowcount", False
2039 )
2040 rowcount = 0
2041
2042 for imv_batch in dialect._deliver_insertmanyvalues_batches(
2043 self,
2044 cursor,
2045 str_statement,
2046 effective_parameters,
2047 generic_setinputsizes,
2048 context,
2049 ):
2050 if imv_batch.processed_setinputsizes:
2051 try:
2052 dialect.do_set_input_sizes(
2053 context.cursor,
2054 imv_batch.processed_setinputsizes,
2055 context,
2056 )
2057 except BaseException as e:
2058 self._handle_dbapi_exception(
2059 e,
2060 sql_util._long_statement(imv_batch.replaced_statement),
2061 imv_batch.replaced_parameters,
2062 None,
2063 context,
2064 is_sub_exec=True,
2065 )
2066
2067 sub_stmt = imv_batch.replaced_statement
2068 sub_params = imv_batch.replaced_parameters
2069
2070 if engine_events:
2071 for fn in self.dispatch.before_cursor_execute:
2072 sub_stmt, sub_params = fn(
2073 self,
2074 cursor,
2075 sub_stmt,
2076 sub_params,
2077 context,
2078 True,
2079 )
2080
2081 if self._echo:
2082 self._log_info(sql_util._long_statement(sub_stmt))
2083
2084 imv_stats = f""" {imv_batch.batchnum}/{
2085 imv_batch.total_batches
2086 } ({
2087 'ordered'
2088 if imv_batch.rows_sorted else 'unordered'
2089 }{
2090 '; batch not supported'
2091 if imv_batch.is_downgraded
2092 else ''
2093 })"""
2094
2095 if imv_batch.batchnum == 1:
2096 stats += imv_stats
2097 else:
2098 stats = f"insertmanyvalues{imv_stats}"
2099
2100 if not self.engine.hide_parameters:
2101 self._log_info(
2102 "[%s] %r",
2103 stats,
2104 sql_util._repr_params(
2105 sub_params,
2106 batches=10,
2107 ismulti=False,
2108 ),
2109 )
2110 else:
2111 self._log_info(
2112 "[%s] [SQL parameters hidden due to "
2113 "hide_parameters=True]",
2114 stats,
2115 )
2116
2117 try:
2118 for fn in do_execute_dispatch:
2119 if fn(
2120 cursor,
2121 sub_stmt,
2122 sub_params,
2123 context,
2124 ):
2125 break
2126 else:
2127 dialect.do_execute(
2128 cursor,
2129 sub_stmt,
2130 sub_params,
2131 context,
2132 )
2133
2134 except BaseException as e:
2135 self._handle_dbapi_exception(
2136 e,
2137 sql_util._long_statement(sub_stmt),
2138 sub_params,
2139 cursor,
2140 context,
2141 is_sub_exec=True,
2142 )
2143
2144 if engine_events:
2145 self.dispatch.after_cursor_execute(
2146 self,
2147 cursor,
2148 # TODO: this will be fixed by #13018
2149 sub_stmt if _WORKAROUND_ISSUE_13018 else str_statement,
2150 sub_params if _WORKAROUND_ISSUE_13018 else parameters,
2151 context,
2152 context.executemany,
2153 )
2154
2155 if preserve_rowcount:
2156 rowcount += imv_batch.current_batch_size
2157
2158 try:
2159 context.post_exec()
2160
2161 if preserve_rowcount:
2162 context._rowcount = rowcount # type: ignore[attr-defined]
2163
2164 result = context._setup_result_proxy()
2165
2166 except BaseException as e:
2167 self._handle_dbapi_exception(
2168 e, str_statement, effective_parameters, cursor, context
2169 )
2170
2171 return result
2172
2173 def _cursor_execute(
2174 self,
2175 cursor: DBAPICursor,
2176 statement: str,
2177 parameters: _DBAPISingleExecuteParams,
2178 context: Optional[ExecutionContext] = None,
2179 ) -> None:
2180 """Execute a statement + params on the given cursor.
2181
2182 Adds appropriate logging and exception handling.
2183
2184 This method is used by DefaultDialect for special-case
2185 executions, such as for sequences and column defaults.
2186 The path of statement execution in the majority of cases
2187 terminates at _execute_context().
2188
2189 """
2190 if self._has_events or self.engine._has_events:
2191 for fn in self.dispatch.before_cursor_execute:
2192 statement, parameters = fn(
2193 self, cursor, statement, parameters, context, False
2194 )
2195
2196 if self._echo:
2197 self._log_info(statement)
2198 self._log_info("[raw sql] %r", parameters)
2199 try:
2200 for fn in (
2201 ()
2202 if not self.dialect._has_events
2203 else self.dialect.dispatch.do_execute
2204 ):
2205 if fn(cursor, statement, parameters, context):
2206 break
2207 else:
2208 self.dialect.do_execute(cursor, statement, parameters, context)
2209 except BaseException as e:
2210 self._handle_dbapi_exception(
2211 e, statement, parameters, cursor, context
2212 )
2213
2214 if self._has_events or self.engine._has_events:
2215 self.dispatch.after_cursor_execute(
2216 self, cursor, statement, parameters, context, False
2217 )
2218
2219 def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
2220 """Close the given cursor, catching exceptions
2221 and turning into log warnings.
2222
2223 """
2224 try:
2225 cursor.close()
2226 except Exception:
2227 # log the error through the connection pool's logger.
2228 self.engine.pool.logger.error(
2229 "Error closing cursor", exc_info=True
2230 )
2231
2232 _reentrant_error = False
2233 _is_disconnect = False
2234
2235 def _handle_dbapi_exception(
2236 self,
2237 e: BaseException,
2238 statement: Optional[str],
2239 parameters: Optional[_AnyExecuteParams],
2240 cursor: Optional[DBAPICursor],
2241 context: Optional[ExecutionContext],
2242 is_sub_exec: bool = False,
2243 ) -> NoReturn:
2244 exc_info = sys.exc_info()
2245
2246 is_exit_exception = util.is_exit_exception(e)
2247
2248 if not self._is_disconnect:
2249 self._is_disconnect = (
2250 isinstance(e, self.dialect.loaded_dbapi.Error)
2251 and not self.closed
2252 and self.dialect.is_disconnect(
2253 e,
2254 self._dbapi_connection if not self.invalidated else None,
2255 cursor,
2256 )
2257 ) or (is_exit_exception and not self.closed)
2258
2259 invalidate_pool_on_disconnect = not is_exit_exception
2260
2261 ismulti: bool = (
2262 not is_sub_exec and context.executemany
2263 if context is not None
2264 else False
2265 )
2266 if self._reentrant_error:
2267 raise exc.DBAPIError.instance(
2268 statement,
2269 parameters,
2270 e,
2271 self.dialect.loaded_dbapi.Error,
2272 hide_parameters=self.engine.hide_parameters,
2273 dialect=self.dialect,
2274 ismulti=ismulti,
2275 ).with_traceback(exc_info[2]) from e
2276 self._reentrant_error = True
2277 try:
2278 # non-DBAPI error - if we already got a context,
2279 # or there's no string statement, don't wrap it
2280 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or (
2281 statement is not None
2282 and context is None
2283 and not is_exit_exception
2284 )
2285
2286 if should_wrap:
2287 sqlalchemy_exception = exc.DBAPIError.instance(
2288 statement,
2289 parameters,
2290 cast(Exception, e),
2291 self.dialect.loaded_dbapi.Error,
2292 hide_parameters=self.engine.hide_parameters,
2293 connection_invalidated=self._is_disconnect,
2294 dialect=self.dialect,
2295 ismulti=ismulti,
2296 )
2297 else:
2298 sqlalchemy_exception = None
2299
2300 newraise = None
2301
2302 if (self.dialect._has_events) and not self._execution_options.get(
2303 "skip_user_error_events", False
2304 ):
2305 ctx = ExceptionContextImpl(
2306 e,
2307 sqlalchemy_exception,
2308 self.engine,
2309 self.dialect,
2310 self,
2311 cursor,
2312 statement,
2313 parameters,
2314 context,
2315 self._is_disconnect,
2316 invalidate_pool_on_disconnect,
2317 False,
2318 )
2319
2320 for fn in self.dialect.dispatch.handle_error:
2321 try:
2322 # handler returns an exception;
2323 # call next handler in a chain
2324 per_fn = fn(ctx)
2325 if per_fn is not None:
2326 ctx.chained_exception = newraise = per_fn
2327 except Exception as _raised:
2328 # handler raises an exception - stop processing
2329 newraise = _raised
2330 break
2331
2332 if self._is_disconnect != ctx.is_disconnect:
2333 self._is_disconnect = ctx.is_disconnect
2334 if sqlalchemy_exception:
2335 sqlalchemy_exception.connection_invalidated = (
2336 ctx.is_disconnect
2337 )
2338
2339 # set up potentially user-defined value for
2340 # invalidate pool.
2341 invalidate_pool_on_disconnect = (
2342 ctx.invalidate_pool_on_disconnect
2343 )
2344
2345 if should_wrap and context:
2346 context.handle_dbapi_exception(e)
2347
2348 if not self._is_disconnect:
2349 if cursor:
2350 self._safe_close_cursor(cursor)
2351 # "autorollback" was mostly relevant in 1.x series.
2352 # It's very unlikely to reach here, as the connection
2353 # does autobegin so when we are here, we are usually
2354 # in an explicit / semi-explicit transaction.
2355 # however we have a test which manufactures this
2356 # scenario in any case using an event handler.
2357 # test/engine/test_execute.py-> test_actual_autorollback
2358 if not self.in_transaction():
2359 self._rollback_impl()
2360
2361 if newraise:
2362 raise newraise.with_traceback(exc_info[2]) from e
2363 elif should_wrap:
2364 assert sqlalchemy_exception is not None
2365 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2366 else:
2367 assert exc_info[1] is not None
2368 raise exc_info[1].with_traceback(exc_info[2])
2369 finally:
2370 del self._reentrant_error
2371 if self._is_disconnect:
2372 del self._is_disconnect
2373 if not self.invalidated:
2374 dbapi_conn_wrapper = self._dbapi_connection
2375 assert dbapi_conn_wrapper is not None
2376 if invalidate_pool_on_disconnect:
2377 self.engine.pool._invalidate(dbapi_conn_wrapper, e)
2378 self.invalidate(e)
2379
2380 @classmethod
2381 def _handle_dbapi_exception_noconnection(
2382 cls,
2383 e: BaseException,
2384 dialect: Dialect,
2385 engine: Optional[Engine] = None,
2386 is_disconnect: Optional[bool] = None,
2387 invalidate_pool_on_disconnect: bool = True,
2388 is_pre_ping: bool = False,
2389 ) -> NoReturn:
2390 exc_info = sys.exc_info()
2391
2392 if is_disconnect is None:
2393 is_disconnect = isinstance(
2394 e, dialect.loaded_dbapi.Error
2395 ) and dialect.is_disconnect(e, None, None)
2396
2397 should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
2398
2399 if should_wrap:
2400 sqlalchemy_exception = exc.DBAPIError.instance(
2401 None,
2402 None,
2403 cast(Exception, e),
2404 dialect.loaded_dbapi.Error,
2405 hide_parameters=(
2406 engine.hide_parameters if engine is not None else False
2407 ),
2408 connection_invalidated=is_disconnect,
2409 dialect=dialect,
2410 )
2411 else:
2412 sqlalchemy_exception = None
2413
2414 newraise = None
2415
2416 if dialect._has_events:
2417 ctx = ExceptionContextImpl(
2418 e,
2419 sqlalchemy_exception,
2420 engine,
2421 dialect,
2422 None,
2423 None,
2424 None,
2425 None,
2426 None,
2427 is_disconnect,
2428 invalidate_pool_on_disconnect,
2429 is_pre_ping,
2430 )
2431 for fn in dialect.dispatch.handle_error:
2432 try:
2433 # handler returns an exception;
2434 # call next handler in a chain
2435 per_fn = fn(ctx)
2436 if per_fn is not None:
2437 ctx.chained_exception = newraise = per_fn
2438 except Exception as _raised:
2439 # handler raises an exception - stop processing
2440 newraise = _raised
2441 break
2442
2443 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
2444 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect
2445
2446 if newraise:
2447 raise newraise.with_traceback(exc_info[2]) from e
2448 elif should_wrap:
2449 assert sqlalchemy_exception is not None
2450 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2451 else:
2452 assert exc_info[1] is not None
2453 raise exc_info[1].with_traceback(exc_info[2])
2454
2455 def _run_ddl_visitor(
2456 self,
2457 visitorcallable: Type[InvokeDDLBase],
2458 element: SchemaVisitable,
2459 **kwargs: Any,
2460 ) -> None:
2461 """run a DDL visitor.
2462
2463 This method is only here so that the MockConnection can change the
2464 options given to the visitor so that "checkfirst" is skipped.
2465
2466 """
2467 visitorcallable(
2468 dialect=self.dialect, connection=self, **kwargs
2469 ).traverse_single(element)
2470
2471
2472class ExceptionContextImpl(ExceptionContext):
2473 """Implement the :class:`.ExceptionContext` interface."""
2474
2475 __slots__ = (
2476 "connection",
2477 "engine",
2478 "dialect",
2479 "cursor",
2480 "statement",
2481 "parameters",
2482 "original_exception",
2483 "sqlalchemy_exception",
2484 "chained_exception",
2485 "execution_context",
2486 "is_disconnect",
2487 "invalidate_pool_on_disconnect",
2488 "is_pre_ping",
2489 )
2490
2491 def __init__(
2492 self,
2493 exception: BaseException,
2494 sqlalchemy_exception: Optional[exc.StatementError],
2495 engine: Optional[Engine],
2496 dialect: Dialect,
2497 connection: Optional[Connection],
2498 cursor: Optional[DBAPICursor],
2499 statement: Optional[str],
2500 parameters: Optional[_DBAPIAnyExecuteParams],
2501 context: Optional[ExecutionContext],
2502 is_disconnect: bool,
2503 invalidate_pool_on_disconnect: bool,
2504 is_pre_ping: bool,
2505 ):
2506 self.engine = engine
2507 self.dialect = dialect
2508 self.connection = connection
2509 self.sqlalchemy_exception = sqlalchemy_exception
2510 self.original_exception = exception
2511 self.execution_context = context
2512 self.statement = statement
2513 self.parameters = parameters
2514 self.is_disconnect = is_disconnect
2515 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
2516 self.is_pre_ping = is_pre_ping
2517
2518
2519class Transaction(TransactionalContext):
2520 """Represent a database transaction in progress.
2521
2522 The :class:`.Transaction` object is procured by
2523 calling the :meth:`_engine.Connection.begin` method of
2524 :class:`_engine.Connection`::
2525
2526 from sqlalchemy import create_engine
2527
2528 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
2529 connection = engine.connect()
2530 trans = connection.begin()
2531 connection.execute(text("insert into x (a, b) values (1, 2)"))
2532 trans.commit()
2533
2534 The object provides :meth:`.rollback` and :meth:`.commit`
2535 methods in order to control transaction boundaries. It
2536 also implements a context manager interface so that
2537 the Python ``with`` statement can be used with the
2538 :meth:`_engine.Connection.begin` method::
2539
2540 with connection.begin():
2541 connection.execute(text("insert into x (a, b) values (1, 2)"))
2542
2543 The Transaction object is **not** threadsafe.
2544
2545 .. seealso::
2546
2547 :meth:`_engine.Connection.begin`
2548
2549 :meth:`_engine.Connection.begin_twophase`
2550
2551 :meth:`_engine.Connection.begin_nested`
2552
2553 .. index::
2554 single: thread safety; Transaction
2555 """ # noqa
2556
2557 __slots__ = ()
2558
2559 _is_root: bool = False
2560 is_active: bool
2561 connection: Connection
2562
2563 def __init__(self, connection: Connection):
2564 raise NotImplementedError()
2565
2566 @property
2567 def _deactivated_from_connection(self) -> bool:
2568 """True if this transaction is totally deactivated from the connection
2569 and therefore can no longer affect its state.
2570
2571 """
2572 raise NotImplementedError()
2573
2574 def _do_close(self) -> None:
2575 raise NotImplementedError()
2576
2577 def _do_rollback(self) -> None:
2578 raise NotImplementedError()
2579
2580 def _do_commit(self) -> None:
2581 raise NotImplementedError()
2582
2583 @property
2584 def is_valid(self) -> bool:
2585 return self.is_active and not self.connection.invalidated
2586
2587 def close(self) -> None:
2588 """Close this :class:`.Transaction`.
2589
2590 If this transaction is the base transaction in a begin/commit
2591 nesting, the transaction will rollback(). Otherwise, the
2592 method returns.
2593
2594 This is used to cancel a Transaction without affecting the scope of
2595 an enclosing transaction.
2596
2597 """
2598 try:
2599 self._do_close()
2600 finally:
2601 assert not self.is_active
2602
2603 def rollback(self) -> None:
2604 """Roll back this :class:`.Transaction`.
2605
2606 The implementation of this may vary based on the type of transaction in
2607 use:
2608
2609 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2610 it corresponds to a ROLLBACK.
2611
2612 * For a :class:`.NestedTransaction`, it corresponds to a
2613 "ROLLBACK TO SAVEPOINT" operation.
2614
2615 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2616 phase transactions may be used.
2617
2618
2619 """
2620 try:
2621 self._do_rollback()
2622 finally:
2623 assert not self.is_active
2624
2625 def commit(self) -> None:
2626 """Commit this :class:`.Transaction`.
2627
2628 The implementation of this may vary based on the type of transaction in
2629 use:
2630
2631 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2632 it corresponds to a COMMIT.
2633
2634 * For a :class:`.NestedTransaction`, it corresponds to a
2635 "RELEASE SAVEPOINT" operation.
2636
2637 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2638 phase transactions may be used.
2639
2640 """
2641 try:
2642 self._do_commit()
2643 finally:
2644 assert not self.is_active
2645
2646 def _get_subject(self) -> Connection:
2647 return self.connection
2648
2649 def _transaction_is_active(self) -> bool:
2650 return self.is_active
2651
2652 def _transaction_is_closed(self) -> bool:
2653 return not self._deactivated_from_connection
2654
2655 def _rollback_can_be_called(self) -> bool:
2656 # for RootTransaction / NestedTransaction, it's safe to call
2657 # rollback() even if the transaction is deactive and no warnings
2658 # will be emitted. tested in
2659 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
2660 return True
2661
2662
2663class RootTransaction(Transaction):
2664 """Represent the "root" transaction on a :class:`_engine.Connection`.
2665
2666 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
2667 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
2668 is created by calling upon the :meth:`_engine.Connection.begin` method, and
2669 remains associated with the :class:`_engine.Connection` throughout its
2670 active span. The current :class:`_engine.RootTransaction` in use is
2671 accessible via the :attr:`_engine.Connection.get_transaction` method of
2672 :class:`_engine.Connection`.
2673
2674 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs
2675 "autobegin" behavior that will create a new
2676 :class:`_engine.RootTransaction` whenever a connection in a
2677 non-transactional state is used to emit commands on the DBAPI connection.
2678 The scope of the :class:`_engine.RootTransaction` in 2.0 style
2679 use can be controlled using the :meth:`_engine.Connection.commit` and
2680 :meth:`_engine.Connection.rollback` methods.
2681
2682
2683 """
2684
2685 _is_root = True
2686
2687 __slots__ = ("connection", "is_active")
2688
2689 def __init__(self, connection: Connection):
2690 assert connection._transaction is None
2691 if connection._trans_context_manager:
2692 TransactionalContext._trans_ctx_check(connection)
2693 self.connection = connection
2694 self._connection_begin_impl()
2695 connection._transaction = self
2696
2697 self.is_active = True
2698
2699 def _deactivate_from_connection(self) -> None:
2700 if self.is_active:
2701 assert self.connection._transaction is self
2702 self.is_active = False
2703
2704 elif self.connection._transaction is not self:
2705 util.warn("transaction already deassociated from connection")
2706
2707 @property
2708 def _deactivated_from_connection(self) -> bool:
2709 return self.connection._transaction is not self
2710
2711 def _connection_begin_impl(self) -> None:
2712 self.connection._begin_impl(self)
2713
2714 def _connection_rollback_impl(self) -> None:
2715 self.connection._rollback_impl()
2716
2717 def _connection_commit_impl(self) -> None:
2718 self.connection._commit_impl()
2719
2720 def _close_impl(self, try_deactivate: bool = False) -> None:
2721 try:
2722 if self.is_active:
2723 self._connection_rollback_impl()
2724
2725 if self.connection._nested_transaction:
2726 self.connection._nested_transaction._cancel()
2727 finally:
2728 if self.is_active or try_deactivate:
2729 self._deactivate_from_connection()
2730 if self.connection._transaction is self:
2731 self.connection._transaction = None
2732
2733 assert not self.is_active
2734 assert self.connection._transaction is not self
2735
2736 def _do_close(self) -> None:
2737 self._close_impl()
2738
2739 def _do_rollback(self) -> None:
2740 self._close_impl(try_deactivate=True)
2741
2742 def _do_commit(self) -> None:
2743 if self.is_active:
2744 assert self.connection._transaction is self
2745
2746 try:
2747 self._connection_commit_impl()
2748 finally:
2749 # whether or not commit succeeds, cancel any
2750 # nested transactions, make this transaction "inactive"
2751 # and remove it as a reset agent
2752 if self.connection._nested_transaction:
2753 self.connection._nested_transaction._cancel()
2754
2755 self._deactivate_from_connection()
2756
2757 # ...however only remove as the connection's current transaction
2758 # if commit succeeded. otherwise it stays on so that a rollback
2759 # needs to occur.
2760 self.connection._transaction = None
2761 else:
2762 if self.connection._transaction is self:
2763 self.connection._invalid_transaction()
2764 else:
2765 raise exc.InvalidRequestError("This transaction is inactive")
2766
2767 assert not self.is_active
2768 assert self.connection._transaction is not self
2769
2770
2771class NestedTransaction(Transaction):
2772 """Represent a 'nested', or SAVEPOINT transaction.
2773
2774 The :class:`.NestedTransaction` object is created by calling the
2775 :meth:`_engine.Connection.begin_nested` method of
2776 :class:`_engine.Connection`.
2777
2778 When using :class:`.NestedTransaction`, the semantics of "begin" /
2779 "commit" / "rollback" are as follows:
2780
2781 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
2782 the savepoint is given an explicit name that is part of the state
2783 of this object.
2784
2785 * The :meth:`.NestedTransaction.commit` method corresponds to a
2786 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
2787 with this :class:`.NestedTransaction`.
2788
2789 * The :meth:`.NestedTransaction.rollback` method corresponds to a
2790 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
2791 associated with this :class:`.NestedTransaction`.
2792
2793 The rationale for mimicking the semantics of an outer transaction in
2794 terms of savepoints so that code may deal with a "savepoint" transaction
2795 and an "outer" transaction in an agnostic way.
2796
2797 .. seealso::
2798
2799 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
2800
2801 """
2802
2803 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
2804
2805 _savepoint: str
2806
2807 def __init__(self, connection: Connection):
2808 assert connection._transaction is not None
2809 if connection._trans_context_manager:
2810 TransactionalContext._trans_ctx_check(connection)
2811 self.connection = connection
2812 self._savepoint = self.connection._savepoint_impl()
2813 self.is_active = True
2814 self._previous_nested = connection._nested_transaction
2815 connection._nested_transaction = self
2816
2817 def _deactivate_from_connection(self, warn: bool = True) -> None:
2818 if self.connection._nested_transaction is self:
2819 self.connection._nested_transaction = self._previous_nested
2820 elif warn:
2821 util.warn(
2822 "nested transaction already deassociated from connection"
2823 )
2824
2825 @property
2826 def _deactivated_from_connection(self) -> bool:
2827 return self.connection._nested_transaction is not self
2828
2829 def _cancel(self) -> None:
2830 # called by RootTransaction when the outer transaction is
2831 # committed, rolled back, or closed to cancel all savepoints
2832 # without any action being taken
2833 self.is_active = False
2834 self._deactivate_from_connection()
2835 if self._previous_nested:
2836 self._previous_nested._cancel()
2837
2838 def _close_impl(
2839 self, deactivate_from_connection: bool, warn_already_deactive: bool
2840 ) -> None:
2841 try:
2842 if (
2843 self.is_active
2844 and self.connection._transaction
2845 and self.connection._transaction.is_active
2846 ):
2847 self.connection._rollback_to_savepoint_impl(self._savepoint)
2848 finally:
2849 self.is_active = False
2850
2851 if deactivate_from_connection:
2852 self._deactivate_from_connection(warn=warn_already_deactive)
2853
2854 assert not self.is_active
2855 if deactivate_from_connection:
2856 assert self.connection._nested_transaction is not self
2857
2858 def _do_close(self) -> None:
2859 self._close_impl(True, False)
2860
2861 def _do_rollback(self) -> None:
2862 self._close_impl(True, True)
2863
2864 def _do_commit(self) -> None:
2865 if self.is_active:
2866 try:
2867 self.connection._release_savepoint_impl(self._savepoint)
2868 finally:
2869 # nested trans becomes inactive on failed release
2870 # unconditionally. this prevents it from trying to
2871 # emit SQL when it rolls back.
2872 self.is_active = False
2873
2874 # but only de-associate from connection if it succeeded
2875 self._deactivate_from_connection()
2876 else:
2877 if self.connection._nested_transaction is self:
2878 self.connection._invalid_transaction()
2879 else:
2880 raise exc.InvalidRequestError(
2881 "This nested transaction is inactive"
2882 )
2883
2884
2885class TwoPhaseTransaction(RootTransaction):
2886 """Represent a two-phase transaction.
2887
2888 A new :class:`.TwoPhaseTransaction` object may be procured
2889 using the :meth:`_engine.Connection.begin_twophase` method.
2890
2891 The interface is the same as that of :class:`.Transaction`
2892 with the addition of the :meth:`prepare` method.
2893
2894 """
2895
2896 __slots__ = ("xid", "_is_prepared")
2897
2898 xid: Any
2899
2900 def __init__(self, connection: Connection, xid: Any):
2901 self._is_prepared = False
2902 self.xid = xid
2903 super().__init__(connection)
2904
2905 def prepare(self) -> None:
2906 """Prepare this :class:`.TwoPhaseTransaction`.
2907
2908 After a PREPARE, the transaction can be committed.
2909
2910 """
2911 if not self.is_active:
2912 raise exc.InvalidRequestError("This transaction is inactive")
2913 self.connection._prepare_twophase_impl(self.xid)
2914 self._is_prepared = True
2915
2916 def _connection_begin_impl(self) -> None:
2917 self.connection._begin_twophase_impl(self)
2918
2919 def _connection_rollback_impl(self) -> None:
2920 self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
2921
2922 def _connection_commit_impl(self) -> None:
2923 self.connection._commit_twophase_impl(self.xid, self._is_prepared)
2924
2925
2926class Engine(
2927 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"]
2928):
2929 """
2930 Connects a :class:`~sqlalchemy.pool.Pool` and
2931 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
2932 source of database connectivity and behavior.
2933
2934 An :class:`_engine.Engine` object is instantiated publicly using the
2935 :func:`~sqlalchemy.create_engine` function.
2936
2937 .. seealso::
2938
2939 :doc:`/core/engines`
2940
2941 :ref:`connections_toplevel`
2942
2943 """
2944
2945 dispatch: dispatcher[ConnectionEventsTarget]
2946
2947 _compiled_cache: Optional[CompiledCacheType]
2948
2949 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
2950 _has_events: bool = False
2951 _connection_cls: Type[Connection] = Connection
2952 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
2953 _is_future: bool = False
2954
2955 _schema_translate_map: Optional[SchemaTranslateMapType] = None
2956 _option_cls: Type[OptionEngine]
2957
2958 dialect: Dialect
2959 pool: Pool
2960 url: URL
2961 hide_parameters: bool
2962
2963 def __init__(
2964 self,
2965 pool: Pool,
2966 dialect: Dialect,
2967 url: URL,
2968 logging_name: Optional[str] = None,
2969 echo: Optional[_EchoFlagType] = None,
2970 query_cache_size: int = 500,
2971 execution_options: Optional[Mapping[str, Any]] = None,
2972 hide_parameters: bool = False,
2973 ):
2974 self.pool = pool
2975 self.url = url
2976 self.dialect = dialect
2977 if logging_name:
2978 self.logging_name = logging_name
2979 self.echo = echo
2980 self.hide_parameters = hide_parameters
2981 if query_cache_size != 0:
2982 self._compiled_cache = util.LRUCache(
2983 query_cache_size, size_alert=self._lru_size_alert
2984 )
2985 else:
2986 self._compiled_cache = None
2987 log.instance_logger(self, echoflag=echo)
2988 if execution_options:
2989 self.update_execution_options(**execution_options)
2990
2991 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
2992 if self._should_log_info():
2993 self.logger.info(
2994 "Compiled cache size pruning from %d items to %d. "
2995 "Increase cache size to reduce the frequency of pruning.",
2996 len(cache),
2997 cache.capacity,
2998 )
2999
3000 @property
3001 def engine(self) -> Engine:
3002 """Returns this :class:`.Engine`.
3003
3004 Used for legacy schemes that accept :class:`.Connection` /
3005 :class:`.Engine` objects within the same variable.
3006
3007 """
3008 return self
3009
3010 def clear_compiled_cache(self) -> None:
3011 """Clear the compiled cache associated with the dialect.
3012
3013 This applies **only** to the built-in cache that is established
3014 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
3015 It will not impact any dictionary caches that were passed via the
3016 :paramref:`.Connection.execution_options.compiled_cache` parameter.
3017
3018 .. versionadded:: 1.4
3019
3020 """
3021 if self._compiled_cache:
3022 self._compiled_cache.clear()
3023
3024 def update_execution_options(self, **opt: Any) -> None:
3025 r"""Update the default execution_options dictionary
3026 of this :class:`_engine.Engine`.
3027
3028 The given keys/values in \**opt are added to the
3029 default execution options that will be used for
3030 all connections. The initial contents of this dictionary
3031 can be sent via the ``execution_options`` parameter
3032 to :func:`_sa.create_engine`.
3033
3034 .. seealso::
3035
3036 :meth:`_engine.Connection.execution_options`
3037
3038 :meth:`_engine.Engine.execution_options`
3039
3040 """
3041 self.dispatch.set_engine_execution_options(self, opt)
3042 self._execution_options = self._execution_options.union(opt)
3043 self.dialect.set_engine_execution_options(self, opt)
3044
3045 @overload
3046 def execution_options(
3047 self,
3048 *,
3049 compiled_cache: Optional[CompiledCacheType] = ...,
3050 logging_token: str = ...,
3051 isolation_level: IsolationLevel = ...,
3052 insertmanyvalues_page_size: int = ...,
3053 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
3054 **opt: Any,
3055 ) -> OptionEngine: ...
3056
3057 @overload
3058 def execution_options(self, **opt: Any) -> OptionEngine: ...
3059
3060 def execution_options(self, **opt: Any) -> OptionEngine:
3061 """Return a new :class:`_engine.Engine` that will provide
3062 :class:`_engine.Connection` objects with the given execution options.
3063
3064 The returned :class:`_engine.Engine` remains related to the original
3065 :class:`_engine.Engine` in that it shares the same connection pool and
3066 other state:
3067
3068 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
3069 is the
3070 same instance. The :meth:`_engine.Engine.dispose`
3071 method will replace
3072 the connection pool instance for the parent engine as well
3073 as this one.
3074 * Event listeners are "cascaded" - meaning, the new
3075 :class:`_engine.Engine`
3076 inherits the events of the parent, and new events can be associated
3077 with the new :class:`_engine.Engine` individually.
3078 * The logging configuration and logging_name is copied from the parent
3079 :class:`_engine.Engine`.
3080
3081 The intent of the :meth:`_engine.Engine.execution_options` method is
3082 to implement schemes where multiple :class:`_engine.Engine`
3083 objects refer to the same connection pool, but are differentiated
3084 by options that affect some execution-level behavior for each
3085 engine. One such example is breaking into separate "reader" and
3086 "writer" :class:`_engine.Engine` instances, where one
3087 :class:`_engine.Engine`
3088 has a lower :term:`isolation level` setting configured or is even
3089 transaction-disabled using "autocommit". An example of this
3090 configuration is at :ref:`dbapi_autocommit_multiple`.
3091
3092 Another example is one that
3093 uses a custom option ``shard_id`` which is consumed by an event
3094 to change the current schema on a database connection::
3095
3096 from sqlalchemy import event
3097 from sqlalchemy.engine import Engine
3098
3099 primary_engine = create_engine("mysql+mysqldb://")
3100 shard1 = primary_engine.execution_options(shard_id="shard1")
3101 shard2 = primary_engine.execution_options(shard_id="shard2")
3102
3103 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}
3104
3105
3106 @event.listens_for(Engine, "before_cursor_execute")
3107 def _switch_shard(conn, cursor, stmt, params, context, executemany):
3108 shard_id = conn.get_execution_options().get("shard_id", "default")
3109 current_shard = conn.info.get("current_shard", None)
3110
3111 if current_shard != shard_id:
3112 cursor.execute("use %s" % shards[shard_id])
3113 conn.info["current_shard"] = shard_id
3114
3115 The above recipe illustrates two :class:`_engine.Engine` objects that
3116 will each serve as factories for :class:`_engine.Connection` objects
3117 that have pre-established "shard_id" execution options present. A
3118 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler
3119 then interprets this execution option to emit a MySQL ``use`` statement
3120 to switch databases before a statement execution, while at the same
3121 time keeping track of which database we've established using the
3122 :attr:`_engine.Connection.info` dictionary.
3123
3124 .. seealso::
3125
3126 :meth:`_engine.Connection.execution_options`
3127 - update execution options
3128 on a :class:`_engine.Connection` object.
3129
3130 :meth:`_engine.Engine.update_execution_options`
3131 - update the execution
3132 options for a given :class:`_engine.Engine` in place.
3133
3134 :meth:`_engine.Engine.get_execution_options`
3135
3136
3137 """ # noqa: E501
3138 return self._option_cls(self, opt)
3139
3140 def get_execution_options(self) -> _ExecuteOptions:
3141 """Get the non-SQL options which will take effect during execution.
3142
3143 .. versionadded: 1.3
3144
3145 .. seealso::
3146
3147 :meth:`_engine.Engine.execution_options`
3148 """
3149 return self._execution_options
3150
3151 @property
3152 def name(self) -> str:
3153 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3154 in use by this :class:`Engine`.
3155
3156 """
3157
3158 return self.dialect.name
3159
3160 @property
3161 def driver(self) -> str:
3162 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3163 in use by this :class:`Engine`.
3164
3165 """
3166
3167 return self.dialect.driver
3168
3169 echo = log.echo_property()
3170
3171 def __repr__(self) -> str:
3172 return "Engine(%r)" % (self.url,)
3173
3174 def dispose(self, close: bool = True) -> None:
3175 """Dispose of the connection pool used by this
3176 :class:`_engine.Engine`.
3177
3178 A new connection pool is created immediately after the old one has been
3179 disposed. The previous connection pool is disposed either actively, by
3180 closing out all currently checked-in connections in that pool, or
3181 passively, by losing references to it but otherwise not closing any
3182 connections. The latter strategy is more appropriate for an initializer
3183 in a forked Python process.
3184
3185 Event listeners associated with the old pool via :class:`.PoolEvents`
3186 are **transferred to the new pool**; this is to support the pattern
3187 by which :class:`.PoolEvents` are set up in terms of the owning
3188 :class:`.Engine` without the need to refer to the :class:`.Pool`
3189 directly.
3190
3191 :param close: if left at its default of ``True``, has the
3192 effect of fully closing all **currently checked in**
3193 database connections. Connections that are still checked out
3194 will **not** be closed, however they will no longer be associated
3195 with this :class:`_engine.Engine`,
3196 so when they are closed individually, eventually the
3197 :class:`_pool.Pool` which they are associated with will
3198 be garbage collected and they will be closed out fully, if
3199 not already closed on checkin.
3200
3201 If set to ``False``, the previous connection pool is de-referenced,
3202 and otherwise not touched in any way.
3203
3204 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
3205 parameter to allow the replacement of a connection pool in a child
3206 process without interfering with the connections used by the parent
3207 process.
3208
3209
3210 .. seealso::
3211
3212 :ref:`engine_disposal`
3213
3214 :ref:`pooling_multiprocessing`
3215
3216 :meth:`.ConnectionEvents.engine_disposed`
3217
3218 """
3219 if close:
3220 self.pool.dispose()
3221 self.pool = self.pool.recreate()
3222 self.dispatch.engine_disposed(self)
3223
3224 @contextlib.contextmanager
3225 def _optional_conn_ctx_manager(
3226 self, connection: Optional[Connection] = None
3227 ) -> Iterator[Connection]:
3228 if connection is None:
3229 with self.connect() as conn:
3230 yield conn
3231 else:
3232 yield connection
3233
3234 @contextlib.contextmanager
3235 def begin(self) -> Iterator[Connection]:
3236 """Return a context manager delivering a :class:`_engine.Connection`
3237 with a :class:`.Transaction` established.
3238
3239 E.g.::
3240
3241 with engine.begin() as conn:
3242 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)"))
3243 conn.execute(text("my_special_procedure(5)"))
3244
3245 Upon successful operation, the :class:`.Transaction`
3246 is committed. If an error is raised, the :class:`.Transaction`
3247 is rolled back.
3248
3249 .. seealso::
3250
3251 :meth:`_engine.Engine.connect` - procure a
3252 :class:`_engine.Connection` from
3253 an :class:`_engine.Engine`.
3254
3255 :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
3256 for a particular :class:`_engine.Connection`.
3257
3258 """ # noqa: E501
3259 with self.connect() as conn:
3260 with conn.begin():
3261 yield conn
3262
3263 def _run_ddl_visitor(
3264 self,
3265 visitorcallable: Type[InvokeDDLBase],
3266 element: SchemaVisitable,
3267 **kwargs: Any,
3268 ) -> None:
3269 with self.begin() as conn:
3270 conn._run_ddl_visitor(visitorcallable, element, **kwargs)
3271
3272 def connect(self) -> Connection:
3273 """Return a new :class:`_engine.Connection` object.
3274
3275 The :class:`_engine.Connection` acts as a Python context manager, so
3276 the typical use of this method looks like::
3277
3278 with engine.connect() as connection:
3279 connection.execute(text("insert into table values ('foo')"))
3280 connection.commit()
3281
3282 Where above, after the block is completed, the connection is "closed"
3283 and its underlying DBAPI resources are returned to the connection pool.
3284 This also has the effect of rolling back any transaction that
3285 was explicitly begun or was begun via autobegin, and will
3286 emit the :meth:`_events.ConnectionEvents.rollback` event if one was
3287 started and is still in progress.
3288
3289 .. seealso::
3290
3291 :meth:`_engine.Engine.begin`
3292
3293 """
3294
3295 return self._connection_cls(self)
3296
3297 def raw_connection(self) -> PoolProxiedConnection:
3298 """Return a "raw" DBAPI connection from the connection pool.
3299
3300 The returned object is a proxied version of the DBAPI
3301 connection object used by the underlying driver in use.
3302 The object will have all the same behavior as the real DBAPI
3303 connection, except that its ``close()`` method will result in the
3304 connection being returned to the pool, rather than being closed
3305 for real.
3306
3307 This method provides direct DBAPI connection access for
3308 special situations when the API provided by
3309 :class:`_engine.Connection`
3310 is not needed. When a :class:`_engine.Connection` object is already
3311 present, the DBAPI connection is available using
3312 the :attr:`_engine.Connection.connection` accessor.
3313
3314 .. seealso::
3315
3316 :ref:`dbapi_connections`
3317
3318 """
3319 return self.pool.connect()
3320
3321
3322class OptionEngineMixin(log.Identified):
3323 _sa_propagate_class_events = False
3324
3325 dispatch: dispatcher[ConnectionEventsTarget]
3326 _compiled_cache: Optional[CompiledCacheType]
3327 dialect: Dialect
3328 pool: Pool
3329 url: URL
3330 hide_parameters: bool
3331 echo: log.echo_property
3332
3333 def __init__(
3334 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter
3335 ):
3336 self._proxied = proxied
3337 self.url = proxied.url
3338 self.dialect = proxied.dialect
3339 self.logging_name = proxied.logging_name
3340 self.echo = proxied.echo
3341 self._compiled_cache = proxied._compiled_cache
3342 self.hide_parameters = proxied.hide_parameters
3343 log.instance_logger(self, echoflag=self.echo)
3344
3345 # note: this will propagate events that are assigned to the parent
3346 # engine after this OptionEngine is created. Since we share
3347 # the events of the parent we also disallow class-level events
3348 # to apply to the OptionEngine class directly.
3349 #
3350 # the other way this can work would be to transfer existing
3351 # events only, using:
3352 # self.dispatch._update(proxied.dispatch)
3353 #
3354 # that might be more appropriate however it would be a behavioral
3355 # change for logic that assigns events to the parent engine and
3356 # would like it to take effect for the already-created sub-engine.
3357 self.dispatch = self.dispatch._join(proxied.dispatch)
3358
3359 self._execution_options = proxied._execution_options
3360 self.update_execution_options(**execution_options)
3361
3362 def update_execution_options(self, **opt: Any) -> None:
3363 raise NotImplementedError()
3364
3365 if not typing.TYPE_CHECKING:
3366 # https://github.com/python/typing/discussions/1095
3367
3368 @property
3369 def pool(self) -> Pool:
3370 return self._proxied.pool
3371
3372 @pool.setter
3373 def pool(self, pool: Pool) -> None:
3374 self._proxied.pool = pool
3375
3376 @property
3377 def _has_events(self) -> bool:
3378 return self._proxied._has_events or self.__dict__.get(
3379 "_has_events", False
3380 )
3381
3382 @_has_events.setter
3383 def _has_events(self, value: bool) -> None:
3384 self.__dict__["_has_events"] = value
3385
3386
3387class OptionEngine(OptionEngineMixin, Engine):
3388 def update_execution_options(self, **opt: Any) -> None:
3389 Engine.update_execution_options(self, **opt)
3390
3391
3392Engine._option_cls = OptionEngine