1# engine/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`."""
8from __future__ import annotations
9
10import contextlib
11import sys
12import typing
13from typing import Any
14from typing import Callable
15from typing import cast
16from typing import Iterable
17from typing import Iterator
18from typing import List
19from typing import Mapping
20from typing import NoReturn
21from typing import Optional
22from typing import overload
23from typing import Tuple
24from typing import Type
25from typing import TypeVar
26from typing import Union
27
28from .interfaces import BindTyping
29from .interfaces import ConnectionEventsTarget
30from .interfaces import DBAPICursor
31from .interfaces import ExceptionContext
32from .interfaces import ExecuteStyle
33from .interfaces import ExecutionContext
34from .interfaces import IsolationLevel
35from .util import _distill_params_20
36from .util import _distill_raw_params
37from .util import TransactionalContext
38from .. import exc
39from .. import inspection
40from .. import log
41from .. import util
42from ..sql import compiler
43from ..sql import util as sql_util
44
45if typing.TYPE_CHECKING:
46 from . import CursorResult
47 from . import ScalarResult
48 from .interfaces import _AnyExecuteParams
49 from .interfaces import _AnyMultiExecuteParams
50 from .interfaces import _CoreAnyExecuteParams
51 from .interfaces import _CoreMultiExecuteParams
52 from .interfaces import _CoreSingleExecuteParams
53 from .interfaces import _DBAPIAnyExecuteParams
54 from .interfaces import _DBAPISingleExecuteParams
55 from .interfaces import _ExecuteOptions
56 from .interfaces import CompiledCacheType
57 from .interfaces import CoreExecuteOptionsParameter
58 from .interfaces import Dialect
59 from .interfaces import SchemaTranslateMapType
60 from .reflection import Inspector # noqa
61 from .url import URL
62 from ..event import dispatcher
63 from ..log import _EchoFlagType
64 from ..pool import _ConnectionFairy
65 from ..pool import Pool
66 from ..pool import PoolProxiedConnection
67 from ..sql import Executable
68 from ..sql._typing import _InfoType
69 from ..sql.compiler import Compiled
70 from ..sql.ddl import ExecutableDDLElement
71 from ..sql.ddl import InvokeDDLBase
72 from ..sql.functions import FunctionElement
73 from ..sql.schema import DefaultGenerator
74 from ..sql.schema import HasSchemaAttr
75 from ..sql.schema import SchemaVisitable
76 from ..sql.selectable import TypedReturnsRows
77
78
79_T = TypeVar("_T", bound=Any)
80_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
81NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
82
83
84class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
85 """Provides high-level functionality for a wrapped DB-API connection.
86
87 The :class:`_engine.Connection` object is procured by calling the
88 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
89 object, and provides services for execution of SQL statements as well
90 as transaction control.
91
92 The Connection object is **not** thread-safe. While a Connection can be
93 shared among threads using properly synchronized access, it is still
94 possible that the underlying DBAPI connection may not support shared
95 access between threads. Check the DBAPI documentation for details.
96
97 The Connection object represents a single DBAPI connection checked out
98 from the connection pool. In this state, the connection pool has no
99 affect upon the connection, including its expiration or timeout state.
100 For the connection pool to properly manage connections, connections
101 should be returned to the connection pool (i.e. ``connection.close()``)
102 whenever the connection is not in use.
103
104 .. index::
105 single: thread safety; Connection
106
107 """
108
109 dialect: Dialect
110 dispatch: dispatcher[ConnectionEventsTarget]
111
112 _sqla_logger_namespace = "sqlalchemy.engine.Connection"
113
114 # used by sqlalchemy.engine.util.TransactionalContext
115 _trans_context_manager: Optional[TransactionalContext] = None
116
117 # legacy as of 2.0, should be eventually deprecated and
118 # removed. was used in the "pre_ping" recipe that's been in the docs
119 # a long time
120 should_close_with_result = False
121
122 _dbapi_connection: Optional[PoolProxiedConnection]
123
124 _execution_options: _ExecuteOptions
125
126 _transaction: Optional[RootTransaction]
127 _nested_transaction: Optional[NestedTransaction]
128
129 def __init__(
130 self,
131 engine: Engine,
132 connection: Optional[PoolProxiedConnection] = None,
133 _has_events: Optional[bool] = None,
134 _allow_revalidate: bool = True,
135 _allow_autobegin: bool = True,
136 ):
137 """Construct a new Connection."""
138 self.engine = engine
139 self.dialect = dialect = engine.dialect
140
141 if connection is None:
142 try:
143 self._dbapi_connection = engine.raw_connection()
144 except dialect.loaded_dbapi.Error as err:
145 Connection._handle_dbapi_exception_noconnection(
146 err, dialect, engine
147 )
148 raise
149 else:
150 self._dbapi_connection = connection
151
152 self._transaction = self._nested_transaction = None
153 self.__savepoint_seq = 0
154 self.__in_begin = False
155
156 self.__can_reconnect = _allow_revalidate
157 self._allow_autobegin = _allow_autobegin
158 self._echo = self.engine._should_log_info()
159
160 if _has_events is None:
161 # if _has_events is sent explicitly as False,
162 # then don't join the dispatch of the engine; we don't
163 # want to handle any of the engine's events in that case.
164 self.dispatch = self.dispatch._join(engine.dispatch)
165 self._has_events = _has_events or (
166 _has_events is None and engine._has_events
167 )
168
169 self._execution_options = engine._execution_options
170
171 if self._has_events or self.engine._has_events:
172 self.dispatch.engine_connect(self)
173
174 # this can be assigned differently via
175 # characteristics.LoggingTokenCharacteristic
176 _message_formatter: Any = None
177
178 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
179 fmt = self._message_formatter
180
181 if fmt:
182 message = fmt(message)
183
184 if log.STACKLEVEL:
185 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
186
187 self.engine.logger.info(message, *arg, **kw)
188
189 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
190 fmt = self._message_formatter
191
192 if fmt:
193 message = fmt(message)
194
195 if log.STACKLEVEL:
196 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
197
198 self.engine.logger.debug(message, *arg, **kw)
199
200 @property
201 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]:
202 schema_translate_map: Optional[SchemaTranslateMapType] = (
203 self._execution_options.get("schema_translate_map", None)
204 )
205
206 return schema_translate_map
207
208 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
209 """Return the schema name for the given schema item taking into
210 account current schema translate map.
211
212 """
213
214 name = obj.schema
215 schema_translate_map: Optional[SchemaTranslateMapType] = (
216 self._execution_options.get("schema_translate_map", None)
217 )
218
219 if (
220 schema_translate_map
221 and name in schema_translate_map
222 and obj._use_schema_map
223 ):
224 return schema_translate_map[name]
225 else:
226 return name
227
228 def __enter__(self) -> Connection:
229 return self
230
231 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
232 self.close()
233
234 @overload
235 def execution_options(
236 self,
237 *,
238 compiled_cache: Optional[CompiledCacheType] = ...,
239 logging_token: str = ...,
240 isolation_level: IsolationLevel = ...,
241 no_parameters: bool = False,
242 stream_results: bool = False,
243 max_row_buffer: int = ...,
244 yield_per: int = ...,
245 insertmanyvalues_page_size: int = ...,
246 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
247 preserve_rowcount: bool = False,
248 **opt: Any,
249 ) -> Connection: ...
250
251 @overload
252 def execution_options(self, **opt: Any) -> Connection: ...
253
254 def execution_options(self, **opt: Any) -> Connection:
255 r"""Set non-SQL options for the connection which take effect
256 during execution.
257
258 This method modifies this :class:`_engine.Connection` **in-place**;
259 the return value is the same :class:`_engine.Connection` object
260 upon which the method is called. Note that this is in contrast
261 to the behavior of the ``execution_options`` methods on other
262 objects such as :meth:`_engine.Engine.execution_options` and
263 :meth:`_sql.Executable.execution_options`. The rationale is that many
264 such execution options necessarily modify the state of the base
265 DBAPI connection in any case so there is no feasible means of
266 keeping the effect of such an option localized to a "sub" connection.
267
268 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options`
269 method, in contrast to other objects with this method, modifies
270 the connection in-place without creating copy of it.
271
272 As discussed elsewhere, the :meth:`_engine.Connection.execution_options`
273 method accepts any arbitrary parameters including user defined names.
274 All parameters given are consumable in a number of ways including
275 by using the :meth:`_engine.Connection.get_execution_options` method.
276 See the examples at :meth:`_sql.Executable.execution_options`
277 and :meth:`_engine.Engine.execution_options`.
278
279 The keywords that are currently recognized by SQLAlchemy itself
280 include all those listed under :meth:`.Executable.execution_options`,
281 as well as others that are specific to :class:`_engine.Connection`.
282
283 :param compiled_cache: Available on: :class:`_engine.Connection`,
284 :class:`_engine.Engine`.
285
286 A dictionary where :class:`.Compiled` objects
287 will be cached when the :class:`_engine.Connection`
288 compiles a clause
289 expression into a :class:`.Compiled` object. This dictionary will
290 supersede the statement cache that may be configured on the
291 :class:`_engine.Engine` itself. If set to None, caching
292 is disabled, even if the engine has a configured cache size.
293
294 Note that the ORM makes use of its own "compiled" caches for
295 some operations, including flush operations. The caching
296 used by the ORM internally supersedes a cache dictionary
297 specified here.
298
299 :param logging_token: Available on: :class:`_engine.Connection`,
300 :class:`_engine.Engine`, :class:`_sql.Executable`.
301
302 Adds the specified string token surrounded by brackets in log
303 messages logged by the connection, i.e. the logging that's enabled
304 either via the :paramref:`_sa.create_engine.echo` flag or via the
305 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
306 per-connection or per-sub-engine token to be available which is
307 useful for debugging concurrent connection scenarios.
308
309 .. versionadded:: 1.4.0b2
310
311 .. seealso::
312
313 :ref:`dbengine_logging_tokens` - usage example
314
315 :paramref:`_sa.create_engine.logging_name` - adds a name to the
316 name used by the Python logger object itself.
317
318 :param isolation_level: Available on: :class:`_engine.Connection`,
319 :class:`_engine.Engine`.
320
321 Set the transaction isolation level for the lifespan of this
322 :class:`_engine.Connection` object.
323 Valid values include those string
324 values accepted by the :paramref:`_sa.create_engine.isolation_level`
325 parameter passed to :func:`_sa.create_engine`. These levels are
326 semi-database specific; see individual dialect documentation for
327 valid levels.
328
329 The isolation level option applies the isolation level by emitting
330 statements on the DBAPI connection, and **necessarily affects the
331 original Connection object overall**. The isolation level will remain
332 at the given setting until explicitly changed, or when the DBAPI
333 connection itself is :term:`released` to the connection pool, i.e. the
334 :meth:`_engine.Connection.close` method is called, at which time an
335 event handler will emit additional statements on the DBAPI connection
336 in order to revert the isolation level change.
337
338 .. note:: The ``isolation_level`` execution option may only be
339 established before the :meth:`_engine.Connection.begin` method is
340 called, as well as before any SQL statements are emitted which
341 would otherwise trigger "autobegin", or directly after a call to
342 :meth:`_engine.Connection.commit` or
343 :meth:`_engine.Connection.rollback`. A database cannot change the
344 isolation level on a transaction in progress.
345
346 .. note:: The ``isolation_level`` execution option is implicitly
347 reset if the :class:`_engine.Connection` is invalidated, e.g. via
348 the :meth:`_engine.Connection.invalidate` method, or if a
349 disconnection error occurs. The new connection produced after the
350 invalidation will **not** have the selected isolation level
351 re-applied to it automatically.
352
353 .. seealso::
354
355 :ref:`dbapi_autocommit`
356
357 :meth:`_engine.Connection.get_isolation_level`
358 - view current actual level
359
360 :param no_parameters: Available on: :class:`_engine.Connection`,
361 :class:`_sql.Executable`.
362
363 When ``True``, if the final parameter
364 list or dictionary is totally empty, will invoke the
365 statement on the cursor as ``cursor.execute(statement)``,
366 not passing the parameter collection at all.
367 Some DBAPIs such as psycopg2 and mysql-python consider
368 percent signs as significant only when parameters are
369 present; this option allows code to generate SQL
370 containing percent signs (and possibly other characters)
371 that is neutral regarding whether it's executed by the DBAPI
372 or piped into a script that's later invoked by
373 command line tools.
374
375 :param stream_results: Available on: :class:`_engine.Connection`,
376 :class:`_sql.Executable`.
377
378 Indicate to the dialect that results should be "streamed" and not
379 pre-buffered, if possible. For backends such as PostgreSQL, MySQL
380 and MariaDB, this indicates the use of a "server side cursor" as
381 opposed to a client side cursor. Other backends such as that of
382 Oracle Database may already use server side cursors by default.
383
384 The usage of
385 :paramref:`_engine.Connection.execution_options.stream_results` is
386 usually combined with setting a fixed number of rows to to be fetched
387 in batches, to allow for efficient iteration of database rows while
388 at the same time not loading all result rows into memory at once;
389 this can be configured on a :class:`_engine.Result` object using the
390 :meth:`_engine.Result.yield_per` method, after execution has
391 returned a new :class:`_engine.Result`. If
392 :meth:`_engine.Result.yield_per` is not used,
393 the :paramref:`_engine.Connection.execution_options.stream_results`
394 mode of operation will instead use a dynamically sized buffer
395 which buffers sets of rows at a time, growing on each batch
396 based on a fixed growth size up until a limit which may
397 be configured using the
398 :paramref:`_engine.Connection.execution_options.max_row_buffer`
399 parameter.
400
401 When using the ORM to fetch ORM mapped objects from a result,
402 :meth:`_engine.Result.yield_per` should always be used with
403 :paramref:`_engine.Connection.execution_options.stream_results`,
404 so that the ORM does not fetch all rows into new ORM objects at once.
405
406 For typical use, the
407 :paramref:`_engine.Connection.execution_options.yield_per` execution
408 option should be preferred, which sets up both
409 :paramref:`_engine.Connection.execution_options.stream_results` and
410 :meth:`_engine.Result.yield_per` at once. This option is supported
411 both at a core level by :class:`_engine.Connection` as well as by the
412 ORM :class:`_engine.Session`; the latter is described at
413 :ref:`orm_queryguide_yield_per`.
414
415 .. seealso::
416
417 :ref:`engine_stream_results` - background on
418 :paramref:`_engine.Connection.execution_options.stream_results`
419
420 :paramref:`_engine.Connection.execution_options.max_row_buffer`
421
422 :paramref:`_engine.Connection.execution_options.yield_per`
423
424 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
425 describing the ORM version of ``yield_per``
426
427 :param max_row_buffer: Available on: :class:`_engine.Connection`,
428 :class:`_sql.Executable`. Sets a maximum
429 buffer size to use when the
430 :paramref:`_engine.Connection.execution_options.stream_results`
431 execution option is used on a backend that supports server side
432 cursors. The default value if not specified is 1000.
433
434 .. seealso::
435
436 :paramref:`_engine.Connection.execution_options.stream_results`
437
438 :ref:`engine_stream_results`
439
440
441 :param yield_per: Available on: :class:`_engine.Connection`,
442 :class:`_sql.Executable`. Integer value applied which will
443 set the :paramref:`_engine.Connection.execution_options.stream_results`
444 execution option and invoke :meth:`_engine.Result.yield_per`
445 automatically at once. Allows equivalent functionality as
446 is present when using this parameter with the ORM.
447
448 .. versionadded:: 1.4.40
449
450 .. seealso::
451
452 :ref:`engine_stream_results` - background and examples
453 on using server side cursors with Core.
454
455 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
456 describing the ORM version of ``yield_per``
457
458 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`,
459 :class:`_engine.Engine`. Number of rows to format into an
460 INSERT statement when the statement uses "insertmanyvalues" mode,
461 which is a paged form of bulk insert that is used for many backends
462 when using :term:`executemany` execution typically in conjunction
463 with RETURNING. Defaults to 1000. May also be modified on a
464 per-engine basis using the
465 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
466
467 .. versionadded:: 2.0
468
469 .. seealso::
470
471 :ref:`engine_insertmanyvalues`
472
473 :param schema_translate_map: Available on: :class:`_engine.Connection`,
474 :class:`_engine.Engine`, :class:`_sql.Executable`.
475
476 A dictionary mapping schema names to schema names, that will be
477 applied to the :paramref:`_schema.Table.schema` element of each
478 :class:`_schema.Table`
479 encountered when SQL or DDL expression elements
480 are compiled into strings; the resulting schema name will be
481 converted based on presence in the map of the original name.
482
483 .. seealso::
484
485 :ref:`schema_translating`
486
487 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount``
488 attribute will be unconditionally memoized within the result and
489 made available via the :attr:`.CursorResult.rowcount` attribute.
490 Normally, this attribute is only preserved for UPDATE and DELETE
491 statements. Using this option, the DBAPIs rowcount value can
492 be accessed for other kinds of statements such as INSERT and SELECT,
493 to the degree that the DBAPI supports these statements. See
494 :attr:`.CursorResult.rowcount` for notes regarding the behavior
495 of this attribute.
496
497 .. versionadded:: 2.0.28
498
499 .. seealso::
500
501 :meth:`_engine.Engine.execution_options`
502
503 :meth:`.Executable.execution_options`
504
505 :meth:`_engine.Connection.get_execution_options`
506
507 :ref:`orm_queryguide_execution_options` - documentation on all
508 ORM-specific execution options
509
510 """ # noqa
511 if self._has_events or self.engine._has_events:
512 self.dispatch.set_connection_execution_options(self, opt)
513 self._execution_options = self._execution_options.union(opt)
514 self.dialect.set_connection_execution_options(self, opt)
515 return self
516
517 def get_execution_options(self) -> _ExecuteOptions:
518 """Get the non-SQL options which will take effect during execution.
519
520 .. versionadded:: 1.3
521
522 .. seealso::
523
524 :meth:`_engine.Connection.execution_options`
525 """
526 return self._execution_options
527
528 @property
529 def _still_open_and_dbapi_connection_is_valid(self) -> bool:
530 pool_proxied_connection = self._dbapi_connection
531 return (
532 pool_proxied_connection is not None
533 and pool_proxied_connection.is_valid
534 )
535
536 @property
537 def closed(self) -> bool:
538 """Return True if this connection is closed."""
539
540 return self._dbapi_connection is None and not self.__can_reconnect
541
542 @property
543 def invalidated(self) -> bool:
544 """Return True if this connection was invalidated.
545
546 This does not indicate whether or not the connection was
547 invalidated at the pool level, however
548
549 """
550
551 # prior to 1.4, "invalid" was stored as a state independent of
552 # "closed", meaning an invalidated connection could be "closed",
553 # the _dbapi_connection would be None and closed=True, yet the
554 # "invalid" flag would stay True. This meant that there were
555 # three separate states (open/valid, closed/valid, closed/invalid)
556 # when there is really no reason for that; a connection that's
557 # "closed" does not need to be "invalid". So the state is now
558 # represented by the two facts alone.
559
560 pool_proxied_connection = self._dbapi_connection
561 return pool_proxied_connection is None and self.__can_reconnect
562
563 @property
564 def connection(self) -> PoolProxiedConnection:
565 """The underlying DB-API connection managed by this Connection.
566
567 This is a SQLAlchemy connection-pool proxied connection
568 which then has the attribute
569 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
570 actual driver connection.
571
572 .. seealso::
573
574
575 :ref:`dbapi_connections`
576
577 """
578
579 if self._dbapi_connection is None:
580 try:
581 return self._revalidate_connection()
582 except (exc.PendingRollbackError, exc.ResourceClosedError):
583 raise
584 except BaseException as e:
585 self._handle_dbapi_exception(e, None, None, None, None)
586 else:
587 return self._dbapi_connection
588
589 def get_isolation_level(self) -> IsolationLevel:
590 """Return the current **actual** isolation level that's present on
591 the database within the scope of this connection.
592
593 This attribute will perform a live SQL operation against the database
594 in order to procure the current isolation level, so the value returned
595 is the actual level on the underlying DBAPI connection regardless of
596 how this state was set. This will be one of the four actual isolation
597 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
598 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
599 level setting. Third party dialects may also feature additional
600 isolation level settings.
601
602 .. note:: This method **will not report** on the ``AUTOCOMMIT``
603 isolation level, which is a separate :term:`dbapi` setting that's
604 independent of **actual** isolation level. When ``AUTOCOMMIT`` is
605 in use, the database connection still has a "traditional" isolation
606 mode in effect, that is typically one of the four values
607 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
608 ``SERIALIZABLE``.
609
610 Compare to the :attr:`_engine.Connection.default_isolation_level`
611 accessor which returns the isolation level that is present on the
612 database at initial connection time.
613
614 .. seealso::
615
616 :attr:`_engine.Connection.default_isolation_level`
617 - view default level
618
619 :paramref:`_sa.create_engine.isolation_level`
620 - set per :class:`_engine.Engine` isolation level
621
622 :paramref:`.Connection.execution_options.isolation_level`
623 - set per :class:`_engine.Connection` isolation level
624
625 """
626 dbapi_connection = self.connection.dbapi_connection
627 assert dbapi_connection is not None
628 try:
629 return self.dialect.get_isolation_level(dbapi_connection)
630 except BaseException as e:
631 self._handle_dbapi_exception(e, None, None, None, None)
632
633 @property
634 def default_isolation_level(self) -> Optional[IsolationLevel]:
635 """The initial-connection time isolation level associated with the
636 :class:`_engine.Dialect` in use.
637
638 This value is independent of the
639 :paramref:`.Connection.execution_options.isolation_level` and
640 :paramref:`.Engine.execution_options.isolation_level` execution
641 options, and is determined by the :class:`_engine.Dialect` when the
642 first connection is created, by performing a SQL query against the
643 database for the current isolation level before any additional commands
644 have been emitted.
645
646 Calling this accessor does not invoke any new SQL queries.
647
648 .. seealso::
649
650 :meth:`_engine.Connection.get_isolation_level`
651 - view current actual isolation level
652
653 :paramref:`_sa.create_engine.isolation_level`
654 - set per :class:`_engine.Engine` isolation level
655
656 :paramref:`.Connection.execution_options.isolation_level`
657 - set per :class:`_engine.Connection` isolation level
658
659 """
660 return self.dialect.default_isolation_level
661
662 def _invalid_transaction(self) -> NoReturn:
663 raise exc.PendingRollbackError(
664 "Can't reconnect until invalid %stransaction is rolled "
665 "back. Please rollback() fully before proceeding"
666 % ("savepoint " if self._nested_transaction is not None else ""),
667 code="8s2b",
668 )
669
670 def _revalidate_connection(self) -> PoolProxiedConnection:
671 if self.__can_reconnect and self.invalidated:
672 if self._transaction is not None:
673 self._invalid_transaction()
674 self._dbapi_connection = self.engine.raw_connection()
675 return self._dbapi_connection
676 raise exc.ResourceClosedError("This Connection is closed")
677
678 @property
679 def info(self) -> _InfoType:
680 """Info dictionary associated with the underlying DBAPI connection
681 referred to by this :class:`_engine.Connection`, allowing user-defined
682 data to be associated with the connection.
683
684 The data here will follow along with the DBAPI connection including
685 after it is returned to the connection pool and used again
686 in subsequent instances of :class:`_engine.Connection`.
687
688 """
689
690 return self.connection.info
691
692 def invalidate(self, exception: Optional[BaseException] = None) -> None:
693 """Invalidate the underlying DBAPI connection associated with
694 this :class:`_engine.Connection`.
695
696 An attempt will be made to close the underlying DBAPI connection
697 immediately; however if this operation fails, the error is logged
698 but not raised. The connection is then discarded whether or not
699 close() succeeded.
700
701 Upon the next use (where "use" typically means using the
702 :meth:`_engine.Connection.execute` method or similar),
703 this :class:`_engine.Connection` will attempt to
704 procure a new DBAPI connection using the services of the
705 :class:`_pool.Pool` as a source of connectivity (e.g.
706 a "reconnection").
707
708 If a transaction was in progress (e.g. the
709 :meth:`_engine.Connection.begin` method has been called) when
710 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
711 level all state associated with this transaction is lost, as
712 the DBAPI connection is closed. The :class:`_engine.Connection`
713 will not allow a reconnection to proceed until the
714 :class:`.Transaction` object is ended, by calling the
715 :meth:`.Transaction.rollback` method; until that point, any attempt at
716 continuing to use the :class:`_engine.Connection` will raise an
717 :class:`~sqlalchemy.exc.InvalidRequestError`.
718 This is to prevent applications from accidentally
719 continuing an ongoing transactional operations despite the
720 fact that the transaction has been lost due to an
721 invalidation.
722
723 The :meth:`_engine.Connection.invalidate` method,
724 just like auto-invalidation,
725 will at the connection pool level invoke the
726 :meth:`_events.PoolEvents.invalidate` event.
727
728 :param exception: an optional ``Exception`` instance that's the
729 reason for the invalidation. is passed along to event handlers
730 and logging functions.
731
732 .. seealso::
733
734 :ref:`pool_connection_invalidation`
735
736 """
737
738 if self.invalidated:
739 return
740
741 if self.closed:
742 raise exc.ResourceClosedError("This Connection is closed")
743
744 if self._still_open_and_dbapi_connection_is_valid:
745 pool_proxied_connection = self._dbapi_connection
746 assert pool_proxied_connection is not None
747 pool_proxied_connection.invalidate(exception)
748
749 self._dbapi_connection = None
750
751 def detach(self) -> None:
752 """Detach the underlying DB-API connection from its connection pool.
753
754 E.g.::
755
756 with engine.connect() as conn:
757 conn.detach()
758 conn.execute(text("SET search_path TO schema1, schema2"))
759
760 # work with connection
761
762 # connection is fully closed (since we used "with:", can
763 # also call .close())
764
765 This :class:`_engine.Connection` instance will remain usable.
766 When closed
767 (or exited from a context manager context as above),
768 the DB-API connection will be literally closed and not
769 returned to its originating pool.
770
771 This method can be used to insulate the rest of an application
772 from a modified state on a connection (such as a transaction
773 isolation level or similar).
774
775 """
776
777 if self.closed:
778 raise exc.ResourceClosedError("This Connection is closed")
779
780 pool_proxied_connection = self._dbapi_connection
781 if pool_proxied_connection is None:
782 raise exc.InvalidRequestError(
783 "Can't detach an invalidated Connection"
784 )
785 pool_proxied_connection.detach()
786
787 def _autobegin(self) -> None:
788 if self._allow_autobegin and not self.__in_begin:
789 self.begin()
790
791 def begin(self) -> RootTransaction:
792 """Begin a transaction prior to autobegin occurring.
793
794 E.g.::
795
796 with engine.connect() as conn:
797 with conn.begin() as trans:
798 conn.execute(table.insert(), {"username": "sandy"})
799
800 The returned object is an instance of :class:`_engine.RootTransaction`.
801 This object represents the "scope" of the transaction,
802 which completes when either the :meth:`_engine.Transaction.rollback`
803 or :meth:`_engine.Transaction.commit` method is called; the object
804 also works as a context manager as illustrated above.
805
806 The :meth:`_engine.Connection.begin` method begins a
807 transaction that normally will be begun in any case when the connection
808 is first used to execute a statement. The reason this method might be
809 used would be to invoke the :meth:`_events.ConnectionEvents.begin`
810 event at a specific time, or to organize code within the scope of a
811 connection checkout in terms of context managed blocks, such as::
812
813 with engine.connect() as conn:
814 with conn.begin():
815 conn.execute(...)
816 conn.execute(...)
817
818 with conn.begin():
819 conn.execute(...)
820 conn.execute(...)
821
822 The above code is not fundamentally any different in its behavior than
823 the following code which does not use
824 :meth:`_engine.Connection.begin`; the below style is known
825 as "commit as you go" style::
826
827 with engine.connect() as conn:
828 conn.execute(...)
829 conn.execute(...)
830 conn.commit()
831
832 conn.execute(...)
833 conn.execute(...)
834 conn.commit()
835
836 From a database point of view, the :meth:`_engine.Connection.begin`
837 method does not emit any SQL or change the state of the underlying
838 DBAPI connection in any way; the Python DBAPI does not have any
839 concept of explicit transaction begin.
840
841 .. seealso::
842
843 :ref:`tutorial_working_with_transactions` - in the
844 :ref:`unified_tutorial`
845
846 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
847
848 :meth:`_engine.Connection.begin_twophase` -
849 use a two phase /XID transaction
850
851 :meth:`_engine.Engine.begin` - context manager available from
852 :class:`_engine.Engine`
853
854 """
855 if self._transaction is None:
856 self._transaction = RootTransaction(self)
857 return self._transaction
858 else:
859 raise exc.InvalidRequestError(
860 "This connection has already initialized a SQLAlchemy "
861 "Transaction() object via begin() or autobegin; can't "
862 "call begin() here unless rollback() or commit() "
863 "is called first."
864 )
865
866 def begin_nested(self) -> NestedTransaction:
867 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
868 handle that controls the scope of the SAVEPOINT.
869
870 E.g.::
871
872 with engine.begin() as connection:
873 with connection.begin_nested():
874 connection.execute(table.insert(), {"username": "sandy"})
875
876 The returned object is an instance of
877 :class:`_engine.NestedTransaction`, which includes transactional
878 methods :meth:`_engine.NestedTransaction.commit` and
879 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction,
880 these methods correspond to the operations "RELEASE SAVEPOINT <name>"
881 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local
882 to the :class:`_engine.NestedTransaction` object and is generated
883 automatically. Like any other :class:`_engine.Transaction`, the
884 :class:`_engine.NestedTransaction` may be used as a context manager as
885 illustrated above which will "release" or "rollback" corresponding to
886 if the operation within the block were successful or raised an
887 exception.
888
889 Nested transactions require SAVEPOINT support in the underlying
890 database, else the behavior is undefined. SAVEPOINT is commonly used to
891 run operations within a transaction that may fail, while continuing the
892 outer transaction. E.g.::
893
894 from sqlalchemy import exc
895
896 with engine.begin() as connection:
897 trans = connection.begin_nested()
898 try:
899 connection.execute(table.insert(), {"username": "sandy"})
900 trans.commit()
901 except exc.IntegrityError: # catch for duplicate username
902 trans.rollback() # rollback to savepoint
903
904 # outer transaction continues
905 connection.execute(...)
906
907 If :meth:`_engine.Connection.begin_nested` is called without first
908 calling :meth:`_engine.Connection.begin` or
909 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object
910 will "autobegin" the outer transaction first. This outer transaction
911 may be committed using "commit-as-you-go" style, e.g.::
912
913 with engine.connect() as connection: # begin() wasn't called
914
915 with connection.begin_nested(): # will auto-"begin()" first
916 connection.execute(...)
917 # savepoint is released
918
919 connection.execute(...)
920
921 # explicitly commit outer transaction
922 connection.commit()
923
924 # can continue working with connection here
925
926 .. versionchanged:: 2.0
927
928 :meth:`_engine.Connection.begin_nested` will now participate
929 in the connection "autobegin" behavior that is new as of
930 2.0 / "future" style connections in 1.4.
931
932 .. seealso::
933
934 :meth:`_engine.Connection.begin`
935
936 :ref:`session_begin_nested` - ORM support for SAVEPOINT
937
938 """
939 if self._transaction is None:
940 self._autobegin()
941
942 return NestedTransaction(self)
943
944 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
945 """Begin a two-phase or XA transaction and return a transaction
946 handle.
947
948 The returned object is an instance of :class:`.TwoPhaseTransaction`,
949 which in addition to the methods provided by
950 :class:`.Transaction`, also provides a
951 :meth:`~.TwoPhaseTransaction.prepare` method.
952
953 :param xid: the two phase transaction id. If not supplied, a
954 random id will be generated.
955
956 .. seealso::
957
958 :meth:`_engine.Connection.begin`
959
960 :meth:`_engine.Connection.begin_twophase`
961
962 """
963
964 if self._transaction is not None:
965 raise exc.InvalidRequestError(
966 "Cannot start a two phase transaction when a transaction "
967 "is already in progress."
968 )
969 if xid is None:
970 xid = self.engine.dialect.create_xid()
971 return TwoPhaseTransaction(self, xid)
972
973 def commit(self) -> None:
974 """Commit the transaction that is currently in progress.
975
976 This method commits the current transaction if one has been started.
977 If no transaction was started, the method has no effect, assuming
978 the connection is in a non-invalidated state.
979
980 A transaction is begun on a :class:`_engine.Connection` automatically
981 whenever a statement is first executed, or when the
982 :meth:`_engine.Connection.begin` method is called.
983
984 .. note:: The :meth:`_engine.Connection.commit` method only acts upon
985 the primary database transaction that is linked to the
986 :class:`_engine.Connection` object. It does not operate upon a
987 SAVEPOINT that would have been invoked from the
988 :meth:`_engine.Connection.begin_nested` method; for control of a
989 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
990 :class:`_engine.NestedTransaction` that is returned by the
991 :meth:`_engine.Connection.begin_nested` method itself.
992
993
994 """
995 if self._transaction:
996 self._transaction.commit()
997
998 def rollback(self) -> None:
999 """Roll back the transaction that is currently in progress.
1000
1001 This method rolls back the current transaction if one has been started.
1002 If no transaction was started, the method has no effect. If a
1003 transaction was started and the connection is in an invalidated state,
1004 the transaction is cleared using this method.
1005
1006 A transaction is begun on a :class:`_engine.Connection` automatically
1007 whenever a statement is first executed, or when the
1008 :meth:`_engine.Connection.begin` method is called.
1009
1010 .. note:: The :meth:`_engine.Connection.rollback` method only acts
1011 upon the primary database transaction that is linked to the
1012 :class:`_engine.Connection` object. It does not operate upon a
1013 SAVEPOINT that would have been invoked from the
1014 :meth:`_engine.Connection.begin_nested` method; for control of a
1015 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
1016 :class:`_engine.NestedTransaction` that is returned by the
1017 :meth:`_engine.Connection.begin_nested` method itself.
1018
1019
1020 """
1021 if self._transaction:
1022 self._transaction.rollback()
1023
1024 def recover_twophase(self) -> List[Any]:
1025 return self.engine.dialect.do_recover_twophase(self)
1026
1027 def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
1028 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
1029
1030 def commit_prepared(self, xid: Any, recover: bool = False) -> None:
1031 self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
1032
1033 def in_transaction(self) -> bool:
1034 """Return True if a transaction is in progress."""
1035 return self._transaction is not None and self._transaction.is_active
1036
1037 def in_nested_transaction(self) -> bool:
1038 """Return True if a transaction is in progress."""
1039 return (
1040 self._nested_transaction is not None
1041 and self._nested_transaction.is_active
1042 )
1043
1044 def _is_autocommit_isolation(self) -> bool:
1045 opt_iso = self._execution_options.get("isolation_level", None)
1046 return bool(
1047 opt_iso == "AUTOCOMMIT"
1048 or (
1049 opt_iso is None
1050 and self.engine.dialect._on_connect_isolation_level
1051 == "AUTOCOMMIT"
1052 )
1053 )
1054
1055 def _get_required_transaction(self) -> RootTransaction:
1056 trans = self._transaction
1057 if trans is None:
1058 raise exc.InvalidRequestError("connection is not in a transaction")
1059 return trans
1060
1061 def _get_required_nested_transaction(self) -> NestedTransaction:
1062 trans = self._nested_transaction
1063 if trans is None:
1064 raise exc.InvalidRequestError(
1065 "connection is not in a nested transaction"
1066 )
1067 return trans
1068
1069 def get_transaction(self) -> Optional[RootTransaction]:
1070 """Return the current root transaction in progress, if any.
1071
1072 .. versionadded:: 1.4
1073
1074 """
1075
1076 return self._transaction
1077
1078 def get_nested_transaction(self) -> Optional[NestedTransaction]:
1079 """Return the current nested transaction in progress, if any.
1080
1081 .. versionadded:: 1.4
1082
1083 """
1084 return self._nested_transaction
1085
1086 def _begin_impl(self, transaction: RootTransaction) -> None:
1087 if self._echo:
1088 if self._is_autocommit_isolation():
1089 self._log_info(
1090 "BEGIN (implicit; DBAPI should not BEGIN due to "
1091 "autocommit mode)"
1092 )
1093 else:
1094 self._log_info("BEGIN (implicit)")
1095
1096 self.__in_begin = True
1097
1098 if self._has_events or self.engine._has_events:
1099 self.dispatch.begin(self)
1100
1101 try:
1102 self.engine.dialect.do_begin(self.connection)
1103 except BaseException as e:
1104 self._handle_dbapi_exception(e, None, None, None, None)
1105 finally:
1106 self.__in_begin = False
1107
1108 def _rollback_impl(self) -> None:
1109 if self._has_events or self.engine._has_events:
1110 self.dispatch.rollback(self)
1111
1112 if self._still_open_and_dbapi_connection_is_valid:
1113 if self._echo:
1114 if self._is_autocommit_isolation():
1115 if self.dialect.skip_autocommit_rollback:
1116 self._log_info(
1117 "ROLLBACK will be skipped by "
1118 "skip_autocommit_rollback"
1119 )
1120 else:
1121 self._log_info(
1122 "ROLLBACK using DBAPI connection.rollback(); "
1123 "set skip_autocommit_rollback to prevent fully"
1124 )
1125 else:
1126 self._log_info("ROLLBACK")
1127 try:
1128 self.engine.dialect.do_rollback(self.connection)
1129 except BaseException as e:
1130 self._handle_dbapi_exception(e, None, None, None, None)
1131
1132 def _commit_impl(self) -> None:
1133 if self._has_events or self.engine._has_events:
1134 self.dispatch.commit(self)
1135
1136 if self._echo:
1137 if self._is_autocommit_isolation():
1138 self._log_info(
1139 "COMMIT using DBAPI connection.commit(), "
1140 "has no effect due to autocommit mode"
1141 )
1142 else:
1143 self._log_info("COMMIT")
1144 try:
1145 self.engine.dialect.do_commit(self.connection)
1146 except BaseException as e:
1147 self._handle_dbapi_exception(e, None, None, None, None)
1148
1149 def _savepoint_impl(self, name: Optional[str] = None) -> str:
1150 if self._has_events or self.engine._has_events:
1151 self.dispatch.savepoint(self, name)
1152
1153 if name is None:
1154 self.__savepoint_seq += 1
1155 name = "sa_savepoint_%s" % self.__savepoint_seq
1156 self.engine.dialect.do_savepoint(self, name)
1157 return name
1158
1159 def _rollback_to_savepoint_impl(self, name: str) -> None:
1160 if self._has_events or self.engine._has_events:
1161 self.dispatch.rollback_savepoint(self, name, None)
1162
1163 if self._still_open_and_dbapi_connection_is_valid:
1164 self.engine.dialect.do_rollback_to_savepoint(self, name)
1165
1166 def _release_savepoint_impl(self, name: str) -> None:
1167 if self._has_events or self.engine._has_events:
1168 self.dispatch.release_savepoint(self, name, None)
1169
1170 self.engine.dialect.do_release_savepoint(self, name)
1171
1172 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
1173 if self._echo:
1174 self._log_info("BEGIN TWOPHASE (implicit)")
1175 if self._has_events or self.engine._has_events:
1176 self.dispatch.begin_twophase(self, transaction.xid)
1177
1178 self.__in_begin = True
1179 try:
1180 self.engine.dialect.do_begin_twophase(self, transaction.xid)
1181 except BaseException as e:
1182 self._handle_dbapi_exception(e, None, None, None, None)
1183 finally:
1184 self.__in_begin = False
1185
1186 def _prepare_twophase_impl(self, xid: Any) -> None:
1187 if self._has_events or self.engine._has_events:
1188 self.dispatch.prepare_twophase(self, xid)
1189
1190 assert isinstance(self._transaction, TwoPhaseTransaction)
1191 try:
1192 self.engine.dialect.do_prepare_twophase(self, xid)
1193 except BaseException as e:
1194 self._handle_dbapi_exception(e, None, None, None, None)
1195
1196 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1197 if self._has_events or self.engine._has_events:
1198 self.dispatch.rollback_twophase(self, xid, is_prepared)
1199
1200 if self._still_open_and_dbapi_connection_is_valid:
1201 assert isinstance(self._transaction, TwoPhaseTransaction)
1202 try:
1203 self.engine.dialect.do_rollback_twophase(
1204 self, xid, is_prepared
1205 )
1206 except BaseException as e:
1207 self._handle_dbapi_exception(e, None, None, None, None)
1208
1209 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1210 if self._has_events or self.engine._has_events:
1211 self.dispatch.commit_twophase(self, xid, is_prepared)
1212
1213 assert isinstance(self._transaction, TwoPhaseTransaction)
1214 try:
1215 self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
1216 except BaseException as e:
1217 self._handle_dbapi_exception(e, None, None, None, None)
1218
1219 def close(self) -> None:
1220 """Close this :class:`_engine.Connection`.
1221
1222 This results in a release of the underlying database
1223 resources, that is, the DBAPI connection referenced
1224 internally. The DBAPI connection is typically restored
1225 back to the connection-holding :class:`_pool.Pool` referenced
1226 by the :class:`_engine.Engine` that produced this
1227 :class:`_engine.Connection`. Any transactional state present on
1228 the DBAPI connection is also unconditionally released via
1229 the DBAPI connection's ``rollback()`` method, regardless
1230 of any :class:`.Transaction` object that may be
1231 outstanding with regards to this :class:`_engine.Connection`.
1232
1233 This has the effect of also calling :meth:`_engine.Connection.rollback`
1234 if any transaction is in place.
1235
1236 After :meth:`_engine.Connection.close` is called, the
1237 :class:`_engine.Connection` is permanently in a closed state,
1238 and will allow no further operations.
1239
1240 """
1241
1242 if self._transaction:
1243 self._transaction.close()
1244 skip_reset = True
1245 else:
1246 skip_reset = False
1247
1248 if self._dbapi_connection is not None:
1249 conn = self._dbapi_connection
1250
1251 # as we just closed the transaction, close the connection
1252 # pool connection without doing an additional reset
1253 if skip_reset:
1254 cast("_ConnectionFairy", conn)._close_special(
1255 transaction_reset=True
1256 )
1257 else:
1258 conn.close()
1259
1260 # There is a slight chance that conn.close() may have
1261 # triggered an invalidation here in which case
1262 # _dbapi_connection would already be None, however usually
1263 # it will be non-None here and in a "closed" state.
1264 self._dbapi_connection = None
1265 self.__can_reconnect = False
1266
1267 @overload
1268 def scalar(
1269 self,
1270 statement: TypedReturnsRows[Tuple[_T]],
1271 parameters: Optional[_CoreSingleExecuteParams] = None,
1272 *,
1273 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1274 ) -> Optional[_T]: ...
1275
1276 @overload
1277 def scalar(
1278 self,
1279 statement: Executable,
1280 parameters: Optional[_CoreSingleExecuteParams] = None,
1281 *,
1282 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1283 ) -> Any: ...
1284
1285 def scalar(
1286 self,
1287 statement: Executable,
1288 parameters: Optional[_CoreSingleExecuteParams] = None,
1289 *,
1290 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1291 ) -> Any:
1292 r"""Executes a SQL statement construct and returns a scalar object.
1293
1294 This method is shorthand for invoking the
1295 :meth:`_engine.Result.scalar` method after invoking the
1296 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
1297
1298 :return: a scalar Python value representing the first column of the
1299 first row returned.
1300
1301 """
1302 distilled_parameters = _distill_params_20(parameters)
1303 try:
1304 meth = statement._execute_on_scalar
1305 except AttributeError as err:
1306 raise exc.ObjectNotExecutableError(statement) from err
1307 else:
1308 return meth(
1309 self,
1310 distilled_parameters,
1311 execution_options or NO_OPTIONS,
1312 )
1313
1314 @overload
1315 def scalars(
1316 self,
1317 statement: TypedReturnsRows[Tuple[_T]],
1318 parameters: Optional[_CoreAnyExecuteParams] = None,
1319 *,
1320 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1321 ) -> ScalarResult[_T]: ...
1322
1323 @overload
1324 def scalars(
1325 self,
1326 statement: Executable,
1327 parameters: Optional[_CoreAnyExecuteParams] = None,
1328 *,
1329 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1330 ) -> ScalarResult[Any]: ...
1331
1332 def scalars(
1333 self,
1334 statement: Executable,
1335 parameters: Optional[_CoreAnyExecuteParams] = None,
1336 *,
1337 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1338 ) -> ScalarResult[Any]:
1339 """Executes and returns a scalar result set, which yields scalar values
1340 from the first column of each row.
1341
1342 This method is equivalent to calling :meth:`_engine.Connection.execute`
1343 to receive a :class:`_result.Result` object, then invoking the
1344 :meth:`_result.Result.scalars` method to produce a
1345 :class:`_result.ScalarResult` instance.
1346
1347 :return: a :class:`_result.ScalarResult`
1348
1349 .. versionadded:: 1.4.24
1350
1351 """
1352
1353 return self.execute(
1354 statement, parameters, execution_options=execution_options
1355 ).scalars()
1356
1357 @overload
1358 def execute(
1359 self,
1360 statement: TypedReturnsRows[_T],
1361 parameters: Optional[_CoreAnyExecuteParams] = None,
1362 *,
1363 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1364 ) -> CursorResult[_T]: ...
1365
1366 @overload
1367 def execute(
1368 self,
1369 statement: Executable,
1370 parameters: Optional[_CoreAnyExecuteParams] = None,
1371 *,
1372 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1373 ) -> CursorResult[Any]: ...
1374
1375 def execute(
1376 self,
1377 statement: Executable,
1378 parameters: Optional[_CoreAnyExecuteParams] = None,
1379 *,
1380 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1381 ) -> CursorResult[Any]:
1382 r"""Executes a SQL statement construct and returns a
1383 :class:`_engine.CursorResult`.
1384
1385 :param statement: The statement to be executed. This is always
1386 an object that is in both the :class:`_expression.ClauseElement` and
1387 :class:`_expression.Executable` hierarchies, including:
1388
1389 * :class:`_expression.Select`
1390 * :class:`_expression.Insert`, :class:`_expression.Update`,
1391 :class:`_expression.Delete`
1392 * :class:`_expression.TextClause` and
1393 :class:`_expression.TextualSelect`
1394 * :class:`_schema.DDL` and objects which inherit from
1395 :class:`_schema.ExecutableDDLElement`
1396
1397 :param parameters: parameters which will be bound into the statement.
1398 This may be either a dictionary of parameter names to values,
1399 or a mutable sequence (e.g. a list) of dictionaries. When a
1400 list of dictionaries is passed, the underlying statement execution
1401 will make use of the DBAPI ``cursor.executemany()`` method.
1402 When a single dictionary is passed, the DBAPI ``cursor.execute()``
1403 method will be used.
1404
1405 :param execution_options: optional dictionary of execution options,
1406 which will be associated with the statement execution. This
1407 dictionary can provide a subset of the options that are accepted
1408 by :meth:`_engine.Connection.execution_options`.
1409
1410 :return: a :class:`_engine.Result` object.
1411
1412 """
1413 distilled_parameters = _distill_params_20(parameters)
1414 try:
1415 meth = statement._execute_on_connection
1416 except AttributeError as err:
1417 raise exc.ObjectNotExecutableError(statement) from err
1418 else:
1419 return meth(
1420 self,
1421 distilled_parameters,
1422 execution_options or NO_OPTIONS,
1423 )
1424
1425 def _execute_function(
1426 self,
1427 func: FunctionElement[Any],
1428 distilled_parameters: _CoreMultiExecuteParams,
1429 execution_options: CoreExecuteOptionsParameter,
1430 ) -> CursorResult[Any]:
1431 """Execute a sql.FunctionElement object."""
1432
1433 return self._execute_clauseelement(
1434 func.select(), distilled_parameters, execution_options
1435 )
1436
1437 def _execute_default(
1438 self,
1439 default: DefaultGenerator,
1440 distilled_parameters: _CoreMultiExecuteParams,
1441 execution_options: CoreExecuteOptionsParameter,
1442 ) -> Any:
1443 """Execute a schema.ColumnDefault object."""
1444
1445 execution_options = self._execution_options.merge_with(
1446 execution_options
1447 )
1448
1449 event_multiparams: Optional[_CoreMultiExecuteParams]
1450 event_params: Optional[_CoreAnyExecuteParams]
1451
1452 # note for event handlers, the "distilled parameters" which is always
1453 # a list of dicts is broken out into separate "multiparams" and
1454 # "params" collections, which allows the handler to distinguish
1455 # between an executemany and execute style set of parameters.
1456 if self._has_events or self.engine._has_events:
1457 (
1458 default,
1459 distilled_parameters,
1460 event_multiparams,
1461 event_params,
1462 ) = self._invoke_before_exec_event(
1463 default, distilled_parameters, execution_options
1464 )
1465 else:
1466 event_multiparams = event_params = None
1467
1468 try:
1469 conn = self._dbapi_connection
1470 if conn is None:
1471 conn = self._revalidate_connection()
1472
1473 dialect = self.dialect
1474 ctx = dialect.execution_ctx_cls._init_default(
1475 dialect, self, conn, execution_options
1476 )
1477 except (exc.PendingRollbackError, exc.ResourceClosedError):
1478 raise
1479 except BaseException as e:
1480 self._handle_dbapi_exception(e, None, None, None, None)
1481
1482 ret = ctx._exec_default(None, default, None)
1483
1484 if self._has_events or self.engine._has_events:
1485 self.dispatch.after_execute(
1486 self,
1487 default,
1488 event_multiparams,
1489 event_params,
1490 execution_options,
1491 ret,
1492 )
1493
1494 return ret
1495
1496 def _execute_ddl(
1497 self,
1498 ddl: ExecutableDDLElement,
1499 distilled_parameters: _CoreMultiExecuteParams,
1500 execution_options: CoreExecuteOptionsParameter,
1501 ) -> CursorResult[Any]:
1502 """Execute a schema.DDL object."""
1503
1504 exec_opts = ddl._execution_options.merge_with(
1505 self._execution_options, execution_options
1506 )
1507
1508 event_multiparams: Optional[_CoreMultiExecuteParams]
1509 event_params: Optional[_CoreSingleExecuteParams]
1510
1511 if self._has_events or self.engine._has_events:
1512 (
1513 ddl,
1514 distilled_parameters,
1515 event_multiparams,
1516 event_params,
1517 ) = self._invoke_before_exec_event(
1518 ddl, distilled_parameters, exec_opts
1519 )
1520 else:
1521 event_multiparams = event_params = None
1522
1523 schema_translate_map = exec_opts.get("schema_translate_map", None)
1524
1525 dialect = self.dialect
1526
1527 compiled = ddl.compile(
1528 dialect=dialect, schema_translate_map=schema_translate_map
1529 )
1530 ret = self._execute_context(
1531 dialect,
1532 dialect.execution_ctx_cls._init_ddl,
1533 compiled,
1534 None,
1535 exec_opts,
1536 compiled,
1537 )
1538 if self._has_events or self.engine._has_events:
1539 self.dispatch.after_execute(
1540 self,
1541 ddl,
1542 event_multiparams,
1543 event_params,
1544 exec_opts,
1545 ret,
1546 )
1547 return ret
1548
1549 def _invoke_before_exec_event(
1550 self,
1551 elem: Any,
1552 distilled_params: _CoreMultiExecuteParams,
1553 execution_options: _ExecuteOptions,
1554 ) -> Tuple[
1555 Any,
1556 _CoreMultiExecuteParams,
1557 _CoreMultiExecuteParams,
1558 _CoreSingleExecuteParams,
1559 ]:
1560 event_multiparams: _CoreMultiExecuteParams
1561 event_params: _CoreSingleExecuteParams
1562
1563 if len(distilled_params) == 1:
1564 event_multiparams, event_params = [], distilled_params[0]
1565 else:
1566 event_multiparams, event_params = distilled_params, {}
1567
1568 for fn in self.dispatch.before_execute:
1569 elem, event_multiparams, event_params = fn(
1570 self,
1571 elem,
1572 event_multiparams,
1573 event_params,
1574 execution_options,
1575 )
1576
1577 if event_multiparams:
1578 distilled_params = list(event_multiparams)
1579 if event_params:
1580 raise exc.InvalidRequestError(
1581 "Event handler can't return non-empty multiparams "
1582 "and params at the same time"
1583 )
1584 elif event_params:
1585 distilled_params = [event_params]
1586 else:
1587 distilled_params = []
1588
1589 return elem, distilled_params, event_multiparams, event_params
1590
1591 def _execute_clauseelement(
1592 self,
1593 elem: Executable,
1594 distilled_parameters: _CoreMultiExecuteParams,
1595 execution_options: CoreExecuteOptionsParameter,
1596 ) -> CursorResult[Any]:
1597 """Execute a sql.ClauseElement object."""
1598
1599 execution_options = elem._execution_options.merge_with(
1600 self._execution_options, execution_options
1601 )
1602
1603 has_events = self._has_events or self.engine._has_events
1604 if has_events:
1605 (
1606 elem,
1607 distilled_parameters,
1608 event_multiparams,
1609 event_params,
1610 ) = self._invoke_before_exec_event(
1611 elem, distilled_parameters, execution_options
1612 )
1613
1614 if distilled_parameters:
1615 # ensure we don't retain a link to the view object for keys()
1616 # which links to the values, which we don't want to cache
1617 keys = sorted(distilled_parameters[0])
1618 for_executemany = len(distilled_parameters) > 1
1619 else:
1620 keys = []
1621 for_executemany = False
1622
1623 dialect = self.dialect
1624
1625 schema_translate_map = execution_options.get(
1626 "schema_translate_map", None
1627 )
1628
1629 compiled_cache: Optional[CompiledCacheType] = execution_options.get(
1630 "compiled_cache", self.engine._compiled_cache
1631 )
1632
1633 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
1634 dialect=dialect,
1635 compiled_cache=compiled_cache,
1636 column_keys=keys,
1637 for_executemany=for_executemany,
1638 schema_translate_map=schema_translate_map,
1639 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1640 )
1641 ret = self._execute_context(
1642 dialect,
1643 dialect.execution_ctx_cls._init_compiled,
1644 compiled_sql,
1645 distilled_parameters,
1646 execution_options,
1647 compiled_sql,
1648 distilled_parameters,
1649 elem,
1650 extracted_params,
1651 cache_hit=cache_hit,
1652 )
1653 if has_events:
1654 self.dispatch.after_execute(
1655 self,
1656 elem,
1657 event_multiparams,
1658 event_params,
1659 execution_options,
1660 ret,
1661 )
1662 return ret
1663
1664 def _execute_compiled(
1665 self,
1666 compiled: Compiled,
1667 distilled_parameters: _CoreMultiExecuteParams,
1668 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
1669 ) -> CursorResult[Any]:
1670 """Execute a sql.Compiled object.
1671
1672 TODO: why do we have this? likely deprecate or remove
1673
1674 """
1675
1676 execution_options = compiled.execution_options.merge_with(
1677 self._execution_options, execution_options
1678 )
1679
1680 if self._has_events or self.engine._has_events:
1681 (
1682 compiled,
1683 distilled_parameters,
1684 event_multiparams,
1685 event_params,
1686 ) = self._invoke_before_exec_event(
1687 compiled, distilled_parameters, execution_options
1688 )
1689
1690 dialect = self.dialect
1691
1692 ret = self._execute_context(
1693 dialect,
1694 dialect.execution_ctx_cls._init_compiled,
1695 compiled,
1696 distilled_parameters,
1697 execution_options,
1698 compiled,
1699 distilled_parameters,
1700 None,
1701 None,
1702 )
1703 if self._has_events or self.engine._has_events:
1704 self.dispatch.after_execute(
1705 self,
1706 compiled,
1707 event_multiparams,
1708 event_params,
1709 execution_options,
1710 ret,
1711 )
1712 return ret
1713
1714 def exec_driver_sql(
1715 self,
1716 statement: str,
1717 parameters: Optional[_DBAPIAnyExecuteParams] = None,
1718 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1719 ) -> CursorResult[Any]:
1720 r"""Executes a string SQL statement on the DBAPI cursor directly,
1721 without any SQL compilation steps.
1722
1723 This can be used to pass any string directly to the
1724 ``cursor.execute()`` method of the DBAPI in use.
1725
1726 :param statement: The statement str to be executed. Bound parameters
1727 must use the underlying DBAPI's paramstyle, such as "qmark",
1728 "pyformat", "format", etc.
1729
1730 :param parameters: represent bound parameter values to be used in the
1731 execution. The format is one of: a dictionary of named parameters,
1732 a tuple of positional parameters, or a list containing either
1733 dictionaries or tuples for multiple-execute support.
1734
1735 :return: a :class:`_engine.CursorResult`.
1736
1737 E.g. multiple dictionaries::
1738
1739
1740 conn.exec_driver_sql(
1741 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1742 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}],
1743 )
1744
1745 Single dictionary::
1746
1747 conn.exec_driver_sql(
1748 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1749 dict(id=1, value="v1"),
1750 )
1751
1752 Single tuple::
1753
1754 conn.exec_driver_sql(
1755 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1")
1756 )
1757
1758 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
1759 not participate in the
1760 :meth:`_events.ConnectionEvents.before_execute` and
1761 :meth:`_events.ConnectionEvents.after_execute` events. To
1762 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
1763 :meth:`_events.ConnectionEvents.before_cursor_execute` and
1764 :meth:`_events.ConnectionEvents.after_cursor_execute`.
1765
1766 .. seealso::
1767
1768 :pep:`249`
1769
1770 """
1771
1772 distilled_parameters = _distill_raw_params(parameters)
1773
1774 execution_options = self._execution_options.merge_with(
1775 execution_options
1776 )
1777
1778 dialect = self.dialect
1779 ret = self._execute_context(
1780 dialect,
1781 dialect.execution_ctx_cls._init_statement,
1782 statement,
1783 None,
1784 execution_options,
1785 statement,
1786 distilled_parameters,
1787 )
1788
1789 return ret
1790
1791 def _execute_context(
1792 self,
1793 dialect: Dialect,
1794 constructor: Callable[..., ExecutionContext],
1795 statement: Union[str, Compiled],
1796 parameters: Optional[_AnyMultiExecuteParams],
1797 execution_options: _ExecuteOptions,
1798 *args: Any,
1799 **kw: Any,
1800 ) -> CursorResult[Any]:
1801 """Create an :class:`.ExecutionContext` and execute, returning
1802 a :class:`_engine.CursorResult`."""
1803
1804 if execution_options:
1805 yp = execution_options.get("yield_per", None)
1806 if yp:
1807 execution_options = execution_options.union(
1808 {"stream_results": True, "max_row_buffer": yp}
1809 )
1810 try:
1811 conn = self._dbapi_connection
1812 if conn is None:
1813 conn = self._revalidate_connection()
1814
1815 context = constructor(
1816 dialect, self, conn, execution_options, *args, **kw
1817 )
1818 except (exc.PendingRollbackError, exc.ResourceClosedError):
1819 raise
1820 except BaseException as e:
1821 self._handle_dbapi_exception(
1822 e, str(statement), parameters, None, None
1823 )
1824
1825 if (
1826 self._transaction
1827 and not self._transaction.is_active
1828 or (
1829 self._nested_transaction
1830 and not self._nested_transaction.is_active
1831 )
1832 ):
1833 self._invalid_transaction()
1834
1835 elif self._trans_context_manager:
1836 TransactionalContext._trans_ctx_check(self)
1837
1838 if self._transaction is None:
1839 self._autobegin()
1840
1841 context.pre_exec()
1842
1843 if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
1844 return self._exec_insertmany_context(dialect, context)
1845 else:
1846 return self._exec_single_context(
1847 dialect, context, statement, parameters
1848 )
1849
1850 def _exec_single_context(
1851 self,
1852 dialect: Dialect,
1853 context: ExecutionContext,
1854 statement: Union[str, Compiled],
1855 parameters: Optional[_AnyMultiExecuteParams],
1856 ) -> CursorResult[Any]:
1857 """continue the _execute_context() method for a single DBAPI
1858 cursor.execute() or cursor.executemany() call.
1859
1860 """
1861 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1862 generic_setinputsizes = context._prepare_set_input_sizes()
1863
1864 if generic_setinputsizes:
1865 try:
1866 dialect.do_set_input_sizes(
1867 context.cursor, generic_setinputsizes, context
1868 )
1869 except BaseException as e:
1870 self._handle_dbapi_exception(
1871 e, str(statement), parameters, None, context
1872 )
1873
1874 cursor, str_statement, parameters = (
1875 context.cursor,
1876 context.statement,
1877 context.parameters,
1878 )
1879
1880 effective_parameters: Optional[_AnyExecuteParams]
1881
1882 if not context.executemany:
1883 effective_parameters = parameters[0]
1884 else:
1885 effective_parameters = parameters
1886
1887 if self._has_events or self.engine._has_events:
1888 for fn in self.dispatch.before_cursor_execute:
1889 str_statement, effective_parameters = fn(
1890 self,
1891 cursor,
1892 str_statement,
1893 effective_parameters,
1894 context,
1895 context.executemany,
1896 )
1897
1898 if self._echo:
1899 self._log_info(str_statement)
1900
1901 stats = context._get_cache_stats()
1902
1903 if not self.engine.hide_parameters:
1904 self._log_info(
1905 "[%s] %r",
1906 stats,
1907 sql_util._repr_params(
1908 effective_parameters,
1909 batches=10,
1910 ismulti=context.executemany,
1911 ),
1912 )
1913 else:
1914 self._log_info(
1915 "[%s] [SQL parameters hidden due to hide_parameters=True]",
1916 stats,
1917 )
1918
1919 evt_handled: bool = False
1920 try:
1921 if context.execute_style is ExecuteStyle.EXECUTEMANY:
1922 effective_parameters = cast(
1923 "_CoreMultiExecuteParams", effective_parameters
1924 )
1925 if self.dialect._has_events:
1926 for fn in self.dialect.dispatch.do_executemany:
1927 if fn(
1928 cursor,
1929 str_statement,
1930 effective_parameters,
1931 context,
1932 ):
1933 evt_handled = True
1934 break
1935 if not evt_handled:
1936 self.dialect.do_executemany(
1937 cursor,
1938 str_statement,
1939 effective_parameters,
1940 context,
1941 )
1942 elif not effective_parameters and context.no_parameters:
1943 if self.dialect._has_events:
1944 for fn in self.dialect.dispatch.do_execute_no_params:
1945 if fn(cursor, str_statement, context):
1946 evt_handled = True
1947 break
1948 if not evt_handled:
1949 self.dialect.do_execute_no_params(
1950 cursor, str_statement, context
1951 )
1952 else:
1953 effective_parameters = cast(
1954 "_CoreSingleExecuteParams", effective_parameters
1955 )
1956 if self.dialect._has_events:
1957 for fn in self.dialect.dispatch.do_execute:
1958 if fn(
1959 cursor,
1960 str_statement,
1961 effective_parameters,
1962 context,
1963 ):
1964 evt_handled = True
1965 break
1966 if not evt_handled:
1967 self.dialect.do_execute(
1968 cursor, str_statement, effective_parameters, context
1969 )
1970
1971 if self._has_events or self.engine._has_events:
1972 self.dispatch.after_cursor_execute(
1973 self,
1974 cursor,
1975 str_statement,
1976 effective_parameters,
1977 context,
1978 context.executemany,
1979 )
1980
1981 context.post_exec()
1982
1983 result = context._setup_result_proxy()
1984
1985 except BaseException as e:
1986 self._handle_dbapi_exception(
1987 e, str_statement, effective_parameters, cursor, context
1988 )
1989
1990 return result
1991
1992 def _exec_insertmany_context(
1993 self,
1994 dialect: Dialect,
1995 context: ExecutionContext,
1996 ) -> CursorResult[Any]:
1997 """continue the _execute_context() method for an "insertmanyvalues"
1998 operation, which will invoke DBAPI
1999 cursor.execute() one or more times with individual log and
2000 event hook calls.
2001
2002 """
2003
2004 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
2005 generic_setinputsizes = context._prepare_set_input_sizes()
2006 else:
2007 generic_setinputsizes = None
2008
2009 cursor, str_statement, parameters = (
2010 context.cursor,
2011 context.statement,
2012 context.parameters,
2013 )
2014
2015 effective_parameters = parameters
2016
2017 engine_events = self._has_events or self.engine._has_events
2018 if self.dialect._has_events:
2019 do_execute_dispatch: Iterable[Any] = (
2020 self.dialect.dispatch.do_execute
2021 )
2022 else:
2023 do_execute_dispatch = ()
2024
2025 if self._echo:
2026 stats = context._get_cache_stats() + " (insertmanyvalues)"
2027
2028 preserve_rowcount = context.execution_options.get(
2029 "preserve_rowcount", False
2030 )
2031 rowcount = 0
2032
2033 for imv_batch in dialect._deliver_insertmanyvalues_batches(
2034 self,
2035 cursor,
2036 str_statement,
2037 effective_parameters,
2038 generic_setinputsizes,
2039 context,
2040 ):
2041 if imv_batch.processed_setinputsizes:
2042 try:
2043 dialect.do_set_input_sizes(
2044 context.cursor,
2045 imv_batch.processed_setinputsizes,
2046 context,
2047 )
2048 except BaseException as e:
2049 self._handle_dbapi_exception(
2050 e,
2051 sql_util._long_statement(imv_batch.replaced_statement),
2052 imv_batch.replaced_parameters,
2053 None,
2054 context,
2055 is_sub_exec=True,
2056 )
2057
2058 sub_stmt = imv_batch.replaced_statement
2059 sub_params = imv_batch.replaced_parameters
2060
2061 if engine_events:
2062 for fn in self.dispatch.before_cursor_execute:
2063 sub_stmt, sub_params = fn(
2064 self,
2065 cursor,
2066 sub_stmt,
2067 sub_params,
2068 context,
2069 True,
2070 )
2071
2072 if self._echo:
2073 self._log_info(sql_util._long_statement(sub_stmt))
2074
2075 imv_stats = f""" {imv_batch.batchnum}/{
2076 imv_batch.total_batches
2077 } ({
2078 'ordered'
2079 if imv_batch.rows_sorted else 'unordered'
2080 }{
2081 '; batch not supported'
2082 if imv_batch.is_downgraded
2083 else ''
2084 })"""
2085
2086 if imv_batch.batchnum == 1:
2087 stats += imv_stats
2088 else:
2089 stats = f"insertmanyvalues{imv_stats}"
2090
2091 if not self.engine.hide_parameters:
2092 self._log_info(
2093 "[%s] %r",
2094 stats,
2095 sql_util._repr_params(
2096 sub_params,
2097 batches=10,
2098 ismulti=False,
2099 ),
2100 )
2101 else:
2102 self._log_info(
2103 "[%s] [SQL parameters hidden due to "
2104 "hide_parameters=True]",
2105 stats,
2106 )
2107
2108 try:
2109 for fn in do_execute_dispatch:
2110 if fn(
2111 cursor,
2112 sub_stmt,
2113 sub_params,
2114 context,
2115 ):
2116 break
2117 else:
2118 dialect.do_execute(
2119 cursor,
2120 sub_stmt,
2121 sub_params,
2122 context,
2123 )
2124
2125 except BaseException as e:
2126 self._handle_dbapi_exception(
2127 e,
2128 sql_util._long_statement(sub_stmt),
2129 sub_params,
2130 cursor,
2131 context,
2132 is_sub_exec=True,
2133 )
2134
2135 if engine_events:
2136 self.dispatch.after_cursor_execute(
2137 self,
2138 cursor,
2139 str_statement,
2140 effective_parameters,
2141 context,
2142 context.executemany,
2143 )
2144
2145 if preserve_rowcount:
2146 rowcount += imv_batch.current_batch_size
2147
2148 try:
2149 context.post_exec()
2150
2151 if preserve_rowcount:
2152 context._rowcount = rowcount # type: ignore[attr-defined]
2153
2154 result = context._setup_result_proxy()
2155
2156 except BaseException as e:
2157 self._handle_dbapi_exception(
2158 e, str_statement, effective_parameters, cursor, context
2159 )
2160
2161 return result
2162
2163 def _cursor_execute(
2164 self,
2165 cursor: DBAPICursor,
2166 statement: str,
2167 parameters: _DBAPISingleExecuteParams,
2168 context: Optional[ExecutionContext] = None,
2169 ) -> None:
2170 """Execute a statement + params on the given cursor.
2171
2172 Adds appropriate logging and exception handling.
2173
2174 This method is used by DefaultDialect for special-case
2175 executions, such as for sequences and column defaults.
2176 The path of statement execution in the majority of cases
2177 terminates at _execute_context().
2178
2179 """
2180 if self._has_events or self.engine._has_events:
2181 for fn in self.dispatch.before_cursor_execute:
2182 statement, parameters = fn(
2183 self, cursor, statement, parameters, context, False
2184 )
2185
2186 if self._echo:
2187 self._log_info(statement)
2188 self._log_info("[raw sql] %r", parameters)
2189 try:
2190 for fn in (
2191 ()
2192 if not self.dialect._has_events
2193 else self.dialect.dispatch.do_execute
2194 ):
2195 if fn(cursor, statement, parameters, context):
2196 break
2197 else:
2198 self.dialect.do_execute(cursor, statement, parameters, context)
2199 except BaseException as e:
2200 self._handle_dbapi_exception(
2201 e, statement, parameters, cursor, context
2202 )
2203
2204 if self._has_events or self.engine._has_events:
2205 self.dispatch.after_cursor_execute(
2206 self, cursor, statement, parameters, context, False
2207 )
2208
2209 def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
2210 """Close the given cursor, catching exceptions
2211 and turning into log warnings.
2212
2213 """
2214 try:
2215 cursor.close()
2216 except Exception:
2217 # log the error through the connection pool's logger.
2218 self.engine.pool.logger.error(
2219 "Error closing cursor", exc_info=True
2220 )
2221
2222 _reentrant_error = False
2223 _is_disconnect = False
2224
2225 def _handle_dbapi_exception(
2226 self,
2227 e: BaseException,
2228 statement: Optional[str],
2229 parameters: Optional[_AnyExecuteParams],
2230 cursor: Optional[DBAPICursor],
2231 context: Optional[ExecutionContext],
2232 is_sub_exec: bool = False,
2233 ) -> NoReturn:
2234 exc_info = sys.exc_info()
2235
2236 is_exit_exception = util.is_exit_exception(e)
2237
2238 if not self._is_disconnect:
2239 self._is_disconnect = (
2240 isinstance(e, self.dialect.loaded_dbapi.Error)
2241 and not self.closed
2242 and self.dialect.is_disconnect(
2243 e,
2244 self._dbapi_connection if not self.invalidated else None,
2245 cursor,
2246 )
2247 ) or (is_exit_exception and not self.closed)
2248
2249 invalidate_pool_on_disconnect = not is_exit_exception
2250
2251 ismulti: bool = (
2252 not is_sub_exec and context.executemany
2253 if context is not None
2254 else False
2255 )
2256 if self._reentrant_error:
2257 raise exc.DBAPIError.instance(
2258 statement,
2259 parameters,
2260 e,
2261 self.dialect.loaded_dbapi.Error,
2262 hide_parameters=self.engine.hide_parameters,
2263 dialect=self.dialect,
2264 ismulti=ismulti,
2265 ).with_traceback(exc_info[2]) from e
2266 self._reentrant_error = True
2267 try:
2268 # non-DBAPI error - if we already got a context,
2269 # or there's no string statement, don't wrap it
2270 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or (
2271 statement is not None
2272 and context is None
2273 and not is_exit_exception
2274 )
2275
2276 if should_wrap:
2277 sqlalchemy_exception = exc.DBAPIError.instance(
2278 statement,
2279 parameters,
2280 cast(Exception, e),
2281 self.dialect.loaded_dbapi.Error,
2282 hide_parameters=self.engine.hide_parameters,
2283 connection_invalidated=self._is_disconnect,
2284 dialect=self.dialect,
2285 ismulti=ismulti,
2286 )
2287 else:
2288 sqlalchemy_exception = None
2289
2290 newraise = None
2291
2292 if (self.dialect._has_events) and not self._execution_options.get(
2293 "skip_user_error_events", False
2294 ):
2295 ctx = ExceptionContextImpl(
2296 e,
2297 sqlalchemy_exception,
2298 self.engine,
2299 self.dialect,
2300 self,
2301 cursor,
2302 statement,
2303 parameters,
2304 context,
2305 self._is_disconnect,
2306 invalidate_pool_on_disconnect,
2307 False,
2308 )
2309
2310 for fn in self.dialect.dispatch.handle_error:
2311 try:
2312 # handler returns an exception;
2313 # call next handler in a chain
2314 per_fn = fn(ctx)
2315 if per_fn is not None:
2316 ctx.chained_exception = newraise = per_fn
2317 except Exception as _raised:
2318 # handler raises an exception - stop processing
2319 newraise = _raised
2320 break
2321
2322 if self._is_disconnect != ctx.is_disconnect:
2323 self._is_disconnect = ctx.is_disconnect
2324 if sqlalchemy_exception:
2325 sqlalchemy_exception.connection_invalidated = (
2326 ctx.is_disconnect
2327 )
2328
2329 # set up potentially user-defined value for
2330 # invalidate pool.
2331 invalidate_pool_on_disconnect = (
2332 ctx.invalidate_pool_on_disconnect
2333 )
2334
2335 if should_wrap and context:
2336 context.handle_dbapi_exception(e)
2337
2338 if not self._is_disconnect:
2339 if cursor:
2340 self._safe_close_cursor(cursor)
2341 # "autorollback" was mostly relevant in 1.x series.
2342 # It's very unlikely to reach here, as the connection
2343 # does autobegin so when we are here, we are usually
2344 # in an explicit / semi-explicit transaction.
2345 # however we have a test which manufactures this
2346 # scenario in any case using an event handler.
2347 # test/engine/test_execute.py-> test_actual_autorollback
2348 if not self.in_transaction():
2349 self._rollback_impl()
2350
2351 if newraise:
2352 raise newraise.with_traceback(exc_info[2]) from e
2353 elif should_wrap:
2354 assert sqlalchemy_exception is not None
2355 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2356 else:
2357 assert exc_info[1] is not None
2358 raise exc_info[1].with_traceback(exc_info[2])
2359 finally:
2360 del self._reentrant_error
2361 if self._is_disconnect:
2362 del self._is_disconnect
2363 if not self.invalidated:
2364 dbapi_conn_wrapper = self._dbapi_connection
2365 assert dbapi_conn_wrapper is not None
2366 if invalidate_pool_on_disconnect:
2367 self.engine.pool._invalidate(dbapi_conn_wrapper, e)
2368 self.invalidate(e)
2369
2370 @classmethod
2371 def _handle_dbapi_exception_noconnection(
2372 cls,
2373 e: BaseException,
2374 dialect: Dialect,
2375 engine: Optional[Engine] = None,
2376 is_disconnect: Optional[bool] = None,
2377 invalidate_pool_on_disconnect: bool = True,
2378 is_pre_ping: bool = False,
2379 ) -> NoReturn:
2380 exc_info = sys.exc_info()
2381
2382 if is_disconnect is None:
2383 is_disconnect = isinstance(
2384 e, dialect.loaded_dbapi.Error
2385 ) and dialect.is_disconnect(e, None, None)
2386
2387 should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
2388
2389 if should_wrap:
2390 sqlalchemy_exception = exc.DBAPIError.instance(
2391 None,
2392 None,
2393 cast(Exception, e),
2394 dialect.loaded_dbapi.Error,
2395 hide_parameters=(
2396 engine.hide_parameters if engine is not None else False
2397 ),
2398 connection_invalidated=is_disconnect,
2399 dialect=dialect,
2400 )
2401 else:
2402 sqlalchemy_exception = None
2403
2404 newraise = None
2405
2406 if dialect._has_events:
2407 ctx = ExceptionContextImpl(
2408 e,
2409 sqlalchemy_exception,
2410 engine,
2411 dialect,
2412 None,
2413 None,
2414 None,
2415 None,
2416 None,
2417 is_disconnect,
2418 invalidate_pool_on_disconnect,
2419 is_pre_ping,
2420 )
2421 for fn in dialect.dispatch.handle_error:
2422 try:
2423 # handler returns an exception;
2424 # call next handler in a chain
2425 per_fn = fn(ctx)
2426 if per_fn is not None:
2427 ctx.chained_exception = newraise = per_fn
2428 except Exception as _raised:
2429 # handler raises an exception - stop processing
2430 newraise = _raised
2431 break
2432
2433 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
2434 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect
2435
2436 if newraise:
2437 raise newraise.with_traceback(exc_info[2]) from e
2438 elif should_wrap:
2439 assert sqlalchemy_exception is not None
2440 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2441 else:
2442 assert exc_info[1] is not None
2443 raise exc_info[1].with_traceback(exc_info[2])
2444
2445 def _run_ddl_visitor(
2446 self,
2447 visitorcallable: Type[InvokeDDLBase],
2448 element: SchemaVisitable,
2449 **kwargs: Any,
2450 ) -> None:
2451 """run a DDL visitor.
2452
2453 This method is only here so that the MockConnection can change the
2454 options given to the visitor so that "checkfirst" is skipped.
2455
2456 """
2457 visitorcallable(
2458 dialect=self.dialect, connection=self, **kwargs
2459 ).traverse_single(element)
2460
2461
2462class ExceptionContextImpl(ExceptionContext):
2463 """Implement the :class:`.ExceptionContext` interface."""
2464
2465 __slots__ = (
2466 "connection",
2467 "engine",
2468 "dialect",
2469 "cursor",
2470 "statement",
2471 "parameters",
2472 "original_exception",
2473 "sqlalchemy_exception",
2474 "chained_exception",
2475 "execution_context",
2476 "is_disconnect",
2477 "invalidate_pool_on_disconnect",
2478 "is_pre_ping",
2479 )
2480
2481 def __init__(
2482 self,
2483 exception: BaseException,
2484 sqlalchemy_exception: Optional[exc.StatementError],
2485 engine: Optional[Engine],
2486 dialect: Dialect,
2487 connection: Optional[Connection],
2488 cursor: Optional[DBAPICursor],
2489 statement: Optional[str],
2490 parameters: Optional[_DBAPIAnyExecuteParams],
2491 context: Optional[ExecutionContext],
2492 is_disconnect: bool,
2493 invalidate_pool_on_disconnect: bool,
2494 is_pre_ping: bool,
2495 ):
2496 self.engine = engine
2497 self.dialect = dialect
2498 self.connection = connection
2499 self.sqlalchemy_exception = sqlalchemy_exception
2500 self.original_exception = exception
2501 self.execution_context = context
2502 self.statement = statement
2503 self.parameters = parameters
2504 self.is_disconnect = is_disconnect
2505 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
2506 self.is_pre_ping = is_pre_ping
2507
2508
2509class Transaction(TransactionalContext):
2510 """Represent a database transaction in progress.
2511
2512 The :class:`.Transaction` object is procured by
2513 calling the :meth:`_engine.Connection.begin` method of
2514 :class:`_engine.Connection`::
2515
2516 from sqlalchemy import create_engine
2517
2518 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
2519 connection = engine.connect()
2520 trans = connection.begin()
2521 connection.execute(text("insert into x (a, b) values (1, 2)"))
2522 trans.commit()
2523
2524 The object provides :meth:`.rollback` and :meth:`.commit`
2525 methods in order to control transaction boundaries. It
2526 also implements a context manager interface so that
2527 the Python ``with`` statement can be used with the
2528 :meth:`_engine.Connection.begin` method::
2529
2530 with connection.begin():
2531 connection.execute(text("insert into x (a, b) values (1, 2)"))
2532
2533 The Transaction object is **not** threadsafe.
2534
2535 .. seealso::
2536
2537 :meth:`_engine.Connection.begin`
2538
2539 :meth:`_engine.Connection.begin_twophase`
2540
2541 :meth:`_engine.Connection.begin_nested`
2542
2543 .. index::
2544 single: thread safety; Transaction
2545 """ # noqa
2546
2547 __slots__ = ()
2548
2549 _is_root: bool = False
2550 is_active: bool
2551 connection: Connection
2552
2553 def __init__(self, connection: Connection):
2554 raise NotImplementedError()
2555
2556 @property
2557 def _deactivated_from_connection(self) -> bool:
2558 """True if this transaction is totally deactivated from the connection
2559 and therefore can no longer affect its state.
2560
2561 """
2562 raise NotImplementedError()
2563
2564 def _do_close(self) -> None:
2565 raise NotImplementedError()
2566
2567 def _do_rollback(self) -> None:
2568 raise NotImplementedError()
2569
2570 def _do_commit(self) -> None:
2571 raise NotImplementedError()
2572
2573 @property
2574 def is_valid(self) -> bool:
2575 return self.is_active and not self.connection.invalidated
2576
2577 def close(self) -> None:
2578 """Close this :class:`.Transaction`.
2579
2580 If this transaction is the base transaction in a begin/commit
2581 nesting, the transaction will rollback(). Otherwise, the
2582 method returns.
2583
2584 This is used to cancel a Transaction without affecting the scope of
2585 an enclosing transaction.
2586
2587 """
2588 try:
2589 self._do_close()
2590 finally:
2591 assert not self.is_active
2592
2593 def rollback(self) -> None:
2594 """Roll back this :class:`.Transaction`.
2595
2596 The implementation of this may vary based on the type of transaction in
2597 use:
2598
2599 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2600 it corresponds to a ROLLBACK.
2601
2602 * For a :class:`.NestedTransaction`, it corresponds to a
2603 "ROLLBACK TO SAVEPOINT" operation.
2604
2605 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2606 phase transactions may be used.
2607
2608
2609 """
2610 try:
2611 self._do_rollback()
2612 finally:
2613 assert not self.is_active
2614
2615 def commit(self) -> None:
2616 """Commit this :class:`.Transaction`.
2617
2618 The implementation of this may vary based on the type of transaction in
2619 use:
2620
2621 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2622 it corresponds to a COMMIT.
2623
2624 * For a :class:`.NestedTransaction`, it corresponds to a
2625 "RELEASE SAVEPOINT" operation.
2626
2627 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2628 phase transactions may be used.
2629
2630 """
2631 try:
2632 self._do_commit()
2633 finally:
2634 assert not self.is_active
2635
2636 def _get_subject(self) -> Connection:
2637 return self.connection
2638
2639 def _transaction_is_active(self) -> bool:
2640 return self.is_active
2641
2642 def _transaction_is_closed(self) -> bool:
2643 return not self._deactivated_from_connection
2644
2645 def _rollback_can_be_called(self) -> bool:
2646 # for RootTransaction / NestedTransaction, it's safe to call
2647 # rollback() even if the transaction is deactive and no warnings
2648 # will be emitted. tested in
2649 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
2650 return True
2651
2652
2653class RootTransaction(Transaction):
2654 """Represent the "root" transaction on a :class:`_engine.Connection`.
2655
2656 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
2657 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
2658 is created by calling upon the :meth:`_engine.Connection.begin` method, and
2659 remains associated with the :class:`_engine.Connection` throughout its
2660 active span. The current :class:`_engine.RootTransaction` in use is
2661 accessible via the :attr:`_engine.Connection.get_transaction` method of
2662 :class:`_engine.Connection`.
2663
2664 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs
2665 "autobegin" behavior that will create a new
2666 :class:`_engine.RootTransaction` whenever a connection in a
2667 non-transactional state is used to emit commands on the DBAPI connection.
2668 The scope of the :class:`_engine.RootTransaction` in 2.0 style
2669 use can be controlled using the :meth:`_engine.Connection.commit` and
2670 :meth:`_engine.Connection.rollback` methods.
2671
2672
2673 """
2674
2675 _is_root = True
2676
2677 __slots__ = ("connection", "is_active")
2678
2679 def __init__(self, connection: Connection):
2680 assert connection._transaction is None
2681 if connection._trans_context_manager:
2682 TransactionalContext._trans_ctx_check(connection)
2683 self.connection = connection
2684 self._connection_begin_impl()
2685 connection._transaction = self
2686
2687 self.is_active = True
2688
2689 def _deactivate_from_connection(self) -> None:
2690 if self.is_active:
2691 assert self.connection._transaction is self
2692 self.is_active = False
2693
2694 elif self.connection._transaction is not self:
2695 util.warn("transaction already deassociated from connection")
2696
2697 @property
2698 def _deactivated_from_connection(self) -> bool:
2699 return self.connection._transaction is not self
2700
2701 def _connection_begin_impl(self) -> None:
2702 self.connection._begin_impl(self)
2703
2704 def _connection_rollback_impl(self) -> None:
2705 self.connection._rollback_impl()
2706
2707 def _connection_commit_impl(self) -> None:
2708 self.connection._commit_impl()
2709
2710 def _close_impl(self, try_deactivate: bool = False) -> None:
2711 try:
2712 if self.is_active:
2713 self._connection_rollback_impl()
2714
2715 if self.connection._nested_transaction:
2716 self.connection._nested_transaction._cancel()
2717 finally:
2718 if self.is_active or try_deactivate:
2719 self._deactivate_from_connection()
2720 if self.connection._transaction is self:
2721 self.connection._transaction = None
2722
2723 assert not self.is_active
2724 assert self.connection._transaction is not self
2725
2726 def _do_close(self) -> None:
2727 self._close_impl()
2728
2729 def _do_rollback(self) -> None:
2730 self._close_impl(try_deactivate=True)
2731
2732 def _do_commit(self) -> None:
2733 if self.is_active:
2734 assert self.connection._transaction is self
2735
2736 try:
2737 self._connection_commit_impl()
2738 finally:
2739 # whether or not commit succeeds, cancel any
2740 # nested transactions, make this transaction "inactive"
2741 # and remove it as a reset agent
2742 if self.connection._nested_transaction:
2743 self.connection._nested_transaction._cancel()
2744
2745 self._deactivate_from_connection()
2746
2747 # ...however only remove as the connection's current transaction
2748 # if commit succeeded. otherwise it stays on so that a rollback
2749 # needs to occur.
2750 self.connection._transaction = None
2751 else:
2752 if self.connection._transaction is self:
2753 self.connection._invalid_transaction()
2754 else:
2755 raise exc.InvalidRequestError("This transaction is inactive")
2756
2757 assert not self.is_active
2758 assert self.connection._transaction is not self
2759
2760
2761class NestedTransaction(Transaction):
2762 """Represent a 'nested', or SAVEPOINT transaction.
2763
2764 The :class:`.NestedTransaction` object is created by calling the
2765 :meth:`_engine.Connection.begin_nested` method of
2766 :class:`_engine.Connection`.
2767
2768 When using :class:`.NestedTransaction`, the semantics of "begin" /
2769 "commit" / "rollback" are as follows:
2770
2771 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
2772 the savepoint is given an explicit name that is part of the state
2773 of this object.
2774
2775 * The :meth:`.NestedTransaction.commit` method corresponds to a
2776 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
2777 with this :class:`.NestedTransaction`.
2778
2779 * The :meth:`.NestedTransaction.rollback` method corresponds to a
2780 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
2781 associated with this :class:`.NestedTransaction`.
2782
2783 The rationale for mimicking the semantics of an outer transaction in
2784 terms of savepoints so that code may deal with a "savepoint" transaction
2785 and an "outer" transaction in an agnostic way.
2786
2787 .. seealso::
2788
2789 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
2790
2791 """
2792
2793 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
2794
2795 _savepoint: str
2796
2797 def __init__(self, connection: Connection):
2798 assert connection._transaction is not None
2799 if connection._trans_context_manager:
2800 TransactionalContext._trans_ctx_check(connection)
2801 self.connection = connection
2802 self._savepoint = self.connection._savepoint_impl()
2803 self.is_active = True
2804 self._previous_nested = connection._nested_transaction
2805 connection._nested_transaction = self
2806
2807 def _deactivate_from_connection(self, warn: bool = True) -> None:
2808 if self.connection._nested_transaction is self:
2809 self.connection._nested_transaction = self._previous_nested
2810 elif warn:
2811 util.warn(
2812 "nested transaction already deassociated from connection"
2813 )
2814
2815 @property
2816 def _deactivated_from_connection(self) -> bool:
2817 return self.connection._nested_transaction is not self
2818
2819 def _cancel(self) -> None:
2820 # called by RootTransaction when the outer transaction is
2821 # committed, rolled back, or closed to cancel all savepoints
2822 # without any action being taken
2823 self.is_active = False
2824 self._deactivate_from_connection()
2825 if self._previous_nested:
2826 self._previous_nested._cancel()
2827
2828 def _close_impl(
2829 self, deactivate_from_connection: bool, warn_already_deactive: bool
2830 ) -> None:
2831 try:
2832 if (
2833 self.is_active
2834 and self.connection._transaction
2835 and self.connection._transaction.is_active
2836 ):
2837 self.connection._rollback_to_savepoint_impl(self._savepoint)
2838 finally:
2839 self.is_active = False
2840
2841 if deactivate_from_connection:
2842 self._deactivate_from_connection(warn=warn_already_deactive)
2843
2844 assert not self.is_active
2845 if deactivate_from_connection:
2846 assert self.connection._nested_transaction is not self
2847
2848 def _do_close(self) -> None:
2849 self._close_impl(True, False)
2850
2851 def _do_rollback(self) -> None:
2852 self._close_impl(True, True)
2853
2854 def _do_commit(self) -> None:
2855 if self.is_active:
2856 try:
2857 self.connection._release_savepoint_impl(self._savepoint)
2858 finally:
2859 # nested trans becomes inactive on failed release
2860 # unconditionally. this prevents it from trying to
2861 # emit SQL when it rolls back.
2862 self.is_active = False
2863
2864 # but only de-associate from connection if it succeeded
2865 self._deactivate_from_connection()
2866 else:
2867 if self.connection._nested_transaction is self:
2868 self.connection._invalid_transaction()
2869 else:
2870 raise exc.InvalidRequestError(
2871 "This nested transaction is inactive"
2872 )
2873
2874
2875class TwoPhaseTransaction(RootTransaction):
2876 """Represent a two-phase transaction.
2877
2878 A new :class:`.TwoPhaseTransaction` object may be procured
2879 using the :meth:`_engine.Connection.begin_twophase` method.
2880
2881 The interface is the same as that of :class:`.Transaction`
2882 with the addition of the :meth:`prepare` method.
2883
2884 """
2885
2886 __slots__ = ("xid", "_is_prepared")
2887
2888 xid: Any
2889
2890 def __init__(self, connection: Connection, xid: Any):
2891 self._is_prepared = False
2892 self.xid = xid
2893 super().__init__(connection)
2894
2895 def prepare(self) -> None:
2896 """Prepare this :class:`.TwoPhaseTransaction`.
2897
2898 After a PREPARE, the transaction can be committed.
2899
2900 """
2901 if not self.is_active:
2902 raise exc.InvalidRequestError("This transaction is inactive")
2903 self.connection._prepare_twophase_impl(self.xid)
2904 self._is_prepared = True
2905
2906 def _connection_begin_impl(self) -> None:
2907 self.connection._begin_twophase_impl(self)
2908
2909 def _connection_rollback_impl(self) -> None:
2910 self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
2911
2912 def _connection_commit_impl(self) -> None:
2913 self.connection._commit_twophase_impl(self.xid, self._is_prepared)
2914
2915
2916class Engine(
2917 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"]
2918):
2919 """
2920 Connects a :class:`~sqlalchemy.pool.Pool` and
2921 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
2922 source of database connectivity and behavior.
2923
2924 An :class:`_engine.Engine` object is instantiated publicly using the
2925 :func:`~sqlalchemy.create_engine` function.
2926
2927 .. seealso::
2928
2929 :doc:`/core/engines`
2930
2931 :ref:`connections_toplevel`
2932
2933 """
2934
2935 dispatch: dispatcher[ConnectionEventsTarget]
2936
2937 _compiled_cache: Optional[CompiledCacheType]
2938
2939 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
2940 _has_events: bool = False
2941 _connection_cls: Type[Connection] = Connection
2942 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
2943 _is_future: bool = False
2944
2945 _schema_translate_map: Optional[SchemaTranslateMapType] = None
2946 _option_cls: Type[OptionEngine]
2947
2948 dialect: Dialect
2949 pool: Pool
2950 url: URL
2951 hide_parameters: bool
2952
2953 def __init__(
2954 self,
2955 pool: Pool,
2956 dialect: Dialect,
2957 url: URL,
2958 logging_name: Optional[str] = None,
2959 echo: Optional[_EchoFlagType] = None,
2960 query_cache_size: int = 500,
2961 execution_options: Optional[Mapping[str, Any]] = None,
2962 hide_parameters: bool = False,
2963 ):
2964 self.pool = pool
2965 self.url = url
2966 self.dialect = dialect
2967 if logging_name:
2968 self.logging_name = logging_name
2969 self.echo = echo
2970 self.hide_parameters = hide_parameters
2971 if query_cache_size != 0:
2972 self._compiled_cache = util.LRUCache(
2973 query_cache_size, size_alert=self._lru_size_alert
2974 )
2975 else:
2976 self._compiled_cache = None
2977 log.instance_logger(self, echoflag=echo)
2978 if execution_options:
2979 self.update_execution_options(**execution_options)
2980
2981 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
2982 if self._should_log_info():
2983 self.logger.info(
2984 "Compiled cache size pruning from %d items to %d. "
2985 "Increase cache size to reduce the frequency of pruning.",
2986 len(cache),
2987 cache.capacity,
2988 )
2989
2990 @property
2991 def engine(self) -> Engine:
2992 """Returns this :class:`.Engine`.
2993
2994 Used for legacy schemes that accept :class:`.Connection` /
2995 :class:`.Engine` objects within the same variable.
2996
2997 """
2998 return self
2999
3000 def clear_compiled_cache(self) -> None:
3001 """Clear the compiled cache associated with the dialect.
3002
3003 This applies **only** to the built-in cache that is established
3004 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
3005 It will not impact any dictionary caches that were passed via the
3006 :paramref:`.Connection.execution_options.compiled_cache` parameter.
3007
3008 .. versionadded:: 1.4
3009
3010 """
3011 if self._compiled_cache:
3012 self._compiled_cache.clear()
3013
3014 def update_execution_options(self, **opt: Any) -> None:
3015 r"""Update the default execution_options dictionary
3016 of this :class:`_engine.Engine`.
3017
3018 The given keys/values in \**opt are added to the
3019 default execution options that will be used for
3020 all connections. The initial contents of this dictionary
3021 can be sent via the ``execution_options`` parameter
3022 to :func:`_sa.create_engine`.
3023
3024 .. seealso::
3025
3026 :meth:`_engine.Connection.execution_options`
3027
3028 :meth:`_engine.Engine.execution_options`
3029
3030 """
3031 self.dispatch.set_engine_execution_options(self, opt)
3032 self._execution_options = self._execution_options.union(opt)
3033 self.dialect.set_engine_execution_options(self, opt)
3034
3035 @overload
3036 def execution_options(
3037 self,
3038 *,
3039 compiled_cache: Optional[CompiledCacheType] = ...,
3040 logging_token: str = ...,
3041 isolation_level: IsolationLevel = ...,
3042 insertmanyvalues_page_size: int = ...,
3043 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
3044 **opt: Any,
3045 ) -> OptionEngine: ...
3046
3047 @overload
3048 def execution_options(self, **opt: Any) -> OptionEngine: ...
3049
3050 def execution_options(self, **opt: Any) -> OptionEngine:
3051 """Return a new :class:`_engine.Engine` that will provide
3052 :class:`_engine.Connection` objects with the given execution options.
3053
3054 The returned :class:`_engine.Engine` remains related to the original
3055 :class:`_engine.Engine` in that it shares the same connection pool and
3056 other state:
3057
3058 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
3059 is the
3060 same instance. The :meth:`_engine.Engine.dispose`
3061 method will replace
3062 the connection pool instance for the parent engine as well
3063 as this one.
3064 * Event listeners are "cascaded" - meaning, the new
3065 :class:`_engine.Engine`
3066 inherits the events of the parent, and new events can be associated
3067 with the new :class:`_engine.Engine` individually.
3068 * The logging configuration and logging_name is copied from the parent
3069 :class:`_engine.Engine`.
3070
3071 The intent of the :meth:`_engine.Engine.execution_options` method is
3072 to implement schemes where multiple :class:`_engine.Engine`
3073 objects refer to the same connection pool, but are differentiated
3074 by options that affect some execution-level behavior for each
3075 engine. One such example is breaking into separate "reader" and
3076 "writer" :class:`_engine.Engine` instances, where one
3077 :class:`_engine.Engine`
3078 has a lower :term:`isolation level` setting configured or is even
3079 transaction-disabled using "autocommit". An example of this
3080 configuration is at :ref:`dbapi_autocommit_multiple`.
3081
3082 Another example is one that
3083 uses a custom option ``shard_id`` which is consumed by an event
3084 to change the current schema on a database connection::
3085
3086 from sqlalchemy import event
3087 from sqlalchemy.engine import Engine
3088
3089 primary_engine = create_engine("mysql+mysqldb://")
3090 shard1 = primary_engine.execution_options(shard_id="shard1")
3091 shard2 = primary_engine.execution_options(shard_id="shard2")
3092
3093 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}
3094
3095
3096 @event.listens_for(Engine, "before_cursor_execute")
3097 def _switch_shard(conn, cursor, stmt, params, context, executemany):
3098 shard_id = conn.get_execution_options().get("shard_id", "default")
3099 current_shard = conn.info.get("current_shard", None)
3100
3101 if current_shard != shard_id:
3102 cursor.execute("use %s" % shards[shard_id])
3103 conn.info["current_shard"] = shard_id
3104
3105 The above recipe illustrates two :class:`_engine.Engine` objects that
3106 will each serve as factories for :class:`_engine.Connection` objects
3107 that have pre-established "shard_id" execution options present. A
3108 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler
3109 then interprets this execution option to emit a MySQL ``use`` statement
3110 to switch databases before a statement execution, while at the same
3111 time keeping track of which database we've established using the
3112 :attr:`_engine.Connection.info` dictionary.
3113
3114 .. seealso::
3115
3116 :meth:`_engine.Connection.execution_options`
3117 - update execution options
3118 on a :class:`_engine.Connection` object.
3119
3120 :meth:`_engine.Engine.update_execution_options`
3121 - update the execution
3122 options for a given :class:`_engine.Engine` in place.
3123
3124 :meth:`_engine.Engine.get_execution_options`
3125
3126
3127 """ # noqa: E501
3128 return self._option_cls(self, opt)
3129
3130 def get_execution_options(self) -> _ExecuteOptions:
3131 """Get the non-SQL options which will take effect during execution.
3132
3133 .. versionadded: 1.3
3134
3135 .. seealso::
3136
3137 :meth:`_engine.Engine.execution_options`
3138 """
3139 return self._execution_options
3140
3141 @property
3142 def name(self) -> str:
3143 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3144 in use by this :class:`Engine`.
3145
3146 """
3147
3148 return self.dialect.name
3149
3150 @property
3151 def driver(self) -> str:
3152 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3153 in use by this :class:`Engine`.
3154
3155 """
3156
3157 return self.dialect.driver
3158
3159 echo = log.echo_property()
3160
3161 def __repr__(self) -> str:
3162 return "Engine(%r)" % (self.url,)
3163
3164 def dispose(self, close: bool = True) -> None:
3165 """Dispose of the connection pool used by this
3166 :class:`_engine.Engine`.
3167
3168 A new connection pool is created immediately after the old one has been
3169 disposed. The previous connection pool is disposed either actively, by
3170 closing out all currently checked-in connections in that pool, or
3171 passively, by losing references to it but otherwise not closing any
3172 connections. The latter strategy is more appropriate for an initializer
3173 in a forked Python process.
3174
3175 :param close: if left at its default of ``True``, has the
3176 effect of fully closing all **currently checked in**
3177 database connections. Connections that are still checked out
3178 will **not** be closed, however they will no longer be associated
3179 with this :class:`_engine.Engine`,
3180 so when they are closed individually, eventually the
3181 :class:`_pool.Pool` which they are associated with will
3182 be garbage collected and they will be closed out fully, if
3183 not already closed on checkin.
3184
3185 If set to ``False``, the previous connection pool is de-referenced,
3186 and otherwise not touched in any way.
3187
3188 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
3189 parameter to allow the replacement of a connection pool in a child
3190 process without interfering with the connections used by the parent
3191 process.
3192
3193
3194 .. seealso::
3195
3196 :ref:`engine_disposal`
3197
3198 :ref:`pooling_multiprocessing`
3199
3200 """
3201 if close:
3202 self.pool.dispose()
3203 self.pool = self.pool.recreate()
3204 self.dispatch.engine_disposed(self)
3205
3206 @contextlib.contextmanager
3207 def _optional_conn_ctx_manager(
3208 self, connection: Optional[Connection] = None
3209 ) -> Iterator[Connection]:
3210 if connection is None:
3211 with self.connect() as conn:
3212 yield conn
3213 else:
3214 yield connection
3215
3216 @contextlib.contextmanager
3217 def begin(self) -> Iterator[Connection]:
3218 """Return a context manager delivering a :class:`_engine.Connection`
3219 with a :class:`.Transaction` established.
3220
3221 E.g.::
3222
3223 with engine.begin() as conn:
3224 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)"))
3225 conn.execute(text("my_special_procedure(5)"))
3226
3227 Upon successful operation, the :class:`.Transaction`
3228 is committed. If an error is raised, the :class:`.Transaction`
3229 is rolled back.
3230
3231 .. seealso::
3232
3233 :meth:`_engine.Engine.connect` - procure a
3234 :class:`_engine.Connection` from
3235 an :class:`_engine.Engine`.
3236
3237 :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
3238 for a particular :class:`_engine.Connection`.
3239
3240 """ # noqa: E501
3241 with self.connect() as conn:
3242 with conn.begin():
3243 yield conn
3244
3245 def _run_ddl_visitor(
3246 self,
3247 visitorcallable: Type[InvokeDDLBase],
3248 element: SchemaVisitable,
3249 **kwargs: Any,
3250 ) -> None:
3251 with self.begin() as conn:
3252 conn._run_ddl_visitor(visitorcallable, element, **kwargs)
3253
3254 def connect(self) -> Connection:
3255 """Return a new :class:`_engine.Connection` object.
3256
3257 The :class:`_engine.Connection` acts as a Python context manager, so
3258 the typical use of this method looks like::
3259
3260 with engine.connect() as connection:
3261 connection.execute(text("insert into table values ('foo')"))
3262 connection.commit()
3263
3264 Where above, after the block is completed, the connection is "closed"
3265 and its underlying DBAPI resources are returned to the connection pool.
3266 This also has the effect of rolling back any transaction that
3267 was explicitly begun or was begun via autobegin, and will
3268 emit the :meth:`_events.ConnectionEvents.rollback` event if one was
3269 started and is still in progress.
3270
3271 .. seealso::
3272
3273 :meth:`_engine.Engine.begin`
3274
3275 """
3276
3277 return self._connection_cls(self)
3278
3279 def raw_connection(self) -> PoolProxiedConnection:
3280 """Return a "raw" DBAPI connection from the connection pool.
3281
3282 The returned object is a proxied version of the DBAPI
3283 connection object used by the underlying driver in use.
3284 The object will have all the same behavior as the real DBAPI
3285 connection, except that its ``close()`` method will result in the
3286 connection being returned to the pool, rather than being closed
3287 for real.
3288
3289 This method provides direct DBAPI connection access for
3290 special situations when the API provided by
3291 :class:`_engine.Connection`
3292 is not needed. When a :class:`_engine.Connection` object is already
3293 present, the DBAPI connection is available using
3294 the :attr:`_engine.Connection.connection` accessor.
3295
3296 .. seealso::
3297
3298 :ref:`dbapi_connections`
3299
3300 """
3301 return self.pool.connect()
3302
3303
3304class OptionEngineMixin(log.Identified):
3305 _sa_propagate_class_events = False
3306
3307 dispatch: dispatcher[ConnectionEventsTarget]
3308 _compiled_cache: Optional[CompiledCacheType]
3309 dialect: Dialect
3310 pool: Pool
3311 url: URL
3312 hide_parameters: bool
3313 echo: log.echo_property
3314
3315 def __init__(
3316 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter
3317 ):
3318 self._proxied = proxied
3319 self.url = proxied.url
3320 self.dialect = proxied.dialect
3321 self.logging_name = proxied.logging_name
3322 self.echo = proxied.echo
3323 self._compiled_cache = proxied._compiled_cache
3324 self.hide_parameters = proxied.hide_parameters
3325 log.instance_logger(self, echoflag=self.echo)
3326
3327 # note: this will propagate events that are assigned to the parent
3328 # engine after this OptionEngine is created. Since we share
3329 # the events of the parent we also disallow class-level events
3330 # to apply to the OptionEngine class directly.
3331 #
3332 # the other way this can work would be to transfer existing
3333 # events only, using:
3334 # self.dispatch._update(proxied.dispatch)
3335 #
3336 # that might be more appropriate however it would be a behavioral
3337 # change for logic that assigns events to the parent engine and
3338 # would like it to take effect for the already-created sub-engine.
3339 self.dispatch = self.dispatch._join(proxied.dispatch)
3340
3341 self._execution_options = proxied._execution_options
3342 self.update_execution_options(**execution_options)
3343
3344 def update_execution_options(self, **opt: Any) -> None:
3345 raise NotImplementedError()
3346
3347 if not typing.TYPE_CHECKING:
3348 # https://github.com/python/typing/discussions/1095
3349
3350 @property
3351 def pool(self) -> Pool:
3352 return self._proxied.pool
3353
3354 @pool.setter
3355 def pool(self, pool: Pool) -> None:
3356 self._proxied.pool = pool
3357
3358 @property
3359 def _has_events(self) -> bool:
3360 return self._proxied._has_events or self.__dict__.get(
3361 "_has_events", False
3362 )
3363
3364 @_has_events.setter
3365 def _has_events(self, value: bool) -> None:
3366 self.__dict__["_has_events"] = value
3367
3368
3369class OptionEngine(OptionEngineMixin, Engine):
3370 def update_execution_options(self, **opt: Any) -> None:
3371 Engine.update_execution_options(self, **opt)
3372
3373
3374Engine._option_cls = OptionEngine