1# engine/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7from __future__ import with_statement
8
9import contextlib
10import sys
11
12from .interfaces import Connectable
13from .interfaces import ExceptionContext
14from .util import _distill_params
15from .util import _distill_params_20
16from .util import TransactionalContext
17from .. import exc
18from .. import inspection
19from .. import log
20from .. import util
21from ..sql import compiler
22from ..sql import util as sql_util
23
24
25"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.
26
27"""
28
29_EMPTY_EXECUTION_OPTS = util.immutabledict()
30
31
32class Connection(Connectable):
33 """Provides high-level functionality for a wrapped DB-API connection.
34
35 **This is the SQLAlchemy 1.x.x version** of the :class:`_engine.Connection`
36 class. For the :term:`2.0 style` version, which features some API
37 differences, see :class:`_future.Connection`.
38
39 The :class:`_engine.Connection` object is procured by calling
40 the :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine`
41 object, and provides services for execution of SQL statements as well
42 as transaction control.
43
44 The Connection object is **not** thread-safe. While a Connection can be
45 shared among threads using properly synchronized access, it is still
46 possible that the underlying DBAPI connection may not support shared
47 access between threads. Check the DBAPI documentation for details.
48
49 The Connection object represents a single DBAPI connection checked out
50 from the connection pool. In this state, the connection pool has no affect
51 upon the connection, including its expiration or timeout state. For the
52 connection pool to properly manage connections, connections should be
53 returned to the connection pool (i.e. ``connection.close()``) whenever the
54 connection is not in use.
55
56 .. index::
57 single: thread safety; Connection
58
59 """
60
61 _is_future = False
62 _sqla_logger_namespace = "sqlalchemy.engine.Connection"
63
64 # used by sqlalchemy.engine.util.TransactionalContext
65 _trans_context_manager = None
66
67 def __init__(
68 self,
69 engine,
70 connection=None,
71 close_with_result=False,
72 _branch_from=None,
73 _execution_options=None,
74 _dispatch=None,
75 _has_events=None,
76 _allow_revalidate=True,
77 ):
78 """Construct a new Connection."""
79 self.engine = engine
80 self.dialect = engine.dialect
81 self.__branch_from = _branch_from
82
83 if _branch_from:
84 # branching is always "from" the root connection
85 assert _branch_from.__branch_from is None
86 self._dbapi_connection = connection
87 self._execution_options = _execution_options
88 self._echo = _branch_from._echo
89 self.should_close_with_result = False
90 self.dispatch = _dispatch
91 self._has_events = _branch_from._has_events
92 else:
93 self._dbapi_connection = (
94 connection
95 if connection is not None
96 else engine.raw_connection()
97 )
98
99 self._transaction = self._nested_transaction = None
100 self.__savepoint_seq = 0
101 self.__in_begin = False
102 self.should_close_with_result = close_with_result
103
104 self.__can_reconnect = _allow_revalidate
105 self._echo = self.engine._should_log_info()
106
107 if _has_events is None:
108 # if _has_events is sent explicitly as False,
109 # then don't join the dispatch of the engine; we don't
110 # want to handle any of the engine's events in that case.
111 self.dispatch = self.dispatch._join(engine.dispatch)
112 self._has_events = _has_events or (
113 _has_events is None and engine._has_events
114 )
115
116 assert not _execution_options
117 self._execution_options = engine._execution_options
118
119 if self._has_events or self.engine._has_events:
120 self.dispatch.engine_connect(self, _branch_from is not None)
121
122 @util.memoized_property
123 def _message_formatter(self):
124 if "logging_token" in self._execution_options:
125 token = self._execution_options["logging_token"]
126 return lambda msg: "[%s] %s" % (token, msg)
127 else:
128 return None
129
130 def _log_info(self, message, *arg, **kw):
131 fmt = self._message_formatter
132
133 if fmt:
134 message = fmt(message)
135
136 if log.STACKLEVEL:
137 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
138
139 self.engine.logger.info(message, *arg, **kw)
140
141 def _log_debug(self, message, *arg, **kw):
142 fmt = self._message_formatter
143
144 if fmt:
145 message = fmt(message)
146
147 if log.STACKLEVEL:
148 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET
149
150 self.engine.logger.debug(message, *arg, **kw)
151
152 @property
153 def _schema_translate_map(self):
154 return self._execution_options.get("schema_translate_map", None)
155
156 def schema_for_object(self, obj):
157 """Return the schema name for the given schema item taking into
158 account current schema translate map.
159
160 """
161
162 name = obj.schema
163 schema_translate_map = self._execution_options.get(
164 "schema_translate_map", None
165 )
166
167 if (
168 schema_translate_map
169 and name in schema_translate_map
170 and obj._use_schema_map
171 ):
172 return schema_translate_map[name]
173 else:
174 return name
175
176 def _branch(self):
177 """Return a new Connection which references this Connection's
178 engine and connection; but does not have close_with_result enabled,
179 and also whose close() method does nothing.
180
181 .. deprecated:: 1.4 the "branching" concept will be removed in
182 SQLAlchemy 2.0 as well as the "Connection.connect()" method which
183 is the only consumer for this.
184
185 The Core uses this very sparingly, only in the case of
186 custom SQL default functions that are to be INSERTed as the
187 primary key of a row where we need to get the value back, so we have
188 to invoke it distinctly - this is a very uncommon case.
189
190 Userland code accesses _branch() when the connect()
191 method is called. The branched connection
192 acts as much as possible like the parent, except that it stays
193 connected when a close() event occurs.
194
195 """
196 return self.engine._connection_cls(
197 self.engine,
198 self._dbapi_connection,
199 _branch_from=self.__branch_from if self.__branch_from else self,
200 _execution_options=self._execution_options,
201 _has_events=self._has_events,
202 _dispatch=self.dispatch,
203 )
204
205 def _generate_for_options(self):
206 """define connection method chaining behavior for execution_options"""
207
208 if self._is_future:
209 return self
210 else:
211 c = self.__class__.__new__(self.__class__)
212 c.__dict__ = self.__dict__.copy()
213 return c
214
215 def __enter__(self):
216 return self
217
218 def __exit__(self, type_, value, traceback):
219 self.close()
220
221 def execution_options(self, **opt):
222 r""" Set non-SQL options for the connection which take effect
223 during execution.
224
225 For a "future" style connection, this method returns this same
226 :class:`_future.Connection` object with the new options added.
227
228 For a legacy connection, this method returns a copy of this
229 :class:`_engine.Connection` which references the same underlying DBAPI
230 connection, but also defines the given execution options which will
231 take effect for a call to
232 :meth:`execute`. As the new :class:`_engine.Connection` references the
233 same underlying resource, it's usually a good idea to ensure that
234 the copies will be discarded immediately, which is implicit if used
235 as in::
236
237 result = connection.execution_options(stream_results=True).\
238 execute(stmt)
239
240 Note that any key/value can be passed to
241 :meth:`_engine.Connection.execution_options`,
242 and it will be stored in the
243 ``_execution_options`` dictionary of the :class:`_engine.Connection`.
244 It
245 is suitable for usage by end-user schemes to communicate with
246 event listeners, for example.
247
248 The keywords that are currently recognized by SQLAlchemy itself
249 include all those listed under :meth:`.Executable.execution_options`,
250 as well as others that are specific to :class:`_engine.Connection`.
251
252 :param autocommit: Available on: Connection, statement.
253 When True, a COMMIT will be invoked after execution
254 when executed in 'autocommit' mode, i.e. when an explicit
255 transaction is not begun on the connection. Note that this
256 is **library level, not DBAPI level autocommit**. The DBAPI
257 connection will remain in a real transaction unless the
258 "AUTOCOMMIT" isolation level is used.
259
260 .. deprecated:: 1.4 The "autocommit" execution option is deprecated
261 and will be removed in SQLAlchemy 2.0. See
262 :ref:`migration_20_autocommit` for discussion.
263
264 :param compiled_cache: Available on: Connection.
265 A dictionary where :class:`.Compiled` objects
266 will be cached when the :class:`_engine.Connection`
267 compiles a clause
268 expression into a :class:`.Compiled` object. This dictionary will
269 supersede the statement cache that may be configured on the
270 :class:`_engine.Engine` itself. If set to None, caching
271 is disabled, even if the engine has a configured cache size.
272
273 Note that the ORM makes use of its own "compiled" caches for
274 some operations, including flush operations. The caching
275 used by the ORM internally supersedes a cache dictionary
276 specified here.
277
278 :param logging_token: Available on: :class:`_engine.Connection`,
279 :class:`_engine.Engine`.
280
281 Adds the specified string token surrounded by brackets in log
282 messages logged by the connection, i.e. the logging that's enabled
283 either via the :paramref:`_sa.create_engine.echo` flag or via the
284 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a
285 per-connection or per-sub-engine token to be available which is
286 useful for debugging concurrent connection scenarios.
287
288 .. versionadded:: 1.4.0b2
289
290 .. seealso::
291
292 :ref:`dbengine_logging_tokens` - usage example
293
294 :paramref:`_sa.create_engine.logging_name` - adds a name to the
295 name used by the Python logger object itself.
296
297 :param isolation_level: Available on: :class:`_engine.Connection`.
298
299 Set the transaction isolation level for the lifespan of this
300 :class:`_engine.Connection` object.
301 Valid values include those string
302 values accepted by the :paramref:`_sa.create_engine.isolation_level`
303 parameter passed to :func:`_sa.create_engine`. These levels are
304 semi-database specific; see individual dialect documentation for
305 valid levels.
306
307 The isolation level option applies the isolation level by emitting
308 statements on the DBAPI connection, and **necessarily affects the
309 original Connection object overall**, not just the copy that is
310 returned by the call to :meth:`_engine.Connection.execution_options`
311 method. The isolation level will remain at the given setting until
312 the DBAPI connection itself is returned to the connection pool, i.e.
313 the :meth:`_engine.Connection.close` method on the original
314 :class:`_engine.Connection` is called,
315 where an event handler will emit
316 additional statements on the DBAPI connection in order to revert the
317 isolation level change.
318
319 .. warning:: The ``isolation_level`` execution option should
320 **not** be used when a transaction is already established, that
321 is, the :meth:`_engine.Connection.begin`
322 method or similar has been
323 called. A database cannot change the isolation level on a
324 transaction in progress, and different DBAPIs and/or
325 SQLAlchemy dialects may implicitly roll back or commit
326 the transaction, or not affect the connection at all.
327
328 .. note:: The ``isolation_level`` execution option is implicitly
329 reset if the :class:`_engine.Connection` is invalidated, e.g. via
330 the :meth:`_engine.Connection.invalidate` method, or if a
331 disconnection error occurs. The new connection produced after
332 the invalidation will not have the isolation level re-applied
333 to it automatically.
334
335 .. seealso::
336
337 :paramref:`_sa.create_engine.isolation_level`
338 - set per :class:`_engine.Engine` isolation level
339
340 :meth:`_engine.Connection.get_isolation_level`
341 - view current actual level
342
343 :ref:`SQLite Transaction Isolation <sqlite_isolation_level>`
344
345 :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>`
346
347 :ref:`MySQL Transaction Isolation <mysql_isolation_level>`
348
349 :ref:`SQL Server Transaction Isolation <mssql_isolation_level>`
350
351 :ref:`session_transaction_isolation` - for the ORM
352
353 :param no_parameters: When ``True``, if the final parameter
354 list or dictionary is totally empty, will invoke the
355 statement on the cursor as ``cursor.execute(statement)``,
356 not passing the parameter collection at all.
357 Some DBAPIs such as psycopg2 and mysql-python consider
358 percent signs as significant only when parameters are
359 present; this option allows code to generate SQL
360 containing percent signs (and possibly other characters)
361 that is neutral regarding whether it's executed by the DBAPI
362 or piped into a script that's later invoked by
363 command line tools.
364
365 :param stream_results: Available on: Connection, statement.
366 Indicate to the dialect that results should be
367 "streamed" and not pre-buffered, if possible. For backends
368 such as PostgreSQL, MySQL and MariaDB, this indicates the use of
369 a "server side cursor" as opposed to a client side cursor.
370 Other backends such as that of Oracle may already use server
371 side cursors by default.
372
373 The usage of
374 :paramref:`_engine.Connection.execution_options.stream_results` is
375 usually combined with setting a fixed number of rows to to be fetched
376 in batches, to allow for efficient iteration of database rows while
377 at the same time not loading all result rows into memory at once;
378 this can be configured on a :class:`_engine.Result` object using the
379 :meth:`_engine.Result.yield_per` method, after execution has
380 returned a new :class:`_engine.Result`. If
381 :meth:`_engine.Result.yield_per` is not used,
382 the :paramref:`_engine.Connection.execution_options.stream_results`
383 mode of operation will instead use a dynamically sized buffer
384 which buffers sets of rows at a time, growing on each batch
385 based on a fixed growth size up until a limit which may
386 be configured using the
387 :paramref:`_engine.Connection.execution_options.max_row_buffer`
388 parameter.
389
390 When using the ORM to fetch ORM mapped objects from a result,
391 :meth:`_engine.Result.yield_per` should always be used with
392 :paramref:`_engine.Connection.execution_options.stream_results`,
393 so that the ORM does not fetch all rows into new ORM objects at once.
394
395 For typical use, the
396 :paramref:`_engine.Connection.execution_options.yield_per` execution
397 option should be preferred, which sets up both
398 :paramref:`_engine.Connection.execution_options.stream_results` and
399 :meth:`_engine.Result.yield_per` at once. This option is supported
400 both at a core level by :class:`_engine.Connection` as well as by the
401 ORM :class:`_engine.Session`; the latter is described at
402 :ref:`orm_queryguide_yield_per`.
403
404 .. seealso::
405
406 :ref:`engine_stream_results` - background on
407 :paramref:`_engine.Connection.execution_options.stream_results`
408
409 :paramref:`_engine.Connection.execution_options.max_row_buffer`
410
411 :paramref:`_engine.Connection.execution_options.yield_per`
412
413 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
414 describing the ORM version of ``yield_per``
415
416 :param max_row_buffer: Available on: :class:`_engine.Connection`,
417 :class:`_sql.Executable`. Sets a maximum
418 buffer size to use when the
419 :paramref:`_engine.Connection.execution_options.stream_results`
420 execution option is used on a backend that supports server side
421 cursors. The default value if not specified is 1000.
422
423 .. seealso::
424
425 :paramref:`_engine.Connection.execution_options.stream_results`
426
427 :ref:`engine_stream_results`
428
429
430 :param yield_per: Available on: :class:`_engine.Connection`,
431 :class:`_sql.Executable`. Integer value applied which will
432 set the :paramref:`_engine.Connection.execution_options.stream_results`
433 execution option and invoke :meth:`_engine.Result.yield_per`
434 automatically at once. Allows equivalent functionality as
435 is present when using this parameter with the ORM.
436
437 .. versionadded:: 1.4.40
438
439 .. seealso::
440
441 :ref:`engine_stream_results` - background and examples
442 on using server side cursors with Core.
443
444 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
445 describing the ORM version of ``yield_per``
446
447 :param schema_translate_map: Available on: :class:`_engine.Connection`,
448 :class:`_engine.Engine`, :class:`_sql.Executable`.
449
450 :param schema_translate_map: Available on: Connection, Engine.
451 A dictionary mapping schema names to schema names, that will be
452 applied to the :paramref:`_schema.Table.schema` element of each
453 :class:`_schema.Table`
454 encountered when SQL or DDL expression elements
455 are compiled into strings; the resulting schema name will be
456 converted based on presence in the map of the original name.
457
458 .. versionadded:: 1.1
459
460 .. seealso::
461
462 :ref:`schema_translating`
463
464 .. seealso::
465
466 :meth:`_engine.Engine.execution_options`
467
468 :meth:`.Executable.execution_options`
469
470 :meth:`_engine.Connection.get_execution_options`
471
472
473 """ # noqa
474 c = self._generate_for_options()
475 c._execution_options = c._execution_options.union(opt)
476 if self._has_events or self.engine._has_events:
477 self.dispatch.set_connection_execution_options(c, opt)
478 self.dialect.set_connection_execution_options(c, opt)
479 return c
480
481 def get_execution_options(self):
482 """Get the non-SQL options which will take effect during execution.
483
484 .. versionadded:: 1.3
485
486 .. seealso::
487
488 :meth:`_engine.Connection.execution_options`
489 """
490 return self._execution_options
491
492 @property
493 def closed(self):
494 """Return True if this connection is closed."""
495
496 # note this is independent for a "branched" connection vs.
497 # the base
498
499 return self._dbapi_connection is None and not self.__can_reconnect
500
501 @property
502 def invalidated(self):
503 """Return True if this connection was invalidated."""
504
505 # prior to 1.4, "invalid" was stored as a state independent of
506 # "closed", meaning an invalidated connection could be "closed",
507 # the _dbapi_connection would be None and closed=True, yet the
508 # "invalid" flag would stay True. This meant that there were
509 # three separate states (open/valid, closed/valid, closed/invalid)
510 # when there is really no reason for that; a connection that's
511 # "closed" does not need to be "invalid". So the state is now
512 # represented by the two facts alone.
513
514 if self.__branch_from:
515 return self.__branch_from.invalidated
516
517 return self._dbapi_connection is None and not self.closed
518
519 @property
520 def connection(self):
521 """The underlying DB-API connection managed by this Connection.
522
523 This is a SQLAlchemy connection-pool proxied connection
524 which then has the attribute
525 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the
526 actual driver connection.
527
528 .. seealso::
529
530
531 :ref:`dbapi_connections`
532
533 """
534
535 if self._dbapi_connection is None:
536 try:
537 return self._revalidate_connection()
538 except (exc.PendingRollbackError, exc.ResourceClosedError):
539 raise
540 except BaseException as e:
541 self._handle_dbapi_exception(e, None, None, None, None)
542 else:
543 return self._dbapi_connection
544
545 def get_isolation_level(self):
546 """Return the current **actual** isolation level that's present on
547 the database within the scope of this connection.
548
549 This attribute will perform a live SQL operation against the database
550 in order to procure the current isolation level, so the value returned
551 is the actual level on the underlying DBAPI connection regardless of
552 how this state was set. This will be one of the four actual isolation
553 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
554 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation
555 level setting. Third party dialects may also feature additional
556 isolation level settings.
557
558 .. note:: This method **will not report** on the ``AUTOCOMMIT``
559 isolation level, which is a separate :term:`dbapi` setting that's
560 independent of **actual** isolation level. When ``AUTOCOMMIT`` is
561 in use, the database connection still has a "traditional" isolation
562 mode in effect, that is typically one of the four values
563 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``,
564 ``SERIALIZABLE``.
565
566 Compare to the :attr:`_engine.Connection.default_isolation_level`
567 accessor which returns the isolation level that is present on the
568 database at initial connection time.
569
570 .. versionadded:: 0.9.9
571
572 .. seealso::
573
574 :attr:`_engine.Connection.default_isolation_level`
575 - view default level
576
577 :paramref:`_sa.create_engine.isolation_level`
578 - set per :class:`_engine.Engine` isolation level
579
580 :paramref:`.Connection.execution_options.isolation_level`
581 - set per :class:`_engine.Connection` isolation level
582
583 """
584 try:
585 return self.dialect.get_isolation_level(self.connection)
586 except BaseException as e:
587 self._handle_dbapi_exception(e, None, None, None, None)
588
589 @property
590 def default_isolation_level(self):
591 """The initial-connection time isolation level associated with the
592 :class:`_engine.Dialect` in use.
593
594 This value is independent of the
595 :paramref:`.Connection.execution_options.isolation_level` and
596 :paramref:`.Engine.execution_options.isolation_level` execution
597 options, and is determined by the :class:`_engine.Dialect` when the
598 first connection is created, by performing a SQL query against the
599 database for the current isolation level before any additional commands
600 have been emitted.
601
602 Calling this accessor does not invoke any new SQL queries.
603
604 .. versionadded:: 0.9.9
605
606 .. seealso::
607
608 :meth:`_engine.Connection.get_isolation_level`
609 - view current actual isolation level
610
611 :paramref:`_sa.create_engine.isolation_level`
612 - set per :class:`_engine.Engine` isolation level
613
614 :paramref:`.Connection.execution_options.isolation_level`
615 - set per :class:`_engine.Connection` isolation level
616
617 """
618 return self.dialect.default_isolation_level
619
620 def _invalid_transaction(self):
621 if self.invalidated:
622 raise exc.PendingRollbackError(
623 "Can't reconnect until invalid %stransaction is rolled "
624 "back."
625 % (
626 "savepoint "
627 if self._nested_transaction is not None
628 else ""
629 ),
630 code="8s2b",
631 )
632 else:
633 assert not self._is_future
634 raise exc.PendingRollbackError(
635 "This connection is on an inactive %stransaction. "
636 "Please rollback() fully before proceeding."
637 % (
638 "savepoint "
639 if self._nested_transaction is not None
640 else ""
641 ),
642 code="8s2a",
643 )
644
645 def _revalidate_connection(self):
646 if self.__branch_from:
647 return self.__branch_from._revalidate_connection()
648 if self.__can_reconnect and self.invalidated:
649 if self._transaction is not None:
650 self._invalid_transaction()
651 self._dbapi_connection = self.engine.raw_connection(
652 _connection=self
653 )
654 return self._dbapi_connection
655 raise exc.ResourceClosedError("This Connection is closed")
656
657 @property
658 def _still_open_and_dbapi_connection_is_valid(self):
659 return self._dbapi_connection is not None and getattr(
660 self._dbapi_connection, "is_valid", False
661 )
662
663 @property
664 def info(self):
665 """Info dictionary associated with the underlying DBAPI connection
666 referred to by this :class:`_engine.Connection`, allowing user-defined
667 data to be associated with the connection.
668
669 The data here will follow along with the DBAPI connection including
670 after it is returned to the connection pool and used again
671 in subsequent instances of :class:`_engine.Connection`.
672
673 """
674
675 return self.connection.info
676
677 @util.deprecated_20(":meth:`.Connection.connect`")
678 def connect(self, close_with_result=False):
679 """Returns a branched version of this :class:`_engine.Connection`.
680
681 The :meth:`_engine.Connection.close` method on the returned
682 :class:`_engine.Connection` can be called and this
683 :class:`_engine.Connection` will remain open.
684
685 This method provides usage symmetry with
686 :meth:`_engine.Engine.connect`, including for usage
687 with context managers.
688
689 """
690
691 return self._branch()
692
693 def invalidate(self, exception=None):
694 """Invalidate the underlying DBAPI connection associated with
695 this :class:`_engine.Connection`.
696
697 An attempt will be made to close the underlying DBAPI connection
698 immediately; however if this operation fails, the error is logged
699 but not raised. The connection is then discarded whether or not
700 close() succeeded.
701
702 Upon the next use (where "use" typically means using the
703 :meth:`_engine.Connection.execute` method or similar),
704 this :class:`_engine.Connection` will attempt to
705 procure a new DBAPI connection using the services of the
706 :class:`_pool.Pool` as a source of connectivity (e.g.
707 a "reconnection").
708
709 If a transaction was in progress (e.g. the
710 :meth:`_engine.Connection.begin` method has been called) when
711 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI
712 level all state associated with this transaction is lost, as
713 the DBAPI connection is closed. The :class:`_engine.Connection`
714 will not allow a reconnection to proceed until the
715 :class:`.Transaction` object is ended, by calling the
716 :meth:`.Transaction.rollback` method; until that point, any attempt at
717 continuing to use the :class:`_engine.Connection` will raise an
718 :class:`~sqlalchemy.exc.InvalidRequestError`.
719 This is to prevent applications from accidentally
720 continuing an ongoing transactional operations despite the
721 fact that the transaction has been lost due to an
722 invalidation.
723
724 The :meth:`_engine.Connection.invalidate` method,
725 just like auto-invalidation,
726 will at the connection pool level invoke the
727 :meth:`_events.PoolEvents.invalidate` event.
728
729 :param exception: an optional ``Exception`` instance that's the
730 reason for the invalidation. is passed along to event handlers
731 and logging functions.
732
733 .. seealso::
734
735 :ref:`pool_connection_invalidation`
736
737 """
738
739 if self.__branch_from:
740 return self.__branch_from.invalidate(exception=exception)
741
742 if self.invalidated:
743 return
744
745 if self.closed:
746 raise exc.ResourceClosedError("This Connection is closed")
747
748 if self._still_open_and_dbapi_connection_is_valid:
749 self._dbapi_connection.invalidate(exception)
750 self._dbapi_connection = None
751
752 def detach(self):
753 """Detach the underlying DB-API connection from its connection pool.
754
755 E.g.::
756
757 with engine.connect() as conn:
758 conn.detach()
759 conn.execute(text("SET search_path TO schema1, schema2"))
760
761 # work with connection
762
763 # connection is fully closed (since we used "with:", can
764 # also call .close())
765
766 This :class:`_engine.Connection` instance will remain usable.
767 When closed
768 (or exited from a context manager context as above),
769 the DB-API connection will be literally closed and not
770 returned to its originating pool.
771
772 This method can be used to insulate the rest of an application
773 from a modified state on a connection (such as a transaction
774 isolation level or similar).
775
776 """
777
778 self._dbapi_connection.detach()
779
780 def _autobegin(self):
781 self.begin()
782
783 def begin(self):
784 """Begin a transaction and return a transaction handle.
785
786 The returned object is an instance of :class:`.Transaction`.
787 This object represents the "scope" of the transaction,
788 which completes when either the :meth:`.Transaction.rollback`
789 or :meth:`.Transaction.commit` method is called.
790
791 .. tip::
792
793 The :meth:`_engine.Connection.begin` method is invoked when using
794 the :meth:`_engine.Engine.begin` context manager method as well.
795 All documentation that refers to behaviors specific to the
796 :meth:`_engine.Connection.begin` method also apply to use of the
797 :meth:`_engine.Engine.begin` method.
798
799 Legacy use: nested calls to :meth:`.begin` on the same
800 :class:`_engine.Connection` will return new :class:`.Transaction`
801 objects that represent an emulated transaction within the scope of the
802 enclosing transaction, that is::
803
804 trans = conn.begin() # outermost transaction
805 trans2 = conn.begin() # "nested"
806 trans2.commit() # does nothing
807 trans.commit() # actually commits
808
809 Calls to :meth:`.Transaction.commit` only have an effect
810 when invoked via the outermost :class:`.Transaction` object, though the
811 :meth:`.Transaction.rollback` method of any of the
812 :class:`.Transaction` objects will roll back the
813 transaction.
814
815 .. tip::
816
817 The above "nesting" behavior is a legacy behavior specific to
818 :term:`1.x style` use and will be removed in SQLAlchemy 2.0. For
819 notes on :term:`2.0 style` use, see
820 :meth:`_future.Connection.begin`.
821
822
823 .. seealso::
824
825 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT
826
827 :meth:`_engine.Connection.begin_twophase` -
828 use a two phase /XID transaction
829
830 :meth:`_engine.Engine.begin` - context manager available from
831 :class:`_engine.Engine`
832
833 """
834 if self._is_future:
835 assert not self.__branch_from
836 elif self.__branch_from:
837 return self.__branch_from.begin()
838
839 if self.__in_begin:
840 # for dialects that emit SQL within the process of
841 # dialect.do_begin() or dialect.do_begin_twophase(), this
842 # flag prevents "autobegin" from being emitted within that
843 # process, while allowing self._transaction to remain at None
844 # until it's complete.
845 return
846 elif self._transaction is None:
847 self._transaction = RootTransaction(self)
848 return self._transaction
849 else:
850 if self._is_future:
851 raise exc.InvalidRequestError(
852 "This connection has already initialized a SQLAlchemy "
853 "Transaction() object via begin() or autobegin; can't "
854 "call begin() here unless rollback() or commit() "
855 "is called first."
856 )
857 else:
858 return MarkerTransaction(self)
859
860 def begin_nested(self):
861 """Begin a nested transaction (i.e. SAVEPOINT) and return a
862 transaction handle, assuming an outer transaction is already
863 established.
864
865 Nested transactions require SAVEPOINT support in the
866 underlying database. Any transaction in the hierarchy may
867 ``commit`` and ``rollback``, however the outermost transaction
868 still controls the overall ``commit`` or ``rollback`` of the
869 transaction of a whole.
870
871 The legacy form of :meth:`_engine.Connection.begin_nested` method has
872 alternate behaviors based on whether or not the
873 :meth:`_engine.Connection.begin` method was called previously. If
874 :meth:`_engine.Connection.begin` was not called, then this method will
875 behave the same as the :meth:`_engine.Connection.begin` method and
876 return a :class:`.RootTransaction` object that begins and commits a
877 real transaction - **no savepoint is invoked**. If
878 :meth:`_engine.Connection.begin` **has** been called, and a
879 :class:`.RootTransaction` is already established, then this method
880 returns an instance of :class:`.NestedTransaction` which will invoke
881 and manage the scope of a SAVEPOINT.
882
883 .. tip::
884
885 The above mentioned behavior of
886 :meth:`_engine.Connection.begin_nested` is a legacy behavior
887 specific to :term:`1.x style` use. In :term:`2.0 style` use, the
888 :meth:`_future.Connection.begin_nested` method instead autobegins
889 the outer transaction that can be committed using
890 "commit-as-you-go" style; see
891 :meth:`_future.Connection.begin_nested` for migration details.
892
893 .. versionchanged:: 1.4.13 The behavior of
894 :meth:`_engine.Connection.begin_nested`
895 as returning a :class:`.RootTransaction` if
896 :meth:`_engine.Connection.begin` were not called has been restored
897 as was the case in 1.3.x versions; in previous 1.4.x versions, an
898 outer transaction would be "autobegun" but would not be committed.
899
900
901 .. seealso::
902
903 :meth:`_engine.Connection.begin`
904
905 :ref:`session_begin_nested` - ORM support for SAVEPOINT
906
907 """
908 if self._is_future:
909 assert not self.__branch_from
910 elif self.__branch_from:
911 return self.__branch_from.begin_nested()
912
913 if self._transaction is None:
914 if not self._is_future:
915 util.warn_deprecated_20(
916 "Calling Connection.begin_nested() in 2.0 style use will "
917 "return a NestedTransaction (SAVEPOINT) in all cases, "
918 "that will not commit the outer transaction. For code "
919 "that is cross-compatible between 1.x and 2.0 style use, "
920 "ensure Connection.begin() is called before calling "
921 "Connection.begin_nested()."
922 )
923 return self.begin()
924 else:
925 self._autobegin()
926
927 return NestedTransaction(self)
928
929 def begin_twophase(self, xid=None):
930 """Begin a two-phase or XA transaction and return a transaction
931 handle.
932
933 The returned object is an instance of :class:`.TwoPhaseTransaction`,
934 which in addition to the methods provided by
935 :class:`.Transaction`, also provides a
936 :meth:`~.TwoPhaseTransaction.prepare` method.
937
938 :param xid: the two phase transaction id. If not supplied, a
939 random id will be generated.
940
941 .. seealso::
942
943 :meth:`_engine.Connection.begin`
944
945 :meth:`_engine.Connection.begin_twophase`
946
947 """
948
949 if self.__branch_from:
950 return self.__branch_from.begin_twophase(xid=xid)
951
952 if self._transaction is not None:
953 raise exc.InvalidRequestError(
954 "Cannot start a two phase transaction when a transaction "
955 "is already in progress."
956 )
957 if xid is None:
958 xid = self.engine.dialect.create_xid()
959 return TwoPhaseTransaction(self, xid)
960
961 def recover_twophase(self):
962 return self.engine.dialect.do_recover_twophase(self)
963
964 def rollback_prepared(self, xid, recover=False):
965 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover)
966
967 def commit_prepared(self, xid, recover=False):
968 self.engine.dialect.do_commit_twophase(self, xid, recover=recover)
969
970 def in_transaction(self):
971 """Return True if a transaction is in progress."""
972 if self.__branch_from is not None:
973 return self.__branch_from.in_transaction()
974
975 return self._transaction is not None and self._transaction.is_active
976
977 def in_nested_transaction(self):
978 """Return True if a transaction is in progress."""
979 if self.__branch_from is not None:
980 return self.__branch_from.in_nested_transaction()
981
982 return (
983 self._nested_transaction is not None
984 and self._nested_transaction.is_active
985 )
986
987 def _is_autocommit_isolation(self):
988 opt_iso = self._execution_options.get("isolation_level", None)
989 return bool(
990 opt_iso == "AUTOCOMMIT"
991 or (
992 opt_iso is None
993 and getattr(self.engine.dialect, "isolation_level", None)
994 == "AUTOCOMMIT"
995 )
996 )
997
998 def get_transaction(self):
999 """Return the current root transaction in progress, if any.
1000
1001 .. versionadded:: 1.4
1002
1003 """
1004
1005 if self.__branch_from is not None:
1006 return self.__branch_from.get_transaction()
1007
1008 return self._transaction
1009
1010 def get_nested_transaction(self):
1011 """Return the current nested transaction in progress, if any.
1012
1013 .. versionadded:: 1.4
1014
1015 """
1016 if self.__branch_from is not None:
1017
1018 return self.__branch_from.get_nested_transaction()
1019
1020 return self._nested_transaction
1021
1022 def _begin_impl(self, transaction):
1023 assert not self.__branch_from
1024
1025 if self._echo:
1026 if self._is_autocommit_isolation():
1027 self._log_info(
1028 "BEGIN (implicit; DBAPI should not BEGIN due to "
1029 "autocommit mode)"
1030 )
1031 else:
1032 self._log_info("BEGIN (implicit)")
1033
1034 self.__in_begin = True
1035
1036 if self._has_events or self.engine._has_events:
1037 self.dispatch.begin(self)
1038
1039 try:
1040 self.engine.dialect.do_begin(self.connection)
1041 except BaseException as e:
1042 self._handle_dbapi_exception(e, None, None, None, None)
1043 finally:
1044 self.__in_begin = False
1045
1046 def _rollback_impl(self):
1047 assert not self.__branch_from
1048
1049 if self._has_events or self.engine._has_events:
1050 self.dispatch.rollback(self)
1051
1052 if self._still_open_and_dbapi_connection_is_valid:
1053 if self._echo:
1054 if self._is_autocommit_isolation():
1055 self._log_info(
1056 "ROLLBACK using DBAPI connection.rollback(), "
1057 "DBAPI should ignore due to autocommit mode"
1058 )
1059 else:
1060 self._log_info("ROLLBACK")
1061 try:
1062 self.engine.dialect.do_rollback(self.connection)
1063 except BaseException as e:
1064 self._handle_dbapi_exception(e, None, None, None, None)
1065
1066 def _commit_impl(self, autocommit=False):
1067 assert not self.__branch_from
1068
1069 # AUTOCOMMIT isolation-level is a dialect-specific concept, however
1070 # if a connection has this set as the isolation level, we can skip
1071 # the "autocommit" warning as the operation will do "autocommit"
1072 # in any case
1073 if autocommit and not self._is_autocommit_isolation():
1074 util.warn_deprecated_20(
1075 "The current statement is being autocommitted using "
1076 "implicit autocommit, which will be removed in "
1077 "SQLAlchemy 2.0. "
1078 "Use the .begin() method of Engine or Connection in order to "
1079 "use an explicit transaction for DML and DDL statements."
1080 )
1081
1082 if self._has_events or self.engine._has_events:
1083 self.dispatch.commit(self)
1084
1085 if self._echo:
1086 if self._is_autocommit_isolation():
1087 self._log_info(
1088 "COMMIT using DBAPI connection.commit(), "
1089 "DBAPI should ignore due to autocommit mode"
1090 )
1091 else:
1092 self._log_info("COMMIT")
1093 try:
1094 self.engine.dialect.do_commit(self.connection)
1095 except BaseException as e:
1096 self._handle_dbapi_exception(e, None, None, None, None)
1097
1098 def _savepoint_impl(self, name=None):
1099 assert not self.__branch_from
1100
1101 if self._has_events or self.engine._has_events:
1102 self.dispatch.savepoint(self, name)
1103
1104 if name is None:
1105 self.__savepoint_seq += 1
1106 name = "sa_savepoint_%s" % self.__savepoint_seq
1107 if self._still_open_and_dbapi_connection_is_valid:
1108 self.engine.dialect.do_savepoint(self, name)
1109 return name
1110
1111 def _rollback_to_savepoint_impl(self, name):
1112 assert not self.__branch_from
1113
1114 if self._has_events or self.engine._has_events:
1115 self.dispatch.rollback_savepoint(self, name, None)
1116
1117 if self._still_open_and_dbapi_connection_is_valid:
1118 self.engine.dialect.do_rollback_to_savepoint(self, name)
1119
1120 def _release_savepoint_impl(self, name):
1121 assert not self.__branch_from
1122
1123 if self._has_events or self.engine._has_events:
1124 self.dispatch.release_savepoint(self, name, None)
1125
1126 if self._still_open_and_dbapi_connection_is_valid:
1127 self.engine.dialect.do_release_savepoint(self, name)
1128
1129 def _begin_twophase_impl(self, transaction):
1130 assert not self.__branch_from
1131
1132 if self._echo:
1133 self._log_info("BEGIN TWOPHASE (implicit)")
1134 if self._has_events or self.engine._has_events:
1135 self.dispatch.begin_twophase(self, transaction.xid)
1136
1137 if self._still_open_and_dbapi_connection_is_valid:
1138 self.__in_begin = True
1139 try:
1140 self.engine.dialect.do_begin_twophase(self, transaction.xid)
1141 except BaseException as e:
1142 self._handle_dbapi_exception(e, None, None, None, None)
1143 finally:
1144 self.__in_begin = False
1145
1146 def _prepare_twophase_impl(self, xid):
1147 assert not self.__branch_from
1148
1149 if self._has_events or self.engine._has_events:
1150 self.dispatch.prepare_twophase(self, xid)
1151
1152 if self._still_open_and_dbapi_connection_is_valid:
1153 assert isinstance(self._transaction, TwoPhaseTransaction)
1154 try:
1155 self.engine.dialect.do_prepare_twophase(self, xid)
1156 except BaseException as e:
1157 self._handle_dbapi_exception(e, None, None, None, None)
1158
1159 def _rollback_twophase_impl(self, xid, is_prepared):
1160 assert not self.__branch_from
1161
1162 if self._has_events or self.engine._has_events:
1163 self.dispatch.rollback_twophase(self, xid, is_prepared)
1164
1165 if self._still_open_and_dbapi_connection_is_valid:
1166 assert isinstance(self._transaction, TwoPhaseTransaction)
1167 try:
1168 self.engine.dialect.do_rollback_twophase(
1169 self, xid, is_prepared
1170 )
1171 except BaseException as e:
1172 self._handle_dbapi_exception(e, None, None, None, None)
1173
1174 def _commit_twophase_impl(self, xid, is_prepared):
1175 assert not self.__branch_from
1176
1177 if self._has_events or self.engine._has_events:
1178 self.dispatch.commit_twophase(self, xid, is_prepared)
1179
1180 if self._still_open_and_dbapi_connection_is_valid:
1181 assert isinstance(self._transaction, TwoPhaseTransaction)
1182 try:
1183 self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
1184 except BaseException as e:
1185 self._handle_dbapi_exception(e, None, None, None, None)
1186
1187 def _autorollback(self):
1188 if self.__branch_from:
1189 self.__branch_from._autorollback()
1190
1191 if not self.in_transaction():
1192 self._rollback_impl()
1193
1194 def _warn_for_legacy_exec_format(self):
1195 util.warn_deprecated_20(
1196 "The connection.execute() method in "
1197 "SQLAlchemy 2.0 will accept parameters as a single "
1198 "dictionary or a "
1199 "single sequence of dictionaries only. "
1200 "Parameters passed as keyword arguments, tuples or positionally "
1201 "oriented dictionaries and/or tuples "
1202 "will no longer be accepted."
1203 )
1204
1205 def close(self):
1206 """Close this :class:`_engine.Connection`.
1207
1208 This results in a release of the underlying database
1209 resources, that is, the DBAPI connection referenced
1210 internally. The DBAPI connection is typically restored
1211 back to the connection-holding :class:`_pool.Pool` referenced
1212 by the :class:`_engine.Engine` that produced this
1213 :class:`_engine.Connection`. Any transactional state present on
1214 the DBAPI connection is also unconditionally released via
1215 the DBAPI connection's ``rollback()`` method, regardless
1216 of any :class:`.Transaction` object that may be
1217 outstanding with regards to this :class:`_engine.Connection`.
1218
1219 After :meth:`_engine.Connection.close` is called, the
1220 :class:`_engine.Connection` is permanently in a closed state,
1221 and will allow no further operations.
1222
1223 """
1224
1225 if self.__branch_from:
1226 assert not self._is_future
1227 util.warn_deprecated_20(
1228 "The .close() method on a so-called 'branched' connection is "
1229 "deprecated as of 1.4, as are 'branched' connections overall, "
1230 "and will be removed in a future release. If this is a "
1231 "default-handling function, don't close the connection."
1232 )
1233 self._dbapi_connection = None
1234 self.__can_reconnect = False
1235 return
1236
1237 if self._transaction:
1238 self._transaction.close()
1239 skip_reset = True
1240 else:
1241 skip_reset = False
1242
1243 if self._dbapi_connection is not None:
1244 conn = self._dbapi_connection
1245
1246 # as we just closed the transaction, close the connection
1247 # pool connection without doing an additional reset
1248 if skip_reset:
1249 conn._close_special(transaction_reset=True)
1250 else:
1251 conn.close()
1252
1253 # There is a slight chance that conn.close() may have
1254 # triggered an invalidation here in which case
1255 # _dbapi_connection would already be None, however usually
1256 # it will be non-None here and in a "closed" state.
1257 self._dbapi_connection = None
1258 self.__can_reconnect = False
1259
1260 def scalar(self, object_, *multiparams, **params):
1261 """Executes and returns the first column of the first row.
1262
1263 The underlying result/cursor is closed after execution.
1264
1265 """
1266
1267 return self.execute(object_, *multiparams, **params).scalar()
1268
1269 def scalars(self, object_, *multiparams, **params):
1270 """Executes and returns a scalar result set, which yields scalar values
1271 from the first column of each row.
1272
1273 This method is equivalent to calling :meth:`_engine.Connection.execute`
1274 to receive a :class:`_result.Result` object, then invoking the
1275 :meth:`_result.Result.scalars` method to produce a
1276 :class:`_result.ScalarResult` instance.
1277
1278 :return: a :class:`_result.ScalarResult`
1279
1280 .. versionadded:: 1.4.24
1281
1282 """
1283
1284 return self.execute(object_, *multiparams, **params).scalars()
1285
1286 def execute(self, statement, *multiparams, **params):
1287 r"""Executes a SQL statement construct and returns a
1288 :class:`_engine.CursorResult`.
1289
1290 :param statement: The statement to be executed. May be
1291 one of:
1292
1293 * a plain string (deprecated)
1294 * any :class:`_expression.ClauseElement` construct that is also
1295 a subclass of :class:`.Executable`, such as a
1296 :func:`_expression.select` construct
1297 * a :class:`.FunctionElement`, such as that generated
1298 by :data:`.func`, will be automatically wrapped in
1299 a SELECT statement, which is then executed.
1300 * a :class:`.DDLElement` object
1301 * a :class:`.DefaultGenerator` object
1302 * a :class:`.Compiled` object
1303
1304 .. deprecated:: 2.0 passing a string to
1305 :meth:`_engine.Connection.execute` is
1306 deprecated and will be removed in version 2.0. Use the
1307 :func:`_expression.text` construct with
1308 :meth:`_engine.Connection.execute`, or the
1309 :meth:`_engine.Connection.exec_driver_sql`
1310 method to invoke a driver-level
1311 SQL string.
1312
1313 :param \*multiparams/\**params: represent bound parameter
1314 values to be used in the execution. Typically,
1315 the format is either a collection of one or more
1316 dictionaries passed to \*multiparams::
1317
1318 conn.execute(
1319 table.insert(),
1320 {"id":1, "value":"v1"},
1321 {"id":2, "value":"v2"}
1322 )
1323
1324 ...or individual key/values interpreted by \**params::
1325
1326 conn.execute(
1327 table.insert(), id=1, value="v1"
1328 )
1329
1330 In the case that a plain SQL string is passed, and the underlying
1331 DBAPI accepts positional bind parameters, a collection of tuples
1332 or individual values in \*multiparams may be passed::
1333
1334 conn.execute(
1335 "INSERT INTO table (id, value) VALUES (?, ?)",
1336 (1, "v1"), (2, "v2")
1337 )
1338
1339 conn.execute(
1340 "INSERT INTO table (id, value) VALUES (?, ?)",
1341 1, "v1"
1342 )
1343
1344 Note above, the usage of a question mark "?" or other
1345 symbol is contingent upon the "paramstyle" accepted by the DBAPI
1346 in use, which may be any of "qmark", "named", "pyformat", "format",
1347 "numeric". See `pep-249
1348 <https://www.python.org/dev/peps/pep-0249/>`_ for details on
1349 paramstyle.
1350
1351 To execute a textual SQL statement which uses bound parameters in a
1352 DBAPI-agnostic way, use the :func:`_expression.text` construct.
1353
1354 .. deprecated:: 2.0 use of tuple or scalar positional parameters
1355 is deprecated. All params should be dicts or sequences of dicts.
1356 Use :meth:`.exec_driver_sql` to execute a plain string with
1357 tuple or scalar positional parameters.
1358
1359 """
1360
1361 if isinstance(statement, util.string_types):
1362 util.warn_deprecated_20(
1363 "Passing a string to Connection.execute() is "
1364 "deprecated and will be removed in version 2.0. Use the "
1365 "text() construct, "
1366 "or the Connection.exec_driver_sql() method to invoke a "
1367 "driver-level SQL string."
1368 )
1369
1370 return self._exec_driver_sql(
1371 statement,
1372 multiparams,
1373 params,
1374 _EMPTY_EXECUTION_OPTS,
1375 future=False,
1376 )
1377
1378 try:
1379 meth = statement._execute_on_connection
1380 except AttributeError as err:
1381 util.raise_(
1382 exc.ObjectNotExecutableError(statement), replace_context=err
1383 )
1384 else:
1385 return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
1386
1387 def _execute_function(self, func, multiparams, params, execution_options):
1388 """Execute a sql.FunctionElement object."""
1389
1390 return self._execute_clauseelement(
1391 func.select(), multiparams, params, execution_options
1392 )
1393
1394 def _execute_default(
1395 self,
1396 default,
1397 multiparams,
1398 params,
1399 # migrate is calling this directly :(
1400 execution_options=_EMPTY_EXECUTION_OPTS,
1401 ):
1402 """Execute a schema.ColumnDefault object."""
1403
1404 execution_options = self._execution_options.merge_with(
1405 execution_options
1406 )
1407
1408 distilled_parameters = _distill_params(self, multiparams, params)
1409
1410 if self._has_events or self.engine._has_events:
1411 (
1412 default,
1413 distilled_params,
1414 event_multiparams,
1415 event_params,
1416 ) = self._invoke_before_exec_event(
1417 default, distilled_parameters, execution_options
1418 )
1419
1420 try:
1421 conn = self._dbapi_connection
1422 if conn is None:
1423 conn = self._revalidate_connection()
1424
1425 dialect = self.dialect
1426 ctx = dialect.execution_ctx_cls._init_default(
1427 dialect, self, conn, execution_options
1428 )
1429 except (exc.PendingRollbackError, exc.ResourceClosedError):
1430 raise
1431 except BaseException as e:
1432 self._handle_dbapi_exception(e, None, None, None, None)
1433
1434 ret = ctx._exec_default(None, default, None)
1435 if self.should_close_with_result:
1436 self.close()
1437
1438 if self._has_events or self.engine._has_events:
1439 self.dispatch.after_execute(
1440 self,
1441 default,
1442 event_multiparams,
1443 event_params,
1444 execution_options,
1445 ret,
1446 )
1447
1448 return ret
1449
1450 def _execute_ddl(self, ddl, multiparams, params, execution_options):
1451 """Execute a schema.DDL object."""
1452
1453 execution_options = ddl._execution_options.merge_with(
1454 self._execution_options, execution_options
1455 )
1456
1457 distilled_parameters = _distill_params(self, multiparams, params)
1458
1459 if self._has_events or self.engine._has_events:
1460 (
1461 ddl,
1462 distilled_params,
1463 event_multiparams,
1464 event_params,
1465 ) = self._invoke_before_exec_event(
1466 ddl, distilled_parameters, execution_options
1467 )
1468
1469 exec_opts = self._execution_options.merge_with(execution_options)
1470 schema_translate_map = exec_opts.get("schema_translate_map", None)
1471
1472 dialect = self.dialect
1473
1474 compiled = ddl.compile(
1475 dialect=dialect, schema_translate_map=schema_translate_map
1476 )
1477 ret = self._execute_context(
1478 dialect,
1479 dialect.execution_ctx_cls._init_ddl,
1480 compiled,
1481 None,
1482 execution_options,
1483 compiled,
1484 )
1485 if self._has_events or self.engine._has_events:
1486 self.dispatch.after_execute(
1487 self,
1488 ddl,
1489 event_multiparams,
1490 event_params,
1491 execution_options,
1492 ret,
1493 )
1494 return ret
1495
1496 def _invoke_before_exec_event(
1497 self, elem, distilled_params, execution_options
1498 ):
1499
1500 if len(distilled_params) == 1:
1501 event_multiparams, event_params = [], distilled_params[0]
1502 else:
1503 event_multiparams, event_params = distilled_params, {}
1504
1505 for fn in self.dispatch.before_execute:
1506 elem, event_multiparams, event_params = fn(
1507 self,
1508 elem,
1509 event_multiparams,
1510 event_params,
1511 execution_options,
1512 )
1513
1514 if event_multiparams:
1515 distilled_params = list(event_multiparams)
1516 if event_params:
1517 raise exc.InvalidRequestError(
1518 "Event handler can't return non-empty multiparams "
1519 "and params at the same time"
1520 )
1521 elif event_params:
1522 distilled_params = [event_params]
1523 else:
1524 distilled_params = []
1525
1526 return elem, distilled_params, event_multiparams, event_params
1527
1528 def _execute_clauseelement(
1529 self, elem, multiparams, params, execution_options
1530 ):
1531 """Execute a sql.ClauseElement object."""
1532
1533 execution_options = elem._execution_options.merge_with(
1534 self._execution_options, execution_options
1535 )
1536
1537 distilled_params = _distill_params(self, multiparams, params)
1538
1539 has_events = self._has_events or self.engine._has_events
1540 if has_events:
1541 (
1542 elem,
1543 distilled_params,
1544 event_multiparams,
1545 event_params,
1546 ) = self._invoke_before_exec_event(
1547 elem, distilled_params, execution_options
1548 )
1549
1550 if distilled_params:
1551 # ensure we don't retain a link to the view object for keys()
1552 # which links to the values, which we don't want to cache
1553 keys = sorted(distilled_params[0])
1554 for_executemany = len(distilled_params) > 1
1555 else:
1556 keys = []
1557 for_executemany = False
1558
1559 dialect = self.dialect
1560
1561 schema_translate_map = execution_options.get(
1562 "schema_translate_map", None
1563 )
1564
1565 compiled_cache = execution_options.get(
1566 "compiled_cache", self.engine._compiled_cache
1567 )
1568
1569 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
1570 dialect=dialect,
1571 compiled_cache=compiled_cache,
1572 column_keys=keys,
1573 for_executemany=for_executemany,
1574 schema_translate_map=schema_translate_map,
1575 linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
1576 )
1577 ret = self._execute_context(
1578 dialect,
1579 dialect.execution_ctx_cls._init_compiled,
1580 compiled_sql,
1581 distilled_params,
1582 execution_options,
1583 compiled_sql,
1584 distilled_params,
1585 elem,
1586 extracted_params,
1587 cache_hit=cache_hit,
1588 )
1589 if has_events:
1590 self.dispatch.after_execute(
1591 self,
1592 elem,
1593 event_multiparams,
1594 event_params,
1595 execution_options,
1596 ret,
1597 )
1598 return ret
1599
1600 def _execute_compiled(
1601 self,
1602 compiled,
1603 multiparams,
1604 params,
1605 execution_options=_EMPTY_EXECUTION_OPTS,
1606 ):
1607 """Execute a sql.Compiled object.
1608
1609 TODO: why do we have this? likely deprecate or remove
1610
1611 """
1612
1613 execution_options = compiled.execution_options.merge_with(
1614 self._execution_options, execution_options
1615 )
1616 distilled_parameters = _distill_params(self, multiparams, params)
1617
1618 if self._has_events or self.engine._has_events:
1619 (
1620 compiled,
1621 distilled_params,
1622 event_multiparams,
1623 event_params,
1624 ) = self._invoke_before_exec_event(
1625 compiled, distilled_parameters, execution_options
1626 )
1627
1628 dialect = self.dialect
1629
1630 ret = self._execute_context(
1631 dialect,
1632 dialect.execution_ctx_cls._init_compiled,
1633 compiled,
1634 distilled_parameters,
1635 execution_options,
1636 compiled,
1637 distilled_parameters,
1638 None,
1639 None,
1640 )
1641 if self._has_events or self.engine._has_events:
1642 self.dispatch.after_execute(
1643 self,
1644 compiled,
1645 event_multiparams,
1646 event_params,
1647 execution_options,
1648 ret,
1649 )
1650 return ret
1651
1652 def _exec_driver_sql(
1653 self, statement, multiparams, params, execution_options, future
1654 ):
1655
1656 execution_options = self._execution_options.merge_with(
1657 execution_options
1658 )
1659
1660 distilled_parameters = _distill_params(self, multiparams, params)
1661
1662 if not future:
1663 if self._has_events or self.engine._has_events:
1664 (
1665 statement,
1666 distilled_params,
1667 event_multiparams,
1668 event_params,
1669 ) = self._invoke_before_exec_event(
1670 statement, distilled_parameters, execution_options
1671 )
1672
1673 dialect = self.dialect
1674 ret = self._execute_context(
1675 dialect,
1676 dialect.execution_ctx_cls._init_statement,
1677 statement,
1678 distilled_parameters,
1679 execution_options,
1680 statement,
1681 distilled_parameters,
1682 )
1683
1684 if not future:
1685 if self._has_events or self.engine._has_events:
1686 self.dispatch.after_execute(
1687 self,
1688 statement,
1689 event_multiparams,
1690 event_params,
1691 execution_options,
1692 ret,
1693 )
1694 return ret
1695
1696 def _execute_20(
1697 self,
1698 statement,
1699 parameters=None,
1700 execution_options=_EMPTY_EXECUTION_OPTS,
1701 ):
1702 args_10style, kwargs_10style = _distill_params_20(parameters)
1703 try:
1704 meth = statement._execute_on_connection
1705 except AttributeError as err:
1706 util.raise_(
1707 exc.ObjectNotExecutableError(statement), replace_context=err
1708 )
1709 else:
1710 return meth(self, args_10style, kwargs_10style, execution_options)
1711
1712 def exec_driver_sql(
1713 self, statement, parameters=None, execution_options=None
1714 ):
1715 r"""Executes a string SQL statement on the DBAPI cursor directly,
1716 without any SQL compilation steps.
1717
1718 This can be used to pass any string directly to the
1719 ``cursor.execute()`` method of the DBAPI in use.
1720
1721 :param statement: The statement str to be executed. Bound parameters
1722 must use the underlying DBAPI's paramstyle, such as "qmark",
1723 "pyformat", "format", etc.
1724
1725 :param parameters: represent bound parameter values to be used in the
1726 execution. The format is one of: a dictionary of named parameters,
1727 a tuple of positional parameters, or a list containing either
1728 dictionaries or tuples for multiple-execute support.
1729
1730 :return: a :class:`_engine.CursorResult`.
1731
1732 E.g. multiple dictionaries::
1733
1734
1735 conn.exec_driver_sql(
1736 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1737 [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}]
1738 )
1739
1740 Single dictionary::
1741
1742 conn.exec_driver_sql(
1743 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
1744 dict(id=1, value="v1")
1745 )
1746
1747 Single tuple::
1748
1749 conn.exec_driver_sql(
1750 "INSERT INTO table (id, value) VALUES (?, ?)",
1751 (1, 'v1')
1752 )
1753
1754 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does
1755 not participate in the
1756 :meth:`_events.ConnectionEvents.before_execute` and
1757 :meth:`_events.ConnectionEvents.after_execute` events. To
1758 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use
1759 :meth:`_events.ConnectionEvents.before_cursor_execute` and
1760 :meth:`_events.ConnectionEvents.after_cursor_execute`.
1761
1762 .. seealso::
1763
1764 :pep:`249`
1765
1766 """
1767
1768 args_10style, kwargs_10style = _distill_params_20(parameters)
1769
1770 return self._exec_driver_sql(
1771 statement,
1772 args_10style,
1773 kwargs_10style,
1774 execution_options,
1775 future=True,
1776 )
1777
1778 def _execute_context(
1779 self,
1780 dialect,
1781 constructor,
1782 statement,
1783 parameters,
1784 execution_options,
1785 *args,
1786 **kw
1787 ):
1788 """Create an :class:`.ExecutionContext` and execute, returning
1789 a :class:`_engine.CursorResult`."""
1790
1791 branched = self
1792 if self.__branch_from:
1793 # if this is a "branched" connection, do everything in terms
1794 # of the "root" connection, *except* for .close(), which is
1795 # the only feature that branching provides
1796 self = self.__branch_from
1797
1798 if execution_options:
1799 yp = execution_options.get("yield_per", None)
1800 if yp:
1801 execution_options = execution_options.union(
1802 {"stream_results": True, "max_row_buffer": yp}
1803 )
1804
1805 try:
1806 conn = self._dbapi_connection
1807 if conn is None:
1808 conn = self._revalidate_connection()
1809
1810 context = constructor(
1811 dialect, self, conn, execution_options, *args, **kw
1812 )
1813 except (exc.PendingRollbackError, exc.ResourceClosedError):
1814 raise
1815 except BaseException as e:
1816 self._handle_dbapi_exception(
1817 e, util.text_type(statement), parameters, None, None
1818 )
1819
1820 if (
1821 self._transaction
1822 and not self._transaction.is_active
1823 or (
1824 self._nested_transaction
1825 and not self._nested_transaction.is_active
1826 )
1827 ):
1828 self._invalid_transaction()
1829
1830 elif self._trans_context_manager:
1831 TransactionalContext._trans_ctx_check(self)
1832
1833 if self._is_future and self._transaction is None:
1834 self._autobegin()
1835
1836 context.pre_exec()
1837
1838 if dialect.use_setinputsizes:
1839 context._set_input_sizes()
1840
1841 cursor, statement, parameters = (
1842 context.cursor,
1843 context.statement,
1844 context.parameters,
1845 )
1846
1847 if not context.executemany:
1848 parameters = parameters[0]
1849
1850 if self._has_events or self.engine._has_events:
1851 for fn in self.dispatch.before_cursor_execute:
1852 statement, parameters = fn(
1853 self,
1854 cursor,
1855 statement,
1856 parameters,
1857 context,
1858 context.executemany,
1859 )
1860
1861 if self._echo:
1862
1863 self._log_info(statement)
1864
1865 stats = context._get_cache_stats()
1866
1867 if not self.engine.hide_parameters:
1868 self._log_info(
1869 "[%s] %r",
1870 stats,
1871 sql_util._repr_params(
1872 parameters, batches=10, ismulti=context.executemany
1873 ),
1874 )
1875 else:
1876 self._log_info(
1877 "[%s] [SQL parameters hidden due to hide_parameters=True]"
1878 % (stats,)
1879 )
1880
1881 evt_handled = False
1882 try:
1883 if context.executemany:
1884 if self.dialect._has_events:
1885 for fn in self.dialect.dispatch.do_executemany:
1886 if fn(cursor, statement, parameters, context):
1887 evt_handled = True
1888 break
1889 if not evt_handled:
1890 self.dialect.do_executemany(
1891 cursor, statement, parameters, context
1892 )
1893 elif not parameters and context.no_parameters:
1894 if self.dialect._has_events:
1895 for fn in self.dialect.dispatch.do_execute_no_params:
1896 if fn(cursor, statement, context):
1897 evt_handled = True
1898 break
1899 if not evt_handled:
1900 self.dialect.do_execute_no_params(
1901 cursor, statement, context
1902 )
1903 else:
1904 if self.dialect._has_events:
1905 for fn in self.dialect.dispatch.do_execute:
1906 if fn(cursor, statement, parameters, context):
1907 evt_handled = True
1908 break
1909 if not evt_handled:
1910 self.dialect.do_execute(
1911 cursor, statement, parameters, context
1912 )
1913
1914 if self._has_events or self.engine._has_events:
1915 self.dispatch.after_cursor_execute(
1916 self,
1917 cursor,
1918 statement,
1919 parameters,
1920 context,
1921 context.executemany,
1922 )
1923
1924 context.post_exec()
1925
1926 result = context._setup_result_proxy()
1927
1928 if not self._is_future:
1929 should_close_with_result = branched.should_close_with_result
1930
1931 if not result._soft_closed and should_close_with_result:
1932 result._autoclose_connection = True
1933
1934 if (
1935 # usually we're in a transaction so avoid relatively
1936 # expensive / legacy should_autocommit call
1937 self._transaction is None
1938 and context.should_autocommit
1939 ):
1940 self._commit_impl(autocommit=True)
1941
1942 # for "connectionless" execution, we have to close this
1943 # Connection after the statement is complete.
1944 # legacy stuff.
1945 if should_close_with_result and context._soft_closed:
1946 assert not self._is_future
1947
1948 # CursorResult already exhausted rows / has no rows.
1949 # close us now
1950 branched.close()
1951
1952 except BaseException as e:
1953 self._handle_dbapi_exception(
1954 e, statement, parameters, cursor, context
1955 )
1956
1957 return result
1958
1959 def _cursor_execute(self, cursor, statement, parameters, context=None):
1960 """Execute a statement + params on the given cursor.
1961
1962 Adds appropriate logging and exception handling.
1963
1964 This method is used by DefaultDialect for special-case
1965 executions, such as for sequences and column defaults.
1966 The path of statement execution in the majority of cases
1967 terminates at _execute_context().
1968
1969 """
1970 if self._has_events or self.engine._has_events:
1971 for fn in self.dispatch.before_cursor_execute:
1972 statement, parameters = fn(
1973 self, cursor, statement, parameters, context, False
1974 )
1975
1976 if self._echo:
1977 self._log_info(statement)
1978 self._log_info("[raw sql] %r", parameters)
1979 try:
1980 for fn in (
1981 ()
1982 if not self.dialect._has_events
1983 else self.dialect.dispatch.do_execute
1984 ):
1985 if fn(cursor, statement, parameters, context):
1986 break
1987 else:
1988 self.dialect.do_execute(cursor, statement, parameters, context)
1989 except BaseException as e:
1990 self._handle_dbapi_exception(
1991 e, statement, parameters, cursor, context
1992 )
1993
1994 if self._has_events or self.engine._has_events:
1995 self.dispatch.after_cursor_execute(
1996 self, cursor, statement, parameters, context, False
1997 )
1998
1999 def _safe_close_cursor(self, cursor):
2000 """Close the given cursor, catching exceptions
2001 and turning into log warnings.
2002
2003 """
2004 try:
2005 cursor.close()
2006 except Exception:
2007 # log the error through the connection pool's logger.
2008 self.engine.pool.logger.error(
2009 "Error closing cursor", exc_info=True
2010 )
2011
2012 _reentrant_error = False
2013 _is_disconnect = False
2014
2015 def _handle_dbapi_exception(
2016 self, e, statement, parameters, cursor, context
2017 ):
2018 exc_info = sys.exc_info()
2019
2020 is_exit_exception = util.is_exit_exception(e)
2021
2022 if not self._is_disconnect:
2023 self._is_disconnect = (
2024 isinstance(e, self.dialect.dbapi.Error)
2025 and not self.closed
2026 and self.dialect.is_disconnect(
2027 e,
2028 self._dbapi_connection if not self.invalidated else None,
2029 cursor,
2030 )
2031 ) or (is_exit_exception and not self.closed)
2032
2033 invalidate_pool_on_disconnect = not is_exit_exception
2034
2035 if self._reentrant_error:
2036 util.raise_(
2037 exc.DBAPIError.instance(
2038 statement,
2039 parameters,
2040 e,
2041 self.dialect.dbapi.Error,
2042 hide_parameters=self.engine.hide_parameters,
2043 dialect=self.dialect,
2044 ismulti=context.executemany
2045 if context is not None
2046 else None,
2047 ),
2048 with_traceback=exc_info[2],
2049 from_=e,
2050 )
2051 self._reentrant_error = True
2052 try:
2053 # non-DBAPI error - if we already got a context,
2054 # or there's no string statement, don't wrap it
2055 should_wrap = isinstance(e, self.dialect.dbapi.Error) or (
2056 statement is not None
2057 and context is None
2058 and not is_exit_exception
2059 )
2060
2061 if should_wrap:
2062 sqlalchemy_exception = exc.DBAPIError.instance(
2063 statement,
2064 parameters,
2065 e,
2066 self.dialect.dbapi.Error,
2067 hide_parameters=self.engine.hide_parameters,
2068 connection_invalidated=self._is_disconnect,
2069 dialect=self.dialect,
2070 ismulti=context.executemany
2071 if context is not None
2072 else None,
2073 )
2074 else:
2075 sqlalchemy_exception = None
2076
2077 newraise = None
2078
2079 if (
2080 self._has_events or self.engine._has_events
2081 ) and not self._execution_options.get(
2082 "skip_user_error_events", False
2083 ):
2084 ctx = ExceptionContextImpl(
2085 e,
2086 sqlalchemy_exception,
2087 self.engine,
2088 self,
2089 cursor,
2090 statement,
2091 parameters,
2092 context,
2093 self._is_disconnect,
2094 invalidate_pool_on_disconnect,
2095 )
2096
2097 for fn in self.dispatch.handle_error:
2098 try:
2099 # handler returns an exception;
2100 # call next handler in a chain
2101 per_fn = fn(ctx)
2102 if per_fn is not None:
2103 ctx.chained_exception = newraise = per_fn
2104 except Exception as _raised:
2105 # handler raises an exception - stop processing
2106 newraise = _raised
2107 break
2108
2109 if self._is_disconnect != ctx.is_disconnect:
2110 self._is_disconnect = ctx.is_disconnect
2111 if sqlalchemy_exception:
2112 sqlalchemy_exception.connection_invalidated = (
2113 ctx.is_disconnect
2114 )
2115
2116 # set up potentially user-defined value for
2117 # invalidate pool.
2118 invalidate_pool_on_disconnect = (
2119 ctx.invalidate_pool_on_disconnect
2120 )
2121
2122 if should_wrap and context:
2123 context.handle_dbapi_exception(e)
2124
2125 if not self._is_disconnect:
2126 if cursor:
2127 self._safe_close_cursor(cursor)
2128 with util.safe_reraise(warn_only=True):
2129 self._autorollback()
2130
2131 if newraise:
2132 util.raise_(newraise, with_traceback=exc_info[2], from_=e)
2133 elif should_wrap:
2134 util.raise_(
2135 sqlalchemy_exception, with_traceback=exc_info[2], from_=e
2136 )
2137 else:
2138 util.raise_(exc_info[1], with_traceback=exc_info[2])
2139
2140 finally:
2141 del self._reentrant_error
2142 if self._is_disconnect:
2143 del self._is_disconnect
2144 if not self.invalidated:
2145 dbapi_conn_wrapper = self._dbapi_connection
2146 if invalidate_pool_on_disconnect:
2147 self.engine.pool._invalidate(dbapi_conn_wrapper, e)
2148 self.invalidate(e)
2149 if self.should_close_with_result:
2150 assert not self._is_future
2151 self.close()
2152
2153 @classmethod
2154 def _handle_dbapi_exception_noconnection(cls, e, dialect, engine):
2155 exc_info = sys.exc_info()
2156
2157 is_disconnect = dialect.is_disconnect(e, None, None)
2158
2159 should_wrap = isinstance(e, dialect.dbapi.Error)
2160
2161 if should_wrap:
2162 sqlalchemy_exception = exc.DBAPIError.instance(
2163 None,
2164 None,
2165 e,
2166 dialect.dbapi.Error,
2167 hide_parameters=engine.hide_parameters,
2168 connection_invalidated=is_disconnect,
2169 )
2170 else:
2171 sqlalchemy_exception = None
2172
2173 newraise = None
2174
2175 if engine._has_events:
2176 ctx = ExceptionContextImpl(
2177 e,
2178 sqlalchemy_exception,
2179 engine,
2180 None,
2181 None,
2182 None,
2183 None,
2184 None,
2185 is_disconnect,
2186 True,
2187 )
2188 for fn in engine.dispatch.handle_error:
2189 try:
2190 # handler returns an exception;
2191 # call next handler in a chain
2192 per_fn = fn(ctx)
2193 if per_fn is not None:
2194 ctx.chained_exception = newraise = per_fn
2195 except Exception as _raised:
2196 # handler raises an exception - stop processing
2197 newraise = _raised
2198 break
2199
2200 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect:
2201 sqlalchemy_exception.connection_invalidated = (
2202 is_disconnect
2203 ) = ctx.is_disconnect
2204
2205 if newraise:
2206 util.raise_(newraise, with_traceback=exc_info[2], from_=e)
2207 elif should_wrap:
2208 util.raise_(
2209 sqlalchemy_exception, with_traceback=exc_info[2], from_=e
2210 )
2211 else:
2212 util.raise_(exc_info[1], with_traceback=exc_info[2])
2213
2214 def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
2215 """run a DDL visitor.
2216
2217 This method is only here so that the MockConnection can change the
2218 options given to the visitor so that "checkfirst" is skipped.
2219
2220 """
2221 visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
2222
2223 @util.deprecated(
2224 "1.4",
2225 "The :meth:`_engine.Connection.transaction` "
2226 "method is deprecated and will be "
2227 "removed in a future release. Use the :meth:`_engine.Engine.begin` "
2228 "context manager instead.",
2229 )
2230 def transaction(self, callable_, *args, **kwargs):
2231 r"""Execute the given function within a transaction boundary.
2232
2233 The function is passed this :class:`_engine.Connection`
2234 as the first argument, followed by the given \*args and \**kwargs,
2235 e.g.::
2236
2237 def do_something(conn, x, y):
2238 conn.execute(text("some statement"), {'x':x, 'y':y})
2239
2240 conn.transaction(do_something, 5, 10)
2241
2242 The operations inside the function are all invoked within the
2243 context of a single :class:`.Transaction`.
2244 Upon success, the transaction is committed. If an
2245 exception is raised, the transaction is rolled back
2246 before propagating the exception.
2247
2248 .. note::
2249
2250 The :meth:`.transaction` method is superseded by
2251 the usage of the Python ``with:`` statement, which can
2252 be used with :meth:`_engine.Connection.begin`::
2253
2254 with conn.begin():
2255 conn.execute(text("some statement"), {'x':5, 'y':10})
2256
2257 As well as with :meth:`_engine.Engine.begin`::
2258
2259 with engine.begin() as conn:
2260 conn.execute(text("some statement"), {'x':5, 'y':10})
2261
2262 .. seealso::
2263
2264 :meth:`_engine.Engine.begin` - engine-level transactional
2265 context
2266
2267 :meth:`_engine.Engine.transaction` - engine-level version of
2268 :meth:`_engine.Connection.transaction`
2269
2270 """
2271
2272 kwargs["_sa_skip_warning"] = True
2273 trans = self.begin()
2274 try:
2275 ret = self.run_callable(callable_, *args, **kwargs)
2276 trans.commit()
2277 return ret
2278 except:
2279 with util.safe_reraise():
2280 trans.rollback()
2281
2282 @util.deprecated(
2283 "1.4",
2284 "The :meth:`_engine.Connection.run_callable` "
2285 "method is deprecated and will "
2286 "be removed in a future release. Invoke the callable function "
2287 "directly, passing the Connection.",
2288 )
2289 def run_callable(self, callable_, *args, **kwargs):
2290 r"""Given a callable object or function, execute it, passing
2291 a :class:`_engine.Connection` as the first argument.
2292
2293 The given \*args and \**kwargs are passed subsequent
2294 to the :class:`_engine.Connection` argument.
2295
2296 This function, along with :meth:`_engine.Engine.run_callable`,
2297 allows a function to be run with a :class:`_engine.Connection`
2298 or :class:`_engine.Engine` object without the need to know
2299 which one is being dealt with.
2300
2301 """
2302 return callable_(self, *args, **kwargs)
2303
2304
2305class ExceptionContextImpl(ExceptionContext):
2306 """Implement the :class:`.ExceptionContext` interface."""
2307
2308 def __init__(
2309 self,
2310 exception,
2311 sqlalchemy_exception,
2312 engine,
2313 connection,
2314 cursor,
2315 statement,
2316 parameters,
2317 context,
2318 is_disconnect,
2319 invalidate_pool_on_disconnect,
2320 ):
2321 self.engine = engine
2322 self.connection = connection
2323 self.sqlalchemy_exception = sqlalchemy_exception
2324 self.original_exception = exception
2325 self.execution_context = context
2326 self.statement = statement
2327 self.parameters = parameters
2328 self.is_disconnect = is_disconnect
2329 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
2330
2331
2332class Transaction(TransactionalContext):
2333 """Represent a database transaction in progress.
2334
2335 The :class:`.Transaction` object is procured by
2336 calling the :meth:`_engine.Connection.begin` method of
2337 :class:`_engine.Connection`::
2338
2339 from sqlalchemy import create_engine
2340 engine = create_engine("postgresql://scott:tiger@localhost/test")
2341 connection = engine.connect()
2342 trans = connection.begin()
2343 connection.execute(text("insert into x (a, b) values (1, 2)"))
2344 trans.commit()
2345
2346 The object provides :meth:`.rollback` and :meth:`.commit`
2347 methods in order to control transaction boundaries. It
2348 also implements a context manager interface so that
2349 the Python ``with`` statement can be used with the
2350 :meth:`_engine.Connection.begin` method::
2351
2352 with connection.begin():
2353 connection.execute(text("insert into x (a, b) values (1, 2)"))
2354
2355 The Transaction object is **not** threadsafe.
2356
2357 .. seealso::
2358
2359 :meth:`_engine.Connection.begin`
2360
2361 :meth:`_engine.Connection.begin_twophase`
2362
2363 :meth:`_engine.Connection.begin_nested`
2364
2365 .. index::
2366 single: thread safety; Transaction
2367 """
2368
2369 __slots__ = ()
2370
2371 _is_root = False
2372
2373 def __init__(self, connection):
2374 raise NotImplementedError()
2375
2376 def _do_deactivate(self):
2377 """do whatever steps are necessary to set this transaction as
2378 "deactive", however leave this transaction object in place as far
2379 as the connection's state.
2380
2381 for a "real" transaction this should roll back the transaction
2382 and ensure this transaction is no longer a reset agent.
2383
2384 this is used for nesting of marker transactions where the marker
2385 can set the "real" transaction as rolled back, however it stays
2386 in place.
2387
2388 for 2.0 we hope to remove this nesting feature.
2389
2390 """
2391 raise NotImplementedError()
2392
2393 @property
2394 def _deactivated_from_connection(self):
2395 """True if this transaction is totally deactivated from the connection
2396 and therefore can no longer affect its state.
2397
2398 """
2399 raise NotImplementedError()
2400
2401 def _do_close(self):
2402 raise NotImplementedError()
2403
2404 def _do_rollback(self):
2405 raise NotImplementedError()
2406
2407 def _do_commit(self):
2408 raise NotImplementedError()
2409
2410 @property
2411 def is_valid(self):
2412 return self.is_active and not self.connection.invalidated
2413
2414 def close(self):
2415 """Close this :class:`.Transaction`.
2416
2417 If this transaction is the base transaction in a begin/commit
2418 nesting, the transaction will rollback(). Otherwise, the
2419 method returns.
2420
2421 This is used to cancel a Transaction without affecting the scope of
2422 an enclosing transaction.
2423
2424 """
2425 try:
2426 self._do_close()
2427 finally:
2428 assert not self.is_active
2429
2430 def rollback(self):
2431 """Roll back this :class:`.Transaction`.
2432
2433 The implementation of this may vary based on the type of transaction in
2434 use:
2435
2436 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2437 it corresponds to a ROLLBACK.
2438
2439 * For a :class:`.NestedTransaction`, it corresponds to a
2440 "ROLLBACK TO SAVEPOINT" operation.
2441
2442 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2443 phase transactions may be used.
2444
2445
2446 """
2447 try:
2448 self._do_rollback()
2449 finally:
2450 assert not self.is_active
2451
2452 def commit(self):
2453 """Commit this :class:`.Transaction`.
2454
2455 The implementation of this may vary based on the type of transaction in
2456 use:
2457
2458 * For a simple database transaction (e.g. :class:`.RootTransaction`),
2459 it corresponds to a COMMIT.
2460
2461 * For a :class:`.NestedTransaction`, it corresponds to a
2462 "RELEASE SAVEPOINT" operation.
2463
2464 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two
2465 phase transactions may be used.
2466
2467 """
2468 try:
2469 self._do_commit()
2470 finally:
2471 assert not self.is_active
2472
2473 def _get_subject(self):
2474 return self.connection
2475
2476 def _transaction_is_active(self):
2477 return self.is_active
2478
2479 def _transaction_is_closed(self):
2480 return not self._deactivated_from_connection
2481
2482 def _rollback_can_be_called(self):
2483 # for RootTransaction / NestedTransaction, it's safe to call
2484 # rollback() even if the transaction is deactive and no warnings
2485 # will be emitted. tested in
2486 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)?
2487 return True
2488
2489
2490class MarkerTransaction(Transaction):
2491 """A 'marker' transaction that is used for nested begin() calls.
2492
2493 .. deprecated:: 1.4 future connection for 2.0 won't support this pattern.
2494
2495 """
2496
2497 __slots__ = ("connection", "_is_active", "_transaction")
2498
2499 def __init__(self, connection):
2500 assert connection._transaction is not None
2501 if not connection._transaction.is_active:
2502 raise exc.InvalidRequestError(
2503 "the current transaction on this connection is inactive. "
2504 "Please issue a rollback first."
2505 )
2506
2507 assert not connection._is_future
2508 util.warn_deprecated_20(
2509 "Calling .begin() when a transaction is already begun, creating "
2510 "a 'sub' transaction, is deprecated "
2511 "and will be removed in 2.0. See the documentation section "
2512 "'Migrating from the nesting pattern' for background on how "
2513 "to migrate from this pattern."
2514 )
2515
2516 self.connection = connection
2517
2518 if connection._trans_context_manager:
2519 TransactionalContext._trans_ctx_check(connection)
2520
2521 if connection._nested_transaction is not None:
2522 self._transaction = connection._nested_transaction
2523 else:
2524 self._transaction = connection._transaction
2525 self._is_active = True
2526
2527 @property
2528 def _deactivated_from_connection(self):
2529 return not self.is_active
2530
2531 @property
2532 def is_active(self):
2533 return self._is_active and self._transaction.is_active
2534
2535 def _deactivate(self):
2536 self._is_active = False
2537
2538 def _do_close(self):
2539 # does not actually roll back the root
2540 self._deactivate()
2541
2542 def _do_rollback(self):
2543 # does roll back the root
2544 if self._is_active:
2545 try:
2546 self._transaction._do_deactivate()
2547 finally:
2548 self._deactivate()
2549
2550 def _do_commit(self):
2551 self._deactivate()
2552
2553
2554class RootTransaction(Transaction):
2555 """Represent the "root" transaction on a :class:`_engine.Connection`.
2556
2557 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring
2558 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction`
2559 is created by calling upon the :meth:`_engine.Connection.begin` method, and
2560 remains associated with the :class:`_engine.Connection` throughout its
2561 active span. The current :class:`_engine.RootTransaction` in use is
2562 accessible via the :attr:`_engine.Connection.get_transaction` method of
2563 :class:`_engine.Connection`.
2564
2565 In :term:`2.0 style` use, the :class:`_future.Connection` also employs
2566 "autobegin" behavior that will create a new
2567 :class:`_engine.RootTransaction` whenever a connection in a
2568 non-transactional state is used to emit commands on the DBAPI connection.
2569 The scope of the :class:`_engine.RootTransaction` in 2.0 style
2570 use can be controlled using the :meth:`_future.Connection.commit` and
2571 :meth:`_future.Connection.rollback` methods.
2572
2573
2574 """
2575
2576 _is_root = True
2577
2578 __slots__ = ("connection", "is_active")
2579
2580 def __init__(self, connection):
2581 assert connection._transaction is None
2582 if connection._trans_context_manager:
2583 TransactionalContext._trans_ctx_check(connection)
2584 self.connection = connection
2585 self._connection_begin_impl()
2586 connection._transaction = self
2587
2588 self.is_active = True
2589
2590 def _deactivate_from_connection(self):
2591 if self.is_active:
2592 assert self.connection._transaction is self
2593 self.is_active = False
2594
2595 elif self.connection._transaction is not self:
2596 util.warn("transaction already deassociated from connection")
2597
2598 @property
2599 def _deactivated_from_connection(self):
2600 return self.connection._transaction is not self
2601
2602 def _do_deactivate(self):
2603 # called from a MarkerTransaction to cancel this root transaction.
2604 # the transaction stays in place as connection._transaction, but
2605 # is no longer active and is no longer the reset agent for the
2606 # pooled connection. the connection won't support a new begin()
2607 # until this transaction is explicitly closed, rolled back,
2608 # or committed.
2609
2610 assert self.connection._transaction is self
2611
2612 if self.is_active:
2613 self._connection_rollback_impl()
2614
2615 # handle case where a savepoint was created inside of a marker
2616 # transaction that refers to a root. nested has to be cancelled
2617 # also.
2618 if self.connection._nested_transaction:
2619 self.connection._nested_transaction._cancel()
2620
2621 self._deactivate_from_connection()
2622
2623 def _connection_begin_impl(self):
2624 self.connection._begin_impl(self)
2625
2626 def _connection_rollback_impl(self):
2627 self.connection._rollback_impl()
2628
2629 def _connection_commit_impl(self):
2630 self.connection._commit_impl()
2631
2632 def _close_impl(self, try_deactivate=False):
2633 try:
2634 if self.is_active:
2635 self._connection_rollback_impl()
2636
2637 if self.connection._nested_transaction:
2638 self.connection._nested_transaction._cancel()
2639 finally:
2640 if self.is_active or try_deactivate:
2641 self._deactivate_from_connection()
2642 if self.connection._transaction is self:
2643 self.connection._transaction = None
2644
2645 assert not self.is_active
2646 assert self.connection._transaction is not self
2647
2648 def _do_close(self):
2649 self._close_impl()
2650
2651 def _do_rollback(self):
2652 self._close_impl(try_deactivate=True)
2653
2654 def _do_commit(self):
2655 if self.is_active:
2656 assert self.connection._transaction is self
2657
2658 try:
2659 self._connection_commit_impl()
2660 finally:
2661 # whether or not commit succeeds, cancel any
2662 # nested transactions, make this transaction "inactive"
2663 # and remove it as a reset agent
2664 if self.connection._nested_transaction:
2665 self.connection._nested_transaction._cancel()
2666
2667 self._deactivate_from_connection()
2668
2669 # ...however only remove as the connection's current transaction
2670 # if commit succeeded. otherwise it stays on so that a rollback
2671 # needs to occur.
2672 self.connection._transaction = None
2673 else:
2674 if self.connection._transaction is self:
2675 self.connection._invalid_transaction()
2676 else:
2677 raise exc.InvalidRequestError("This transaction is inactive")
2678
2679 assert not self.is_active
2680 assert self.connection._transaction is not self
2681
2682
2683class NestedTransaction(Transaction):
2684 """Represent a 'nested', or SAVEPOINT transaction.
2685
2686 The :class:`.NestedTransaction` object is created by calling the
2687 :meth:`_engine.Connection.begin_nested` method of
2688 :class:`_engine.Connection`.
2689
2690 When using :class:`.NestedTransaction`, the semantics of "begin" /
2691 "commit" / "rollback" are as follows:
2692
2693 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where
2694 the savepoint is given an explicit name that is part of the state
2695 of this object.
2696
2697 * The :meth:`.NestedTransaction.commit` method corresponds to a
2698 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated
2699 with this :class:`.NestedTransaction`.
2700
2701 * The :meth:`.NestedTransaction.rollback` method corresponds to a
2702 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier
2703 associated with this :class:`.NestedTransaction`.
2704
2705 The rationale for mimicking the semantics of an outer transaction in
2706 terms of savepoints so that code may deal with a "savepoint" transaction
2707 and an "outer" transaction in an agnostic way.
2708
2709 .. seealso::
2710
2711 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API.
2712
2713 """
2714
2715 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
2716
2717 def __init__(self, connection):
2718 assert connection._transaction is not None
2719 if connection._trans_context_manager:
2720 TransactionalContext._trans_ctx_check(connection)
2721 self.connection = connection
2722 self._savepoint = self.connection._savepoint_impl()
2723 self.is_active = True
2724 self._previous_nested = connection._nested_transaction
2725 connection._nested_transaction = self
2726
2727 def _deactivate_from_connection(self, warn=True):
2728 if self.connection._nested_transaction is self:
2729 self.connection._nested_transaction = self._previous_nested
2730 elif warn:
2731 util.warn(
2732 "nested transaction already deassociated from connection"
2733 )
2734
2735 @property
2736 def _deactivated_from_connection(self):
2737 return self.connection._nested_transaction is not self
2738
2739 def _cancel(self):
2740 # called by RootTransaction when the outer transaction is
2741 # committed, rolled back, or closed to cancel all savepoints
2742 # without any action being taken
2743 self.is_active = False
2744 self._deactivate_from_connection()
2745 if self._previous_nested:
2746 self._previous_nested._cancel()
2747
2748 def _close_impl(self, deactivate_from_connection, warn_already_deactive):
2749 try:
2750 if self.is_active and self.connection._transaction.is_active:
2751 self.connection._rollback_to_savepoint_impl(self._savepoint)
2752 finally:
2753 self.is_active = False
2754
2755 if deactivate_from_connection:
2756 self._deactivate_from_connection(warn=warn_already_deactive)
2757
2758 assert not self.is_active
2759 if deactivate_from_connection:
2760 assert self.connection._nested_transaction is not self
2761
2762 def _do_deactivate(self):
2763 self._close_impl(False, False)
2764
2765 def _do_close(self):
2766 self._close_impl(True, False)
2767
2768 def _do_rollback(self):
2769 self._close_impl(True, True)
2770
2771 def _do_commit(self):
2772 if self.is_active:
2773 try:
2774 self.connection._release_savepoint_impl(self._savepoint)
2775 finally:
2776 # nested trans becomes inactive on failed release
2777 # unconditionally. this prevents it from trying to
2778 # emit SQL when it rolls back.
2779 self.is_active = False
2780
2781 # but only de-associate from connection if it succeeded
2782 self._deactivate_from_connection()
2783 else:
2784 if self.connection._nested_transaction is self:
2785 self.connection._invalid_transaction()
2786 else:
2787 raise exc.InvalidRequestError(
2788 "This nested transaction is inactive"
2789 )
2790
2791
2792class TwoPhaseTransaction(RootTransaction):
2793 """Represent a two-phase transaction.
2794
2795 A new :class:`.TwoPhaseTransaction` object may be procured
2796 using the :meth:`_engine.Connection.begin_twophase` method.
2797
2798 The interface is the same as that of :class:`.Transaction`
2799 with the addition of the :meth:`prepare` method.
2800
2801 """
2802
2803 __slots__ = ("connection", "is_active", "xid", "_is_prepared")
2804
2805 def __init__(self, connection, xid):
2806 self._is_prepared = False
2807 self.xid = xid
2808 super(TwoPhaseTransaction, self).__init__(connection)
2809
2810 def prepare(self):
2811 """Prepare this :class:`.TwoPhaseTransaction`.
2812
2813 After a PREPARE, the transaction can be committed.
2814
2815 """
2816 if not self.is_active:
2817 raise exc.InvalidRequestError("This transaction is inactive")
2818 self.connection._prepare_twophase_impl(self.xid)
2819 self._is_prepared = True
2820
2821 def _connection_begin_impl(self):
2822 self.connection._begin_twophase_impl(self)
2823
2824 def _connection_rollback_impl(self):
2825 self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
2826
2827 def _connection_commit_impl(self):
2828 self.connection._commit_twophase_impl(self.xid, self._is_prepared)
2829
2830
2831class Engine(Connectable, log.Identified):
2832 """
2833 Connects a :class:`~sqlalchemy.pool.Pool` and
2834 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
2835 source of database connectivity and behavior.
2836
2837 This is the **SQLAlchemy 1.x version** of :class:`_engine.Engine`. For
2838 the :term:`2.0 style` version, which includes some API differences,
2839 see :class:`_future.Engine`.
2840
2841 An :class:`_engine.Engine` object is instantiated publicly using the
2842 :func:`~sqlalchemy.create_engine` function.
2843
2844 .. seealso::
2845
2846 :doc:`/core/engines`
2847
2848 :ref:`connections_toplevel`
2849
2850 """
2851
2852 _execution_options = _EMPTY_EXECUTION_OPTS
2853 _has_events = False
2854 _connection_cls = Connection
2855 _sqla_logger_namespace = "sqlalchemy.engine.Engine"
2856 _is_future = False
2857
2858 _schema_translate_map = None
2859
2860 def __init__(
2861 self,
2862 pool,
2863 dialect,
2864 url,
2865 logging_name=None,
2866 echo=None,
2867 query_cache_size=500,
2868 execution_options=None,
2869 hide_parameters=False,
2870 ):
2871 self.pool = pool
2872 self.url = url
2873 self.dialect = dialect
2874 if logging_name:
2875 self.logging_name = logging_name
2876 self.echo = echo
2877 self.hide_parameters = hide_parameters
2878 if query_cache_size != 0:
2879 self._compiled_cache = util.LRUCache(
2880 query_cache_size, size_alert=self._lru_size_alert
2881 )
2882 else:
2883 self._compiled_cache = None
2884 log.instance_logger(self, echoflag=echo)
2885 if execution_options:
2886 self.update_execution_options(**execution_options)
2887
2888 def _lru_size_alert(self, cache):
2889 if self._should_log_info:
2890 self.logger.info(
2891 "Compiled cache size pruning from %d items to %d. "
2892 "Increase cache size to reduce the frequency of pruning.",
2893 len(cache),
2894 cache.capacity,
2895 )
2896
2897 @property
2898 def engine(self):
2899 return self
2900
2901 def clear_compiled_cache(self):
2902 """Clear the compiled cache associated with the dialect.
2903
2904 This applies **only** to the built-in cache that is established
2905 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
2906 It will not impact any dictionary caches that were passed via the
2907 :paramref:`.Connection.execution_options.query_cache` parameter.
2908
2909 .. versionadded:: 1.4
2910
2911 """
2912 if self._compiled_cache:
2913 self._compiled_cache.clear()
2914
2915 def update_execution_options(self, **opt):
2916 r"""Update the default execution_options dictionary
2917 of this :class:`_engine.Engine`.
2918
2919 The given keys/values in \**opt are added to the
2920 default execution options that will be used for
2921 all connections. The initial contents of this dictionary
2922 can be sent via the ``execution_options`` parameter
2923 to :func:`_sa.create_engine`.
2924
2925 .. seealso::
2926
2927 :meth:`_engine.Connection.execution_options`
2928
2929 :meth:`_engine.Engine.execution_options`
2930
2931 """
2932 self._execution_options = self._execution_options.union(opt)
2933 self.dispatch.set_engine_execution_options(self, opt)
2934 self.dialect.set_engine_execution_options(self, opt)
2935
2936 def execution_options(self, **opt):
2937 """Return a new :class:`_engine.Engine` that will provide
2938 :class:`_engine.Connection` objects with the given execution options.
2939
2940 The returned :class:`_engine.Engine` remains related to the original
2941 :class:`_engine.Engine` in that it shares the same connection pool and
2942 other state:
2943
2944 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine`
2945 is the
2946 same instance. The :meth:`_engine.Engine.dispose`
2947 method will replace
2948 the connection pool instance for the parent engine as well
2949 as this one.
2950 * Event listeners are "cascaded" - meaning, the new
2951 :class:`_engine.Engine`
2952 inherits the events of the parent, and new events can be associated
2953 with the new :class:`_engine.Engine` individually.
2954 * The logging configuration and logging_name is copied from the parent
2955 :class:`_engine.Engine`.
2956
2957 The intent of the :meth:`_engine.Engine.execution_options` method is
2958 to implement "sharding" schemes where multiple :class:`_engine.Engine`
2959 objects refer to the same connection pool, but are differentiated
2960 by options that would be consumed by a custom event::
2961
2962 primary_engine = create_engine("mysql://")
2963 shard1 = primary_engine.execution_options(shard_id="shard1")
2964 shard2 = primary_engine.execution_options(shard_id="shard2")
2965
2966 Above, the ``shard1`` engine serves as a factory for
2967 :class:`_engine.Connection`
2968 objects that will contain the execution option
2969 ``shard_id=shard1``, and ``shard2`` will produce
2970 :class:`_engine.Connection`
2971 objects that contain the execution option ``shard_id=shard2``.
2972
2973 An event handler can consume the above execution option to perform
2974 a schema switch or other operation, given a connection. Below
2975 we emit a MySQL ``use`` statement to switch databases, at the same
2976 time keeping track of which database we've established using the
2977 :attr:`_engine.Connection.info` dictionary,
2978 which gives us a persistent
2979 storage space that follows the DBAPI connection::
2980
2981 from sqlalchemy import event
2982 from sqlalchemy.engine import Engine
2983
2984 shards = {"default": "base", shard_1: "db1", "shard_2": "db2"}
2985
2986 @event.listens_for(Engine, "before_cursor_execute")
2987 def _switch_shard(conn, cursor, stmt,
2988 params, context, executemany):
2989 shard_id = conn._execution_options.get('shard_id', "default")
2990 current_shard = conn.info.get("current_shard", None)
2991
2992 if current_shard != shard_id:
2993 cursor.execute("use %s" % shards[shard_id])
2994 conn.info["current_shard"] = shard_id
2995
2996 .. seealso::
2997
2998 :meth:`_engine.Connection.execution_options`
2999 - update execution options
3000 on a :class:`_engine.Connection` object.
3001
3002 :meth:`_engine.Engine.update_execution_options`
3003 - update the execution
3004 options for a given :class:`_engine.Engine` in place.
3005
3006 :meth:`_engine.Engine.get_execution_options`
3007
3008
3009 """
3010 return self._option_cls(self, opt)
3011
3012 def get_execution_options(self):
3013 """Get the non-SQL options which will take effect during execution.
3014
3015 .. versionadded: 1.3
3016
3017 .. seealso::
3018
3019 :meth:`_engine.Engine.execution_options`
3020 """
3021 return self._execution_options
3022
3023 @property
3024 def name(self):
3025 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3026 in use by this :class:`Engine`."""
3027
3028 return self.dialect.name
3029
3030 @property
3031 def driver(self):
3032 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
3033 in use by this :class:`Engine`."""
3034
3035 return self.dialect.driver
3036
3037 echo = log.echo_property()
3038
3039 def __repr__(self):
3040 return "Engine(%r)" % (self.url,)
3041
3042 def dispose(self, close=True):
3043 """Dispose of the connection pool used by this
3044 :class:`_engine.Engine`.
3045
3046 A new connection pool is created immediately after the old one has been
3047 disposed. The previous connection pool is disposed either actively, by
3048 closing out all currently checked-in connections in that pool, or
3049 passively, by losing references to it but otherwise not closing any
3050 connections. The latter strategy is more appropriate for an initializer
3051 in a forked Python process.
3052
3053 :param close: if left at its default of ``True``, has the
3054 effect of fully closing all **currently checked in**
3055 database connections. Connections that are still checked out
3056 will **not** be closed, however they will no longer be associated
3057 with this :class:`_engine.Engine`,
3058 so when they are closed individually, eventually the
3059 :class:`_pool.Pool` which they are associated with will
3060 be garbage collected and they will be closed out fully, if
3061 not already closed on checkin.
3062
3063 If set to ``False``, the previous connection pool is de-referenced,
3064 and otherwise not touched in any way.
3065
3066 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close`
3067 parameter to allow the replacement of a connection pool in a child
3068 process without interfering with the connections used by the parent
3069 process.
3070
3071
3072 .. seealso::
3073
3074 :ref:`engine_disposal`
3075
3076 :ref:`pooling_multiprocessing`
3077
3078 """
3079 if close:
3080 self.pool.dispose()
3081 self.pool = self.pool.recreate()
3082 self.dispatch.engine_disposed(self)
3083
3084 def _execute_default(
3085 self, default, multiparams=(), params=util.EMPTY_DICT
3086 ):
3087 with self.connect() as conn:
3088 return conn._execute_default(default, multiparams, params)
3089
3090 @contextlib.contextmanager
3091 def _optional_conn_ctx_manager(self, connection=None):
3092 if connection is None:
3093 with self.connect() as conn:
3094 yield conn
3095 else:
3096 yield connection
3097
3098 class _trans_ctx(object):
3099 def __init__(self, conn, transaction, close_with_result):
3100 self.conn = conn
3101 self.transaction = transaction
3102 self.close_with_result = close_with_result
3103
3104 def __enter__(self):
3105 self.transaction.__enter__()
3106 return self.conn
3107
3108 def __exit__(self, type_, value, traceback):
3109 try:
3110 self.transaction.__exit__(type_, value, traceback)
3111 finally:
3112 if not self.close_with_result:
3113 self.conn.close()
3114
3115 def begin(self, close_with_result=False):
3116 """Return a context manager delivering a :class:`_engine.Connection`
3117 with a :class:`.Transaction` established.
3118
3119 E.g.::
3120
3121 with engine.begin() as conn:
3122 conn.execute(
3123 text("insert into table (x, y, z) values (1, 2, 3)")
3124 )
3125 conn.execute(text("my_special_procedure(5)"))
3126
3127 Upon successful operation, the :class:`.Transaction`
3128 is committed. If an error is raised, the :class:`.Transaction`
3129 is rolled back.
3130
3131 Legacy use only: the ``close_with_result`` flag is normally ``False``,
3132 and indicates that the :class:`_engine.Connection` will be closed when
3133 the operation is complete. When set to ``True``, it indicates the
3134 :class:`_engine.Connection` is in "single use" mode, where the
3135 :class:`_engine.CursorResult` returned by the first call to
3136 :meth:`_engine.Connection.execute` will close the
3137 :class:`_engine.Connection` when that :class:`_engine.CursorResult` has
3138 exhausted all result rows.
3139
3140 .. seealso::
3141
3142 :meth:`_engine.Engine.connect` - procure a
3143 :class:`_engine.Connection` from
3144 an :class:`_engine.Engine`.
3145
3146 :meth:`_engine.Connection.begin` - start a :class:`.Transaction`
3147 for a particular :class:`_engine.Connection`.
3148
3149 """
3150 if self._connection_cls._is_future:
3151 conn = self.connect()
3152 else:
3153 conn = self.connect(close_with_result=close_with_result)
3154 try:
3155 trans = conn.begin()
3156 except:
3157 with util.safe_reraise():
3158 conn.close()
3159 return Engine._trans_ctx(conn, trans, close_with_result)
3160
3161 @util.deprecated(
3162 "1.4",
3163 "The :meth:`_engine.Engine.transaction` "
3164 "method is deprecated and will be "
3165 "removed in a future release. Use the :meth:`_engine.Engine.begin` "
3166 "context "
3167 "manager instead.",
3168 )
3169 def transaction(self, callable_, *args, **kwargs):
3170 r"""Execute the given function within a transaction boundary.
3171
3172 The function is passed a :class:`_engine.Connection` newly procured
3173 from :meth:`_engine.Engine.connect` as the first argument,
3174 followed by the given \*args and \**kwargs.
3175
3176 e.g.::
3177
3178 def do_something(conn, x, y):
3179 conn.execute(text("some statement"), {'x':x, 'y':y})
3180
3181 engine.transaction(do_something, 5, 10)
3182
3183 The operations inside the function are all invoked within the
3184 context of a single :class:`.Transaction`.
3185 Upon success, the transaction is committed. If an
3186 exception is raised, the transaction is rolled back
3187 before propagating the exception.
3188
3189 .. note::
3190
3191 The :meth:`.transaction` method is superseded by
3192 the usage of the Python ``with:`` statement, which can
3193 be used with :meth:`_engine.Engine.begin`::
3194
3195 with engine.begin() as conn:
3196 conn.execute(text("some statement"), {'x':5, 'y':10})
3197
3198 .. seealso::
3199
3200 :meth:`_engine.Engine.begin` - engine-level transactional
3201 context
3202
3203 :meth:`_engine.Connection.transaction`
3204 - connection-level version of
3205 :meth:`_engine.Engine.transaction`
3206
3207 """
3208 kwargs["_sa_skip_warning"] = True
3209 with self.connect() as conn:
3210 return conn.transaction(callable_, *args, **kwargs)
3211
3212 @util.deprecated(
3213 "1.4",
3214 "The :meth:`_engine.Engine.run_callable` "
3215 "method is deprecated and will be "
3216 "removed in a future release. Use the :meth:`_engine.Engine.begin` "
3217 "context manager instead.",
3218 )
3219 def run_callable(self, callable_, *args, **kwargs):
3220 r"""Given a callable object or function, execute it, passing
3221 a :class:`_engine.Connection` as the first argument.
3222
3223 The given \*args and \**kwargs are passed subsequent
3224 to the :class:`_engine.Connection` argument.
3225
3226 This function, along with :meth:`_engine.Connection.run_callable`,
3227 allows a function to be run with a :class:`_engine.Connection`
3228 or :class:`_engine.Engine` object without the need to know
3229 which one is being dealt with.
3230
3231 """
3232 kwargs["_sa_skip_warning"] = True
3233 with self.connect() as conn:
3234 return conn.run_callable(callable_, *args, **kwargs)
3235
3236 def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
3237 with self.begin() as conn:
3238 conn._run_ddl_visitor(visitorcallable, element, **kwargs)
3239
3240 @util.deprecated_20(
3241 ":meth:`_engine.Engine.execute`",
3242 alternative="All statement execution in SQLAlchemy 2.0 is performed "
3243 "by the :meth:`_engine.Connection.execute` method of "
3244 ":class:`_engine.Connection`, "
3245 "or in the ORM by the :meth:`.Session.execute` method of "
3246 ":class:`.Session`.",
3247 )
3248 def execute(self, statement, *multiparams, **params):
3249 """Executes the given construct and returns a
3250 :class:`_engine.CursorResult`.
3251
3252 The arguments are the same as those used by
3253 :meth:`_engine.Connection.execute`.
3254
3255 Here, a :class:`_engine.Connection` is acquired using the
3256 :meth:`_engine.Engine.connect` method, and the statement executed
3257 with that connection. The returned :class:`_engine.CursorResult`
3258 is flagged
3259 such that when the :class:`_engine.CursorResult` is exhausted and its
3260 underlying cursor is closed, the :class:`_engine.Connection`
3261 created here
3262 will also be closed, which allows its associated DBAPI connection
3263 resource to be returned to the connection pool.
3264
3265 """
3266 connection = self.connect(close_with_result=True)
3267 return connection.execute(statement, *multiparams, **params)
3268
3269 @util.deprecated_20(
3270 ":meth:`_engine.Engine.scalar`",
3271 alternative="All statement execution in SQLAlchemy 2.0 is performed "
3272 "by the :meth:`_engine.Connection.execute` method of "
3273 ":class:`_engine.Connection`, "
3274 "or in the ORM by the :meth:`.Session.execute` method of "
3275 ":class:`.Session`; the :meth:`_future.Result.scalar` "
3276 "method can then be "
3277 "used to return a scalar result.",
3278 )
3279 def scalar(self, statement, *multiparams, **params):
3280 """Executes and returns the first column of the first row.
3281
3282 The underlying result/cursor is closed after execution.
3283 """
3284 return self.execute(statement, *multiparams, **params).scalar()
3285
3286 def _execute_clauseelement(
3287 self,
3288 elem,
3289 multiparams=None,
3290 params=None,
3291 execution_options=_EMPTY_EXECUTION_OPTS,
3292 ):
3293 connection = self.connect(close_with_result=True)
3294 return connection._execute_clauseelement(
3295 elem, multiparams, params, execution_options
3296 )
3297
3298 def _execute_compiled(
3299 self,
3300 compiled,
3301 multiparams,
3302 params,
3303 execution_options=_EMPTY_EXECUTION_OPTS,
3304 ):
3305 connection = self.connect(close_with_result=True)
3306 return connection._execute_compiled(
3307 compiled, multiparams, params, execution_options
3308 )
3309
3310 def connect(self, close_with_result=False):
3311 """Return a new :class:`_engine.Connection` object.
3312
3313 The :class:`_engine.Connection` object is a facade that uses a DBAPI
3314 connection internally in order to communicate with the database. This
3315 connection is procured from the connection-holding :class:`_pool.Pool`
3316 referenced by this :class:`_engine.Engine`. When the
3317 :meth:`_engine.Connection.close` method of the
3318 :class:`_engine.Connection` object
3319 is called, the underlying DBAPI connection is then returned to the
3320 connection pool, where it may be used again in a subsequent call to
3321 :meth:`_engine.Engine.connect`.
3322
3323 """
3324
3325 return self._connection_cls(self, close_with_result=close_with_result)
3326
3327 @util.deprecated(
3328 "1.4",
3329 "The :meth:`_engine.Engine.table_names` "
3330 "method is deprecated and will be "
3331 "removed in a future release. Please refer to "
3332 ":meth:`_reflection.Inspector.get_table_names`.",
3333 )
3334 def table_names(self, schema=None, connection=None):
3335 """Return a list of all table names available in the database.
3336
3337 :param schema: Optional, retrieve names from a non-default schema.
3338
3339 :param connection: Optional, use a specified connection.
3340 """
3341 with self._optional_conn_ctx_manager(connection) as conn:
3342 insp = inspection.inspect(conn)
3343 return insp.get_table_names(schema)
3344
3345 @util.deprecated(
3346 "1.4",
3347 "The :meth:`_engine.Engine.has_table` "
3348 "method is deprecated and will be "
3349 "removed in a future release. Please refer to "
3350 ":meth:`_reflection.Inspector.has_table`.",
3351 )
3352 def has_table(self, table_name, schema=None):
3353 """Return True if the given backend has a table of the given name.
3354
3355 .. seealso::
3356
3357 :ref:`metadata_reflection_inspector` - detailed schema inspection
3358 using the :class:`_reflection.Inspector` interface.
3359
3360 :class:`.quoted_name` - used to pass quoting information along
3361 with a schema identifier.
3362
3363 """
3364 with self._optional_conn_ctx_manager(None) as conn:
3365 insp = inspection.inspect(conn)
3366 return insp.has_table(table_name, schema=schema)
3367
3368 def _wrap_pool_connect(self, fn, connection):
3369 dialect = self.dialect
3370 try:
3371 return fn()
3372 except dialect.dbapi.Error as e:
3373 if connection is None:
3374 Connection._handle_dbapi_exception_noconnection(
3375 e, dialect, self
3376 )
3377 else:
3378 util.raise_(
3379 sys.exc_info()[1], with_traceback=sys.exc_info()[2]
3380 )
3381
3382 def raw_connection(self, _connection=None):
3383 """Return a "raw" DBAPI connection from the connection pool.
3384
3385 The returned object is a proxied version of the DBAPI
3386 connection object used by the underlying driver in use.
3387 The object will have all the same behavior as the real DBAPI
3388 connection, except that its ``close()`` method will result in the
3389 connection being returned to the pool, rather than being closed
3390 for real.
3391
3392 This method provides direct DBAPI connection access for
3393 special situations when the API provided by
3394 :class:`_engine.Connection`
3395 is not needed. When a :class:`_engine.Connection` object is already
3396 present, the DBAPI connection is available using
3397 the :attr:`_engine.Connection.connection` accessor.
3398
3399 .. seealso::
3400
3401 :ref:`dbapi_connections`
3402
3403 """
3404 return self._wrap_pool_connect(self.pool.connect, _connection)
3405
3406
3407class OptionEngineMixin(object):
3408 _sa_propagate_class_events = False
3409
3410 def __init__(self, proxied, execution_options):
3411 self._proxied = proxied
3412 self.url = proxied.url
3413 self.dialect = proxied.dialect
3414 self.logging_name = proxied.logging_name
3415 self.echo = proxied.echo
3416 self._compiled_cache = proxied._compiled_cache
3417 self.hide_parameters = proxied.hide_parameters
3418 log.instance_logger(self, echoflag=self.echo)
3419
3420 # note: this will propagate events that are assigned to the parent
3421 # engine after this OptionEngine is created. Since we share
3422 # the events of the parent we also disallow class-level events
3423 # to apply to the OptionEngine class directly.
3424 #
3425 # the other way this can work would be to transfer existing
3426 # events only, using:
3427 # self.dispatch._update(proxied.dispatch)
3428 #
3429 # that might be more appropriate however it would be a behavioral
3430 # change for logic that assigns events to the parent engine and
3431 # would like it to take effect for the already-created sub-engine.
3432 self.dispatch = self.dispatch._join(proxied.dispatch)
3433
3434 self._execution_options = proxied._execution_options
3435 self.update_execution_options(**execution_options)
3436
3437 def _get_pool(self):
3438 return self._proxied.pool
3439
3440 def _set_pool(self, pool):
3441 self._proxied.pool = pool
3442
3443 pool = property(_get_pool, _set_pool)
3444
3445 def _get_has_events(self):
3446 return self._proxied._has_events or self.__dict__.get(
3447 "_has_events", False
3448 )
3449
3450 def _set_has_events(self, value):
3451 self.__dict__["_has_events"] = value
3452
3453 _has_events = property(_get_has_events, _set_has_events)
3454
3455
3456class OptionEngine(OptionEngineMixin, Engine):
3457 pass
3458
3459
3460Engine._option_cls = OptionEngine