1# engine/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`."""
8from __future__ import annotations
9
10import contextlib
11import sys
12import typing
13from typing import Any
14from typing import Callable
15from typing import cast
16from typing import Iterable
17from typing import Iterator
18from typing import List
19from typing import Mapping
20from typing import NoReturn
21from typing import Optional
22from typing import overload
23from typing import Tuple
24from typing import Type
25from typing import TypeVar
26from typing import Union
27
28from .interfaces import BindTyping
29from .interfaces import ConnectionEventsTarget
30from .interfaces import DBAPICursor
31from .interfaces import ExceptionContext
32from .interfaces import ExecuteStyle
33from .interfaces import ExecutionContext
34from .interfaces import IsolationLevel
35from .util import _distill_params_20
36from .util import _distill_raw_params
37from .util import TransactionalContext
38from .. import exc
39from .. import inspection
40from .. import log
41from .. import util
42from ..sql import compiler
43from ..sql import util as sql_util
44from ..util.typing import TupleAny
45from ..util.typing import TypeVarTuple
46from ..util.typing import Unpack
47
48if typing.TYPE_CHECKING:
49 from . import CursorResult
50 from . import ScalarResult
51 from .interfaces import _AnyExecuteParams
52 from .interfaces import _AnyMultiExecuteParams
53 from .interfaces import _CoreAnyExecuteParams
54 from .interfaces import _CoreMultiExecuteParams
55 from .interfaces import _CoreSingleExecuteParams
56 from .interfaces import _DBAPIAnyExecuteParams
57 from .interfaces import _DBAPISingleExecuteParams
58 from .interfaces import _ExecuteOptions
59 from .interfaces import CompiledCacheType
60 from .interfaces import CoreExecuteOptionsParameter
61 from .interfaces import Dialect
62 from .interfaces import SchemaTranslateMapType
63 from .reflection import Inspector # noqa
64 from .url import URL
65 from ..event import dispatcher
66 from ..log import _EchoFlagType
67 from ..pool import _ConnectionFairy
68 from ..pool import Pool
69 from ..pool import PoolProxiedConnection
70 from ..sql import Executable
71 from ..sql._typing import _InfoType
72 from ..sql.compiler import Compiled
73 from ..sql.ddl import ExecutableDDLElement
74 from ..sql.ddl import InvokeDDLBase
75 from ..sql.functions import FunctionElement
76 from ..sql.schema import DefaultGenerator
77 from ..sql.schema import HasSchemaAttr
78 from ..sql.schema import SchemaVisitable
79 from ..sql.selectable import TypedReturnsRows
80
81
82_T = TypeVar("_T", bound=Any)
83_Ts = TypeVarTuple("_Ts")
84_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT
85NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT
86
87
88class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
89 """Provides high-level functionality for a wrapped DB-API connection.
90
91 The :class:`_engine.Connection` object is procured by calling the
92 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
93 object, and provides services for execution of SQL statements as well
94 as transaction control.
95
96 The Connection object is **not** thread-safe. While a Connection can be
97 shared among threads using properly synchronized access, it is still
98 possible that the underlying DBAPI connection may not support shared
99 access between threads. Check the DBAPI documentation for details.
100
101 The Connection object represents a single DBAPI connection checked out
102 from the connection pool. In this state, the connection pool has no
103 affect upon the connection, including its expiration or timeout state.
104 For the connection pool to properly manage connections, connections
105 should be returned to the connection pool (i.e. ``connection.close()``)
106 whenever the connection is not in use.
107
108 .. index::
109 single: thread safety; Connection
110
111 """
112
113 dialect: Dialect
114 dispatch: dispatcher[ConnectionEventsTarget]
115
116 _sqla_logger_namespace = "sqlalchemy.engine.Connection"
117
118 # used by sqlalchemy.engine.util.TransactionalContext
119 _trans_context_manager: Optional[TransactionalContext] = None
120
121 # legacy as of 2.0, should be eventually deprecated and
122 # removed. was used in the "pre_ping" recipe that's been in the docs
123 # a long time
124 should_close_with_result = False
125
126 _dbapi_connection: Optional[PoolProxiedConnection]
127
128 _execution_options: _ExecuteOptions
129
130 _transaction: Optional[RootTransaction]
131 _nested_transaction: Optional[NestedTransaction]
132
133 def __init__(
134 self,
135 engine: Engine,
136 connection: Optional[PoolProxiedConnection] = None,
137 _has_events: Optional[bool] = None,
138 _allow_revalidate: bool = True,
139 _allow_autobegin: bool = True,
140 ):
141 """Construct a new Connection."""
142 self.engine = engine
143 self.dialect = dialect = engine.dialect
144
145 if connection is None:
146 try:
147 self._dbapi_connection = engine.raw_connection()
148 except dialect.loaded_dbapi.Error as err:
149 Connection._handle_dbapi_exception_noconnection(
150 err, dialect, engine
151 )
152 raise
153 else:
154 self._dbapi_connection = connection
155
156 self._transaction = self._nested_transaction = None
157 self.__savepoint_seq = 0
158 self.__in_begin = False
159
160 self.__can_reconnect = _allow_revalidate
161 self._allow_autobegin = _allow_autobegin
162 self._echo = self.engine._should_log_info()
163
164 if _has_events is None:
165 # if _has_events is sent explicitly as False,
166 # then don't join the dispatch of the engine; we don't
167 # want to handle any of the engine's events in that case.
168 self.dispatch = self.dispatch._join(engine.dispatch)
169 self._has_events = _has_events or (
170 _has_events is None and engine._has_events
171 )
172
173 self._execution_options = engine._execution_options
174
175 if self._has_events or self.engine._has_events:
176 self.dispatch.engine_connect(self)
177
178 # this can be assigned differently via
179 # characteristics.LoggingTokenCharacteristic
180 _message_formatter: Any = None
181
182 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None:
183 fmt = self._message_formatter
184
185 if fmt:
186 message = fmt(message)
187
188 if log.STACKLEVEL:
189 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
190
191 self.engine.logger.info(message, *arg, **kw)
192
193 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None:
194 fmt = self._message_formatter
195
196 if fmt:
197 message = fmt(message)
198
199 if log.STACKLEVEL:
200 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
201
202 self.engine.logger.debug(message, *arg, **kw)
203
204 @property
205 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]:
206 schema_translate_map: Optional[SchemaTranslateMapType] = (
207 self._execution_options.get("schema_translate_map", None)
208 )
209
210 return schema_translate_map
211
212 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]:
213 """Return the schema name for the given schema item taking into
214 account current schema translate map.
215
216 """
217
218 name = obj.schema
219 schema_translate_map: Optional[SchemaTranslateMapType] = (
220 self._execution_options.get("schema_translate_map", None)
221 )
222
223 if (
224 schema_translate_map
225 and name in schema_translate_map
226 and obj._use_schema_map
227 ):
228 return schema_translate_map[name]
229 else:
230 return name
231
232 def __enter__(self) -> Connection:
233 return self
234
235 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
236 self.close()
237
238 @overload
239 def execution_options(
240 self,
241 *,
242 compiled_cache: Optional[CompiledCacheType] = ...,
243 logging_token: str = ...,
244 isolation_level: IsolationLevel = ...,
245 no_parameters: bool = False,
246 stream_results: bool = False,
247 max_row_buffer: int = ...,
248 yield_per: int = ...,
249 insertmanyvalues_page_size: int = ...,
250 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
251 preserve_rowcount: bool = False,
252 driver_column_names: bool = False,
253 **opt: Any,
254 ) -> Connection: ...
255
256 @overload
257 def execution_options(self, **opt: Any) -> Connection: ...
258
259 def execution_options(self, **opt: Any) -> Connection:
260 r"""Set non-SQL options for the connection which take effect
261 during execution.
262
263 This method modifies this :class:`_engine.Connection` **in-place**;
264 the return value is the same :class:`_engine.Connection` object
265 upon which the method is called. Note that this is in contrast
266 to the behavior of the ``execution_options`` methods on other
267 objects such as :meth:`_engine.Engine.execution_options` and
268 :meth:`_sql.Executable.execution_options`. The rationale is that many
269 such execution options necessarily modify the state of the base
270 DBAPI connection in any case so there is no feasible means of
271 keeping the effect of such an option localized to a "sub" connection.
272
273 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options`
274 method, in contrast to other objects with this method, modifies
275 the connection in-place without creating copy of it.
276
277 As discussed elsewhere, the :meth:`_engine.Connection.execution_options`
278 method accepts any arbitrary parameters including user defined names.
279 All parameters given are consumable in a number of ways including
280 by using the :meth:`_engine.Connection.get_execution_options` method.
281 See the examples at :meth:`_sql.Executable.execution_options`
282 and :meth:`_engine.Engine.execution_options`.
283
284 The keywords that are currently recognized by SQLAlchemy itself
285 include all those listed under :meth:`.Executable.execution_options`,
286 as well as others that are specific to :class:`_engine.Connection`.
287
288 :param compiled_cache: Available on: :class:`_engine.Connection`,
289 :class:`_engine.Engine`.
290
291 A dictionary where :class:`.Compiled` objects
292 will be cached when the :class:`_engine.Connection`
293 compiles a clause
294 expression into a :class:`.Compiled` object. This dictionary will
295 supersede the statement cache that may be configured on the
296 :class:`_engine.Engine` itself. If set to None, caching
297 is disabled, even if the engine has a configured cache size.
298
299 Note that the ORM makes use of its own "compiled" caches for
300 some operations, including flush operations. The caching
301 used by the ORM internally supersedes a cache dictionary
302 specified here.
303
304 :param logging_token: Available on: :class:`_engine.Connection`,
305 :class:`_engine.Engine`, :class:`_sql.Executable`.
306
307 Adds the specified string token surrounded by brackets in log
308 messages logged by the connection, i.e. the logging that's enabled
309 either via the :paramref:`_sa.create_engine.echo` flag or via the
310 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
311 per-connection or per-sub-engine token to be available which is
312 useful for debugging concurrent connection scenarios.
313
314 .. versionadded:: 1.4.0b2
315
316 .. seealso::
317
318 :ref:`dbengine_logging_tokens` - usage example
319
320 :paramref:`_sa.create_engine.logging_name` - adds a name to the
321 name used by the Python logger object itself.
322
323 :param isolation_level: Available on: :class:`_engine.Connection`,
324 :class:`_engine.Engine`.
325
326 Set the transaction isolation level for the lifespan of this
327 :class:`_engine.Connection` object.
328 Valid values include those string
329 values accepted by the :paramref:`_sa.create_engine.isolation_level`
330 parameter passed to :func:`_sa.create_engine`. These levels are
331 semi-database specific; see individual dialect documentation for
332 valid levels.
333
334 The isolation level option applies the isolation level by emitting
335 statements on the DBAPI connection, and **necessarily affects the
336 original Connection object overall**. The isolation level will remain
337 at the given setting until explicitly changed, or when the DBAPI
338 connection itself is :term:`released` to the connection pool, i.e. the
339 :meth:`_engine.Connection.close` method is called, at which time an
340 event handler will emit additional statements on the DBAPI connection
341 in order to revert the isolation level change.
342
343 .. note:: The ``isolation_level`` execution option may only be
344 established before the :meth:`_engine.Connection.begin` method is
345 called, as well as before any SQL statements are emitted which
346 would otherwise trigger "autobegin", or directly after a call to
347 :meth:`_engine.Connection.commit` or
348 :meth:`_engine.Connection.rollback`. A database cannot change the
349 isolation level on a transaction in progress.
350
351 .. note:: The ``isolation_level`` execution option is implicitly
352 reset if the :class:`_engine.Connection` is invalidated, e.g. via
353 the :meth:`_engine.Connection.invalidate` method, or if a
354 disconnection error occurs. The new connection produced after the
355 invalidation will **not** have the selected isolation level
356 re-applied to it automatically.
357
358 .. seealso::
359
360 :ref:`dbapi_autocommit`
361
362 :meth:`_engine.Connection.get_isolation_level`
363 - view current actual level
364
365 :param no_parameters: Available on: :class:`_engine.Connection`,
366 :class:`_sql.Executable`.
367
368 When ``True``, if the final parameter
369 list or dictionary is totally empty, will invoke the
370 statement on the cursor as ``cursor.execute(statement)``,
371 not passing the parameter collection at all.
372 Some DBAPIs such as psycopg2 and mysql-python consider
373 percent signs as significant only when parameters are
374 present; this option allows code to generate SQL
375 containing percent signs (and possibly other characters)
376 that is neutral regarding whether it's executed by the DBAPI
377 or piped into a script that's later invoked by
378 command line tools.
379
380 :param stream_results: Available on: :class:`_engine.Connection`,
381 :class:`_sql.Executable`.
382
383 Indicate to the dialect that results should be "streamed" and not
384 pre-buffered, if possible. For backends such as PostgreSQL, MySQL
385 and MariaDB, this indicates the use of a "server side cursor" as
386 opposed to a client side cursor. Other backends such as that of
387 Oracle Database may already use server side cursors by default.
388
389 The usage of
390 :paramref:`_engine.Connection.execution_options.stream_results` is
391 usually combined with setting a fixed number of rows to to be fetched
392 in batches, to allow for efficient iteration of database rows while
393 at the same time not loading all result rows into memory at once;
394 this can be configured on a :class:`_engine.Result` object using the
395 :meth:`_engine.Result.yield_per` method, after execution has
396 returned a new :class:`_engine.Result`. If
397 :meth:`_engine.Result.yield_per` is not used,
398 the :paramref:`_engine.Connection.execution_options.stream_results`
399 mode of operation will instead use a dynamically sized buffer
400 which buffers sets of rows at a time, growing on each batch
401 based on a fixed growth size up until a limit which may
402 be configured using the
403 :paramref:`_engine.Connection.execution_options.max_row_buffer`
404 parameter.
405
406 When using the ORM to fetch ORM mapped objects from a result,
407 :meth:`_engine.Result.yield_per` should always be used with
408 :paramref:`_engine.Connection.execution_options.stream_results`,
409 so that the ORM does not fetch all rows into new ORM objects at once.
410
411 For typical use, the
412 :paramref:`_engine.Connection.execution_options.yield_per` execution
413 option should be preferred, which sets up both
414 :paramref:`_engine.Connection.execution_options.stream_results` and
415 :meth:`_engine.Result.yield_per` at once. This option is supported
416 both at a core level by :class:`_engine.Connection` as well as by the
417 ORM :class:`_engine.Session`; the latter is described at
418 :ref:`orm_queryguide_yield_per`.
419
420 .. seealso::
421
422 :ref:`engine_stream_results` - background on
423 :paramref:`_engine.Connection.execution_options.stream_results`
424
425 :paramref:`_engine.Connection.execution_options.max_row_buffer`
426
427 :paramref:`_engine.Connection.execution_options.yield_per`
428
429 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
430 describing the ORM version of ``yield_per``
431
432 :param max_row_buffer: Available on: :class:`_engine.Connection`,
433 :class:`_sql.Executable`. Sets a maximum
434 buffer size to use when the
435 :paramref:`_engine.Connection.execution_options.stream_results`
436 execution option is used on a backend that supports server side
437 cursors. The default value if not specified is 1000.
438
439 .. seealso::
440
441 :paramref:`_engine.Connection.execution_options.stream_results`
442
443 :ref:`engine_stream_results`
444
445
446 :param yield_per: Available on: :class:`_engine.Connection`,
447 :class:`_sql.Executable`. Integer value applied which will
448 set the :paramref:`_engine.Connection.execution_options.stream_results`
449 execution option and invoke :meth:`_engine.Result.yield_per`
450 automatically at once. Allows equivalent functionality as
451 is present when using this parameter with the ORM.
452
453 .. versionadded:: 1.4.40
454
455 .. seealso::
456
457 :ref:`engine_stream_results` - background and examples
458 on using server side cursors with Core.
459
460 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
461 describing the ORM version of ``yield_per``
462
463 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`,
464 :class:`_engine.Engine`. Number of rows to format into an
465 INSERT statement when the statement uses "insertmanyvalues" mode,
466 which is a paged form of bulk insert that is used for many backends
467 when using :term:`executemany` execution typically in conjunction
468 with RETURNING. Defaults to 1000. May also be modified on a
469 per-engine basis using the
470 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter.
471
472 .. versionadded:: 2.0
473
474 .. seealso::
475
476 :ref:`engine_insertmanyvalues`
477
478 :param schema_translate_map: Available on: :class:`_engine.Connection`,
479 :class:`_engine.Engine`, :class:`_sql.Executable`.
480
481 A dictionary mapping schema names to schema names, that will be
482 applied to the :paramref:`_schema.Table.schema` element of each
483 :class:`_schema.Table`
484 encountered when SQL or DDL expression elements
485 are compiled into strings; the resulting schema name will be
486 converted based on presence in the map of the original name.
487
488 .. seealso::
489
490 :ref:`schema_translating`
491
492 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount``
493 attribute will be unconditionally memoized within the result and
494 made available via the :attr:`.CursorResult.rowcount` attribute.
495 Normally, this attribute is only preserved for UPDATE and DELETE
496 statements. Using this option, the DBAPIs rowcount value can
497 be accessed for other kinds of statements such as INSERT and SELECT,
498 to the degree that the DBAPI supports these statements. See
499 :attr:`.CursorResult.rowcount` for notes regarding the behavior
500 of this attribute.
501
502 .. versionadded:: 2.0.28
503
504 .. seealso::
505
506 :meth:`_engine.Engine.execution_options`
507
508 :meth:`.Executable.execution_options`
509
510 :meth:`_engine.Connection.get_execution_options`
511
512 :ref:`orm_queryguide_execution_options` - documentation on all
513 ORM-specific execution options
514
515 :param driver_column_names: When True, the returned
516 :class:`_engine.CursorResult` will use the column names as written in
517 ``cursor.description`` to set up the keys for the result set,
518 including the names of columns for the :class:`_engine.Row` object as
519 well as the dictionary keys when using :attr:`_engine.Row._mapping`.
520 On backends that use "name normalization" such as Oracle Database to
521 correct for lower case names being converted to all uppercase, this
522 behavior is turned off and the raw UPPERCASE names in
523 cursor.description will be present.
524
525 .. versionadded:: 2.1
526
527 """ # noqa
528 if self._has_events or self.engine._has_events:
529 self.dispatch.set_connection_execution_options(self, opt)
530 self._execution_options = self._execution_options.union(opt)
531 self.dialect.set_connection_execution_options(self, opt)
532 return self
533
534 def get_execution_options(self) -> _ExecuteOptions:
535 """Get the non-SQL options which will take effect during execution.
536
537 .. seealso::
538
539 :meth:`_engine.Connection.execution_options`
540 """
541 return self._execution_options
542
543 @property
544 def _still_open_and_dbapi_connection_is_valid(self) -> bool:
545 pool_proxied_connection = self._dbapi_connection
546 return (
547 pool_proxied_connection is not None
548 and pool_proxied_connection.is_valid
549 )
550
551 @property
552 def closed(self) -> bool:
553 """Return True if this connection is closed."""
554
555 return self._dbapi_connection is None and not self.__can_reconnect
556
557 @property
558 def invalidated(self) -> bool:
559 """Return True if this connection was invalidated.
560
561 This does not indicate whether or not the connection was
562 invalidated at the pool level, however
563
564 """
565
566 # prior to 1.4, "invalid" was stored as a state independent of
567 # "closed", meaning an invalidated connection could be "closed",
568 # the _dbapi_connection would be None and closed=True, yet the
569 # "invalid" flag would stay True. This meant that there were
570 # three separate states (open/valid, closed/valid, closed/invalid)
571 # when there is really no reason for that; a connection that's
572 # "closed" does not need to be "invalid". So the state is now
573 # represented by the two facts alone.
574
575 pool_proxied_connection = self._dbapi_connection
576 return pool_proxied_connection is None and self.__can_reconnect
577
578 @property
579 def connection(self) -> PoolProxiedConnection:
580 """The underlying DB-API connection managed by this Connection.
581
582 This is a SQLAlchemy connection-pool proxied connection
583 which then has the attribute
584 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
585 actual driver connection.
586
587 .. seealso::
588
589
590 :ref:`dbapi_connections`
591
592 """
593
594 if self._dbapi_connection is None:
595 try:
596 return self._revalidate_connection()
597 except (exc.PendingRollbackError, exc.ResourceClosedError):
598 raise
599 except BaseException as e:
600 self._handle_dbapi_exception(e, None, None, None, None)
601 else:
602 return self._dbapi_connection
603
604 def get_isolation_level(self) -> IsolationLevel:
605 """Return the current **actual** isolation level that's present on
606 the database within the scope of this connection.
607
608 This attribute will perform a live SQL operation against the database
609 in order to procure the current isolation level, so the value returned
610 is the actual level on the underlying DBAPI connection regardless of
611 how this state was set. This will be one of the four actual isolation
612 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
613 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
614 level setting. Third party dialects may also feature additional
615 isolation level settings.
616
617 .. note:: This method **will not report** on the ``AUTOCOMMIT``
618 isolation level, which is a separate :term:`dbapi` setting that's
619 independent of **actual** isolation level. When ``AUTOCOMMIT`` is
620 in use, the database connection still has a "traditional" isolation
621 mode in effect, that is typically one of the four values
622 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
623 ``SERIALIZABLE``.
624
625 Compare to the :attr:`_engine.Connection.default_isolation_level`
626 accessor which returns the isolation level that is present on the
627 database at initial connection time.
628
629 .. seealso::
630
631 :attr:`_engine.Connection.default_isolation_level`
632 - view default level
633
634 :paramref:`_sa.create_engine.isolation_level`
635 - set per :class:`_engine.Engine` isolation level
636
637 :paramref:`.Connection.execution_options.isolation_level`
638 - set per :class:`_engine.Connection` isolation level
639
640 """
641 dbapi_connection = self.connection.dbapi_connection
642 assert dbapi_connection is not None
643 try:
644 return self.dialect.get_isolation_level(dbapi_connection)
645 except BaseException as e:
646 self._handle_dbapi_exception(e, None, None, None, None)
647
648 @property
649 def default_isolation_level(self) -> Optional[IsolationLevel]:
650 """The initial-connection time isolation level associated with the
651 :class:`_engine.Dialect` in use.
652
653 This value is independent of the
654 :paramref:`.Connection.execution_options.isolation_level` and
655 :paramref:`.Engine.execution_options.isolation_level` execution
656 options, and is determined by the :class:`_engine.Dialect` when the
657 first connection is created, by performing a SQL query against the
658 database for the current isolation level before any additional commands
659 have been emitted.
660
661 Calling this accessor does not invoke any new SQL queries.
662
663 .. seealso::
664
665 :meth:`_engine.Connection.get_isolation_level`
666 - view current actual isolation level
667
668 :paramref:`_sa.create_engine.isolation_level`
669 - set per :class:`_engine.Engine` isolation level
670
671 :paramref:`.Connection.execution_options.isolation_level`
672 - set per :class:`_engine.Connection` isolation level
673
674 """
675 return self.dialect.default_isolation_level
676
677 def _invalid_transaction(self) -> NoReturn:
678 raise exc.PendingRollbackError(
679 "Can't reconnect until invalid %stransaction is rolled "
680 "back. Please rollback() fully before proceeding"
681 % ("savepoint " if self._nested_transaction is not None else ""),
682 code="8s2b",
683 )
684
685 def _revalidate_connection(self) -> PoolProxiedConnection:
686 if self.__can_reconnect and self.invalidated:
687 if self._transaction is not None:
688 self._invalid_transaction()
689 self._dbapi_connection = self.engine.raw_connection()
690 return self._dbapi_connection
691 raise exc.ResourceClosedError("This Connection is closed")
692
693 @property
694 def info(self) -> _InfoType:
695 """Info dictionary associated with the underlying DBAPI connection
696 referred to by this :class:`_engine.Connection`, allowing user-defined
697 data to be associated with the connection.
698
699 The data here will follow along with the DBAPI connection including
700 after it is returned to the connection pool and used again
701 in subsequent instances of :class:`_engine.Connection`.
702
703 """
704
705 return self.connection.info
706
707 def invalidate(self, exception: Optional[BaseException] = None) -> None:
708 """Invalidate the underlying DBAPI connection associated with
709 this :class:`_engine.Connection`.
710
711 An attempt will be made to close the underlying DBAPI connection
712 immediately; however if this operation fails, the error is logged
713 but not raised. The connection is then discarded whether or not
714 close() succeeded.
715
716 Upon the next use (where "use" typically means using the
717 :meth:`_engine.Connection.execute` method or similar),
718 this :class:`_engine.Connection` will attempt to
719 procure a new DBAPI connection using the services of the
720 :class:`_pool.Pool` as a source of connectivity (e.g.
721 a "reconnection").
722
723 If a transaction was in progress (e.g. the
724 :meth:`_engine.Connection.begin` method has been called) when
725 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
726 level all state associated with this transaction is lost, as
727 the DBAPI connection is closed. The :class:`_engine.Connection`
728 will not allow a reconnection to proceed until the
729 :class:`.Transaction` object is ended, by calling the
730 :meth:`.Transaction.rollback` method; until that point, any attempt at
731 continuing to use the :class:`_engine.Connection` will raise an
732 :class:`~sqlalchemy.exc.InvalidRequestError`.
733 This is to prevent applications from accidentally
734 continuing an ongoing transactional operations despite the
735 fact that the transaction has been lost due to an
736 invalidation.
737
738 The :meth:`_engine.Connection.invalidate` method,
739 just like auto-invalidation,
740 will at the connection pool level invoke the
741 :meth:`_events.PoolEvents.invalidate` event.
742
743 :param exception: an optional ``Exception`` instance that's the
744 reason for the invalidation. is passed along to event handlers
745 and logging functions.
746
747 .. seealso::
748
749 :ref:`pool_connection_invalidation`
750
751 """
752
753 if self.invalidated:
754 return
755
756 if self.closed:
757 raise exc.ResourceClosedError("This Connection is closed")
758
759 if self._still_open_and_dbapi_connection_is_valid:
760 pool_proxied_connection = self._dbapi_connection
761 assert pool_proxied_connection is not None
762 pool_proxied_connection.invalidate(exception)
763
764 self._dbapi_connection = None
765
766 def detach(self) -> None:
767 """Detach the underlying DB-API connection from its connection pool.
768
769 E.g.::
770
771 with engine.connect() as conn:
772 conn.detach()
773 conn.execute(text("SET search_path TO schema1, schema2"))
774
775 # work with connection
776
777 # connection is fully closed (since we used "with:", can
778 # also call .close())
779
780 This :class:`_engine.Connection` instance will remain usable.
781 When closed
782 (or exited from a context manager context as above),
783 the DB-API connection will be literally closed and not
784 returned to its originating pool.
785
786 This method can be used to insulate the rest of an application
787 from a modified state on a connection (such as a transaction
788 isolation level or similar).
789
790 """
791
792 if self.closed:
793 raise exc.ResourceClosedError("This Connection is closed")
794
795 pool_proxied_connection = self._dbapi_connection
796 if pool_proxied_connection is None:
797 raise exc.InvalidRequestError(
798 "Can't detach an invalidated Connection"
799 )
800 pool_proxied_connection.detach()
801
802 def _autobegin(self) -> None:
803 if self._allow_autobegin and not self.__in_begin:
804 self.begin()
805
806 def begin(self) -> RootTransaction:
807 """Begin a transaction prior to autobegin occurring.
808
809 E.g.::
810
811 with engine.connect() as conn:
812 with conn.begin() as trans:
813 conn.execute(table.insert(), {"username": "sandy"})
814
815 The returned object is an instance of :class:`_engine.RootTransaction`.
816 This object represents the "scope" of the transaction,
817 which completes when either the :meth:`_engine.Transaction.rollback`
818 or :meth:`_engine.Transaction.commit` method is called; the object
819 also works as a context manager as illustrated above.
820
821 The :meth:`_engine.Connection.begin` method begins a
822 transaction that normally will be begun in any case when the connection
823 is first used to execute a statement. The reason this method might be
824 used would be to invoke the :meth:`_events.ConnectionEvents.begin`
825 event at a specific time, or to organize code within the scope of a
826 connection checkout in terms of context managed blocks, such as::
827
828 with engine.connect() as conn:
829 with conn.begin():
830 conn.execute(...)
831 conn.execute(...)
832
833 with conn.begin():
834 conn.execute(...)
835 conn.execute(...)
836
837 The above code is not fundamentally any different in its behavior than
838 the following code which does not use
839 :meth:`_engine.Connection.begin`; the below style is known
840 as "commit as you go" style::
841
842 with engine.connect() as conn:
843 conn.execute(...)
844 conn.execute(...)
845 conn.commit()
846
847 conn.execute(...)
848 conn.execute(...)
849 conn.commit()
850
851 From a database point of view, the :meth:`_engine.Connection.begin`
852 method does not emit any SQL or change the state of the underlying
853 DBAPI connection in any way; the Python DBAPI does not have any
854 concept of explicit transaction begin.
855
856 .. seealso::
857
858 :ref:`tutorial_working_with_transactions` - in the
859 :ref:`unified_tutorial`
860
861 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
862
863 :meth:`_engine.Connection.begin_twophase` -
864 use a two phase /XID transaction
865
866 :meth:`_engine.Engine.begin` - context manager available from
867 :class:`_engine.Engine`
868
869 """
870 if self._transaction is None:
871 self._transaction = RootTransaction(self)
872 return self._transaction
873 else:
874 raise exc.InvalidRequestError(
875 "This connection has already initialized a SQLAlchemy "
876 "Transaction() object via begin() or autobegin; can't "
877 "call begin() here unless rollback() or commit() "
878 "is called first."
879 )
880
881 def begin_nested(self) -> NestedTransaction:
882 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction
883 handle that controls the scope of the SAVEPOINT.
884
885 E.g.::
886
887 with engine.begin() as connection:
888 with connection.begin_nested():
889 connection.execute(table.insert(), {"username": "sandy"})
890
891 The returned object is an instance of
892 :class:`_engine.NestedTransaction`, which includes transactional
893 methods :meth:`_engine.NestedTransaction.commit` and
894 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction,
895 these methods correspond to the operations "RELEASE SAVEPOINT <name>"
896 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local
897 to the :class:`_engine.NestedTransaction` object and is generated
898 automatically. Like any other :class:`_engine.Transaction`, the
899 :class:`_engine.NestedTransaction` may be used as a context manager as
900 illustrated above which will "release" or "rollback" corresponding to
901 if the operation within the block were successful or raised an
902 exception.
903
904 Nested transactions require SAVEPOINT support in the underlying
905 database, else the behavior is undefined. SAVEPOINT is commonly used to
906 run operations within a transaction that may fail, while continuing the
907 outer transaction. E.g.::
908
909 from sqlalchemy import exc
910
911 with engine.begin() as connection:
912 trans = connection.begin_nested()
913 try:
914 connection.execute(table.insert(), {"username": "sandy"})
915 trans.commit()
916 except exc.IntegrityError: # catch for duplicate username
917 trans.rollback() # rollback to savepoint
918
919 # outer transaction continues
920 connection.execute(...)
921
922 If :meth:`_engine.Connection.begin_nested` is called without first
923 calling :meth:`_engine.Connection.begin` or
924 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object
925 will "autobegin" the outer transaction first. This outer transaction
926 may be committed using "commit-as-you-go" style, e.g.::
927
928 with engine.connect() as connection: # begin() wasn't called
929
930 with connection.begin_nested(): # will auto-"begin()" first
931 connection.execute(...)
932 # savepoint is released
933
934 connection.execute(...)
935
936 # explicitly commit outer transaction
937 connection.commit()
938
939 # can continue working with connection here
940
941 .. versionchanged:: 2.0
942
943 :meth:`_engine.Connection.begin_nested` will now participate
944 in the connection "autobegin" behavior that is new as of
945 2.0 / "future" style connections in 1.4.
946
947 .. seealso::
948
949 :meth:`_engine.Connection.begin`
950
951 :ref:`session_begin_nested` - ORM support for SAVEPOINT
952
953 """
954 if self._transaction is None:
955 self._autobegin()
956
957 return NestedTransaction(self)
958
959 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction:
960 """Begin a two-phase or XA transaction and return a transaction
961 handle.
962
963 The returned object is an instance of :class:`.TwoPhaseTransaction`,
964 which in addition to the methods provided by
965 :class:`.Transaction`, also provides a
966 :meth:`~.TwoPhaseTransaction.prepare` method.
967
968 :param xid: the two phase transaction id. If not supplied, a
969 random id will be generated.
970
971 .. seealso::
972
973 :meth:`_engine.Connection.begin`
974
975 :meth:`_engine.Connection.begin_twophase`
976
977 """
978
979 if self._transaction is not None:
980 raise exc.InvalidRequestError(
981 "Cannot start a two phase transaction when a transaction "
982 "is already in progress."
983 )
984 if xid is None:
985 xid = self.engine.dialect.create_xid()
986 return TwoPhaseTransaction(self, xid)
987
988 def commit(self) -> None:
989 """Commit the transaction that is currently in progress.
990
991 This method commits the current transaction if one has been started.
992 If no transaction was started, the method has no effect, assuming
993 the connection is in a non-invalidated state.
994
995 A transaction is begun on a :class:`_engine.Connection` automatically
996 whenever a statement is first executed, or when the
997 :meth:`_engine.Connection.begin` method is called.
998
999 .. note:: The :meth:`_engine.Connection.commit` method only acts upon
1000 the primary database transaction that is linked to the
1001 :class:`_engine.Connection` object. It does not operate upon a
1002 SAVEPOINT that would have been invoked from the
1003 :meth:`_engine.Connection.begin_nested` method; for control of a
1004 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
1005 :class:`_engine.NestedTransaction` that is returned by the
1006 :meth:`_engine.Connection.begin_nested` method itself.
1007
1008
1009 """
1010 if self._transaction:
1011 self._transaction.commit()
1012
1013 def rollback(self) -> None:
1014 """Roll back the transaction that is currently in progress.
1015
1016 This method rolls back the current transaction if one has been started.
1017 If no transaction was started, the method has no effect. If a
1018 transaction was started and the connection is in an invalidated state,
1019 the transaction is cleared using this method.
1020
1021 A transaction is begun on a :class:`_engine.Connection` automatically
1022 whenever a statement is first executed, or when the
1023 :meth:`_engine.Connection.begin` method is called.
1024
1025 .. note:: The :meth:`_engine.Connection.rollback` method only acts
1026 upon the primary database transaction that is linked to the
1027 :class:`_engine.Connection` object. It does not operate upon a
1028 SAVEPOINT that would have been invoked from the
1029 :meth:`_engine.Connection.begin_nested` method; for control of a
1030 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
1031 :class:`_engine.NestedTransaction` that is returned by the
1032 :meth:`_engine.Connection.begin_nested` method itself.
1033
1034
1035 """
1036 if self._transaction:
1037 self._transaction.rollback()
1038
1039 def recover_twophase(self) -> List[Any]:
1040 return self.engine.dialect.do_recover_twophase(self)
1041
1042 def rollback_prepared(self, xid: Any, recover: bool = False) -> None:
1043 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
1044
1045 def commit_prepared(self, xid: Any, recover: bool = False) -> None:
1046 self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
1047
1048 def in_transaction(self) -> bool:
1049 """Return True if a transaction is in progress."""
1050 return self._transaction is not None and self._transaction.is_active
1051
1052 def in_nested_transaction(self) -> bool:
1053 """Return True if a transaction is in progress."""
1054 return (
1055 self._nested_transaction is not None
1056 and self._nested_transaction.is_active
1057 )
1058
1059 def _is_autocommit_isolation(self) -> bool:
1060 opt_iso = self._execution_options.get("isolation_level", None)
1061 return bool(
1062 opt_iso == "AUTOCOMMIT"
1063 or (
1064 opt_iso is None
1065 and self.engine.dialect._on_connect_isolation_level
1066 == "AUTOCOMMIT"
1067 )
1068 )
1069
1070 def _get_required_transaction(self) -> RootTransaction:
1071 trans = self._transaction
1072 if trans is None:
1073 raise exc.InvalidRequestError("connection is not in a transaction")
1074 return trans
1075
1076 def _get_required_nested_transaction(self) -> NestedTransaction:
1077 trans = self._nested_transaction
1078 if trans is None:
1079 raise exc.InvalidRequestError(
1080 "connection is not in a nested transaction"
1081 )
1082 return trans
1083
1084 def get_transaction(self) -> Optional[RootTransaction]:
1085 """Return the current root transaction in progress, if any.
1086
1087 .. versionadded:: 1.4
1088
1089 """
1090
1091 return self._transaction
1092
1093 def get_nested_transaction(self) -> Optional[NestedTransaction]:
1094 """Return the current nested transaction in progress, if any.
1095
1096 .. versionadded:: 1.4
1097
1098 """
1099 return self._nested_transaction
1100
1101 def _begin_impl(self, transaction: RootTransaction) -> None:
1102 if self._echo:
1103 if self._is_autocommit_isolation():
1104 self._log_info(
1105 "BEGIN (implicit; DBAPI should not BEGIN due to "
1106 "autocommit mode)"
1107 )
1108 else:
1109 self._log_info("BEGIN (implicit)")
1110
1111 self.__in_begin = True
1112
1113 if self._has_events or self.engine._has_events:
1114 self.dispatch.begin(self)
1115
1116 try:
1117 self.engine.dialect.do_begin(self.connection)
1118 except BaseException as e:
1119 self._handle_dbapi_exception(e, None, None, None, None)
1120 finally:
1121 self.__in_begin = False
1122
1123 def _rollback_impl(self) -> None:
1124 if self._has_events or self.engine._has_events:
1125 self.dispatch.rollback(self)
1126
1127 if self._still_open_and_dbapi_connection_is_valid:
1128 if self._echo:
1129 if self._is_autocommit_isolation():
1130 if self.dialect.skip_autocommit_rollback:
1131 self._log_info(
1132 "ROLLBACK will be skipped by "
1133 "skip_autocommit_rollback"
1134 )
1135 else:
1136 self._log_info(
1137 "ROLLBACK using DBAPI connection.rollback(); "
1138 "set skip_autocommit_rollback to prevent fully"
1139 )
1140 else:
1141 self._log_info("ROLLBACK")
1142 try:
1143 self.engine.dialect.do_rollback(self.connection)
1144 except BaseException as e:
1145 self._handle_dbapi_exception(e, None, None, None, None)
1146
1147 def _commit_impl(self) -> None:
1148 if self._has_events or self.engine._has_events:
1149 self.dispatch.commit(self)
1150
1151 if self._echo:
1152 if self._is_autocommit_isolation():
1153 self._log_info(
1154 "COMMIT using DBAPI connection.commit(), "
1155 "has no effect due to autocommit mode"
1156 )
1157 else:
1158 self._log_info("COMMIT")
1159 try:
1160 self.engine.dialect.do_commit(self.connection)
1161 except BaseException as e:
1162 self._handle_dbapi_exception(e, None, None, None, None)
1163
1164 def _savepoint_impl(self, name: Optional[str] = None) -> str:
1165 if self._has_events or self.engine._has_events:
1166 self.dispatch.savepoint(self, name)
1167
1168 if name is None:
1169 self.__savepoint_seq += 1
1170 name = "sa_savepoint_%s" % self.__savepoint_seq
1171 self.engine.dialect.do_savepoint(self, name)
1172 return name
1173
1174 def _rollback_to_savepoint_impl(self, name: str) -> None:
1175 if self._has_events or self.engine._has_events:
1176 self.dispatch.rollback_savepoint(self, name, None)
1177
1178 if self._still_open_and_dbapi_connection_is_valid:
1179 self.engine.dialect.do_rollback_to_savepoint(self, name)
1180
1181 def _release_savepoint_impl(self, name: str) -> None:
1182 if self._has_events or self.engine._has_events:
1183 self.dispatch.release_savepoint(self, name, None)
1184
1185 self.engine.dialect.do_release_savepoint(self, name)
1186
1187 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None:
1188 if self._echo:
1189 self._log_info("BEGIN TWOPHASE (implicit)")
1190 if self._has_events or self.engine._has_events:
1191 self.dispatch.begin_twophase(self, transaction.xid)
1192
1193 self.__in_begin = True
1194 try:
1195 self.engine.dialect.do_begin_twophase(self, transaction.xid)
1196 except BaseException as e:
1197 self._handle_dbapi_exception(e, None, None, None, None)
1198 finally:
1199 self.__in_begin = False
1200
1201 def _prepare_twophase_impl(self, xid: Any) -> None:
1202 if self._has_events or self.engine._has_events:
1203 self.dispatch.prepare_twophase(self, xid)
1204
1205 assert isinstance(self._transaction, TwoPhaseTransaction)
1206 try:
1207 self.engine.dialect.do_prepare_twophase(self, xid)
1208 except BaseException as e:
1209 self._handle_dbapi_exception(e, None, None, None, None)
1210
1211 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1212 if self._has_events or self.engine._has_events:
1213 self.dispatch.rollback_twophase(self, xid, is_prepared)
1214
1215 if self._still_open_and_dbapi_connection_is_valid:
1216 assert isinstance(self._transaction, TwoPhaseTransaction)
1217 try:
1218 self.engine.dialect.do_rollback_twophase(
1219 self, xid, is_prepared
1220 )
1221 except BaseException as e:
1222 self._handle_dbapi_exception(e, None, None, None, None)
1223
1224 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None:
1225 if self._has_events or self.engine._has_events:
1226 self.dispatch.commit_twophase(self, xid, is_prepared)
1227
1228 assert isinstance(self._transaction, TwoPhaseTransaction)
1229 try:
1230 self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
1231 except BaseException as e:
1232 self._handle_dbapi_exception(e, None, None, None, None)
1233
1234 def close(self) -> None:
1235 """Close this :class:`_engine.Connection`.
1236
1237 This results in a release of the underlying database
1238 resources, that is, the DBAPI connection referenced
1239 internally. The DBAPI connection is typically restored
1240 back to the connection-holding :class:`_pool.Pool` referenced
1241 by the :class:`_engine.Engine` that produced this
1242 :class:`_engine.Connection`. Any transactional state present on
1243 the DBAPI connection is also unconditionally released via
1244 the DBAPI connection's ``rollback()`` method, regardless
1245 of any :class:`.Transaction` object that may be
1246 outstanding with regards to this :class:`_engine.Connection`.
1247
1248 This has the effect of also calling :meth:`_engine.Connection.rollback`
1249 if any transaction is in place.
1250
1251 After :meth:`_engine.Connection.close` is called, the
1252 :class:`_engine.Connection` is permanently in a closed state,
1253 and will allow no further operations.
1254
1255 """
1256
1257 if self._transaction:
1258 self._transaction.close()
1259 skip_reset = True
1260 else:
1261 skip_reset = False
1262
1263 if self._dbapi_connection is not None:
1264 conn = self._dbapi_connection
1265
1266 # as we just closed the transaction, close the connection
1267 # pool connection without doing an additional reset
1268 if skip_reset:
1269 cast("_ConnectionFairy", conn)._close_special(
1270 transaction_reset=True
1271 )
1272 else:
1273 conn.close()
1274
1275 # There is a slight chance that conn.close() may have
1276 # triggered an invalidation here in which case
1277 # _dbapi_connection would already be None, however usually
1278 # it will be non-None here and in a "closed" state.
1279 self._dbapi_connection = None
1280 self.__can_reconnect = False
1281
1282 @overload
1283 def scalar(
1284 self,
1285 statement: TypedReturnsRows[_T],
1286 parameters: Optional[_CoreSingleExecuteParams] = None,
1287 *,
1288 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1289 ) -> Optional[_T]: ...
1290
1291 @overload
1292 def scalar(
1293 self,
1294 statement: Executable,
1295 parameters: Optional[_CoreSingleExecuteParams] = None,
1296 *,
1297 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1298 ) -> Any: ...
1299
1300 def scalar(
1301 self,
1302 statement: Executable,
1303 parameters: Optional[_CoreSingleExecuteParams] = None,
1304 *,
1305 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1306 ) -> Any:
1307 r"""Executes a SQL statement construct and returns a scalar object.
1308
1309 This method is shorthand for invoking the
1310 :meth:`_engine.Result.scalar` method after invoking the
1311 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
1312
1313 :return: a scalar Python value representing the first column of the
1314 first row returned.
1315
1316 """
1317 distilled_parameters = _distill_params_20(parameters)
1318 try:
1319 meth = statement._execute_on_scalar
1320 except AttributeError as err:
1321 raise exc.ObjectNotExecutableError(statement) from err
1322 else:
1323 return meth(
1324 self,
1325 distilled_parameters,
1326 execution_options or NO_OPTIONS,
1327 )
1328
1329 @overload
1330 def scalars(
1331 self,
1332 statement: TypedReturnsRows[_T],
1333 parameters: Optional[_CoreAnyExecuteParams] = None,
1334 *,
1335 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1336 ) -> ScalarResult[_T]: ...
1337
1338 @overload
1339 def scalars(
1340 self,
1341 statement: Executable,
1342 parameters: Optional[_CoreAnyExecuteParams] = None,
1343 *,
1344 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1345 ) -> ScalarResult[Any]: ...
1346
1347 def scalars(
1348 self,
1349 statement: Executable,
1350 parameters: Optional[_CoreAnyExecuteParams] = None,
1351 *,
1352 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1353 ) -> ScalarResult[Any]:
1354 """Executes and returns a scalar result set, which yields scalar values
1355 from the first column of each row.
1356
1357 This method is equivalent to calling :meth:`_engine.Connection.execute`
1358 to receive a :class:`_result.Result` object, then invoking the
1359 :meth:`_result.Result.scalars` method to produce a
1360 :class:`_result.ScalarResult` instance.
1361
1362 :return: a :class:`_result.ScalarResult`
1363
1364 .. versionadded:: 1.4.24
1365
1366 """
1367
1368 return self.execute(
1369 statement, parameters, execution_options=execution_options
1370 ).scalars()
1371
1372 @overload
1373 def execute(
1374 self,
1375 statement: TypedReturnsRows[Unpack[_Ts]],
1376 parameters: Optional[_CoreAnyExecuteParams] = None,
1377 *,
1378 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1379 ) -> CursorResult[Unpack[_Ts]]: ...
1380
1381 @overload
1382 def execute(
1383 self,
1384 statement: Executable,
1385 parameters: Optional[_CoreAnyExecuteParams] = None,
1386 *,
1387 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1388 ) -> CursorResult[Unpack[TupleAny]]: ...
1389
1390 def execute(
1391 self,
1392 statement: Executable,
1393 parameters: Optional[_CoreAnyExecuteParams] = None,
1394 *,
1395 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1396 ) -> CursorResult[Unpack[TupleAny]]:
1397 r"""Executes a SQL statement construct and returns a
1398 :class:`_engine.CursorResult`.
1399
1400 :param statement: The statement to be executed. This is always
1401 an object that is in both the :class:`_expression.ClauseElement` and
1402 :class:`_expression.Executable` hierarchies, including:
1403
1404 * :class:`_expression.Select`
1405 * :class:`_expression.Insert`, :class:`_expression.Update`,
1406 :class:`_expression.Delete`
1407 * :class:`_expression.TextClause` and
1408 :class:`_expression.TextualSelect`
1409 * :class:`_schema.DDL` and objects which inherit from
1410 :class:`_schema.ExecutableDDLElement`
1411
1412 :param parameters: parameters which will be bound into the statement.
1413 This may be either a dictionary of parameter names to values,
1414 or a mutable sequence (e.g. a list) of dictionaries. When a
1415 list of dictionaries is passed, the underlying statement execution
1416 will make use of the DBAPI ``cursor.executemany()`` method.
1417 When a single dictionary is passed, the DBAPI ``cursor.execute()``
1418 method will be used.
1419
1420 :param execution_options: optional dictionary of execution options,
1421 which will be associated with the statement execution. This
1422 dictionary can provide a subset of the options that are accepted
1423 by :meth:`_engine.Connection.execution_options`.
1424
1425 :return: a :class:`_engine.Result` object.
1426
1427 """
1428 distilled_parameters = _distill_params_20(parameters)
1429 try:
1430 meth = statement._execute_on_connection
1431 except AttributeError as err:
1432 raise exc.ObjectNotExecutableError(statement) from err
1433 else:
1434 return meth(
1435 self,
1436 distilled_parameters,
1437 execution_options or NO_OPTIONS,
1438 )
1439
1440 def _execute_function(
1441 self,
1442 func: FunctionElement[Any],
1443 distilled_parameters: _CoreMultiExecuteParams,
1444 execution_options: CoreExecuteOptionsParameter,
1445 ) -> CursorResult[Unpack[TupleAny]]:
1446 """Execute a sql.FunctionElement object."""
1447
1448 return self._execute_clauseelement(
1449 func.select(), distilled_parameters, execution_options
1450 )
1451
1452 def _execute_default(
1453 self,
1454 default: DefaultGenerator,
1455 distilled_parameters: _CoreMultiExecuteParams,
1456 execution_options: CoreExecuteOptionsParameter,
1457 ) -> Any:
1458 """Execute a schema.ColumnDefault object."""
1459
1460 exec_opts = self._execution_options.merge_with(execution_options)
1461
1462 event_multiparams: Optional[_CoreMultiExecuteParams]
1463 event_params: Optional[_CoreAnyExecuteParams]
1464
1465 # note for event handlers, the "distilled parameters" which is always
1466 # a list of dicts is broken out into separate "multiparams" and
1467 # "params" collections, which allows the handler to distinguish
1468 # between an executemany and execute style set of parameters.
1469 if self._has_events or self.engine._has_events:
1470 (
1471 default,
1472 distilled_parameters,
1473 event_multiparams,
1474 event_params,
1475 ) = self._invoke_before_exec_event(
1476 default, distilled_parameters, exec_opts
1477 )
1478 else:
1479 event_multiparams = event_params = None
1480
1481 try:
1482 conn = self._dbapi_connection
1483 if conn is None:
1484 conn = self._revalidate_connection()
1485
1486 dialect = self.dialect
1487 ctx = dialect.execution_ctx_cls._init_default(
1488 dialect, self, conn, exec_opts
1489 )
1490 except (exc.PendingRollbackError, exc.ResourceClosedError):
1491 raise
1492 except BaseException as e:
1493 self._handle_dbapi_exception(e, None, None, None, None)
1494
1495 ret = ctx._exec_default(None, default, None)
1496
1497 if self._has_events or self.engine._has_events:
1498 self.dispatch.after_execute(
1499 self,
1500 default,
1501 event_multiparams,
1502 event_params,
1503 exec_opts,
1504 ret,
1505 )
1506
1507 return ret
1508
1509 def _execute_ddl(
1510 self,
1511 ddl: ExecutableDDLElement,
1512 distilled_parameters: _CoreMultiExecuteParams,
1513 execution_options: CoreExecuteOptionsParameter,
1514 ) -> CursorResult[Unpack[TupleAny]]:
1515 """Execute a schema.DDL object."""
1516
1517 exec_opts = ddl._execution_options.merge_with(
1518 self._execution_options, execution_options
1519 )
1520
1521 event_multiparams: Optional[_CoreMultiExecuteParams]
1522 event_params: Optional[_CoreSingleExecuteParams]
1523
1524 if self._has_events or self.engine._has_events:
1525 (
1526 ddl,
1527 distilled_parameters,
1528 event_multiparams,
1529 event_params,
1530 ) = self._invoke_before_exec_event(
1531 ddl, distilled_parameters, exec_opts
1532 )
1533 else:
1534 event_multiparams = event_params = None
1535
1536 schema_translate_map = exec_opts.get("schema_translate_map", None)
1537
1538 dialect = self.dialect
1539
1540 compiled = ddl.compile(
1541 dialect=dialect, schema_translate_map=schema_translate_map
1542 )
1543 ret = self._execute_context(
1544 dialect,
1545 dialect.execution_ctx_cls._init_ddl,
1546 compiled,
1547 None,
1548 exec_opts,
1549 compiled,
1550 )
1551 if self._has_events or self.engine._has_events:
1552 self.dispatch.after_execute(
1553 self,
1554 ddl,
1555 event_multiparams,
1556 event_params,
1557 exec_opts,
1558 ret,
1559 )
1560 return ret
1561
1562 def _invoke_before_exec_event(
1563 self,
1564 elem: Any,
1565 distilled_params: _CoreMultiExecuteParams,
1566 execution_options: _ExecuteOptions,
1567 ) -> Tuple[
1568 Any,
1569 _CoreMultiExecuteParams,
1570 _CoreMultiExecuteParams,
1571 _CoreSingleExecuteParams,
1572 ]:
1573 event_multiparams: _CoreMultiExecuteParams
1574 event_params: _CoreSingleExecuteParams
1575
1576 if len(distilled_params) == 1:
1577 event_multiparams, event_params = [], distilled_params[0]
1578 else:
1579 event_multiparams, event_params = distilled_params, {}
1580
1581 for fn in self.dispatch.before_execute:
1582 elem, event_multiparams, event_params = fn(
1583 self,
1584 elem,
1585 event_multiparams,
1586 event_params,
1587 execution_options,
1588 )
1589
1590 if event_multiparams:
1591 distilled_params = list(event_multiparams)
1592 if event_params:
1593 raise exc.InvalidRequestError(
1594 "Event handler can't return non-empty multiparams "
1595 "and params at the same time"
1596 )
1597 elif event_params:
1598 distilled_params = [event_params]
1599 else:
1600 distilled_params = []
1601
1602 return elem, distilled_params, event_multiparams, event_params
1603
1604 def _execute_clauseelement(
1605 self,
1606 elem: Executable,
1607 distilled_parameters: _CoreMultiExecuteParams,
1608 execution_options: CoreExecuteOptionsParameter,
1609 ) -> CursorResult[Unpack[TupleAny]]:
1610 """Execute a sql.ClauseElement object."""
1611
1612 exec_opts = elem._execution_options.merge_with(
1613 self._execution_options, execution_options
1614 )
1615
1616 has_events = self._has_events or self.engine._has_events
1617 if has_events:
1618 (
1619 elem,
1620 distilled_parameters,
1621 event_multiparams,
1622 event_params,
1623 ) = self._invoke_before_exec_event(
1624 elem, distilled_parameters, exec_opts
1625 )
1626
1627 if distilled_parameters:
1628 # ensure we don't retain a link to the view object for keys()
1629 # which links to the values, which we don't want to cache
1630 keys = sorted(distilled_parameters[0])
1631 for_executemany = len(distilled_parameters) > 1
1632 else:
1633 keys = []
1634 for_executemany = False
1635
1636 dialect = self.dialect
1637
1638 schema_translate_map = exec_opts.get("schema_translate_map", None)
1639
1640 compiled_cache: Optional[CompiledCacheType] = exec_opts.get(
1641 "compiled_cache", self.engine._compiled_cache
1642 )
1643
1644 compiled_sql, extracted_params, param_dict, cache_hit = (
1645 elem._compile_w_cache(
1646 dialect=dialect,
1647 compiled_cache=compiled_cache,
1648 column_keys=keys,
1649 for_executemany=for_executemany,
1650 schema_translate_map=schema_translate_map,
1651 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1652 )
1653 )
1654 ret = self._execute_context(
1655 dialect,
1656 dialect.execution_ctx_cls._init_compiled,
1657 compiled_sql,
1658 distilled_parameters,
1659 exec_opts,
1660 compiled_sql,
1661 distilled_parameters,
1662 elem,
1663 extracted_params,
1664 cache_hit=cache_hit,
1665 param_dict=param_dict,
1666 )
1667 if has_events:
1668 self.dispatch.after_execute(
1669 self,
1670 elem,
1671 event_multiparams,
1672 event_params,
1673 exec_opts,
1674 ret,
1675 )
1676 return ret
1677
1678 def exec_driver_sql(
1679 self,
1680 statement: str,
1681 parameters: Optional[_DBAPIAnyExecuteParams] = None,
1682 execution_options: Optional[CoreExecuteOptionsParameter] = None,
1683 ) -> CursorResult[Unpack[TupleAny]]:
1684 r"""Executes a string SQL statement on the DBAPI cursor directly,
1685 without any SQL compilation steps.
1686
1687 This can be used to pass any string directly to the
1688 ``cursor.execute()`` method of the DBAPI in use.
1689
1690 :param statement: The statement str to be executed. Bound parameters
1691 must use the underlying DBAPI's paramstyle, such as "qmark",
1692 "pyformat", "format", etc.
1693
1694 :param parameters: represent bound parameter values to be used in the
1695 execution. The format is one of: a dictionary of named parameters,
1696 a tuple of positional parameters, or a list containing either
1697 dictionaries or tuples for multiple-execute support.
1698
1699 :return: a :class:`_engine.CursorResult`.
1700
1701 E.g. multiple dictionaries::
1702
1703
1704 conn.exec_driver_sql(
1705 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1706 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}],
1707 )
1708
1709 Single dictionary::
1710
1711 conn.exec_driver_sql(
1712 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1713 dict(id=1, value="v1"),
1714 )
1715
1716 Single tuple::
1717
1718 conn.exec_driver_sql(
1719 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1")
1720 )
1721
1722 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
1723 not participate in the
1724 :meth:`_events.ConnectionEvents.before_execute` and
1725 :meth:`_events.ConnectionEvents.after_execute` events. To
1726 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
1727 :meth:`_events.ConnectionEvents.before_cursor_execute` and
1728 :meth:`_events.ConnectionEvents.after_cursor_execute`.
1729
1730 .. seealso::
1731
1732 :pep:`249`
1733
1734 """
1735
1736 distilled_parameters = _distill_raw_params(parameters)
1737
1738 exec_opts = self._execution_options.merge_with(execution_options)
1739
1740 dialect = self.dialect
1741 ret = self._execute_context(
1742 dialect,
1743 dialect.execution_ctx_cls._init_statement,
1744 statement,
1745 None,
1746 exec_opts,
1747 statement,
1748 distilled_parameters,
1749 )
1750
1751 return ret
1752
1753 def _execute_context(
1754 self,
1755 dialect: Dialect,
1756 constructor: Callable[..., ExecutionContext],
1757 statement: Union[str, Compiled],
1758 parameters: Optional[_AnyMultiExecuteParams],
1759 execution_options: _ExecuteOptions,
1760 *args: Any,
1761 **kw: Any,
1762 ) -> CursorResult[Unpack[TupleAny]]:
1763 """Create an :class:`.ExecutionContext` and execute, returning
1764 a :class:`_engine.CursorResult`."""
1765
1766 if execution_options:
1767 yp = execution_options.get("yield_per", None)
1768 if yp:
1769 execution_options = execution_options.union(
1770 {"stream_results": True, "max_row_buffer": yp}
1771 )
1772 try:
1773 conn = self._dbapi_connection
1774 if conn is None:
1775 conn = self._revalidate_connection()
1776
1777 context = constructor(
1778 dialect, self, conn, execution_options, *args, **kw
1779 )
1780 except (exc.PendingRollbackError, exc.ResourceClosedError):
1781 raise
1782 except BaseException as e:
1783 self._handle_dbapi_exception(
1784 e, str(statement), parameters, None, None
1785 )
1786
1787 if (
1788 self._transaction
1789 and not self._transaction.is_active
1790 or (
1791 self._nested_transaction
1792 and not self._nested_transaction.is_active
1793 )
1794 ):
1795 self._invalid_transaction()
1796
1797 elif self._trans_context_manager:
1798 TransactionalContext._trans_ctx_check(self)
1799
1800 if self._transaction is None:
1801 self._autobegin()
1802
1803 context.pre_exec()
1804
1805 if context.execute_style is ExecuteStyle.INSERTMANYVALUES:
1806 return self._exec_insertmany_context(dialect, context)
1807 else:
1808 return self._exec_single_context(
1809 dialect, context, statement, parameters
1810 )
1811
1812 def _exec_single_context(
1813 self,
1814 dialect: Dialect,
1815 context: ExecutionContext,
1816 statement: Union[str, Compiled],
1817 parameters: Optional[_AnyMultiExecuteParams],
1818 ) -> CursorResult[Unpack[TupleAny]]:
1819 """continue the _execute_context() method for a single DBAPI
1820 cursor.execute() or cursor.executemany() call.
1821
1822 """
1823 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1824 generic_setinputsizes = context._prepare_set_input_sizes()
1825
1826 if generic_setinputsizes:
1827 try:
1828 dialect.do_set_input_sizes(
1829 context.cursor, generic_setinputsizes, context
1830 )
1831 except BaseException as e:
1832 self._handle_dbapi_exception(
1833 e, str(statement), parameters, None, context
1834 )
1835
1836 cursor, str_statement, parameters = (
1837 context.cursor,
1838 context.statement,
1839 context.parameters,
1840 )
1841
1842 effective_parameters: Optional[_AnyExecuteParams]
1843
1844 if not context.executemany:
1845 effective_parameters = parameters[0]
1846 else:
1847 effective_parameters = parameters
1848
1849 if self._has_events or self.engine._has_events:
1850 for fn in self.dispatch.before_cursor_execute:
1851 str_statement, effective_parameters = fn(
1852 self,
1853 cursor,
1854 str_statement,
1855 effective_parameters,
1856 context,
1857 context.executemany,
1858 )
1859
1860 if self._echo:
1861 self._log_info(str_statement)
1862
1863 stats = context._get_cache_stats()
1864
1865 if not self.engine.hide_parameters:
1866 self._log_info(
1867 "[%s] %r",
1868 stats,
1869 sql_util._repr_params(
1870 effective_parameters,
1871 batches=10,
1872 ismulti=context.executemany,
1873 ),
1874 )
1875 else:
1876 self._log_info(
1877 "[%s] [SQL parameters hidden due to hide_parameters=True]",
1878 stats,
1879 )
1880
1881 evt_handled: bool = False
1882 try:
1883 if context.execute_style is ExecuteStyle.EXECUTEMANY:
1884 effective_parameters = cast(
1885 "_CoreMultiExecuteParams", effective_parameters
1886 )
1887 if self.dialect._has_events:
1888 for fn in self.dialect.dispatch.do_executemany:
1889 if fn(
1890 cursor,
1891 str_statement,
1892 effective_parameters,
1893 context,
1894 ):
1895 evt_handled = True
1896 break
1897 if not evt_handled:
1898 self.dialect.do_executemany(
1899 cursor,
1900 str_statement,
1901 effective_parameters,
1902 context,
1903 )
1904 elif not effective_parameters and context.no_parameters:
1905 if self.dialect._has_events:
1906 for fn in self.dialect.dispatch.do_execute_no_params:
1907 if fn(cursor, str_statement, context):
1908 evt_handled = True
1909 break
1910 if not evt_handled:
1911 self.dialect.do_execute_no_params(
1912 cursor, str_statement, context
1913 )
1914 else:
1915 effective_parameters = cast(
1916 "_CoreSingleExecuteParams", effective_parameters
1917 )
1918 if self.dialect._has_events:
1919 for fn in self.dialect.dispatch.do_execute:
1920 if fn(
1921 cursor,
1922 str_statement,
1923 effective_parameters,
1924 context,
1925 ):
1926 evt_handled = True
1927 break
1928 if not evt_handled:
1929 self.dialect.do_execute(
1930 cursor, str_statement, effective_parameters, context
1931 )
1932
1933 if self._has_events or self.engine._has_events:
1934 self.dispatch.after_cursor_execute(
1935 self,
1936 cursor,
1937 str_statement,
1938 effective_parameters,
1939 context,
1940 context.executemany,
1941 )
1942
1943 context.post_exec()
1944
1945 result = context._setup_result_proxy()
1946
1947 except BaseException as e:
1948 self._handle_dbapi_exception(
1949 e, str_statement, effective_parameters, cursor, context
1950 )
1951
1952 return result
1953
1954 def _exec_insertmany_context(
1955 self,
1956 dialect: Dialect,
1957 context: ExecutionContext,
1958 ) -> CursorResult[Unpack[TupleAny]]:
1959 """continue the _execute_context() method for an "insertmanyvalues"
1960 operation, which will invoke DBAPI
1961 cursor.execute() one or more times with individual log and
1962 event hook calls.
1963
1964 """
1965
1966 if dialect.bind_typing is BindTyping.SETINPUTSIZES:
1967 generic_setinputsizes = context._prepare_set_input_sizes()
1968 else:
1969 generic_setinputsizes = None
1970
1971 cursor, str_statement, parameters = (
1972 context.cursor,
1973 context.statement,
1974 context.parameters,
1975 )
1976
1977 effective_parameters = parameters
1978
1979 engine_events = self._has_events or self.engine._has_events
1980 if self.dialect._has_events:
1981 do_execute_dispatch: Iterable[Any] = (
1982 self.dialect.dispatch.do_execute
1983 )
1984 else:
1985 do_execute_dispatch = ()
1986
1987 if self._echo:
1988 stats = context._get_cache_stats() + " (insertmanyvalues)"
1989
1990 preserve_rowcount = context.execution_options.get(
1991 "preserve_rowcount", False
1992 )
1993 rowcount = 0
1994
1995 for imv_batch in dialect._deliver_insertmanyvalues_batches(
1996 self,
1997 cursor,
1998 str_statement,
1999 effective_parameters,
2000 generic_setinputsizes,
2001 context,
2002 ):
2003 if imv_batch.processed_setinputsizes:
2004 try:
2005 dialect.do_set_input_sizes(
2006 context.cursor,
2007 imv_batch.processed_setinputsizes,
2008 context,
2009 )
2010 except BaseException as e:
2011 self._handle_dbapi_exception(
2012 e,
2013 sql_util._long_statement(imv_batch.replaced_statement),
2014 imv_batch.replaced_parameters,
2015 None,
2016 context,
2017 is_sub_exec=True,
2018 )
2019
2020 sub_stmt = imv_batch.replaced_statement
2021 sub_params = imv_batch.replaced_parameters
2022
2023 if engine_events:
2024 for fn in self.dispatch.before_cursor_execute:
2025 sub_stmt, sub_params = fn(
2026 self,
2027 cursor,
2028 sub_stmt,
2029 sub_params,
2030 context,
2031 True,
2032 )
2033
2034 if self._echo:
2035 self._log_info(sql_util._long_statement(sub_stmt))
2036
2037 imv_stats = f""" {imv_batch.batchnum}/{
2038 imv_batch.total_batches
2039 } ({
2040 'ordered'
2041 if imv_batch.rows_sorted else 'unordered'
2042 }{
2043 '; batch not supported'
2044 if imv_batch.is_downgraded
2045 else ''
2046 })"""
2047
2048 if imv_batch.batchnum == 1:
2049 stats += imv_stats
2050 else:
2051 stats = f"insertmanyvalues{imv_stats}"
2052
2053 if not self.engine.hide_parameters:
2054 self._log_info(
2055 "[%s] %r",
2056 stats,
2057 sql_util._repr_params(
2058 sub_params,
2059 batches=10,
2060 ismulti=False,
2061 ),
2062 )
2063 else:
2064 self._log_info(
2065 "[%s] [SQL parameters hidden due to "
2066 "hide_parameters=True]",
2067 stats,
2068 )
2069
2070 try:
2071 for fn in do_execute_dispatch:
2072 if fn(
2073 cursor,
2074 sub_stmt,
2075 sub_params,
2076 context,
2077 ):
2078 break
2079 else:
2080 dialect.do_execute(
2081 cursor,
2082 sub_stmt,
2083 sub_params,
2084 context,
2085 )
2086
2087 except BaseException as e:
2088 self._handle_dbapi_exception(
2089 e,
2090 sql_util._long_statement(sub_stmt),
2091 sub_params,
2092 cursor,
2093 context,
2094 is_sub_exec=True,
2095 )
2096
2097 if engine_events:
2098 self.dispatch.after_cursor_execute(
2099 self,
2100 cursor,
2101 str_statement,
2102 effective_parameters,
2103 context,
2104 context.executemany,
2105 )
2106
2107 if preserve_rowcount:
2108 rowcount += imv_batch.current_batch_size
2109
2110 try:
2111 context.post_exec()
2112
2113 if preserve_rowcount:
2114 context._rowcount = rowcount # type: ignore[attr-defined]
2115
2116 result = context._setup_result_proxy()
2117
2118 except BaseException as e:
2119 self._handle_dbapi_exception(
2120 e, str_statement, effective_parameters, cursor, context
2121 )
2122
2123 return result
2124
2125 def _cursor_execute(
2126 self,
2127 cursor: DBAPICursor,
2128 statement: str,
2129 parameters: _DBAPISingleExecuteParams,
2130 context: Optional[ExecutionContext] = None,
2131 ) -> None:
2132 """Execute a statement + params on the given cursor.
2133
2134 Adds appropriate logging and exception handling.
2135
2136 This method is used by DefaultDialect for special-case
2137 executions, such as for sequences and column defaults.
2138 The path of statement execution in the majority of cases
2139 terminates at _execute_context().
2140
2141 """
2142 if self._has_events or self.engine._has_events:
2143 for fn in self.dispatch.before_cursor_execute:
2144 statement, parameters = fn(
2145 self, cursor, statement, parameters, context, False
2146 )
2147
2148 if self._echo:
2149 self._log_info(statement)
2150 self._log_info("[raw sql] %r", parameters)
2151 try:
2152 for fn in (
2153 ()
2154 if not self.dialect._has_events
2155 else self.dialect.dispatch.do_execute
2156 ):
2157 if fn(cursor, statement, parameters, context):
2158 break
2159 else:
2160 self.dialect.do_execute(cursor, statement, parameters, context)
2161 except BaseException as e:
2162 self._handle_dbapi_exception(
2163 e, statement, parameters, cursor, context
2164 )
2165
2166 if self._has_events or self.engine._has_events:
2167 self.dispatch.after_cursor_execute(
2168 self, cursor, statement, parameters, context, False
2169 )
2170
2171 def _safe_close_cursor(self, cursor: DBAPICursor) -> None:
2172 """Close the given cursor, catching exceptions
2173 and turning into log warnings.
2174
2175 """
2176 try:
2177 cursor.close()
2178 except Exception:
2179 # log the error through the connection pool's logger.
2180 self.engine.pool.logger.error(
2181 "Error closing cursor", exc_info=True
2182 )
2183
2184 _reentrant_error = False
2185 _is_disconnect = False
2186
2187 def _handle_dbapi_exception(
2188 self,
2189 e: BaseException,
2190 statement: Optional[str],
2191 parameters: Optional[_AnyExecuteParams],
2192 cursor: Optional[DBAPICursor],
2193 context: Optional[ExecutionContext],
2194 is_sub_exec: bool = False,
2195 ) -> NoReturn:
2196 exc_info = sys.exc_info()
2197
2198 is_exit_exception = util.is_exit_exception(e)
2199
2200 if not self._is_disconnect:
2201 self._is_disconnect = (
2202 isinstance(e, self.dialect.loaded_dbapi.Error)
2203 and not self.closed
2204 and self.dialect.is_disconnect(
2205 e,
2206 self._dbapi_connection if not self.invalidated else None,
2207 cursor,
2208 )
2209 ) or (is_exit_exception and not self.closed)
2210
2211 invalidate_pool_on_disconnect = not is_exit_exception
2212
2213 ismulti: bool = (
2214 not is_sub_exec and context.executemany
2215 if context is not None
2216 else False
2217 )
2218 if self._reentrant_error:
2219 raise exc.DBAPIError.instance(
2220 statement,
2221 parameters,
2222 e,
2223 self.dialect.loaded_dbapi.Error,
2224 hide_parameters=self.engine.hide_parameters,
2225 dialect=self.dialect,
2226 ismulti=ismulti,
2227 ).with_traceback(exc_info[2]) from e
2228 self._reentrant_error = True
2229 try:
2230 # non-DBAPI error - if we already got a context,
2231 # or there's no string statement, don't wrap it
2232 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or (
2233 statement is not None
2234 and context is None
2235 and not is_exit_exception
2236 )
2237
2238 if should_wrap:
2239 sqlalchemy_exception = exc.DBAPIError.instance(
2240 statement,
2241 parameters,
2242 cast(Exception, e),
2243 self.dialect.loaded_dbapi.Error,
2244 hide_parameters=self.engine.hide_parameters,
2245 connection_invalidated=self._is_disconnect,
2246 dialect=self.dialect,
2247 ismulti=ismulti,
2248 )
2249 else:
2250 sqlalchemy_exception = None
2251
2252 newraise = None
2253
2254 if (self.dialect._has_events) and not self._execution_options.get(
2255 "skip_user_error_events", False
2256 ):
2257 ctx = ExceptionContextImpl(
2258 e,
2259 sqlalchemy_exception,
2260 self.engine,
2261 self.dialect,
2262 self,
2263 cursor,
2264 statement,
2265 parameters,
2266 context,
2267 self._is_disconnect,
2268 invalidate_pool_on_disconnect,
2269 False,
2270 )
2271
2272 for fn in self.dialect.dispatch.handle_error:
2273 try:
2274 # handler returns an exception;
2275 # call next handler in a chain
2276 per_fn = fn(ctx)
2277 if per_fn is not None:
2278 ctx.chained_exception = newraise = per_fn
2279 except Exception as _raised:
2280 # handler raises an exception - stop processing
2281 newraise = _raised
2282 break
2283
2284 if self._is_disconnect != ctx.is_disconnect:
2285 self._is_disconnect = ctx.is_disconnect
2286 if sqlalchemy_exception:
2287 sqlalchemy_exception.connection_invalidated = (
2288 ctx.is_disconnect
2289 )
2290
2291 # set up potentially user-defined value for
2292 # invalidate pool.
2293 invalidate_pool_on_disconnect = (
2294 ctx.invalidate_pool_on_disconnect
2295 )
2296
2297 if should_wrap and context:
2298 context.handle_dbapi_exception(e)
2299
2300 if not self._is_disconnect:
2301 if cursor:
2302 self._safe_close_cursor(cursor)
2303 # "autorollback" was mostly relevant in 1.x series.
2304 # It's very unlikely to reach here, as the connection
2305 # does autobegin so when we are here, we are usually
2306 # in an explicit / semi-explicit transaction.
2307 # however we have a test which manufactures this
2308 # scenario in any case using an event handler.
2309 # test/engine/test_execute.py-> test_actual_autorollback
2310 if not self.in_transaction():
2311 self._rollback_impl()
2312
2313 if newraise:
2314 raise newraise.with_traceback(exc_info[2]) from e
2315 elif should_wrap:
2316 assert sqlalchemy_exception is not None
2317 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2318 else:
2319 assert exc_info[1] is not None
2320 raise exc_info[1].with_traceback(exc_info[2])
2321 finally:
2322 del self._reentrant_error
2323 if self._is_disconnect:
2324 del self._is_disconnect
2325 if not self.invalidated:
2326 dbapi_conn_wrapper = self._dbapi_connection
2327 assert dbapi_conn_wrapper is not None
2328 if invalidate_pool_on_disconnect:
2329 self.engine.pool._invalidate(dbapi_conn_wrapper, e)
2330 self.invalidate(e)
2331
2332 @classmethod
2333 def _handle_dbapi_exception_noconnection(
2334 cls,
2335 e: BaseException,
2336 dialect: Dialect,
2337 engine: Optional[Engine] = None,
2338 is_disconnect: Optional[bool] = None,
2339 invalidate_pool_on_disconnect: bool = True,
2340 is_pre_ping: bool = False,
2341 ) -> NoReturn:
2342 exc_info = sys.exc_info()
2343
2344 if is_disconnect is None:
2345 is_disconnect = isinstance(
2346 e, dialect.loaded_dbapi.Error
2347 ) and dialect.is_disconnect(e, None, None)
2348
2349 should_wrap = isinstance(e, dialect.loaded_dbapi.Error)
2350
2351 if should_wrap:
2352 sqlalchemy_exception = exc.DBAPIError.instance(
2353 None,
2354 None,
2355 cast(Exception, e),
2356 dialect.loaded_dbapi.Error,
2357 hide_parameters=(
2358 engine.hide_parameters if engine is not None else False
2359 ),
2360 connection_invalidated=is_disconnect,
2361 dialect=dialect,
2362 )
2363 else:
2364 sqlalchemy_exception = None
2365
2366 newraise = None
2367
2368 if dialect._has_events:
2369 ctx = ExceptionContextImpl(
2370 e,
2371 sqlalchemy_exception,
2372 engine,
2373 dialect,
2374 None,
2375 None,
2376 None,
2377 None,
2378 None,
2379 is_disconnect,
2380 invalidate_pool_on_disconnect,
2381 is_pre_ping,
2382 )
2383 for fn in dialect.dispatch.handle_error:
2384 try:
2385 # handler returns an exception;
2386 # call next handler in a chain
2387 per_fn = fn(ctx)
2388 if per_fn is not None:
2389 ctx.chained_exception = newraise = per_fn
2390 except Exception as _raised:
2391 # handler raises an exception - stop processing
2392 newraise = _raised
2393 break
2394
2395 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
2396 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect
2397
2398 if newraise:
2399 raise newraise.with_traceback(exc_info[2]) from e
2400 elif should_wrap:
2401 assert sqlalchemy_exception is not None
2402 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
2403 else:
2404 assert exc_info[1] is not None
2405 raise exc_info[1].with_traceback(exc_info[2])
2406
2407 def _run_ddl_visitor(
2408 self,
2409 visitorcallable: Type[InvokeDDLBase],
2410 element: SchemaVisitable,
2411 **kwargs: Any,
2412 ) -> None:
2413 """run a DDL visitor.
2414
2415 This method is only here so that the MockConnection can change the
2416 options given to the visitor so that "checkfirst" is skipped.
2417
2418 """
2419 visitorcallable(
2420 dialect=self.dialect, connection=self, **kwargs
2421 ).traverse_single(element)
2422
2423
2424class ExceptionContextImpl(ExceptionContext):
2425 """Implement the :class:`.ExceptionContext` interface."""
2426
2427 __slots__ = (
2428 "connection",
2429 "engine",
2430 "dialect",
2431 "cursor",
2432 "statement",
2433 "parameters",
2434 "original_exception",
2435 "sqlalchemy_exception",
2436 "chained_exception",
2437 "execution_context",
2438 "is_disconnect",
2439 "invalidate_pool_on_disconnect",
2440 "is_pre_ping",
2441 )
2442
2443 def __init__(
2444 self,
2445 exception: BaseException,
2446 sqlalchemy_exception: Optional[exc.StatementError],
2447 engine: Optional[Engine],
2448 dialect: Dialect,
2449 connection: Optional[Connection],
2450 cursor: Optional[DBAPICursor],
2451 statement: Optional[str],
2452 parameters: Optional[_DBAPIAnyExecuteParams],
2453 context: Optional[ExecutionContext],
2454 is_disconnect: bool,
2455 invalidate_pool_on_disconnect: bool,
2456 is_pre_ping: bool,
2457 ):
2458 self.engine = engine
2459 self.dialect = dialect
2460 self.connection = connection
2461 self.sqlalchemy_exception = sqlalchemy_exception
2462 self.original_exception = exception
2463 self.execution_context = context
2464 self.statement = statement
2465 self.parameters = parameters
2466 self.is_disconnect = is_disconnect
2467 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
2468 self.is_pre_ping = is_pre_ping
2469
2470
2471class Transaction(TransactionalContext):
2472 """Represent a database transaction in progress.
2473
2474 The :class:`.Transaction` object is procured by
2475 calling the :meth:`_engine.Connection.begin` method of
2476 :class:`_engine.Connection`::
2477
2478 from sqlalchemy import create_engine
2479
2480 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
2481 connection = engine.connect()
2482 trans = connection.begin()
2483 connection.execute(text("insert into x (a, b) values (1, 2)"))
2484 trans.commit()
2485
2486 The object provides :meth:`.rollback` and :meth:`.commit`
2487 methods in order to control transaction boundaries. It
2488 also implements a context manager interface so that
2489 the Python ``with`` statement can be used with the
2490 :meth:`_engine.Connection.begin` method::
2491
2492 with connection.begin():
2493 connection.execute(text("insert into x (a, b) values (1, 2)"))
2494
2495 The Transaction object is **not** threadsafe.
2496
2497 .. seealso::
2498
2499 :meth:`_engine.Connection.begin`
2500
2501 :meth:`_engine.Connection.begin_twophase`
2502
2503 :meth:`_engine.Connection.begin_nested`
2504
2505 .. index::
2506 single: thread safety; Transaction
2507 """ # noqa
2508
2509 __slots__ = ()
2510
2511 _is_root: bool = False
2512 is_active: bool
2513 connection: Connection
2514
2515 def __init__(self, connection: Connection):
2516 raise NotImplementedError()
2517
2518 @property
2519 def _deactivated_from_connection(self) -> bool:
2520 """True if this transaction is totally deactivated from the connection
2521 and therefore can no longer affect its state.
2522
2523 """
2524 raise NotImplementedError()
2525
2526 def _do_close(self) -> None:
2527 raise NotImplementedError()
2528
2529 def _do_rollback(self) -> None:
2530 raise NotImplementedError()
2531
2532 def _do_commit(self) -> None:
2533 raise NotImplementedError()
2534
2535 @property
2536 def is_valid(self) -> bool:
2537 return self.is_active and not self.connection.invalidated
2538
2539 def close(self) -> None:
2540 """Close this :class:`.Transaction`.
2541
2542 If this transaction is the base transaction in a begin/commit
2543 nesting, the transaction will rollback(). Otherwise, the
2544 method returns.
2545
2546 This is used to cancel a Transaction without affecting the scope of
2547 an enclosing transaction.
2548
2549 """
2550 try:
2551 self._do_close()
2552 finally:
2553 assert not self.is_active
2554
2555 def rollback(self) -> None:
2556 """Roll back this :class:`.Transaction`.
2557
2558 The implementation of this may vary based on the type of transaction in
2559 use:
2560
2561 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2562 it corresponds to a ROLLBACK.
2563
2564 * For a :class:`.NestedTransaction`, it corresponds to a
2565 "ROLLBACK TO SAVEPOINT" operation.
2566
2567 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2568 phase transactions may be used.
2569
2570
2571 """
2572 try:
2573 self._do_rollback()
2574 finally:
2575 assert not self.is_active
2576
2577 def commit(self) -> None:
2578 """Commit this :class:`.Transaction`.
2579
2580 The implementation of this may vary based on the type of transaction in
2581 use:
2582
2583 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2584 it corresponds to a COMMIT.
2585
2586 * For a :class:`.NestedTransaction`, it corresponds to a
2587 "RELEASE SAVEPOINT" operation.
2588
2589 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2590 phase transactions may be used.
2591
2592 """
2593 try:
2594 self._do_commit()
2595 finally:
2596 assert not self.is_active
2597
2598 def _get_subject(self) -> Connection:
2599 return self.connection
2600
2601 def _transaction_is_active(self) -> bool:
2602 return self.is_active
2603
2604 def _transaction_is_closed(self) -> bool:
2605 return not self._deactivated_from_connection
2606
2607 def _rollback_can_be_called(self) -> bool:
2608 # for RootTransaction / NestedTransaction, it's safe to call
2609 # rollback() even if the transaction is deactive and no warnings
2610 # will be emitted. tested in
2611 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
2612 return True
2613
2614
2615class RootTransaction(Transaction):
2616 """Represent the "root" transaction on a :class:`_engine.Connection`.
2617
2618 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
2619 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
2620 is created by calling upon the :meth:`_engine.Connection.begin` method, and
2621 remains associated with the :class:`_engine.Connection` throughout its
2622 active span. The current :class:`_engine.RootTransaction` in use is
2623 accessible via the :attr:`_engine.Connection.get_transaction` method of
2624 :class:`_engine.Connection`.
2625
2626 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs
2627 "autobegin" behavior that will create a new
2628 :class:`_engine.RootTransaction` whenever a connection in a
2629 non-transactional state is used to emit commands on the DBAPI connection.
2630 The scope of the :class:`_engine.RootTransaction` in 2.0 style
2631 use can be controlled using the :meth:`_engine.Connection.commit` and
2632 :meth:`_engine.Connection.rollback` methods.
2633
2634
2635 """
2636
2637 _is_root = True
2638
2639 __slots__ = ("connection", "is_active")
2640
2641 def __init__(self, connection: Connection):
2642 assert connection._transaction is None
2643 if connection._trans_context_manager:
2644 TransactionalContext._trans_ctx_check(connection)
2645 self.connection = connection
2646 self._connection_begin_impl()
2647 connection._transaction = self
2648
2649 self.is_active = True
2650
2651 def _deactivate_from_connection(self) -> None:
2652 if self.is_active:
2653 assert self.connection._transaction is self
2654 self.is_active = False
2655
2656 elif self.connection._transaction is not self:
2657 util.warn("transaction already deassociated from connection")
2658
2659 @property
2660 def _deactivated_from_connection(self) -> bool:
2661 return self.connection._transaction is not self
2662
2663 def _connection_begin_impl(self) -> None:
2664 self.connection._begin_impl(self)
2665
2666 def _connection_rollback_impl(self) -> None:
2667 self.connection._rollback_impl()
2668
2669 def _connection_commit_impl(self) -> None:
2670 self.connection._commit_impl()
2671
2672 def _close_impl(self, try_deactivate: bool = False) -> None:
2673 try:
2674 if self.is_active:
2675 self._connection_rollback_impl()
2676
2677 if self.connection._nested_transaction:
2678 self.connection._nested_transaction._cancel()
2679 finally:
2680 if self.is_active or try_deactivate:
2681 self._deactivate_from_connection()
2682 if self.connection._transaction is self:
2683 self.connection._transaction = None
2684
2685 assert not self.is_active
2686 assert self.connection._transaction is not self
2687
2688 def _do_close(self) -> None:
2689 self._close_impl()
2690
2691 def _do_rollback(self) -> None:
2692 self._close_impl(try_deactivate=True)
2693
2694 def _do_commit(self) -> None:
2695 if self.is_active:
2696 assert self.connection._transaction is self
2697
2698 try:
2699 self._connection_commit_impl()
2700 finally:
2701 # whether or not commit succeeds, cancel any
2702 # nested transactions, make this transaction "inactive"
2703 # and remove it as a reset agent
2704 if self.connection._nested_transaction:
2705 self.connection._nested_transaction._cancel()
2706
2707 self._deactivate_from_connection()
2708
2709 # ...however only remove as the connection's current transaction
2710 # if commit succeeded. otherwise it stays on so that a rollback
2711 # needs to occur.
2712 self.connection._transaction = None
2713 else:
2714 if self.connection._transaction is self:
2715 self.connection._invalid_transaction()
2716 else:
2717 raise exc.InvalidRequestError("This transaction is inactive")
2718
2719 assert not self.is_active
2720 assert self.connection._transaction is not self
2721
2722
2723class NestedTransaction(Transaction):
2724 """Represent a 'nested', or SAVEPOINT transaction.
2725
2726 The :class:`.NestedTransaction` object is created by calling the
2727 :meth:`_engine.Connection.begin_nested` method of
2728 :class:`_engine.Connection`.
2729
2730 When using :class:`.NestedTransaction`, the semantics of "begin" /
2731 "commit" / "rollback" are as follows:
2732
2733 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
2734 the savepoint is given an explicit name that is part of the state
2735 of this object.
2736
2737 * The :meth:`.NestedTransaction.commit` method corresponds to a
2738 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
2739 with this :class:`.NestedTransaction`.
2740
2741 * The :meth:`.NestedTransaction.rollback` method corresponds to a
2742 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
2743 associated with this :class:`.NestedTransaction`.
2744
2745 The rationale for mimicking the semantics of an outer transaction in
2746 terms of savepoints so that code may deal with a "savepoint" transaction
2747 and an "outer" transaction in an agnostic way.
2748
2749 .. seealso::
2750
2751 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
2752
2753 """
2754
2755 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
2756
2757 _savepoint: str
2758
2759 def __init__(self, connection: Connection):
2760 assert connection._transaction is not None
2761 if connection._trans_context_manager:
2762 TransactionalContext._trans_ctx_check(connection)
2763 self.connection = connection
2764 self._savepoint = self.connection._savepoint_impl()
2765 self.is_active = True
2766 self._previous_nested = connection._nested_transaction
2767 connection._nested_transaction = self
2768
2769 def _deactivate_from_connection(self, warn: bool = True) -> None:
2770 if self.connection._nested_transaction is self:
2771 self.connection._nested_transaction = self._previous_nested
2772 elif warn:
2773 util.warn(
2774 "nested transaction already deassociated from connection"
2775 )
2776
2777 @property
2778 def _deactivated_from_connection(self) -> bool:
2779 return self.connection._nested_transaction is not self
2780
2781 def _cancel(self) -> None:
2782 # called by RootTransaction when the outer transaction is
2783 # committed, rolled back, or closed to cancel all savepoints
2784 # without any action being taken
2785 self.is_active = False
2786 self._deactivate_from_connection()
2787 if self._previous_nested:
2788 self._previous_nested._cancel()
2789
2790 def _close_impl(
2791 self, deactivate_from_connection: bool, warn_already_deactive: bool
2792 ) -> None:
2793 try:
2794 if (
2795 self.is_active
2796 and self.connection._transaction
2797 and self.connection._transaction.is_active
2798 ):
2799 self.connection._rollback_to_savepoint_impl(self._savepoint)
2800 finally:
2801 self.is_active = False
2802
2803 if deactivate_from_connection:
2804 self._deactivate_from_connection(warn=warn_already_deactive)
2805
2806 assert not self.is_active
2807 if deactivate_from_connection:
2808 assert self.connection._nested_transaction is not self
2809
2810 def _do_close(self) -> None:
2811 self._close_impl(True, False)
2812
2813 def _do_rollback(self) -> None:
2814 self._close_impl(True, True)
2815
2816 def _do_commit(self) -> None:
2817 if self.is_active:
2818 try:
2819 self.connection._release_savepoint_impl(self._savepoint)
2820 finally:
2821 # nested trans becomes inactive on failed release
2822 # unconditionally. this prevents it from trying to
2823 # emit SQL when it rolls back.
2824 self.is_active = False
2825
2826 # but only de-associate from connection if it succeeded
2827 self._deactivate_from_connection()
2828 else:
2829 if self.connection._nested_transaction is self:
2830 self.connection._invalid_transaction()
2831 else:
2832 raise exc.InvalidRequestError(
2833 "This nested transaction is inactive"
2834 )
2835
2836
2837class TwoPhaseTransaction(RootTransaction):
2838 """Represent a two-phase transaction.
2839
2840 A new :class:`.TwoPhaseTransaction` object may be procured
2841 using the :meth:`_engine.Connection.begin_twophase` method.
2842
2843 The interface is the same as that of :class:`.Transaction`
2844 with the addition of the :meth:`prepare` method.
2845
2846 """
2847
2848 __slots__ = ("xid", "_is_prepared")
2849
2850 xid: Any
2851
2852 def __init__(self, connection: Connection, xid: Any):
2853 self._is_prepared = False
2854 self.xid = xid
2855 super().__init__(connection)
2856
2857 def prepare(self) -> None:
2858 """Prepare this :class:`.TwoPhaseTransaction`.
2859
2860 After a PREPARE, the transaction can be committed.
2861
2862 """
2863 if not self.is_active:
2864 raise exc.InvalidRequestError("This transaction is inactive")
2865 self.connection._prepare_twophase_impl(self.xid)
2866 self._is_prepared = True
2867
2868 def _connection_begin_impl(self) -> None:
2869 self.connection._begin_twophase_impl(self)
2870
2871 def _connection_rollback_impl(self) -> None:
2872 self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
2873
2874 def _connection_commit_impl(self) -> None:
2875 self.connection._commit_twophase_impl(self.xid, self._is_prepared)
2876
2877
2878class Engine(
2879 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"]
2880):
2881 """
2882 Connects a :class:`~sqlalchemy.pool.Pool` and
2883 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
2884 source of database connectivity and behavior.
2885
2886 An :class:`_engine.Engine` object is instantiated publicly using the
2887 :func:`~sqlalchemy.create_engine` function.
2888
2889 .. seealso::
2890
2891 :doc:`/core/engines`
2892
2893 :ref:`connections_toplevel`
2894
2895 """
2896
2897 dispatch: dispatcher[ConnectionEventsTarget]
2898
2899 _compiled_cache: Optional[CompiledCacheType]
2900
2901 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS
2902 _has_events: bool = False
2903 _connection_cls: Type[Connection] = Connection
2904 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine"
2905 _is_future: bool = False
2906
2907 _schema_translate_map: Optional[SchemaTranslateMapType] = None
2908 _option_cls: Type[OptionEngine]
2909
2910 dialect: Dialect
2911 pool: Pool
2912 url: URL
2913 hide_parameters: bool
2914
2915 def __init__(
2916 self,
2917 pool: Pool,
2918 dialect: Dialect,
2919 url: URL,
2920 logging_name: Optional[str] = None,
2921 echo: Optional[_EchoFlagType] = None,
2922 query_cache_size: int = 500,
2923 execution_options: Optional[Mapping[str, Any]] = None,
2924 hide_parameters: bool = False,
2925 ):
2926 self.pool = pool
2927 self.url = url
2928 self.dialect = dialect
2929 if logging_name:
2930 self.logging_name = logging_name
2931 self.echo = echo
2932 self.hide_parameters = hide_parameters
2933 if query_cache_size != 0:
2934 self._compiled_cache = util.LRUCache(
2935 query_cache_size, size_alert=self._lru_size_alert
2936 )
2937 else:
2938 self._compiled_cache = None
2939 log.instance_logger(self, echoflag=echo)
2940 if execution_options:
2941 self.update_execution_options(**execution_options)
2942
2943 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None:
2944 if self._should_log_info():
2945 self.logger.info(
2946 "Compiled cache size pruning from %d items to %d. "
2947 "Increase cache size to reduce the frequency of pruning.",
2948 len(cache),
2949 cache.capacity,
2950 )
2951
2952 @property
2953 def engine(self) -> Engine:
2954 """Returns this :class:`.Engine`.
2955
2956 Used for legacy schemes that accept :class:`.Connection` /
2957 :class:`.Engine` objects within the same variable.
2958
2959 """
2960 return self
2961
2962 def clear_compiled_cache(self) -> None:
2963 """Clear the compiled cache associated with the dialect.
2964
2965 This applies **only** to the built-in cache that is established
2966 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
2967 It will not impact any dictionary caches that were passed via the
2968 :paramref:`.Connection.execution_options.compiled_cache` parameter.
2969
2970 .. versionadded:: 1.4
2971
2972 """
2973 if self._compiled_cache:
2974 self._compiled_cache.clear()
2975
2976 def update_execution_options(self, **opt: Any) -> None:
2977 r"""Update the default execution_options dictionary
2978 of this :class:`_engine.Engine`.
2979
2980 The given keys/values in \**opt are added to the
2981 default execution options that will be used for
2982 all connections. The initial contents of this dictionary
2983 can be sent via the ``execution_options`` parameter
2984 to :func:`_sa.create_engine`.
2985
2986 .. seealso::
2987
2988 :meth:`_engine.Connection.execution_options`
2989
2990 :meth:`_engine.Engine.execution_options`
2991
2992 """
2993 self.dispatch.set_engine_execution_options(self, opt)
2994 self._execution_options = self._execution_options.union(opt)
2995 self.dialect.set_engine_execution_options(self, opt)
2996
2997 @overload
2998 def execution_options(
2999 self,
3000 *,
3001 compiled_cache: Optional[CompiledCacheType] = ...,
3002 logging_token: str = ...,
3003 isolation_level: IsolationLevel = ...,
3004 insertmanyvalues_page_size: int = ...,
3005 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
3006 **opt: Any,
3007 ) -> OptionEngine: ...
3008
3009 @overload
3010 def execution_options(self, **opt: Any) -> OptionEngine: ...
3011
3012 def execution_options(self, **opt: Any) -> OptionEngine:
3013 """Return a new :class:`_engine.Engine` that will provide
3014 :class:`_engine.Connection` objects with the given execution options.
3015
3016 The returned :class:`_engine.Engine` remains related to the original
3017 :class:`_engine.Engine` in that it shares the same connection pool and
3018 other state:
3019
3020 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
3021 is the
3022 same instance. The :meth:`_engine.Engine.dispose`
3023 method will replace
3024 the connection pool instance for the parent engine as well
3025 as this one.
3026 * Event listeners are "cascaded" - meaning, the new
3027 :class:`_engine.Engine`
3028 inherits the events of the parent, and new events can be associated
3029 with the new :class:`_engine.Engine` individually.
3030 * The logging configuration and logging_name is copied from the parent
3031 :class:`_engine.Engine`.
3032
3033 The intent of the :meth:`_engine.Engine.execution_options` method is
3034 to implement schemes where multiple :class:`_engine.Engine`
3035 objects refer to the same connection pool, but are differentiated
3036 by options that affect some execution-level behavior for each
3037 engine. One such example is breaking into separate "reader" and
3038 "writer" :class:`_engine.Engine` instances, where one
3039 :class:`_engine.Engine`
3040 has a lower :term:`isolation level` setting configured or is even
3041 transaction-disabled using "autocommit". An example of this
3042 configuration is at :ref:`dbapi_autocommit_multiple`.
3043
3044 Another example is one that
3045 uses a custom option ``shard_id`` which is consumed by an event
3046 to change the current schema on a database connection::
3047
3048 from sqlalchemy import event
3049 from sqlalchemy.engine import Engine
3050
3051 primary_engine = create_engine("mysql+mysqldb://")
3052 shard1 = primary_engine.execution_options(shard_id="shard1")
3053 shard2 = primary_engine.execution_options(shard_id="shard2")
3054
3055 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}
3056
3057
3058 @event.listens_for(Engine, "before_cursor_execute")
3059 def _switch_shard(conn, cursor, stmt, params, context, executemany):
3060 shard_id = conn.get_execution_options().get("shard_id", "default")
3061 current_shard = conn.info.get("current_shard", None)
3062
3063 if current_shard != shard_id:
3064 cursor.execute("use %s" % shards[shard_id])
3065 conn.info["current_shard"] = shard_id
3066
3067 The above recipe illustrates two :class:`_engine.Engine` objects that
3068 will each serve as factories for :class:`_engine.Connection` objects
3069 that have pre-established "shard_id" execution options present. A
3070 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler
3071 then interprets this execution option to emit a MySQL ``use`` statement
3072 to switch databases before a statement execution, while at the same
3073 time keeping track of which database we've established using the
3074 :attr:`_engine.Connection.info` dictionary.
3075
3076 .. seealso::
3077
3078 :meth:`_engine.Connection.execution_options`
3079 - update execution options
3080 on a :class:`_engine.Connection` object.
3081
3082 :meth:`_engine.Engine.update_execution_options`
3083 - update the execution
3084 options for a given :class:`_engine.Engine` in place.
3085
3086 :meth:`_engine.Engine.get_execution_options`
3087
3088
3089 """ # noqa: E501
3090 return self._option_cls(self, opt)
3091
3092 def get_execution_options(self) -> _ExecuteOptions:
3093 """Get the non-SQL options which will take effect during execution.
3094
3095 .. seealso::
3096
3097 :meth:`_engine.Engine.execution_options`
3098 """
3099 return self._execution_options
3100
3101 @property
3102 def name(self) -> str:
3103 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3104 in use by this :class:`Engine`.
3105
3106 """
3107
3108 return self.dialect.name
3109
3110 @property
3111 def driver(self) -> str:
3112 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3113 in use by this :class:`Engine`.
3114
3115 """
3116
3117 return self.dialect.driver
3118
3119 echo = log.echo_property()
3120
3121 def __repr__(self) -> str:
3122 return "Engine(%r)" % (self.url,)
3123
3124 def dispose(self, close: bool = True) -> None:
3125 """Dispose of the connection pool used by this
3126 :class:`_engine.Engine`.
3127
3128 A new connection pool is created immediately after the old one has been
3129 disposed. The previous connection pool is disposed either actively, by
3130 closing out all currently checked-in connections in that pool, or
3131 passively, by losing references to it but otherwise not closing any
3132 connections. The latter strategy is more appropriate for an initializer
3133 in a forked Python process.
3134
3135 :param close: if left at its default of ``True``, has the
3136 effect of fully closing all **currently checked in**
3137 database connections. Connections that are still checked out
3138 will **not** be closed, however they will no longer be associated
3139 with this :class:`_engine.Engine`,
3140 so when they are closed individually, eventually the
3141 :class:`_pool.Pool` which they are associated with will
3142 be garbage collected and they will be closed out fully, if
3143 not already closed on checkin.
3144
3145 If set to ``False``, the previous connection pool is de-referenced,
3146 and otherwise not touched in any way.
3147
3148 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
3149 parameter to allow the replacement of a connection pool in a child
3150 process without interfering with the connections used by the parent
3151 process.
3152
3153
3154 .. seealso::
3155
3156 :ref:`engine_disposal`
3157
3158 :ref:`pooling_multiprocessing`
3159
3160 """
3161 if close:
3162 self.pool.dispose()
3163 self.pool = self.pool.recreate()
3164 self.dispatch.engine_disposed(self)
3165
3166 @contextlib.contextmanager
3167 def _optional_conn_ctx_manager(
3168 self, connection: Optional[Connection] = None
3169 ) -> Iterator[Connection]:
3170 if connection is None:
3171 with self.connect() as conn:
3172 yield conn
3173 else:
3174 yield connection
3175
3176 @contextlib.contextmanager
3177 def begin(self) -> Iterator[Connection]:
3178 """Return a context manager delivering a :class:`_engine.Connection`
3179 with a :class:`.Transaction` established.
3180
3181 E.g.::
3182
3183 with engine.begin() as conn:
3184 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)"))
3185 conn.execute(text("my_special_procedure(5)"))
3186
3187 Upon successful operation, the :class:`.Transaction`
3188 is committed. If an error is raised, the :class:`.Transaction`
3189 is rolled back.
3190
3191 .. seealso::
3192
3193 :meth:`_engine.Engine.connect` - procure a
3194 :class:`_engine.Connection` from
3195 an :class:`_engine.Engine`.
3196
3197 :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
3198 for a particular :class:`_engine.Connection`.
3199
3200 """ # noqa: E501
3201 with self.connect() as conn:
3202 with conn.begin():
3203 yield conn
3204
3205 def _run_ddl_visitor(
3206 self,
3207 visitorcallable: Type[InvokeDDLBase],
3208 element: SchemaVisitable,
3209 **kwargs: Any,
3210 ) -> None:
3211 with self.begin() as conn:
3212 conn._run_ddl_visitor(visitorcallable, element, **kwargs)
3213
3214 def connect(self) -> Connection:
3215 """Return a new :class:`_engine.Connection` object.
3216
3217 The :class:`_engine.Connection` acts as a Python context manager, so
3218 the typical use of this method looks like::
3219
3220 with engine.connect() as connection:
3221 connection.execute(text("insert into table values ('foo')"))
3222 connection.commit()
3223
3224 Where above, after the block is completed, the connection is "closed"
3225 and its underlying DBAPI resources are returned to the connection pool.
3226 This also has the effect of rolling back any transaction that
3227 was explicitly begun or was begun via autobegin, and will
3228 emit the :meth:`_events.ConnectionEvents.rollback` event if one was
3229 started and is still in progress.
3230
3231 .. seealso::
3232
3233 :meth:`_engine.Engine.begin`
3234
3235 """
3236
3237 return self._connection_cls(self)
3238
3239 def raw_connection(self) -> PoolProxiedConnection:
3240 """Return a "raw" DBAPI connection from the connection pool.
3241
3242 The returned object is a proxied version of the DBAPI
3243 connection object used by the underlying driver in use.
3244 The object will have all the same behavior as the real DBAPI
3245 connection, except that its ``close()`` method will result in the
3246 connection being returned to the pool, rather than being closed
3247 for real.
3248
3249 This method provides direct DBAPI connection access for
3250 special situations when the API provided by
3251 :class:`_engine.Connection`
3252 is not needed. When a :class:`_engine.Connection` object is already
3253 present, the DBAPI connection is available using
3254 the :attr:`_engine.Connection.connection` accessor.
3255
3256 .. seealso::
3257
3258 :ref:`dbapi_connections`
3259
3260 """
3261 return self.pool.connect()
3262
3263
3264class OptionEngineMixin(log.Identified):
3265 _sa_propagate_class_events = False
3266
3267 dispatch: dispatcher[ConnectionEventsTarget]
3268 _compiled_cache: Optional[CompiledCacheType]
3269 dialect: Dialect
3270 pool: Pool
3271 url: URL
3272 hide_parameters: bool
3273 echo: log.echo_property
3274
3275 def __init__(
3276 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter
3277 ):
3278 self._proxied = proxied
3279 self.url = proxied.url
3280 self.dialect = proxied.dialect
3281 self.logging_name = proxied.logging_name
3282 self.echo = proxied.echo
3283 self._compiled_cache = proxied._compiled_cache
3284 self.hide_parameters = proxied.hide_parameters
3285 log.instance_logger(self, echoflag=self.echo)
3286
3287 # note: this will propagate events that are assigned to the parent
3288 # engine after this OptionEngine is created. Since we share
3289 # the events of the parent we also disallow class-level events
3290 # to apply to the OptionEngine class directly.
3291 #
3292 # the other way this can work would be to transfer existing
3293 # events only, using:
3294 # self.dispatch._update(proxied.dispatch)
3295 #
3296 # that might be more appropriate however it would be a behavioral
3297 # change for logic that assigns events to the parent engine and
3298 # would like it to take effect for the already-created sub-engine.
3299 self.dispatch = self.dispatch._join(proxied.dispatch)
3300
3301 self._execution_options = proxied._execution_options
3302 self.update_execution_options(**execution_options)
3303
3304 def update_execution_options(self, **opt: Any) -> None:
3305 raise NotImplementedError()
3306
3307 if not typing.TYPE_CHECKING:
3308 # https://github.com/python/typing/discussions/1095
3309
3310 @property
3311 def pool(self) -> Pool:
3312 return self._proxied.pool
3313
3314 @pool.setter
3315 def pool(self, pool: Pool) -> None:
3316 self._proxied.pool = pool
3317
3318 @property
3319 def _has_events(self) -> bool:
3320 return self._proxied._has_events or self.__dict__.get(
3321 "_has_events", False
3322 )
3323
3324 @_has_events.setter
3325 def _has_events(self, value: bool) -> None:
3326 self.__dict__["_has_events"] = value
3327
3328
3329class OptionEngine(OptionEngineMixin, Engine):
3330 def update_execution_options(self, **opt: Any) -> None:
3331 Engine.update_execution_options(self, **opt)
3332
3333
3334Engine._option_cls = OptionEngine