1# engine/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.
8
9"""
10from __future__ import annotations
11
12import contextlib
13import sys
14import typing
15from typing import Any
16from typing import Callable
17from typing import cast
18from typing import Iterable
19from typing import Iterator
20from typing import List
21from typing import Mapping
22from typing import NoReturn
23from typing import Optional
24from typing import overload
25from typing import Tuple
26from typing import Type
27from typing import TypeVar
28from typing import Union
29
30from .interfaces import BindTyping
31from .interfaces import ConnectionEventsTarget
32from .interfaces import DBAPICursor
33from .interfaces import ExceptionContext
34from .interfaces import ExecuteStyle
35from .interfaces import ExecutionContext
36from .interfaces import IsolationLevel
37from .util import _distill_params_20
38from .util import _distill_raw_params
39from .util import TransactionalContext
40from .. import exc
41from .. import inspection
42from .. import log
43from .. import util
44from ..sql import compiler
45from ..sql import util as sql_util
46
47if typing.TYPE_CHECKING:
48 from . import CursorResult
49 from . import ScalarResult
50 from .interfaces import _AnyExecuteParams
51 from .interfaces import _AnyMultiExecuteParams
52 from .interfaces import _CoreAnyExecuteParams
53 from .interfaces import _CoreMultiExecuteParams
54 from .interfaces import _CoreSingleExecuteParams
55 from .interfaces import _DBAPIAnyExecuteParams
56 from .interfaces import _DBAPISingleExecuteParams
57 from .interfaces import _ExecuteOptions
58 from .interfaces import CompiledCacheType
59 from .interfaces import CoreExecuteOptionsParameter
60 from .interfaces import Dialect
61 from .interfaces import SchemaTranslateMapType
62 from .reflection import Inspector # noqa
63 from .url import URL
64 from ..event import dispatcher
65 from ..log import _EchoFlagType
66 from ..pool import _ConnectionFairy
67 from ..pool import Pool
68 from ..pool import PoolProxiedConnection
69 from ..sql import Executable
70 from ..sql._typing import _InfoType
71 from ..sql.compiler import Compiled
72 from ..sql.ddl import ExecutableDDLElement
73 from ..sql.ddl import InvokeDDLBase
74 from ..sql.functions import FunctionElement
75 from ..sql.schema import DefaultGenerator
76 from ..sql.schema import HasSchemaAttr
77 from ..sql.schema import SchemaVisitable
78 from ..sql.selectable import TypedReturnsRows
79
80
81_T = TypeVar("_T", bound=Any)
82_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
83NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
84
85
86class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
87 """Provides high-level functionality for a wrapped DB-API connection.
88
89 The :class:`_engine.Connection` object is procured by calling the
90 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
91 object, and provides services for execution of SQL statements as well
92 as transaction control.
93
94 The Connection object is **not** thread-safe. While a Connection can be
95 shared among threads using properly synchronized access, it is still
96 possible that the underlying DBAPI connection may not support shared
97 access between threads. Check the DBAPI documentation for details.
98
99 The Connection object represents a single DBAPI connection checked out
100 from the connection pool. In this state, the connection pool has no
101 affect upon the connection, including its expiration or timeout state.
102 For the connection pool to properly manage connections, connections
103 should be returned to the connection pool (i.e. ``connection.close()``)
104 whenever the connection is not in use.
105
106 .. index::
107 single: thread safety; Connection
108
109 """
110
111 dialect: Dialect
112 dispatch: dispatcher[ConnectionEventsTarget]
113
114 _sqla_logger_namespace = "sqlalchemy.engine.Connection"
115
116 # used by sqlalchemy.engine.util.TransactionalContext
117 _trans_context_manager: Optional[TransactionalContext] = None
118
119 # legacy as of 2.0, should be eventually deprecated and
120 # removed. was used in the "pre_ping" recipe that's been in the docs
121 # a long time
122 should_close_with_result = False
123
124 _dbapi_connection: Optional[PoolProxiedConnection]
125
126 _execution_options: _ExecuteOptions
127
128 _transaction: Optional[RootTransaction]
129 _nested_transaction: Optional[NestedTransaction]
130
131 def __init__(
132 self,
133 engine: Engine,
134 connection: Optional[PoolProxiedConnection] = None,
135 _has_events: Optional[bool] = None,
136 _allow_revalidate: bool = True,
137 _allow_autobegin: bool = True,
138 ):
139 """Construct a new Connection."""
140 self.engine = engine
141 self.dialect = dialect = engine.dialect
142
143 if connection is None:
144 try:
145 self._dbapi_connection = engine.raw_connection()
146 except dialect.loaded_dbapi.Error as err:
147 Connection._handle_dbapi_exception_noconnection(
148 err, dialect, engine
149 )
150 raise
151 else:
152 self._dbapi_connection = connection
153
154 self._transaction = self._nested_transaction = None
155 self.__savepoint_seq = 0
156 self.__in_begin = False
157
158 self.__can_reconnect = _allow_revalidate
159 self._allow_autobegin = _allow_autobegin
160 self._echo = self.engine._should_log_info()
161
162 if _has_events is None:
163 # if _has_events is sent explicitly as False,
164 # then don't join the dispatch of the engine; we don't
165 # want to handle any of the engine's events in that case.
166 self.dispatch = self.dispatch._join(engine.dispatch)
167 self._has_events = _has_events or (
168 _has_events is None and engine._has_events
169 )
170
171 self._execution_options = engine._execution_options
172
173 if self._has_events or self.engine._has_events:
174 self.dispatch.engine_connect(self)
175
176 # this can be assigned differently via
177 # characteristics.LoggingTokenCharacteristic
178 _message_formatter: Any = None
179
180 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
181 fmt = self._message_formatter
182
183 if fmt:
184 message = fmt(message)
185
186 if log.STACKLEVEL:
187 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
188
189 self.engine.logger.info(message, *arg, **kw)
190
191 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
192 fmt = self._message_formatter
193
194 if fmt:
195 message = fmt(message)
196
197 if log.STACKLEVEL:
198 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
199
200 self.engine.logger.debug(message, *arg, **kw)
201
202 @property
203 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]:
204 schema_translate_map: Optional[SchemaTranslateMapType] = (
205 self._execution_options.get("schema_translate_map", None)
206 )
207
208 return schema_translate_map
209
210 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
211 """Return the schema name for the given schema item taking into
212 account current schema translate map.
213
214 """
215
216 name = obj.schema
217 schema_translate_map: Optional[SchemaTranslateMapType] = (
218 self._execution_options.get("schema_translate_map", None)
219 )
220
221 if (
222 schema_translate_map
223 and name in schema_translate_map
224 and obj._use_schema_map
225 ):
226 return schema_translate_map[name]
227 else:
228 return name
229
230 def __enter__(self) -> Connection:
231 return self
232
233 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
234 self.close()
235
236 @overload
237 def execution_options(
238 self,
239 *,
240 compiled_cache: Optional[CompiledCacheType] = ...,
241 logging_token: str = ...,
242 isolation_level: IsolationLevel = ...,
243 no_parameters: bool = False,
244 stream_results: bool = False,
245 max_row_buffer: int = ...,
246 yield_per: int = ...,
247 insertmanyvalues_page_size: int = ...,
248 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
249 preserve_rowcount: bool = False,
250 **opt: Any,
251 ) -> Connection: ...
252
253 @overload
254 def execution_options(self, **opt: Any) -> Connection: ...
255
256 def execution_options(self, **opt: Any) -> Connection:
257 r"""Set non-SQL options for the connection which take effect
258 during execution.
259
260 This method modifies this :class:`_engine.Connection` **in-place**;
261 the return value is the same :class:`_engine.Connection` object
262 upon which the method is called. Note that this is in contrast
263 to the behavior of the ``execution_options`` methods on other
264 objects such as :meth:`_engine.Engine.execution_options` and
265 :meth:`_sql.Executable.execution_options`. The rationale is that many
266 such execution options necessarily modify the state of the base
267 DBAPI connection in any case so there is no feasible means of
268 keeping the effect of such an option localized to a "sub" connection.
269
270 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options`
271 method, in contrast to other objects with this method, modifies
272 the connection in-place without creating copy of it.
273
274 As discussed elsewhere, the :meth:`_engine.Connection.execution_options`
275 method accepts any arbitrary parameters including user defined names.
276 All parameters given are consumable in a number of ways including
277 by using the :meth:`_engine.Connection.get_execution_options` method.
278 See the examples at :meth:`_sql.Executable.execution_options`
279 and :meth:`_engine.Engine.execution_options`.
280
281 The keywords that are currently recognized by SQLAlchemy itself
282 include all those listed under :meth:`.Executable.execution_options`,
283 as well as others that are specific to :class:`_engine.Connection`.
284
285 :param compiled_cache: Available on: :class:`_engine.Connection`,
286 :class:`_engine.Engine`.
287
288 A dictionary where :class:`.Compiled` objects
289 will be cached when the :class:`_engine.Connection`
290 compiles a clause
291 expression into a :class:`.Compiled` object. This dictionary will
292 supersede the statement cache that may be configured on the
293 :class:`_engine.Engine` itself. If set to None, caching
294 is disabled, even if the engine has a configured cache size.
295
296 Note that the ORM makes use of its own "compiled" caches for
297 some operations, including flush operations. The caching
298 used by the ORM internally supersedes a cache dictionary
299 specified here.
300
301 :param logging_token: Available on: :class:`_engine.Connection`,
302 :class:`_engine.Engine`, :class:`_sql.Executable`.
303
304 Adds the specified string token surrounded by brackets in log
305 messages logged by the connection, i.e. the logging that's enabled
306 either via the :paramref:`_sa.create_engine.echo` flag or via the
307 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
308 per-connection or per-sub-engine token to be available which is
309 useful for debugging concurrent connection scenarios.
310
311 .. versionadded:: 1.4.0b2
312
313 .. seealso::
314
315 :ref:`dbengine_logging_tokens` - usage example
316
317 :paramref:`_sa.create_engine.logging_name` - adds a name to the
318 name used by the Python logger object itself.
319
320 :param isolation_level: Available on: :class:`_engine.Connection`,
321 :class:`_engine.Engine`.
322
323 Set the transaction isolation level for the lifespan of this
324 :class:`_engine.Connection` object.
325 Valid values include those string
326 values accepted by the :paramref:`_sa.create_engine.isolation_level`
327 parameter passed to :func:`_sa.create_engine`. These levels are
328 semi-database specific; see individual dialect documentation for
329 valid levels.
330
331 The isolation level option applies the isolation level by emitting
332 statements on the DBAPI connection, and **necessarily affects the
333 original Connection object overall**. The isolation level will remain
334 at the given setting until explicitly changed, or when the DBAPI
335 connection itself is :term:`released` to the connection pool, i.e. the
336 :meth:`_engine.Connection.close` method is called, at which time an
337 event handler will emit additional statements on the DBAPI connection
338 in order to revert the isolation level change.
339
340 .. note:: The ``isolation_level`` execution option may only be
341 established before the :meth:`_engine.Connection.begin` method is
342 called, as well as before any SQL statements are emitted which
343 would otherwise trigger "autobegin", or directly after a call to
344 :meth:`_engine.Connection.commit` or
345 :meth:`_engine.Connection.rollback`. A database cannot change the
346 isolation level on a transaction in progress.
347
348 .. note:: The ``isolation_level`` execution option is implicitly
349 reset if the :class:`_engine.Connection` is invalidated, e.g. via
350 the :meth:`_engine.Connection.invalidate` method, or if a
351 disconnection error occurs. The new connection produced after the
352 invalidation will **not** have the selected isolation level
353 re-applied to it automatically.
354
355 .. seealso::
356
357 :ref:`dbapi_autocommit`
358
359 :meth:`_engine.Connection.get_isolation_level`
360 - view current actual level
361
362 :param no_parameters: Available on: :class:`_engine.Connection`,
363 :class:`_sql.Executable`.
364
365 When ``True``, if the final parameter
366 list or dictionary is totally empty, will invoke the
367 statement on the cursor as ``cursor.execute(statement)``,
368 not passing the parameter collection at all.
369 Some DBAPIs such as psycopg2 and mysql-python consider
370 percent signs as significant only when parameters are
371 present; this option allows code to generate SQL
372 containing percent signs (and possibly other characters)
373 that is neutral regarding whether it's executed by the DBAPI
374 or piped into a script that's later invoked by
375 command line tools.
376
377 :param stream_results: Available on: :class:`_engine.Connection`,
378 :class:`_sql.Executable`.
379
380 Indicate to the dialect that results should be "streamed" and not
381 pre-buffered, if possible. For backends such as PostgreSQL, MySQL
382 and MariaDB, this indicates the use of a "server side cursor" as
383 opposed to a client side cursor. Other backends such as that of
384 Oracle Database may already use server side cursors by default.
385
386 The usage of
387 :paramref:`_engine.Connection.execution_options.stream_results` is
388 usually combined with setting a fixed number of rows to to be fetched
389 in batches, to allow for efficient iteration of database rows while
390 at the same time not loading all result rows into memory at once;
391 this can be configured on a :class:`_engine.Result` object using the
392 :meth:`_engine.Result.yield_per` method, after execution has
393 returned a new :class:`_engine.Result`. If
394 :meth:`_engine.Result.yield_per` is not used,
395 the :paramref:`_engine.Connection.execution_options.stream_results`
396 mode of operation will instead use a dynamically sized buffer
397 which buffers sets of rows at a time, growing on each batch
398 based on a fixed growth size up until a limit which may
399 be configured using the
400 :paramref:`_engine.Connection.execution_options.max_row_buffer`
401 parameter.
402
403 When using the ORM to fetch ORM mapped objects from a result,
404 :meth:`_engine.Result.yield_per` should always be used with
405 :paramref:`_engine.Connection.execution_options.stream_results`,
406 so that the ORM does not fetch all rows into new ORM objects at once.
407
408 For typical use, the
409 :paramref:`_engine.Connection.execution_options.yield_per` execution
410 option should be preferred, which sets up both
411 :paramref:`_engine.Connection.execution_options.stream_results` and
412 :meth:`_engine.Result.yield_per` at once. This option is supported
413 both at a core level by :class:`_engine.Connection` as well as by the
414 ORM :class:`_engine.Session`; the latter is described at
415 :ref:`orm_queryguide_yield_per`.
416
417 .. seealso::
418
419 :ref:`engine_stream_results` - background on
420 :paramref:`_engine.Connection.execution_options.stream_results`
421
422 :paramref:`_engine.Connection.execution_options.max_row_buffer`
423
424 :paramref:`_engine.Connection.execution_options.yield_per`
425
426 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
427 describing the ORM version of ``yield_per``
428
429 :param max_row_buffer: Available on: :class:`_engine.Connection`,
430 :class:`_sql.Executable`. Sets a maximum
431 buffer size to use when the
432 :paramref:`_engine.Connection.execution_options.stream_results`
433 execution option is used on a backend that supports server side
434 cursors. The default value if not specified is 1000.
435
436 .. seealso::
437
438 :paramref:`_engine.Connection.execution_options.stream_results`
439
440 :ref:`engine_stream_results`
441
442
443 :param yield_per: Available on: :class:`_engine.Connection`,
444 :class:`_sql.Executable`. Integer value applied which will
445 set the :paramref:`_engine.Connection.execution_options.stream_results`
446 execution option and invoke :meth:`_engine.Result.yield_per`
447 automatically at once. Allows equivalent functionality as
448 is present when using this parameter with the ORM.
449
450 .. versionadded:: 1.4.40
451
452 .. seealso::
453
454 :ref:`engine_stream_results` - background and examples
455 on using server side cursors with Core.
456
457 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
458 describing the ORM version of ``yield_per``
459
460 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`,
461 :class:`_engine.Engine`. Number of rows to format into an
462 INSERT statement when the statement uses "insertmanyvalues" mode,
463 which is a paged form of bulk insert that is used for many backends
464 when using :term:`executemany` execution typically in conjunction
465 with RETURNING. Defaults to 1000. May also be modified on a
466 per-engine basis using the
467 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
468
469 .. versionadded:: 2.0
470
471 .. seealso::
472
473 :ref:`engine_insertmanyvalues`
474
475 :param schema_translate_map: Available on: :class:`_engine.Connection`,
476 :class:`_engine.Engine`, :class:`_sql.Executable`.
477
478 A dictionary mapping schema names to schema names, that will be
479 applied to the :paramref:`_schema.Table.schema` element of each
480 :class:`_schema.Table`
481 encountered when SQL or DDL expression elements
482 are compiled into strings; the resulting schema name will be
483 converted based on presence in the map of the original name.
484
485 .. seealso::
486
487 :ref:`schema_translating`
488
489 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount``
490 attribute will be unconditionally memoized within the result and
491 made available via the :attr:`.CursorResult.rowcount` attribute.
492 Normally, this attribute is only preserved for UPDATE and DELETE
493 statements. Using this option, the DBAPIs rowcount value can
494 be accessed for other kinds of statements such as INSERT and SELECT,
495 to the degree that the DBAPI supports these statements. See
496 :attr:`.CursorResult.rowcount` for notes regarding the behavior
497 of this attribute.
498
499 .. versionadded:: 2.0.28
500
501 .. seealso::
502
503 :meth:`_engine.Engine.execution_options`
504
505 :meth:`.Executable.execution_options`
506
507 :meth:`_engine.Connection.get_execution_options`
508
509 :ref:`orm_queryguide_execution_options` - documentation on all
510 ORM-specific execution options
511
512 """ # noqa
513 if self._has_events or self.engine._has_events:
514 self.dispatch.set_connection_execution_options(self, opt)
515 self._execution_options = self._execution_options.union(opt)
516 self.dialect.set_connection_execution_options(self, opt)
517 return self
518
519 def get_execution_options(self) -> _ExecuteOptions:
520 """Get the non-SQL options which will take effect during execution.
521
522 .. versionadded:: 1.3
523
524 .. seealso::
525
526 :meth:`_engine.Connection.execution_options`
527 """
528 return self._execution_options
529
530 @property
531 def _still_open_and_dbapi_connection_is_valid(self) -> bool:
532 pool_proxied_connection = self._dbapi_connection
533 return (
534 pool_proxied_connection is not None
535 and pool_proxied_connection.is_valid
536 )
537
538 @property
539 def closed(self) -> bool:
540 """Return True if this connection is closed."""
541
542 return self._dbapi_connection is None and not self.__can_reconnect
543
544 @property
545 def invalidated(self) -> bool:
546 """Return True if this connection was invalidated.
547
548 This does not indicate whether or not the connection was
549 invalidated at the pool level, however
550
551 """
552
553 # prior to 1.4, "invalid" was stored as a state independent of
554 # "closed", meaning an invalidated connection could be "closed",
555 # the _dbapi_connection would be None and closed=True, yet the
556 # "invalid" flag would stay True. This meant that there were
557 # three separate states (open/valid, closed/valid, closed/invalid)
558 # when there is really no reason for that; a connection that's
559 # "closed" does not need to be "invalid". So the state is now
560 # represented by the two facts alone.
561
562 pool_proxied_connection = self._dbapi_connection
563 return pool_proxied_connection is None and self.__can_reconnect
564
565 @property
566 def connection(self) -> PoolProxiedConnection:
567 """The underlying DB-API connection managed by this Connection.
568
569 This is a SQLAlchemy connection-pool proxied connection
570 which then has the attribute
571 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
572 actual driver connection.
573
574 .. seealso::
575
576
577 :ref:`dbapi_connections`
578
579 """
580
581 if self._dbapi_connection is None:
582 try:
583 return self._revalidate_connection()
584 except (exc.PendingRollbackError, exc.ResourceClosedError):
585 raise
586 except BaseException as e:
587 self._handle_dbapi_exception(e, None, None, None, None)
588 else:
589 return self._dbapi_connection
590
591 def get_isolation_level(self) -> IsolationLevel:
592 """Return the current **actual** isolation level that's present on
593 the database within the scope of this connection.
594
595 This attribute will perform a live SQL operation against the database
596 in order to procure the current isolation level, so the value returned
597 is the actual level on the underlying DBAPI connection regardless of
598 how this state was set. This will be one of the four actual isolation
599 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
600 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
601 level setting. Third party dialects may also feature additional
602 isolation level settings.
603
604 .. note:: This method **will not report** on the ``AUTOCOMMIT``
605 isolation level, which is a separate :term:`dbapi` setting that's
606 independent of **actual** isolation level. When ``AUTOCOMMIT`` is
607 in use, the database connection still has a "traditional" isolation
608 mode in effect, that is typically one of the four values
609 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
610 ``SERIALIZABLE``.
611
612 Compare to the :attr:`_engine.Connection.default_isolation_level`
613 accessor which returns the isolation level that is present on the
614 database at initial connection time.
615
616 .. seealso::
617
618 :attr:`_engine.Connection.default_isolation_level`
619 - view default level
620
621 :paramref:`_sa.create_engine.isolation_level`
622 - set per :class:`_engine.Engine` isolation level
623
624 :paramref:`.Connection.execution_options.isolation_level`
625 - set per :class:`_engine.Connection` isolation level
626
627 """
628 dbapi_connection = self.connection.dbapi_connection
629 assert dbapi_connection is not None
630 try:
631 return self.dialect.get_isolation_level(dbapi_connection)
632 except BaseException as e:
633 self._handle_dbapi_exception(e, None, None, None, None)
634
635 @property
636 def default_isolation_level(self) -> Optional[IsolationLevel]:
637 """The initial-connection time isolation level associated with the
638 :class:`_engine.Dialect` in use.
639
640 This value is independent of the
641 :paramref:`.Connection.execution_options.isolation_level` and
642 :paramref:`.Engine.execution_options.isolation_level` execution
643 options, and is determined by the :class:`_engine.Dialect` when the
644 first connection is created, by performing a SQL query against the
645 database for the current isolation level before any additional commands
646 have been emitted.
647
648 Calling this accessor does not invoke any new SQL queries.
649
650 .. seealso::
651
652 :meth:`_engine.Connection.get_isolation_level`
653 - view current actual isolation level
654
655 :paramref:`_sa.create_engine.isolation_level`
656 - set per :class:`_engine.Engine` isolation level
657
658 :paramref:`.Connection.execution_options.isolation_level`
659 - set per :class:`_engine.Connection` isolation level
660
661 """
662 return self.dialect.default_isolation_level
663
664 def _invalid_transaction(self) -> NoReturn:
665 raise exc.PendingRollbackError(
666 "Can't reconnect until invalid %stransaction is rolled "
667 "back. Please rollback() fully before proceeding"
668 % ("savepoint " if self._nested_transaction is not None else ""),
669 code="8s2b",
670 )
671
672 def _revalidate_connection(self) -> PoolProxiedConnection:
673 if self.__can_reconnect and self.invalidated:
674 if self._transaction is not None:
675 self._invalid_transaction()
676 self._dbapi_connection = self.engine.raw_connection()
677 return self._dbapi_connection
678 raise exc.ResourceClosedError("This Connection is closed")
679
680 @property
681 def info(self) -> _InfoType:
682 """Info dictionary associated with the underlying DBAPI connection
683 referred to by this :class:`_engine.Connection`, allowing user-defined
684 data to be associated with the connection.
685
686 The data here will follow along with the DBAPI connection including
687 after it is returned to the connection pool and used again
688 in subsequent instances of :class:`_engine.Connection`.
689
690 """
691
692 return self.connection.info
693
694 def invalidate(self, exception: Optional[BaseException] = None) -> None:
695 """Invalidate the underlying DBAPI connection associated with
696 this :class:`_engine.Connection`.
697
698 An attempt will be made to close the underlying DBAPI connection
699 immediately; however if this operation fails, the error is logged
700 but not raised. The connection is then discarded whether or not
701 close() succeeded.
702
703 Upon the next use (where "use" typically means using the
704 :meth:`_engine.Connection.execute` method or similar),
705 this :class:`_engine.Connection` will attempt to
706 procure a new DBAPI connection using the services of the
707 :class:`_pool.Pool` as a source of connectivity (e.g.
708 a "reconnection").
709
710 If a transaction was in progress (e.g. the
711 :meth:`_engine.Connection.begin` method has been called) when
712 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
713 level all state associated with this transaction is lost, as
714 the DBAPI connection is closed. The :class:`_engine.Connection`
715 will not allow a reconnection to proceed until the
716 :class:`.Transaction` object is ended, by calling the
717 :meth:`.Transaction.rollback` method; until that point, any attempt at
718 continuing to use the :class:`_engine.Connection` will raise an
719 :class:`~sqlalchemy.exc.InvalidRequestError`.
720 This is to prevent applications from accidentally
721 continuing an ongoing transactional operations despite the
722 fact that the transaction has been lost due to an
723 invalidation.
724
725 The :meth:`_engine.Connection.invalidate` method,
726 just like auto-invalidation,
727 will at the connection pool level invoke the
728 :meth:`_events.PoolEvents.invalidate` event.
729
730 :param exception: an optional ``Exception`` instance that's the
731 reason for the invalidation. is passed along to event handlers
732 and logging functions.
733
734 .. seealso::
735
736 :ref:`pool_connection_invalidation`
737
738 """
739
740 if self.invalidated:
741 return
742
743 if self.closed:
744 raise exc.ResourceClosedError("This Connection is closed")
745
746 if self._still_open_and_dbapi_connection_is_valid:
747 pool_proxied_connection = self._dbapi_connection
748 assert pool_proxied_connection is not None
749 pool_proxied_connection.invalidate(exception)
750
751 self._dbapi_connection = None
752
753 def detach(self) -> None:
754 """Detach the underlying DB-API connection from its connection pool.
755
756 E.g.::
757
758 with engine.connect() as conn:
759 conn.detach()
760 conn.execute(text("SET search_path TO schema1, schema2"))
761
762 # work with connection
763
764 # connection is fully closed (since we used "with:", can
765 # also call .close())
766
767 This :class:`_engine.Connection` instance will remain usable.
768 When closed
769 (or exited from a context manager context as above),
770 the DB-API connection will be literally closed and not
771 returned to its originating pool.
772
773 This method can be used to insulate the rest of an application
774 from a modified state on a connection (such as a transaction
775 isolation level or similar).
776
777 """
778
779 if self.closed:
780 raise exc.ResourceClosedError("This Connection is closed")
781
782 pool_proxied_connection = self._dbapi_connection
783 if pool_proxied_connection is None:
784 raise exc.InvalidRequestError(
785 "Can't detach an invalidated Connection"
786 )
787 pool_proxied_connection.detach()
788
789 def _autobegin(self) -> None:
790 if self._allow_autobegin and not self.__in_begin:
791 self.begin()
792
793 def begin(self) -> RootTransaction:
794 """Begin a transaction prior to autobegin occurring.
795
796 E.g.::
797
798 with engine.connect() as conn:
799 with conn.begin() as trans:
800 conn.execute(table.insert(), {"username": "sandy"})
801
802 The returned object is an instance of :class:`_engine.RootTransaction`.
803 This object represents the "scope" of the transaction,
804 which completes when either the :meth:`_engine.Transaction.rollback`
805 or :meth:`_engine.Transaction.commit` method is called; the object
806 also works as a context manager as illustrated above.
807
808 The :meth:`_engine.Connection.begin` method begins a
809 transaction that normally will be begun in any case when the connection
810 is first used to execute a statement. The reason this method might be
811 used would be to invoke the :meth:`_events.ConnectionEvents.begin`
812 event at a specific time, or to organize code within the scope of a
813 connection checkout in terms of context managed blocks, such as::
814
815 with engine.connect() as conn:
816 with conn.begin():
817 conn.execute(...)
818 conn.execute(...)
819
820 with conn.begin():
821 conn.execute(...)
822 conn.execute(...)
823
824 The above code is not fundamentally any different in its behavior than
825 the following code which does not use
826 :meth:`_engine.Connection.begin`; the below style is known
827 as "commit as you go" style::
828
829 with engine.connect() as conn:
830 conn.execute(...)
831 conn.execute(...)
832 conn.commit()
833
834 conn.execute(...)
835 conn.execute(...)
836 conn.commit()
837
838 From a database point of view, the :meth:`_engine.Connection.begin`
839 method does not emit any SQL or change the state of the underlying
840 DBAPI connection in any way; the Python DBAPI does not have any
841 concept of explicit transaction begin.
842
843 .. seealso::
844
845 :ref:`tutorial_working_with_transactions` - in the
846 :ref:`unified_tutorial`
847
848 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
849
850 :meth:`_engine.Connection.begin_twophase` -
851 use a two phase /XID transaction
852
853 :meth:`_engine.Engine.begin` - context manager available from
854 :class:`_engine.Engine`
855
856 """
857 if self._transaction is None:
858 self._transaction = RootTransaction(self)
859 return self._transaction
860 else:
861 raise exc.InvalidRequestError(
862 "This connection has already initialized a SQLAlchemy "
863 "Transaction() object via begin() or autobegin; can't "
864 "call begin() here unless rollback() or commit() "
865 "is called first."
866 )
867
868 def begin_nested(self) -> NestedTransaction:
869 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
870 handle that controls the scope of the SAVEPOINT.
871
872 E.g.::
873
874 with engine.begin() as connection:
875 with connection.begin_nested():
876 connection.execute(table.insert(), {"username": "sandy"})
877
878 The returned object is an instance of
879 :class:`_engine.NestedTransaction`, which includes transactional
880 methods :meth:`_engine.NestedTransaction.commit` and
881 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction,
882 these methods correspond to the operations "RELEASE SAVEPOINT <name>"
883 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local
884 to the :class:`_engine.NestedTransaction` object and is generated
885 automatically. Like any other :class:`_engine.Transaction`, the
886 :class:`_engine.NestedTransaction` may be used as a context manager as
887 illustrated above which will "release" or "rollback" corresponding to
888 if the operation within the block were successful or raised an
889 exception.
890
891 Nested transactions require SAVEPOINT support in the underlying
892 database, else the behavior is undefined. SAVEPOINT is commonly used to
893 run operations within a transaction that may fail, while continuing the
894 outer transaction. E.g.::
895
896 from sqlalchemy import exc
897
898 with engine.begin() as connection:
899 trans = connection.begin_nested()
900 try:
901 connection.execute(table.insert(), {"username": "sandy"})
902 trans.commit()
903 except exc.IntegrityError: # catch for duplicate username
904 trans.rollback() # rollback to savepoint
905
906 # outer transaction continues
907 connection.execute(...)
908
909 If :meth:`_engine.Connection.begin_nested` is called without first
910 calling :meth:`_engine.Connection.begin` or
911 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object
912 will "autobegin" the outer transaction first. This outer transaction
913 may be committed using "commit-as-you-go" style, e.g.::
914
915 with engine.connect() as connection: # begin() wasn't called
916
917 with connection.begin_nested(): # will auto-"begin()" first
918 connection.execute(...)
919 # savepoint is released
920
921 connection.execute(...)
922
923 # explicitly commit outer transaction
924 connection.commit()
925
926 # can continue working with connection here
927
928 .. versionchanged:: 2.0
929
930 :meth:`_engine.Connection.begin_nested` will now participate
931 in the connection "autobegin" behavior that is new as of
932 2.0 / "future" style connections in 1.4.
933
934 .. seealso::
935
936 :meth:`_engine.Connection.begin`
937
938 :ref:`session_begin_nested` - ORM support for SAVEPOINT
939
940 """
941 if self._transaction is None:
942 self._autobegin()
943
944 return NestedTransaction(self)
945
946 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
947 """Begin a two-phase or XA transaction and return a transaction
948 handle.
949
950 The returned object is an instance of :class:`.TwoPhaseTransaction`,
951 which in addition to the methods provided by
952 :class:`.Transaction`, also provides a
953 :meth:`~.TwoPhaseTransaction.prepare` method.
954
955 :param xid: the two phase transaction id. If not supplied, a
956 random id will be generated.
957
958 .. seealso::
959
960 :meth:`_engine.Connection.begin`
961
962 :meth:`_engine.Connection.begin_twophase`
963
964 """
965
966 if self._transaction is not None:
967 raise exc.InvalidRequestError(
968 "Cannot start a two phase transaction when a transaction "
969 "is already in progress."
970 )
971 if xid is None:
972 xid = self.engine.dialect.create_xid()
973 return TwoPhaseTransaction(self, xid)
974
975 def commit(self) -> None:
976 """Commit the transaction that is currently in progress.
977
978 This method commits the current transaction if one has been started.
979 If no transaction was started, the method has no effect, assuming
980 the connection is in a non-invalidated state.
981
982 A transaction is begun on a :class:`_engine.Connection` automatically
983 whenever a statement is first executed, or when the
984 :meth:`_engine.Connection.begin` method is called.
985
986 .. note:: The :meth:`_engine.Connection.commit` method only acts upon
987 the primary database transaction that is linked to the
988 :class:`_engine.Connection` object. It does not operate upon a
989 SAVEPOINT that would have been invoked from the
990 :meth:`_engine.Connection.begin_nested` method; for control of a
991 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
992 :class:`_engine.NestedTransaction` that is returned by the
993 :meth:`_engine.Connection.begin_nested` method itself.
994
995
996 """
997 if self._transaction:
998 self._transaction.commit()
999
1000 def rollback(self) -> None:
1001 """Roll back the transaction that is currently in progress.
1002
1003 This method rolls back the current transaction if one has been started.
1004 If no transaction was started, the method has no effect. If a
1005 transaction was started and the connection is in an invalidated state,
1006 the transaction is cleared using this method.
1007
1008 A transaction is begun on a :class:`_engine.Connection` automatically
1009 whenever a statement is first executed, or when the
1010 :meth:`_engine.Connection.begin` method is called.
1011
1012 .. note:: The :meth:`_engine.Connection.rollback` method only acts
1013 upon the primary database transaction that is linked to the
1014 :class:`_engine.Connection` object. It does not operate upon a
1015 SAVEPOINT that would have been invoked from the
1016 :meth:`_engine.Connection.begin_nested` method; for control of a
1017 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
1018 :class:`_engine.NestedTransaction` that is returned by the
1019 :meth:`_engine.Connection.begin_nested` method itself.
1020
1021
1022 """
1023 if self._transaction:
1024 self._transaction.rollback()
1025
1026 def recover_twophase(self) -> List[Any]:
1027 return self.engine.dialect.do_recover_twophase(self)
1028
1029 def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
1030 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
1031
1032 def commit_prepared(self, xid: Any, recover: bool = False) -> None:
1033 self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
1034
1035 def in_transaction(self) -> bool:
1036 """Return True if a transaction is in progress."""
1037 return self._transaction is not None and self._transaction.is_active
1038
1039 def in_nested_transaction(self) -> bool:
1040 """Return True if a transaction is in progress."""
1041 return (
1042 self._nested_transaction is not None
1043 and self._nested_transaction.is_active
1044 )
1045
1046 def _is_autocommit_isolation(self) -> bool:
1047 opt_iso = self._execution_options.get("isolation_level", None)
1048 return bool(
1049 opt_iso == "AUTOCOMMIT"
1050 or (
1051 opt_iso is None
1052 and self.engine.dialect._on_connect_isolation_level
1053 == "AUTOCOMMIT"
1054 )
1055 )
1056
1057 def _get_required_transaction(self) -> RootTransaction:
1058 trans = self._transaction
1059 if trans is None:
1060 raise exc.InvalidRequestError("connection is not in a transaction")
1061 return trans
1062
1063 def _get_required_nested_transaction(self) -> NestedTransaction:
1064 trans = self._nested_transaction
1065 if trans is None:
1066 raise exc.InvalidRequestError(
1067 "connection is not in a nested transaction"
1068 )
1069 return trans
1070
1071 def get_transaction(self) -> Optional[RootTransaction]:
1072 """Return the current root transaction in progress, if any.
1073
1074 .. versionadded:: 1.4
1075
1076 """
1077
1078 return self._transaction
1079
1080 def get_nested_transaction(self) -> Optional[NestedTransaction]:
1081 """Return the current nested transaction in progress, if any.
1082
1083 .. versionadded:: 1.4
1084
1085 """
1086 return self._nested_transaction
1087
1088 def _begin_impl(self, transaction: RootTransaction) -> None:
1089 if self._echo:
1090 if self._is_autocommit_isolation():
1091 self._log_info(
1092 "BEGIN (implicit; DBAPI should not BEGIN due to "
1093 "autocommit mode)"
1094 )
1095 else:
1096 self._log_info("BEGIN (implicit)")
1097
1098 self.__in_begin = True
1099
1100 if self._has_events or self.engine._has_events:
1101 self.dispatch.begin(self)
1102
1103 try:
1104 self.engine.dialect.do_begin(self.connection)
1105 except BaseException as e:
1106 self._handle_dbapi_exception(e, None, None, None, None)
1107 finally:
1108 self.__in_begin = False
1109
1110 def _rollback_impl(self) -> None:
1111 if self._has_events or self.engine._has_events:
1112 self.dispatch.rollback(self)
1113
1114 if self._still_open_and_dbapi_connection_is_valid:
1115 if self._echo:
1116 if self._is_autocommit_isolation():
1117 self._log_info(
1118 "ROLLBACK using DBAPI connection.rollback(), "
1119 "DBAPI should ignore due to autocommit mode"
1120 )
1121 else:
1122 self._log_info("ROLLBACK")
1123 try:
1124 self.engine.dialect.do_rollback(self.connection)
1125 except BaseException as e:
1126 self._handle_dbapi_exception(e, None, None, None, None)
1127
1128 def _commit_impl(self) -> None:
1129 if self._has_events or self.engine._has_events:
1130 self.dispatch.commit(self)
1131
1132 if self._echo:
1133 if self._is_autocommit_isolation():
1134 self._log_info(
1135 "COMMIT using DBAPI connection.commit(), "
1136 "DBAPI should ignore due to autocommit mode"
1137 )
1138 else:
1139 self._log_info("COMMIT")
1140 try:
1141 self.engine.dialect.do_commit(self.connection)
1142 except BaseException as e:
1143 self._handle_dbapi_exception(e, None, None, None, None)
1144
1145 def _savepoint_impl(self, name: Optional[str] = None) -> str:
1146 if self._has_events or self.engine._has_events:
1147 self.dispatch.savepoint(self, name)
1148
1149 if name is None:
1150 self.__savepoint_seq += 1
1151 name = "sa_savepoint_%s" % self.__savepoint_seq
1152 self.engine.dialect.do_savepoint(self, name)
1153 return name
1154
1155 def _rollback_to_savepoint_impl(self, name: str) -> None:
1156 if self._has_events or self.engine._has_events:
1157 self.dispatch.rollback_savepoint(self, name, None)
1158
1159 if self._still_open_and_dbapi_connection_is_valid:
1160 self.engine.dialect.do_rollback_to_savepoint(self, name)
1161
1162 def _release_savepoint_impl(self, name: str) -> None:
1163 if self._has_events or self.engine._has_events:
1164 self.dispatch.release_savepoint(self, name, None)
1165
1166 self.engine.dialect.do_release_savepoint(self, name)
1167
1168 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
1169 if self._echo:
1170 self._log_info("BEGIN TWOPHASE (implicit)")
1171 if self._has_events or self.engine._has_events:
1172 self.dispatch.begin_twophase(self, transaction.xid)
1173
1174 self.__in_begin = True
1175 try:
1176 self.engine.dialect.do_begin_twophase(self, transaction.xid)
1177 except BaseException as e:
1178 self._handle_dbapi_exception(e, None, None, None, None)
1179 finally:
1180 self.__in_begin = False
1181
1182 def _prepare_twophase_impl(self, xid: Any) -> None:
1183 if self._has_events or self.engine._has_events:
1184 self.dispatch.prepare_twophase(self, xid)
1185
1186 assert isinstance(self._transaction, TwoPhaseTransaction)
1187 try:
1188 self.engine.dialect.do_prepare_twophase(self, xid)
1189 except BaseException as e:
1190 self._handle_dbapi_exception(e, None, None, None, None)
1191
1192 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1193 if self._has_events or self.engine._has_events:
1194 self.dispatch.rollback_twophase(self, xid, is_prepared)
1195
1196 if self._still_open_and_dbapi_connection_is_valid:
1197 assert isinstance(self._transaction, TwoPhaseTransaction)
1198 try:
1199 self.engine.dialect.do_rollback_twophase(
1200 self, xid, is_prepared
1201 )
1202 except BaseException as e:
1203 self._handle_dbapi_exception(e, None, None, None, None)
1204
1205 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1206 if self._has_events or self.engine._has_events:
1207 self.dispatch.commit_twophase(self, xid, is_prepared)
1208
1209 assert isinstance(self._transaction, TwoPhaseTransaction)
1210 try:
1211 self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
1212 except BaseException as e:
1213 self._handle_dbapi_exception(e, None, None, None, None)
1214
1215 def close(self) -> None:
1216 """Close this :class:`_engine.Connection`.
1217
1218 This results in a release of the underlying database
1219 resources, that is, the DBAPI connection referenced
1220 internally. The DBAPI connection is typically restored
1221 back to the connection-holding :class:`_pool.Pool` referenced
1222 by the :class:`_engine.Engine` that produced this
1223 :class:`_engine.Connection`. Any transactional state present on
1224 the DBAPI connection is also unconditionally released via
1225 the DBAPI connection's ``rollback()`` method, regardless
1226 of any :class:`.Transaction` object that may be
1227 outstanding with regards to this :class:`_engine.Connection`.
1228
1229 This has the effect of also calling :meth:`_engine.Connection.rollback`
1230 if any transaction is in place.
1231
1232 After :meth:`_engine.Connection.close` is called, the
1233 :class:`_engine.Connection` is permanently in a closed state,
1234 and will allow no further operations.
1235
1236 """
1237
1238 if self._transaction:
1239 self._transaction.close()
1240 skip_reset = True
1241 else:
1242 skip_reset = False
1243
1244 if self._dbapi_connection is not None:
1245 conn = self._dbapi_connection
1246
1247 # as we just closed the transaction, close the connection
1248 # pool connection without doing an additional reset
1249 if skip_reset:
1250 cast("_ConnectionFairy", conn)._close_special(
1251 transaction_reset=True
1252 )
1253 else:
1254 conn.close()
1255
1256 # There is a slight chance that conn.close() may have
1257 # triggered an invalidation here in which case
1258 # _dbapi_connection would already be None, however usually
1259 # it will be non-None here and in a "closed" state.
1260 self._dbapi_connection = None
1261 self.__can_reconnect = False
1262
1263 @overload
1264 def scalar(
1265 self,
1266 statement: TypedReturnsRows[Tuple[_T]],
1267 parameters: Optional[_CoreSingleExecuteParams] = None,
1268 *,
1269 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1270 ) -> Optional[_T]: ...
1271
1272 @overload
1273 def scalar(
1274 self,
1275 statement: Executable,
1276 parameters: Optional[_CoreSingleExecuteParams] = None,
1277 *,
1278 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1279 ) -> Any: ...
1280
1281 def scalar(
1282 self,
1283 statement: Executable,
1284 parameters: Optional[_CoreSingleExecuteParams] = None,
1285 *,
1286 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1287 ) -> Any:
1288 r"""Executes a SQL statement construct and returns a scalar object.
1289
1290 This method is shorthand for invoking the
1291 :meth:`_engine.Result.scalar` method after invoking the
1292 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
1293
1294 :return: a scalar Python value representing the first column of the
1295 first row returned.
1296
1297 """
1298 distilled_parameters = _distill_params_20(parameters)
1299 try:
1300 meth = statement._execute_on_scalar
1301 except AttributeError as err:
1302 raise exc.ObjectNotExecutableError(statement) from err
1303 else:
1304 return meth(
1305 self,
1306 distilled_parameters,
1307 execution_options or NO_OPTIONS,
1308 )
1309
1310 @overload
1311 def scalars(
1312 self,
1313 statement: TypedReturnsRows[Tuple[_T]],
1314 parameters: Optional[_CoreAnyExecuteParams] = None,
1315 *,
1316 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1317 ) -> ScalarResult[_T]: ...
1318
1319 @overload
1320 def scalars(
1321 self,
1322 statement: Executable,
1323 parameters: Optional[_CoreAnyExecuteParams] = None,
1324 *,
1325 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1326 ) -> ScalarResult[Any]: ...
1327
1328 def scalars(
1329 self,
1330 statement: Executable,
1331 parameters: Optional[_CoreAnyExecuteParams] = None,
1332 *,
1333 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1334 ) -> ScalarResult[Any]:
1335 """Executes and returns a scalar result set, which yields scalar values
1336 from the first column of each row.
1337
1338 This method is equivalent to calling :meth:`_engine.Connection.execute`
1339 to receive a :class:`_result.Result` object, then invoking the
1340 :meth:`_result.Result.scalars` method to produce a
1341 :class:`_result.ScalarResult` instance.
1342
1343 :return: a :class:`_result.ScalarResult`
1344
1345 .. versionadded:: 1.4.24
1346
1347 """
1348
1349 return self.execute(
1350 statement, parameters, execution_options=execution_options
1351 ).scalars()
1352
1353 @overload
1354 def execute(
1355 self,
1356 statement: TypedReturnsRows[_T],
1357 parameters: Optional[_CoreAnyExecuteParams] = None,
1358 *,
1359 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1360 ) -> CursorResult[_T]: ...
1361
1362 @overload
1363 def execute(
1364 self,
1365 statement: Executable,
1366 parameters: Optional[_CoreAnyExecuteParams] = None,
1367 *,
1368 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1369 ) -> CursorResult[Any]: ...
1370
1371 def execute(
1372 self,
1373 statement: Executable,
1374 parameters: Optional[_CoreAnyExecuteParams] = None,
1375 *,
1376 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1377 ) -> CursorResult[Any]:
1378 r"""Executes a SQL statement construct and returns a
1379 :class:`_engine.CursorResult`.
1380
1381 :param statement: The statement to be executed. This is always
1382 an object that is in both the :class:`_expression.ClauseElement` and
1383 :class:`_expression.Executable` hierarchies, including:
1384
1385 * :class:`_expression.Select`
1386 * :class:`_expression.Insert`, :class:`_expression.Update`,
1387 :class:`_expression.Delete`
1388 * :class:`_expression.TextClause` and
1389 :class:`_expression.TextualSelect`
1390 * :class:`_schema.DDL` and objects which inherit from
1391 :class:`_schema.ExecutableDDLElement`
1392
1393 :param parameters: parameters which will be bound into the statement.
1394 This may be either a dictionary of parameter names to values,
1395 or a mutable sequence (e.g. a list) of dictionaries. When a
1396 list of dictionaries is passed, the underlying statement execution
1397 will make use of the DBAPI ``cursor.executemany()`` method.
1398 When a single dictionary is passed, the DBAPI ``cursor.execute()``
1399 method will be used.
1400
1401 :param execution_options: optional dictionary of execution options,
1402 which will be associated with the statement execution. This
1403 dictionary can provide a subset of the options that are accepted
1404 by :meth:`_engine.Connection.execution_options`.
1405
1406 :return: a :class:`_engine.Result` object.
1407
1408 """
1409 distilled_parameters = _distill_params_20(parameters)
1410 try:
1411 meth = statement._execute_on_connection
1412 except AttributeError as err:
1413 raise exc.ObjectNotExecutableError(statement) from err
1414 else:
1415 return meth(
1416 self,
1417 distilled_parameters,
1418 execution_options or NO_OPTIONS,
1419 )
1420
1421 def _execute_function(
1422 self,
1423 func: FunctionElement[Any],
1424 distilled_parameters: _CoreMultiExecuteParams,
1425 execution_options: CoreExecuteOptionsParameter,
1426 ) -> CursorResult[Any]:
1427 """Execute a sql.FunctionElement object."""
1428
1429 return self._execute_clauseelement(
1430 func.select(), distilled_parameters, execution_options
1431 )
1432
1433 def _execute_default(
1434 self,
1435 default: DefaultGenerator,
1436 distilled_parameters: _CoreMultiExecuteParams,
1437 execution_options: CoreExecuteOptionsParameter,
1438 ) -> Any:
1439 """Execute a schema.ColumnDefault object."""
1440
1441 execution_options = self._execution_options.merge_with(
1442 execution_options
1443 )
1444
1445 event_multiparams: Optional[_CoreMultiExecuteParams]
1446 event_params: Optional[_CoreAnyExecuteParams]
1447
1448 # note for event handlers, the "distilled parameters" which is always
1449 # a list of dicts is broken out into separate "multiparams" and
1450 # "params" collections, which allows the handler to distinguish
1451 # between an executemany and execute style set of parameters.
1452 if self._has_events or self.engine._has_events:
1453 (
1454 default,
1455 distilled_parameters,
1456 event_multiparams,
1457 event_params,
1458 ) = self._invoke_before_exec_event(
1459 default, distilled_parameters, execution_options
1460 )
1461 else:
1462 event_multiparams = event_params = None
1463
1464 try:
1465 conn = self._dbapi_connection
1466 if conn is None:
1467 conn = self._revalidate_connection()
1468
1469 dialect = self.dialect
1470 ctx = dialect.execution_ctx_cls._init_default(
1471 dialect, self, conn, execution_options
1472 )
1473 except (exc.PendingRollbackError, exc.ResourceClosedError):
1474 raise
1475 except BaseException as e:
1476 self._handle_dbapi_exception(e, None, None, None, None)
1477
1478 ret = ctx._exec_default(None, default, None)
1479
1480 if self._has_events or self.engine._has_events:
1481 self.dispatch.after_execute(
1482 self,
1483 default,
1484 event_multiparams,
1485 event_params,
1486 execution_options,
1487 ret,
1488 )
1489
1490 return ret
1491
1492 def _execute_ddl(
1493 self,
1494 ddl: ExecutableDDLElement,
1495 distilled_parameters: _CoreMultiExecuteParams,
1496 execution_options: CoreExecuteOptionsParameter,
1497 ) -> CursorResult[Any]:
1498 """Execute a schema.DDL object."""
1499
1500 exec_opts = ddl._execution_options.merge_with(
1501 self._execution_options, execution_options
1502 )
1503
1504 event_multiparams: Optional[_CoreMultiExecuteParams]
1505 event_params: Optional[_CoreSingleExecuteParams]
1506
1507 if self._has_events or self.engine._has_events:
1508 (
1509 ddl,
1510 distilled_parameters,
1511 event_multiparams,
1512 event_params,
1513 ) = self._invoke_before_exec_event(
1514 ddl, distilled_parameters, exec_opts
1515 )
1516 else:
1517 event_multiparams = event_params = None
1518
1519 schema_translate_map = exec_opts.get("schema_translate_map", None)
1520
1521 dialect = self.dialect
1522
1523 compiled = ddl.compile(
1524 dialect=dialect, schema_translate_map=schema_translate_map
1525 )
1526 ret = self._execute_context(
1527 dialect,
1528 dialect.execution_ctx_cls._init_ddl,
1529 compiled,
1530 None,
1531 exec_opts,
1532 compiled,
1533 )
1534 if self._has_events or self.engine._has_events:
1535 self.dispatch.after_execute(
1536 self,
1537 ddl,
1538 event_multiparams,
1539 event_params,
1540 exec_opts,
1541 ret,
1542 )
1543 return ret
1544
1545 def _invoke_before_exec_event(
1546 self,
1547 elem: Any,
1548 distilled_params: _CoreMultiExecuteParams,
1549 execution_options: _ExecuteOptions,
1550 ) -> Tuple[
1551 Any,
1552 _CoreMultiExecuteParams,
1553 _CoreMultiExecuteParams,
1554 _CoreSingleExecuteParams,
1555 ]:
1556 event_multiparams: _CoreMultiExecuteParams
1557 event_params: _CoreSingleExecuteParams
1558
1559 if len(distilled_params) == 1:
1560 event_multiparams, event_params = [], distilled_params[0]
1561 else:
1562 event_multiparams, event_params = distilled_params, {}
1563
1564 for fn in self.dispatch.before_execute:
1565 elem, event_multiparams, event_params = fn(
1566 self,
1567 elem,
1568 event_multiparams,
1569 event_params,
1570 execution_options,
1571 )
1572
1573 if event_multiparams:
1574 distilled_params = list(event_multiparams)
1575 if event_params:
1576 raise exc.InvalidRequestError(
1577 "Event handler can't return non-empty multiparams "
1578 "and params at the same time"
1579 )
1580 elif event_params:
1581 distilled_params = [event_params]
1582 else:
1583 distilled_params = []
1584
1585 return elem, distilled_params, event_multiparams, event_params
1586
1587 def _execute_clauseelement(
1588 self,
1589 elem: Executable,
1590 distilled_parameters: _CoreMultiExecuteParams,
1591 execution_options: CoreExecuteOptionsParameter,
1592 ) -> CursorResult[Any]:
1593 """Execute a sql.ClauseElement object."""
1594
1595 execution_options = elem._execution_options.merge_with(
1596 self._execution_options, execution_options
1597 )
1598
1599 has_events = self._has_events or self.engine._has_events
1600 if has_events:
1601 (
1602 elem,
1603 distilled_parameters,
1604 event_multiparams,
1605 event_params,
1606 ) = self._invoke_before_exec_event(
1607 elem, distilled_parameters, execution_options
1608 )
1609
1610 if distilled_parameters:
1611 # ensure we don't retain a link to the view object for keys()
1612 # which links to the values, which we don't want to cache
1613 keys = sorted(distilled_parameters[0])
1614 for_executemany = len(distilled_parameters) > 1
1615 else:
1616 keys = []
1617 for_executemany = False
1618
1619 dialect = self.dialect
1620
1621 schema_translate_map = execution_options.get(
1622 "schema_translate_map", None
1623 )
1624
1625 compiled_cache: Optional[CompiledCacheType] = execution_options.get(
1626 "compiled_cache", self.engine._compiled_cache
1627 )
1628
1629 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
1630 dialect=dialect,
1631 compiled_cache=compiled_cache,
1632 column_keys=keys,
1633 for_executemany=for_executemany,
1634 schema_translate_map=schema_translate_map,
1635 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1636 )
1637 ret = self._execute_context(
1638 dialect,
1639 dialect.execution_ctx_cls._init_compiled,
1640 compiled_sql,
1641 distilled_parameters,
1642 execution_options,
1643 compiled_sql,
1644 distilled_parameters,
1645 elem,
1646 extracted_params,
1647 cache_hit=cache_hit,
1648 )
1649 if has_events:
1650 self.dispatch.after_execute(
1651 self,
1652 elem,
1653 event_multiparams,
1654 event_params,
1655 execution_options,
1656 ret,
1657 )
1658 return ret
1659
1660 def _execute_compiled(
1661 self,
1662 compiled: Compiled,
1663 distilled_parameters: _CoreMultiExecuteParams,
1664 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS,
1665 ) -> CursorResult[Any]:
1666 """Execute a sql.Compiled object.
1667
1668 TODO: why do we have this? likely deprecate or remove
1669
1670 """
1671
1672 execution_options = compiled.execution_options.merge_with(
1673 self._execution_options, execution_options
1674 )
1675
1676 if self._has_events or self.engine._has_events:
1677 (
1678 compiled,
1679 distilled_parameters,
1680 event_multiparams,
1681 event_params,
1682 ) = self._invoke_before_exec_event(
1683 compiled, distilled_parameters, execution_options
1684 )
1685
1686 dialect = self.dialect
1687
1688 ret = self._execute_context(
1689 dialect,
1690 dialect.execution_ctx_cls._init_compiled,
1691 compiled,
1692 distilled_parameters,
1693 execution_options,
1694 compiled,
1695 distilled_parameters,
1696 None,
1697 None,
1698 )
1699 if self._has_events or self.engine._has_events:
1700 self.dispatch.after_execute(
1701 self,
1702 compiled,
1703 event_multiparams,
1704 event_params,
1705 execution_options,
1706 ret,
1707 )
1708 return ret
1709
1710 def exec_driver_sql(
1711 self,
1712 statement: str,
1713 parameters: Optional[_DBAPIAnyExecuteParams] = None,
1714 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1715 ) -> CursorResult[Any]:
1716 r"""Executes a string SQL statement on the DBAPI cursor directly,
1717 without any SQL compilation steps.
1718
1719 This can be used to pass any string directly to the
1720 ``cursor.execute()`` method of the DBAPI in use.
1721
1722 :param statement: The statement str to be executed. Bound parameters
1723 must use the underlying DBAPI's paramstyle, such as "qmark",
1724 "pyformat", "format", etc.
1725
1726 :param parameters: represent bound parameter values to be used in the
1727 execution. The format is one of: a dictionary of named parameters,
1728 a tuple of positional parameters, or a list containing either
1729 dictionaries or tuples for multiple-execute support.
1730
1731 :return: a :class:`_engine.CursorResult`.
1732
1733 E.g. multiple dictionaries::
1734
1735
1736 conn.exec_driver_sql(
1737 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1738 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}],
1739 )
1740
1741 Single dictionary::
1742
1743 conn.exec_driver_sql(
1744 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1745 dict(id=1, value="v1"),
1746 )
1747
1748 Single tuple::
1749
1750 conn.exec_driver_sql(
1751 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1")
1752 )
1753
1754 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
1755 not participate in the
1756 :meth:`_events.ConnectionEvents.before_execute` and
1757 :meth:`_events.ConnectionEvents.after_execute` events. To
1758 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
1759 :meth:`_events.ConnectionEvents.before_cursor_execute` and
1760 :meth:`_events.ConnectionEvents.after_cursor_execute`.
1761
1762 .. seealso::
1763
1764 :pep:`249`
1765
1766 """
1767
1768 distilled_parameters = _distill_raw_params(parameters)
1769
1770 execution_options = self._execution_options.merge_with(
1771 execution_options
1772 )
1773
1774 dialect = self.dialect
1775 ret = self._execute_context(
1776 dialect,
1777 dialect.execution_ctx_cls._init_statement,
1778 statement,
1779 None,
1780 execution_options,
1781 statement,
1782 distilled_parameters,
1783 )
1784
1785 return ret
1786
1787 def _execute_context(
1788 self,
1789 dialect: Dialect,
1790 constructor: Callable[..., ExecutionContext],
1791 statement: Union[str, Compiled],
1792 parameters: Optional[_AnyMultiExecuteParams],
1793 execution_options: _ExecuteOptions,
1794 *args: Any,
1795 **kw: Any,
1796 ) -> CursorResult[Any]:
1797 """Create an :class:`.ExecutionContext` and execute, returning
1798 a :class:`_engine.CursorResult`."""
1799
1800 if execution_options:
1801 yp = execution_options.get("yield_per", None)
1802 if yp:
1803 execution_options = execution_options.union(
1804 {"stream_results": True, "max_row_buffer": yp}
1805 )
1806 try:
1807 conn = self._dbapi_connection
1808 if conn is None:
1809 conn = self._revalidate_connection()
1810
1811 context = constructor(
1812 dialect, self, conn, execution_options, *args, **kw
1813 )
1814 except (exc.PendingRollbackError, exc.ResourceClosedError):
1815 raise
1816 except BaseException as e:
1817 self._handle_dbapi_exception(
1818 e, str(statement), parameters, None, None
1819 )
1820
1821 if (
1822 self._transaction
1823 and not self._transaction.is_active
1824 or (
1825 self._nested_transaction
1826 and not self._nested_transaction.is_active
1827 )
1828 ):
1829 self._invalid_transaction()
1830
1831 elif self._trans_context_manager:
1832 TransactionalContext._trans_ctx_check(self)
1833
1834 if self._transaction is None:
1835 self._autobegin()
1836
1837 context.pre_exec()
1838
1839 if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
1840 return self._exec_insertmany_context(dialect, context)
1841 else:
1842 return self._exec_single_context(
1843 dialect, context, statement, parameters
1844 )
1845
1846 def _exec_single_context(
1847 self,
1848 dialect: Dialect,
1849 context: ExecutionContext,
1850 statement: Union[str, Compiled],
1851 parameters: Optional[_AnyMultiExecuteParams],
1852 ) -> CursorResult[Any]:
1853 """continue the _execute_context() method for a single DBAPI
1854 cursor.execute() or cursor.executemany() call.
1855
1856 """
1857 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1858 generic_setinputsizes = context._prepare_set_input_sizes()
1859
1860 if generic_setinputsizes:
1861 try:
1862 dialect.do_set_input_sizes(
1863 context.cursor, generic_setinputsizes, context
1864 )
1865 except BaseException as e:
1866 self._handle_dbapi_exception(
1867 e, str(statement), parameters, None, context
1868 )
1869
1870 cursor, str_statement, parameters = (
1871 context.cursor,
1872 context.statement,
1873 context.parameters,
1874 )
1875
1876 effective_parameters: Optional[_AnyExecuteParams]
1877
1878 if not context.executemany:
1879 effective_parameters = parameters[0]
1880 else:
1881 effective_parameters = parameters
1882
1883 if self._has_events or self.engine._has_events:
1884 for fn in self.dispatch.before_cursor_execute:
1885 str_statement, effective_parameters = fn(
1886 self,
1887 cursor,
1888 str_statement,
1889 effective_parameters,
1890 context,
1891 context.executemany,
1892 )
1893
1894 if self._echo:
1895 self._log_info(str_statement)
1896
1897 stats = context._get_cache_stats()
1898
1899 if not self.engine.hide_parameters:
1900 self._log_info(
1901 "[%s] %r",
1902 stats,
1903 sql_util._repr_params(
1904 effective_parameters,
1905 batches=10,
1906 ismulti=context.executemany,
1907 ),
1908 )
1909 else:
1910 self._log_info(
1911 "[%s] [SQL parameters hidden due to hide_parameters=True]",
1912 stats,
1913 )
1914
1915 evt_handled: bool = False
1916 try:
1917 if context.execute_style is ExecuteStyle.EXECUTEMANY:
1918 effective_parameters = cast(
1919 "_CoreMultiExecuteParams", effective_parameters
1920 )
1921 if self.dialect._has_events:
1922 for fn in self.dialect.dispatch.do_executemany:
1923 if fn(
1924 cursor,
1925 str_statement,
1926 effective_parameters,
1927 context,
1928 ):
1929 evt_handled = True
1930 break
1931 if not evt_handled:
1932 self.dialect.do_executemany(
1933 cursor,
1934 str_statement,
1935 effective_parameters,
1936 context,
1937 )
1938 elif not effective_parameters and context.no_parameters:
1939 if self.dialect._has_events:
1940 for fn in self.dialect.dispatch.do_execute_no_params:
1941 if fn(cursor, str_statement, context):
1942 evt_handled = True
1943 break
1944 if not evt_handled:
1945 self.dialect.do_execute_no_params(
1946 cursor, str_statement, context
1947 )
1948 else:
1949 effective_parameters = cast(
1950 "_CoreSingleExecuteParams", effective_parameters
1951 )
1952 if self.dialect._has_events:
1953 for fn in self.dialect.dispatch.do_execute:
1954 if fn(
1955 cursor,
1956 str_statement,
1957 effective_parameters,
1958 context,
1959 ):
1960 evt_handled = True
1961 break
1962 if not evt_handled:
1963 self.dialect.do_execute(
1964 cursor, str_statement, effective_parameters, context
1965 )
1966
1967 if self._has_events or self.engine._has_events:
1968 self.dispatch.after_cursor_execute(
1969 self,
1970 cursor,
1971 str_statement,
1972 effective_parameters,
1973 context,
1974 context.executemany,
1975 )
1976
1977 context.post_exec()
1978
1979 result = context._setup_result_proxy()
1980
1981 except BaseException as e:
1982 self._handle_dbapi_exception(
1983 e, str_statement, effective_parameters, cursor, context
1984 )
1985
1986 return result
1987
1988 def _exec_insertmany_context(
1989 self,
1990 dialect: Dialect,
1991 context: ExecutionContext,
1992 ) -> CursorResult[Any]:
1993 """continue the _execute_context() method for an "insertmanyvalues"
1994 operation, which will invoke DBAPI
1995 cursor.execute() one or more times with individual log and
1996 event hook calls.
1997
1998 """
1999
2000 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
2001 generic_setinputsizes = context._prepare_set_input_sizes()
2002 else:
2003 generic_setinputsizes = None
2004
2005 cursor, str_statement, parameters = (
2006 context.cursor,
2007 context.statement,
2008 context.parameters,
2009 )
2010
2011 effective_parameters = parameters
2012
2013 engine_events = self._has_events or self.engine._has_events
2014 if self.dialect._has_events:
2015 do_execute_dispatch: Iterable[Any] = (
2016 self.dialect.dispatch.do_execute
2017 )
2018 else:
2019 do_execute_dispatch = ()
2020
2021 if self._echo:
2022 stats = context._get_cache_stats() + " (insertmanyvalues)"
2023
2024 preserve_rowcount = context.execution_options.get(
2025 "preserve_rowcount", False
2026 )
2027 rowcount = 0
2028
2029 for imv_batch in dialect._deliver_insertmanyvalues_batches(
2030 self,
2031 cursor,
2032 str_statement,
2033 effective_parameters,
2034 generic_setinputsizes,
2035 context,
2036 ):
2037 if imv_batch.processed_setinputsizes:
2038 try:
2039 dialect.do_set_input_sizes(
2040 context.cursor,
2041 imv_batch.processed_setinputsizes,
2042 context,
2043 )
2044 except BaseException as e:
2045 self._handle_dbapi_exception(
2046 e,
2047 sql_util._long_statement(imv_batch.replaced_statement),
2048 imv_batch.replaced_parameters,
2049 None,
2050 context,
2051 is_sub_exec=True,
2052 )
2053
2054 sub_stmt = imv_batch.replaced_statement
2055 sub_params = imv_batch.replaced_parameters
2056
2057 if engine_events:
2058 for fn in self.dispatch.before_cursor_execute:
2059 sub_stmt, sub_params = fn(
2060 self,
2061 cursor,
2062 sub_stmt,
2063 sub_params,
2064 context,
2065 True,
2066 )
2067
2068 if self._echo:
2069 self._log_info(sql_util._long_statement(sub_stmt))
2070
2071 imv_stats = f""" {imv_batch.batchnum}/{
2072 imv_batch.total_batches
2073 } ({
2074 'ordered'
2075 if imv_batch.rows_sorted else 'unordered'
2076 }{
2077 '; batch not supported'
2078 if imv_batch.is_downgraded
2079 else ''
2080 })"""
2081
2082 if imv_batch.batchnum == 1:
2083 stats += imv_stats
2084 else:
2085 stats = f"insertmanyvalues{imv_stats}"
2086
2087 if not self.engine.hide_parameters:
2088 self._log_info(
2089 "[%s] %r",
2090 stats,
2091 sql_util._repr_params(
2092 sub_params,
2093 batches=10,
2094 ismulti=False,
2095 ),
2096 )
2097 else:
2098 self._log_info(
2099 "[%s] [SQL parameters hidden due to "
2100 "hide_parameters=True]",
2101 stats,
2102 )
2103
2104 try:
2105 for fn in do_execute_dispatch:
2106 if fn(
2107 cursor,
2108 sub_stmt,
2109 sub_params,
2110 context,
2111 ):
2112 break
2113 else:
2114 dialect.do_execute(
2115 cursor,
2116 sub_stmt,
2117 sub_params,
2118 context,
2119 )
2120
2121 except BaseException as e:
2122 self._handle_dbapi_exception(
2123 e,
2124 sql_util._long_statement(sub_stmt),
2125 sub_params,
2126 cursor,
2127 context,
2128 is_sub_exec=True,
2129 )
2130
2131 if engine_events:
2132 self.dispatch.after_cursor_execute(
2133 self,
2134 cursor,
2135 str_statement,
2136 effective_parameters,
2137 context,
2138 context.executemany,
2139 )
2140
2141 if preserve_rowcount:
2142 rowcount += imv_batch.current_batch_size
2143
2144 try:
2145 context.post_exec()
2146
2147 if preserve_rowcount:
2148 context._rowcount = rowcount # type: ignore[attr-defined]
2149
2150 result = context._setup_result_proxy()
2151
2152 except BaseException as e:
2153 self._handle_dbapi_exception(
2154 e, str_statement, effective_parameters, cursor, context
2155 )
2156
2157 return result
2158
2159 def _cursor_execute(
2160 self,
2161 cursor: DBAPICursor,
2162 statement: str,
2163 parameters: _DBAPISingleExecuteParams,
2164 context: Optional[ExecutionContext] = None,
2165 ) -> None:
2166 """Execute a statement + params on the given cursor.
2167
2168 Adds appropriate logging and exception handling.
2169
2170 This method is used by DefaultDialect for special-case
2171 executions, such as for sequences and column defaults.
2172 The path of statement execution in the majority of cases
2173 terminates at _execute_context().
2174
2175 """
2176 if self._has_events or self.engine._has_events:
2177 for fn in self.dispatch.before_cursor_execute:
2178 statement, parameters = fn(
2179 self, cursor, statement, parameters, context, False
2180 )
2181
2182 if self._echo:
2183 self._log_info(statement)
2184 self._log_info("[raw sql] %r", parameters)
2185 try:
2186 for fn in (
2187 ()
2188 if not self.dialect._has_events
2189 else self.dialect.dispatch.do_execute
2190 ):
2191 if fn(cursor, statement, parameters, context):
2192 break
2193 else:
2194 self.dialect.do_execute(cursor, statement, parameters, context)
2195 except BaseException as e:
2196 self._handle_dbapi_exception(
2197 e, statement, parameters, cursor, context
2198 )
2199
2200 if self._has_events or self.engine._has_events:
2201 self.dispatch.after_cursor_execute(
2202 self, cursor, statement, parameters, context, False
2203 )
2204
2205 def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
2206 """Close the given cursor, catching exceptions
2207 and turning into log warnings.
2208
2209 """
2210 try:
2211 cursor.close()
2212 except Exception:
2213 # log the error through the connection pool's logger.
2214 self.engine.pool.logger.error(
2215 "Error closing cursor", exc_info=True
2216 )
2217
2218 _reentrant_error = False
2219 _is_disconnect = False
2220
2221 def _handle_dbapi_exception(
2222 self,
2223 e: BaseException,
2224 statement: Optional[str],
2225 parameters: Optional[_AnyExecuteParams],
2226 cursor: Optional[DBAPICursor],
2227 context: Optional[ExecutionContext],
2228 is_sub_exec: bool = False,
2229 ) -> NoReturn:
2230 exc_info = sys.exc_info()
2231
2232 is_exit_exception = util.is_exit_exception(e)
2233
2234 if not self._is_disconnect:
2235 self._is_disconnect = (
2236 isinstance(e, self.dialect.loaded_dbapi.Error)
2237 and not self.closed
2238 and self.dialect.is_disconnect(
2239 e,
2240 self._dbapi_connection if not self.invalidated else None,
2241 cursor,
2242 )
2243 ) or (is_exit_exception and not self.closed)
2244
2245 invalidate_pool_on_disconnect = not is_exit_exception
2246
2247 ismulti: bool = (
2248 not is_sub_exec and context.executemany
2249 if context is not None
2250 else False
2251 )
2252 if self._reentrant_error:
2253 raise exc.DBAPIError.instance(
2254 statement,
2255 parameters,
2256 e,
2257 self.dialect.loaded_dbapi.Error,
2258 hide_parameters=self.engine.hide_parameters,
2259 dialect=self.dialect,
2260 ismulti=ismulti,
2261 ).with_traceback(exc_info[2]) from e
2262 self._reentrant_error = True
2263 try:
2264 # non-DBAPI error - if we already got a context,
2265 # or there's no string statement, don't wrap it
2266 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or (
2267 statement is not None
2268 and context is None
2269 and not is_exit_exception
2270 )
2271
2272 if should_wrap:
2273 sqlalchemy_exception = exc.DBAPIError.instance(
2274 statement,
2275 parameters,
2276 cast(Exception, e),
2277 self.dialect.loaded_dbapi.Error,
2278 hide_parameters=self.engine.hide_parameters,
2279 connection_invalidated=self._is_disconnect,
2280 dialect=self.dialect,
2281 ismulti=ismulti,
2282 )
2283 else:
2284 sqlalchemy_exception = None
2285
2286 newraise = None
2287
2288 if (self.dialect._has_events) and not self._execution_options.get(
2289 "skip_user_error_events", False
2290 ):
2291 ctx = ExceptionContextImpl(
2292 e,
2293 sqlalchemy_exception,
2294 self.engine,
2295 self.dialect,
2296 self,
2297 cursor,
2298 statement,
2299 parameters,
2300 context,
2301 self._is_disconnect,
2302 invalidate_pool_on_disconnect,
2303 False,
2304 )
2305
2306 for fn in self.dialect.dispatch.handle_error:
2307 try:
2308 # handler returns an exception;
2309 # call next handler in a chain
2310 per_fn = fn(ctx)
2311 if per_fn is not None:
2312 ctx.chained_exception = newraise = per_fn
2313 except Exception as _raised:
2314 # handler raises an exception - stop processing
2315 newraise = _raised
2316 break
2317
2318 if self._is_disconnect != ctx.is_disconnect:
2319 self._is_disconnect = ctx.is_disconnect
2320 if sqlalchemy_exception:
2321 sqlalchemy_exception.connection_invalidated = (
2322 ctx.is_disconnect
2323 )
2324
2325 # set up potentially user-defined value for
2326 # invalidate pool.
2327 invalidate_pool_on_disconnect = (
2328 ctx.invalidate_pool_on_disconnect
2329 )
2330
2331 if should_wrap and context:
2332 context.handle_dbapi_exception(e)
2333
2334 if not self._is_disconnect:
2335 if cursor:
2336 self._safe_close_cursor(cursor)
2337 # "autorollback" was mostly relevant in 1.x series.
2338 # It's very unlikely to reach here, as the connection
2339 # does autobegin so when we are here, we are usually
2340 # in an explicit / semi-explicit transaction.
2341 # however we have a test which manufactures this
2342 # scenario in any case using an event handler.
2343 # test/engine/test_execute.py-> test_actual_autorollback
2344 if not self.in_transaction():
2345 self._rollback_impl()
2346
2347 if newraise:
2348 raise newraise.with_traceback(exc_info[2]) from e
2349 elif should_wrap:
2350 assert sqlalchemy_exception is not None
2351 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2352 else:
2353 assert exc_info[1] is not None
2354 raise exc_info[1].with_traceback(exc_info[2])
2355 finally:
2356 del self._reentrant_error
2357 if self._is_disconnect:
2358 del self._is_disconnect
2359 if not self.invalidated:
2360 dbapi_conn_wrapper = self._dbapi_connection
2361 assert dbapi_conn_wrapper is not None
2362 if invalidate_pool_on_disconnect:
2363 self.engine.pool._invalidate(dbapi_conn_wrapper, e)
2364 self.invalidate(e)
2365
2366 @classmethod
2367 def _handle_dbapi_exception_noconnection(
2368 cls,
2369 e: BaseException,
2370 dialect: Dialect,
2371 engine: Optional[Engine] = None,
2372 is_disconnect: Optional[bool] = None,
2373 invalidate_pool_on_disconnect: bool = True,
2374 is_pre_ping: bool = False,
2375 ) -> NoReturn:
2376 exc_info = sys.exc_info()
2377
2378 if is_disconnect is None:
2379 is_disconnect = isinstance(
2380 e, dialect.loaded_dbapi.Error
2381 ) and dialect.is_disconnect(e, None, None)
2382
2383 should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
2384
2385 if should_wrap:
2386 sqlalchemy_exception = exc.DBAPIError.instance(
2387 None,
2388 None,
2389 cast(Exception, e),
2390 dialect.loaded_dbapi.Error,
2391 hide_parameters=(
2392 engine.hide_parameters if engine is not None else False
2393 ),
2394 connection_invalidated=is_disconnect,
2395 dialect=dialect,
2396 )
2397 else:
2398 sqlalchemy_exception = None
2399
2400 newraise = None
2401
2402 if dialect._has_events:
2403 ctx = ExceptionContextImpl(
2404 e,
2405 sqlalchemy_exception,
2406 engine,
2407 dialect,
2408 None,
2409 None,
2410 None,
2411 None,
2412 None,
2413 is_disconnect,
2414 invalidate_pool_on_disconnect,
2415 is_pre_ping,
2416 )
2417 for fn in dialect.dispatch.handle_error:
2418 try:
2419 # handler returns an exception;
2420 # call next handler in a chain
2421 per_fn = fn(ctx)
2422 if per_fn is not None:
2423 ctx.chained_exception = newraise = per_fn
2424 except Exception as _raised:
2425 # handler raises an exception - stop processing
2426 newraise = _raised
2427 break
2428
2429 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
2430 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect
2431
2432 if newraise:
2433 raise newraise.with_traceback(exc_info[2]) from e
2434 elif should_wrap:
2435 assert sqlalchemy_exception is not None
2436 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2437 else:
2438 assert exc_info[1] is not None
2439 raise exc_info[1].with_traceback(exc_info[2])
2440
2441 def _run_ddl_visitor(
2442 self,
2443 visitorcallable: Type[InvokeDDLBase],
2444 element: SchemaVisitable,
2445 **kwargs: Any,
2446 ) -> None:
2447 """run a DDL visitor.
2448
2449 This method is only here so that the MockConnection can change the
2450 options given to the visitor so that "checkfirst" is skipped.
2451
2452 """
2453 visitorcallable(
2454 dialect=self.dialect, connection=self, **kwargs
2455 ).traverse_single(element)
2456
2457
2458class ExceptionContextImpl(ExceptionContext):
2459 """Implement the :class:`.ExceptionContext` interface."""
2460
2461 __slots__ = (
2462 "connection",
2463 "engine",
2464 "dialect",
2465 "cursor",
2466 "statement",
2467 "parameters",
2468 "original_exception",
2469 "sqlalchemy_exception",
2470 "chained_exception",
2471 "execution_context",
2472 "is_disconnect",
2473 "invalidate_pool_on_disconnect",
2474 "is_pre_ping",
2475 )
2476
2477 def __init__(
2478 self,
2479 exception: BaseException,
2480 sqlalchemy_exception: Optional[exc.StatementError],
2481 engine: Optional[Engine],
2482 dialect: Dialect,
2483 connection: Optional[Connection],
2484 cursor: Optional[DBAPICursor],
2485 statement: Optional[str],
2486 parameters: Optional[_DBAPIAnyExecuteParams],
2487 context: Optional[ExecutionContext],
2488 is_disconnect: bool,
2489 invalidate_pool_on_disconnect: bool,
2490 is_pre_ping: bool,
2491 ):
2492 self.engine = engine
2493 self.dialect = dialect
2494 self.connection = connection
2495 self.sqlalchemy_exception = sqlalchemy_exception
2496 self.original_exception = exception
2497 self.execution_context = context
2498 self.statement = statement
2499 self.parameters = parameters
2500 self.is_disconnect = is_disconnect
2501 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
2502 self.is_pre_ping = is_pre_ping
2503
2504
2505class Transaction(TransactionalContext):
2506 """Represent a database transaction in progress.
2507
2508 The :class:`.Transaction` object is procured by
2509 calling the :meth:`_engine.Connection.begin` method of
2510 :class:`_engine.Connection`::
2511
2512 from sqlalchemy import create_engine
2513
2514 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
2515 connection = engine.connect()
2516 trans = connection.begin()
2517 connection.execute(text("insert into x (a, b) values (1, 2)"))
2518 trans.commit()
2519
2520 The object provides :meth:`.rollback` and :meth:`.commit`
2521 methods in order to control transaction boundaries. It
2522 also implements a context manager interface so that
2523 the Python ``with`` statement can be used with the
2524 :meth:`_engine.Connection.begin` method::
2525
2526 with connection.begin():
2527 connection.execute(text("insert into x (a, b) values (1, 2)"))
2528
2529 The Transaction object is **not** threadsafe.
2530
2531 .. seealso::
2532
2533 :meth:`_engine.Connection.begin`
2534
2535 :meth:`_engine.Connection.begin_twophase`
2536
2537 :meth:`_engine.Connection.begin_nested`
2538
2539 .. index::
2540 single: thread safety; Transaction
2541 """ # noqa
2542
2543 __slots__ = ()
2544
2545 _is_root: bool = False
2546 is_active: bool
2547 connection: Connection
2548
2549 def __init__(self, connection: Connection):
2550 raise NotImplementedError()
2551
2552 @property
2553 def _deactivated_from_connection(self) -> bool:
2554 """True if this transaction is totally deactivated from the connection
2555 and therefore can no longer affect its state.
2556
2557 """
2558 raise NotImplementedError()
2559
2560 def _do_close(self) -> None:
2561 raise NotImplementedError()
2562
2563 def _do_rollback(self) -> None:
2564 raise NotImplementedError()
2565
2566 def _do_commit(self) -> None:
2567 raise NotImplementedError()
2568
2569 @property
2570 def is_valid(self) -> bool:
2571 return self.is_active and not self.connection.invalidated
2572
2573 def close(self) -> None:
2574 """Close this :class:`.Transaction`.
2575
2576 If this transaction is the base transaction in a begin/commit
2577 nesting, the transaction will rollback(). Otherwise, the
2578 method returns.
2579
2580 This is used to cancel a Transaction without affecting the scope of
2581 an enclosing transaction.
2582
2583 """
2584 try:
2585 self._do_close()
2586 finally:
2587 assert not self.is_active
2588
2589 def rollback(self) -> None:
2590 """Roll back this :class:`.Transaction`.
2591
2592 The implementation of this may vary based on the type of transaction in
2593 use:
2594
2595 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2596 it corresponds to a ROLLBACK.
2597
2598 * For a :class:`.NestedTransaction`, it corresponds to a
2599 "ROLLBACK TO SAVEPOINT" operation.
2600
2601 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2602 phase transactions may be used.
2603
2604
2605 """
2606 try:
2607 self._do_rollback()
2608 finally:
2609 assert not self.is_active
2610
2611 def commit(self) -> None:
2612 """Commit this :class:`.Transaction`.
2613
2614 The implementation of this may vary based on the type of transaction in
2615 use:
2616
2617 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2618 it corresponds to a COMMIT.
2619
2620 * For a :class:`.NestedTransaction`, it corresponds to a
2621 "RELEASE SAVEPOINT" operation.
2622
2623 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2624 phase transactions may be used.
2625
2626 """
2627 try:
2628 self._do_commit()
2629 finally:
2630 assert not self.is_active
2631
2632 def _get_subject(self) -> Connection:
2633 return self.connection
2634
2635 def _transaction_is_active(self) -> bool:
2636 return self.is_active
2637
2638 def _transaction_is_closed(self) -> bool:
2639 return not self._deactivated_from_connection
2640
2641 def _rollback_can_be_called(self) -> bool:
2642 # for RootTransaction / NestedTransaction, it's safe to call
2643 # rollback() even if the transaction is deactive and no warnings
2644 # will be emitted. tested in
2645 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
2646 return True
2647
2648
2649class RootTransaction(Transaction):
2650 """Represent the "root" transaction on a :class:`_engine.Connection`.
2651
2652 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
2653 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
2654 is created by calling upon the :meth:`_engine.Connection.begin` method, and
2655 remains associated with the :class:`_engine.Connection` throughout its
2656 active span. The current :class:`_engine.RootTransaction` in use is
2657 accessible via the :attr:`_engine.Connection.get_transaction` method of
2658 :class:`_engine.Connection`.
2659
2660 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs
2661 "autobegin" behavior that will create a new
2662 :class:`_engine.RootTransaction` whenever a connection in a
2663 non-transactional state is used to emit commands on the DBAPI connection.
2664 The scope of the :class:`_engine.RootTransaction` in 2.0 style
2665 use can be controlled using the :meth:`_engine.Connection.commit` and
2666 :meth:`_engine.Connection.rollback` methods.
2667
2668
2669 """
2670
2671 _is_root = True
2672
2673 __slots__ = ("connection", "is_active")
2674
2675 def __init__(self, connection: Connection):
2676 assert connection._transaction is None
2677 if connection._trans_context_manager:
2678 TransactionalContext._trans_ctx_check(connection)
2679 self.connection = connection
2680 self._connection_begin_impl()
2681 connection._transaction = self
2682
2683 self.is_active = True
2684
2685 def _deactivate_from_connection(self) -> None:
2686 if self.is_active:
2687 assert self.connection._transaction is self
2688 self.is_active = False
2689
2690 elif self.connection._transaction is not self:
2691 util.warn("transaction already deassociated from connection")
2692
2693 @property
2694 def _deactivated_from_connection(self) -> bool:
2695 return self.connection._transaction is not self
2696
2697 def _connection_begin_impl(self) -> None:
2698 self.connection._begin_impl(self)
2699
2700 def _connection_rollback_impl(self) -> None:
2701 self.connection._rollback_impl()
2702
2703 def _connection_commit_impl(self) -> None:
2704 self.connection._commit_impl()
2705
2706 def _close_impl(self, try_deactivate: bool = False) -> None:
2707 try:
2708 if self.is_active:
2709 self._connection_rollback_impl()
2710
2711 if self.connection._nested_transaction:
2712 self.connection._nested_transaction._cancel()
2713 finally:
2714 if self.is_active or try_deactivate:
2715 self._deactivate_from_connection()
2716 if self.connection._transaction is self:
2717 self.connection._transaction = None
2718
2719 assert not self.is_active
2720 assert self.connection._transaction is not self
2721
2722 def _do_close(self) -> None:
2723 self._close_impl()
2724
2725 def _do_rollback(self) -> None:
2726 self._close_impl(try_deactivate=True)
2727
2728 def _do_commit(self) -> None:
2729 if self.is_active:
2730 assert self.connection._transaction is self
2731
2732 try:
2733 self._connection_commit_impl()
2734 finally:
2735 # whether or not commit succeeds, cancel any
2736 # nested transactions, make this transaction "inactive"
2737 # and remove it as a reset agent
2738 if self.connection._nested_transaction:
2739 self.connection._nested_transaction._cancel()
2740
2741 self._deactivate_from_connection()
2742
2743 # ...however only remove as the connection's current transaction
2744 # if commit succeeded. otherwise it stays on so that a rollback
2745 # needs to occur.
2746 self.connection._transaction = None
2747 else:
2748 if self.connection._transaction is self:
2749 self.connection._invalid_transaction()
2750 else:
2751 raise exc.InvalidRequestError("This transaction is inactive")
2752
2753 assert not self.is_active
2754 assert self.connection._transaction is not self
2755
2756
2757class NestedTransaction(Transaction):
2758 """Represent a 'nested', or SAVEPOINT transaction.
2759
2760 The :class:`.NestedTransaction` object is created by calling the
2761 :meth:`_engine.Connection.begin_nested` method of
2762 :class:`_engine.Connection`.
2763
2764 When using :class:`.NestedTransaction`, the semantics of "begin" /
2765 "commit" / "rollback" are as follows:
2766
2767 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
2768 the savepoint is given an explicit name that is part of the state
2769 of this object.
2770
2771 * The :meth:`.NestedTransaction.commit` method corresponds to a
2772 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
2773 with this :class:`.NestedTransaction`.
2774
2775 * The :meth:`.NestedTransaction.rollback` method corresponds to a
2776 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
2777 associated with this :class:`.NestedTransaction`.
2778
2779 The rationale for mimicking the semantics of an outer transaction in
2780 terms of savepoints so that code may deal with a "savepoint" transaction
2781 and an "outer" transaction in an agnostic way.
2782
2783 .. seealso::
2784
2785 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
2786
2787 """
2788
2789 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
2790
2791 _savepoint: str
2792
2793 def __init__(self, connection: Connection):
2794 assert connection._transaction is not None
2795 if connection._trans_context_manager:
2796 TransactionalContext._trans_ctx_check(connection)
2797 self.connection = connection
2798 self._savepoint = self.connection._savepoint_impl()
2799 self.is_active = True
2800 self._previous_nested = connection._nested_transaction
2801 connection._nested_transaction = self
2802
2803 def _deactivate_from_connection(self, warn: bool = True) -> None:
2804 if self.connection._nested_transaction is self:
2805 self.connection._nested_transaction = self._previous_nested
2806 elif warn:
2807 util.warn(
2808 "nested transaction already deassociated from connection"
2809 )
2810
2811 @property
2812 def _deactivated_from_connection(self) -> bool:
2813 return self.connection._nested_transaction is not self
2814
2815 def _cancel(self) -> None:
2816 # called by RootTransaction when the outer transaction is
2817 # committed, rolled back, or closed to cancel all savepoints
2818 # without any action being taken
2819 self.is_active = False
2820 self._deactivate_from_connection()
2821 if self._previous_nested:
2822 self._previous_nested._cancel()
2823
2824 def _close_impl(
2825 self, deactivate_from_connection: bool, warn_already_deactive: bool
2826 ) -> None:
2827 try:
2828 if (
2829 self.is_active
2830 and self.connection._transaction
2831 and self.connection._transaction.is_active
2832 ):
2833 self.connection._rollback_to_savepoint_impl(self._savepoint)
2834 finally:
2835 self.is_active = False
2836
2837 if deactivate_from_connection:
2838 self._deactivate_from_connection(warn=warn_already_deactive)
2839
2840 assert not self.is_active
2841 if deactivate_from_connection:
2842 assert self.connection._nested_transaction is not self
2843
2844 def _do_close(self) -> None:
2845 self._close_impl(True, False)
2846
2847 def _do_rollback(self) -> None:
2848 self._close_impl(True, True)
2849
2850 def _do_commit(self) -> None:
2851 if self.is_active:
2852 try:
2853 self.connection._release_savepoint_impl(self._savepoint)
2854 finally:
2855 # nested trans becomes inactive on failed release
2856 # unconditionally. this prevents it from trying to
2857 # emit SQL when it rolls back.
2858 self.is_active = False
2859
2860 # but only de-associate from connection if it succeeded
2861 self._deactivate_from_connection()
2862 else:
2863 if self.connection._nested_transaction is self:
2864 self.connection._invalid_transaction()
2865 else:
2866 raise exc.InvalidRequestError(
2867 "This nested transaction is inactive"
2868 )
2869
2870
2871class TwoPhaseTransaction(RootTransaction):
2872 """Represent a two-phase transaction.
2873
2874 A new :class:`.TwoPhaseTransaction` object may be procured
2875 using the :meth:`_engine.Connection.begin_twophase` method.
2876
2877 The interface is the same as that of :class:`.Transaction`
2878 with the addition of the :meth:`prepare` method.
2879
2880 """
2881
2882 __slots__ = ("xid", "_is_prepared")
2883
2884 xid: Any
2885
2886 def __init__(self, connection: Connection, xid: Any):
2887 self._is_prepared = False
2888 self.xid = xid
2889 super().__init__(connection)
2890
2891 def prepare(self) -> None:
2892 """Prepare this :class:`.TwoPhaseTransaction`.
2893
2894 After a PREPARE, the transaction can be committed.
2895
2896 """
2897 if not self.is_active:
2898 raise exc.InvalidRequestError("This transaction is inactive")
2899 self.connection._prepare_twophase_impl(self.xid)
2900 self._is_prepared = True
2901
2902 def _connection_begin_impl(self) -> None:
2903 self.connection._begin_twophase_impl(self)
2904
2905 def _connection_rollback_impl(self) -> None:
2906 self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
2907
2908 def _connection_commit_impl(self) -> None:
2909 self.connection._commit_twophase_impl(self.xid, self._is_prepared)
2910
2911
2912class Engine(
2913 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"]
2914):
2915 """
2916 Connects a :class:`~sqlalchemy.pool.Pool` and
2917 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
2918 source of database connectivity and behavior.
2919
2920 An :class:`_engine.Engine` object is instantiated publicly using the
2921 :func:`~sqlalchemy.create_engine` function.
2922
2923 .. seealso::
2924
2925 :doc:`/core/engines`
2926
2927 :ref:`connections_toplevel`
2928
2929 """
2930
2931 dispatch: dispatcher[ConnectionEventsTarget]
2932
2933 _compiled_cache: Optional[CompiledCacheType]
2934
2935 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
2936 _has_events: bool = False
2937 _connection_cls: Type[Connection] = Connection
2938 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
2939 _is_future: bool = False
2940
2941 _schema_translate_map: Optional[SchemaTranslateMapType] = None
2942 _option_cls: Type[OptionEngine]
2943
2944 dialect: Dialect
2945 pool: Pool
2946 url: URL
2947 hide_parameters: bool
2948
2949 def __init__(
2950 self,
2951 pool: Pool,
2952 dialect: Dialect,
2953 url: URL,
2954 logging_name: Optional[str] = None,
2955 echo: Optional[_EchoFlagType] = None,
2956 query_cache_size: int = 500,
2957 execution_options: Optional[Mapping[str, Any]] = None,
2958 hide_parameters: bool = False,
2959 ):
2960 self.pool = pool
2961 self.url = url
2962 self.dialect = dialect
2963 if logging_name:
2964 self.logging_name = logging_name
2965 self.echo = echo
2966 self.hide_parameters = hide_parameters
2967 if query_cache_size != 0:
2968 self._compiled_cache = util.LRUCache(
2969 query_cache_size, size_alert=self._lru_size_alert
2970 )
2971 else:
2972 self._compiled_cache = None
2973 log.instance_logger(self, echoflag=echo)
2974 if execution_options:
2975 self.update_execution_options(**execution_options)
2976
2977 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
2978 if self._should_log_info():
2979 self.logger.info(
2980 "Compiled cache size pruning from %d items to %d. "
2981 "Increase cache size to reduce the frequency of pruning.",
2982 len(cache),
2983 cache.capacity,
2984 )
2985
2986 @property
2987 def engine(self) -> Engine:
2988 """Returns this :class:`.Engine`.
2989
2990 Used for legacy schemes that accept :class:`.Connection` /
2991 :class:`.Engine` objects within the same variable.
2992
2993 """
2994 return self
2995
2996 def clear_compiled_cache(self) -> None:
2997 """Clear the compiled cache associated with the dialect.
2998
2999 This applies **only** to the built-in cache that is established
3000 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
3001 It will not impact any dictionary caches that were passed via the
3002 :paramref:`.Connection.execution_options.compiled_cache` parameter.
3003
3004 .. versionadded:: 1.4
3005
3006 """
3007 if self._compiled_cache:
3008 self._compiled_cache.clear()
3009
3010 def update_execution_options(self, **opt: Any) -> None:
3011 r"""Update the default execution_options dictionary
3012 of this :class:`_engine.Engine`.
3013
3014 The given keys/values in \**opt are added to the
3015 default execution options that will be used for
3016 all connections. The initial contents of this dictionary
3017 can be sent via the ``execution_options`` parameter
3018 to :func:`_sa.create_engine`.
3019
3020 .. seealso::
3021
3022 :meth:`_engine.Connection.execution_options`
3023
3024 :meth:`_engine.Engine.execution_options`
3025
3026 """
3027 self.dispatch.set_engine_execution_options(self, opt)
3028 self._execution_options = self._execution_options.union(opt)
3029 self.dialect.set_engine_execution_options(self, opt)
3030
3031 @overload
3032 def execution_options(
3033 self,
3034 *,
3035 compiled_cache: Optional[CompiledCacheType] = ...,
3036 logging_token: str = ...,
3037 isolation_level: IsolationLevel = ...,
3038 insertmanyvalues_page_size: int = ...,
3039 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
3040 **opt: Any,
3041 ) -> OptionEngine: ...
3042
3043 @overload
3044 def execution_options(self, **opt: Any) -> OptionEngine: ...
3045
3046 def execution_options(self, **opt: Any) -> OptionEngine:
3047 """Return a new :class:`_engine.Engine` that will provide
3048 :class:`_engine.Connection` objects with the given execution options.
3049
3050 The returned :class:`_engine.Engine` remains related to the original
3051 :class:`_engine.Engine` in that it shares the same connection pool and
3052 other state:
3053
3054 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
3055 is the
3056 same instance. The :meth:`_engine.Engine.dispose`
3057 method will replace
3058 the connection pool instance for the parent engine as well
3059 as this one.
3060 * Event listeners are "cascaded" - meaning, the new
3061 :class:`_engine.Engine`
3062 inherits the events of the parent, and new events can be associated
3063 with the new :class:`_engine.Engine` individually.
3064 * The logging configuration and logging_name is copied from the parent
3065 :class:`_engine.Engine`.
3066
3067 The intent of the :meth:`_engine.Engine.execution_options` method is
3068 to implement schemes where multiple :class:`_engine.Engine`
3069 objects refer to the same connection pool, but are differentiated
3070 by options that affect some execution-level behavior for each
3071 engine. One such example is breaking into separate "reader" and
3072 "writer" :class:`_engine.Engine` instances, where one
3073 :class:`_engine.Engine`
3074 has a lower :term:`isolation level` setting configured or is even
3075 transaction-disabled using "autocommit". An example of this
3076 configuration is at :ref:`dbapi_autocommit_multiple`.
3077
3078 Another example is one that
3079 uses a custom option ``shard_id`` which is consumed by an event
3080 to change the current schema on a database connection::
3081
3082 from sqlalchemy import event
3083 from sqlalchemy.engine import Engine
3084
3085 primary_engine = create_engine("mysql+mysqldb://")
3086 shard1 = primary_engine.execution_options(shard_id="shard1")
3087 shard2 = primary_engine.execution_options(shard_id="shard2")
3088
3089 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}
3090
3091
3092 @event.listens_for(Engine, "before_cursor_execute")
3093 def _switch_shard(conn, cursor, stmt, params, context, executemany):
3094 shard_id = conn.get_execution_options().get("shard_id", "default")
3095 current_shard = conn.info.get("current_shard", None)
3096
3097 if current_shard != shard_id:
3098 cursor.execute("use %s" % shards[shard_id])
3099 conn.info["current_shard"] = shard_id
3100
3101 The above recipe illustrates two :class:`_engine.Engine` objects that
3102 will each serve as factories for :class:`_engine.Connection` objects
3103 that have pre-established "shard_id" execution options present. A
3104 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler
3105 then interprets this execution option to emit a MySQL ``use`` statement
3106 to switch databases before a statement execution, while at the same
3107 time keeping track of which database we've established using the
3108 :attr:`_engine.Connection.info` dictionary.
3109
3110 .. seealso::
3111
3112 :meth:`_engine.Connection.execution_options`
3113 - update execution options
3114 on a :class:`_engine.Connection` object.
3115
3116 :meth:`_engine.Engine.update_execution_options`
3117 - update the execution
3118 options for a given :class:`_engine.Engine` in place.
3119
3120 :meth:`_engine.Engine.get_execution_options`
3121
3122
3123 """ # noqa: E501
3124 return self._option_cls(self, opt)
3125
3126 def get_execution_options(self) -> _ExecuteOptions:
3127 """Get the non-SQL options which will take effect during execution.
3128
3129 .. versionadded: 1.3
3130
3131 .. seealso::
3132
3133 :meth:`_engine.Engine.execution_options`
3134 """
3135 return self._execution_options
3136
3137 @property
3138 def name(self) -> str:
3139 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3140 in use by this :class:`Engine`.
3141
3142 """
3143
3144 return self.dialect.name
3145
3146 @property
3147 def driver(self) -> str:
3148 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3149 in use by this :class:`Engine`.
3150
3151 """
3152
3153 return self.dialect.driver
3154
3155 echo = log.echo_property()
3156
3157 def __repr__(self) -> str:
3158 return "Engine(%r)" % (self.url,)
3159
3160 def dispose(self, close: bool = True) -> None:
3161 """Dispose of the connection pool used by this
3162 :class:`_engine.Engine`.
3163
3164 A new connection pool is created immediately after the old one has been
3165 disposed. The previous connection pool is disposed either actively, by
3166 closing out all currently checked-in connections in that pool, or
3167 passively, by losing references to it but otherwise not closing any
3168 connections. The latter strategy is more appropriate for an initializer
3169 in a forked Python process.
3170
3171 :param close: if left at its default of ``True``, has the
3172 effect of fully closing all **currently checked in**
3173 database connections. Connections that are still checked out
3174 will **not** be closed, however they will no longer be associated
3175 with this :class:`_engine.Engine`,
3176 so when they are closed individually, eventually the
3177 :class:`_pool.Pool` which they are associated with will
3178 be garbage collected and they will be closed out fully, if
3179 not already closed on checkin.
3180
3181 If set to ``False``, the previous connection pool is de-referenced,
3182 and otherwise not touched in any way.
3183
3184 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
3185 parameter to allow the replacement of a connection pool in a child
3186 process without interfering with the connections used by the parent
3187 process.
3188
3189
3190 .. seealso::
3191
3192 :ref:`engine_disposal`
3193
3194 :ref:`pooling_multiprocessing`
3195
3196 """
3197 if close:
3198 self.pool.dispose()
3199 self.pool = self.pool.recreate()
3200 self.dispatch.engine_disposed(self)
3201
3202 @contextlib.contextmanager
3203 def _optional_conn_ctx_manager(
3204 self, connection: Optional[Connection] = None
3205 ) -> Iterator[Connection]:
3206 if connection is None:
3207 with self.connect() as conn:
3208 yield conn
3209 else:
3210 yield connection
3211
3212 @contextlib.contextmanager
3213 def begin(self) -> Iterator[Connection]:
3214 """Return a context manager delivering a :class:`_engine.Connection`
3215 with a :class:`.Transaction` established.
3216
3217 E.g.::
3218
3219 with engine.begin() as conn:
3220 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)"))
3221 conn.execute(text("my_special_procedure(5)"))
3222
3223 Upon successful operation, the :class:`.Transaction`
3224 is committed. If an error is raised, the :class:`.Transaction`
3225 is rolled back.
3226
3227 .. seealso::
3228
3229 :meth:`_engine.Engine.connect` - procure a
3230 :class:`_engine.Connection` from
3231 an :class:`_engine.Engine`.
3232
3233 :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
3234 for a particular :class:`_engine.Connection`.
3235
3236 """ # noqa: E501
3237 with self.connect() as conn:
3238 with conn.begin():
3239 yield conn
3240
3241 def _run_ddl_visitor(
3242 self,
3243 visitorcallable: Type[InvokeDDLBase],
3244 element: SchemaVisitable,
3245 **kwargs: Any,
3246 ) -> None:
3247 with self.begin() as conn:
3248 conn._run_ddl_visitor(visitorcallable, element, **kwargs)
3249
3250 def connect(self) -> Connection:
3251 """Return a new :class:`_engine.Connection` object.
3252
3253 The :class:`_engine.Connection` acts as a Python context manager, so
3254 the typical use of this method looks like::
3255
3256 with engine.connect() as connection:
3257 connection.execute(text("insert into table values ('foo')"))
3258 connection.commit()
3259
3260 Where above, after the block is completed, the connection is "closed"
3261 and its underlying DBAPI resources are returned to the connection pool.
3262 This also has the effect of rolling back any transaction that
3263 was explicitly begun or was begun via autobegin, and will
3264 emit the :meth:`_events.ConnectionEvents.rollback` event if one was
3265 started and is still in progress.
3266
3267 .. seealso::
3268
3269 :meth:`_engine.Engine.begin`
3270
3271 """
3272
3273 return self._connection_cls(self)
3274
3275 def raw_connection(self) -> PoolProxiedConnection:
3276 """Return a "raw" DBAPI connection from the connection pool.
3277
3278 The returned object is a proxied version of the DBAPI
3279 connection object used by the underlying driver in use.
3280 The object will have all the same behavior as the real DBAPI
3281 connection, except that its ``close()`` method will result in the
3282 connection being returned to the pool, rather than being closed
3283 for real.
3284
3285 This method provides direct DBAPI connection access for
3286 special situations when the API provided by
3287 :class:`_engine.Connection`
3288 is not needed. When a :class:`_engine.Connection` object is already
3289 present, the DBAPI connection is available using
3290 the :attr:`_engine.Connection.connection` accessor.
3291
3292 .. seealso::
3293
3294 :ref:`dbapi_connections`
3295
3296 """
3297 return self.pool.connect()
3298
3299
3300class OptionEngineMixin(log.Identified):
3301 _sa_propagate_class_events = False
3302
3303 dispatch: dispatcher[ConnectionEventsTarget]
3304 _compiled_cache: Optional[CompiledCacheType]
3305 dialect: Dialect
3306 pool: Pool
3307 url: URL
3308 hide_parameters: bool
3309 echo: log.echo_property
3310
3311 def __init__(
3312 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter
3313 ):
3314 self._proxied = proxied
3315 self.url = proxied.url
3316 self.dialect = proxied.dialect
3317 self.logging_name = proxied.logging_name
3318 self.echo = proxied.echo
3319 self._compiled_cache = proxied._compiled_cache
3320 self.hide_parameters = proxied.hide_parameters
3321 log.instance_logger(self, echoflag=self.echo)
3322
3323 # note: this will propagate events that are assigned to the parent
3324 # engine after this OptionEngine is created. Since we share
3325 # the events of the parent we also disallow class-level events
3326 # to apply to the OptionEngine class directly.
3327 #
3328 # the other way this can work would be to transfer existing
3329 # events only, using:
3330 # self.dispatch._update(proxied.dispatch)
3331 #
3332 # that might be more appropriate however it would be a behavioral
3333 # change for logic that assigns events to the parent engine and
3334 # would like it to take effect for the already-created sub-engine.
3335 self.dispatch = self.dispatch._join(proxied.dispatch)
3336
3337 self._execution_options = proxied._execution_options
3338 self.update_execution_options(**execution_options)
3339
3340 def update_execution_options(self, **opt: Any) -> None:
3341 raise NotImplementedError()
3342
3343 if not typing.TYPE_CHECKING:
3344 # https://github.com/python/typing/discussions/1095
3345
3346 @property
3347 def pool(self) -> Pool:
3348 return self._proxied.pool
3349
3350 @pool.setter
3351 def pool(self, pool: Pool) -> None:
3352 self._proxied.pool = pool
3353
3354 @property
3355 def _has_events(self) -> bool:
3356 return self._proxied._has_events or self.__dict__.get(
3357 "_has_events", False
3358 )
3359
3360 @_has_events.setter
3361 def _has_events(self, value: bool) -> None:
3362 self.__dict__["_has_events"] = value
3363
3364
3365class OptionEngine(OptionEngineMixin, Engine):
3366 def update_execution_options(self, **opt: Any) -> None:
3367 Engine.update_execution_options(self, **opt)
3368
3369
3370Engine._option_cls = OptionEngine