Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1004 statements  

1# engine/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.""" 

8from __future__ import annotations 

9 

10import contextlib 

11import sys 

12import typing 

13from typing import Any 

14from typing import Callable 

15from typing import cast 

16from typing import Iterable 

17from typing import Iterator 

18from typing import List 

19from typing import Mapping 

20from typing import NoReturn 

21from typing import Optional 

22from typing import overload 

23from typing import Tuple 

24from typing import Type 

25from typing import TypeVar 

26from typing import Union 

27 

28from .interfaces import BindTyping 

29from .interfaces import ConnectionEventsTarget 

30from .interfaces import DBAPICursor 

31from .interfaces import ExceptionContext 

32from .interfaces import ExecuteStyle 

33from .interfaces import ExecutionContext 

34from .interfaces import IsolationLevel 

35from .util import _distill_params_20 

36from .util import _distill_raw_params 

37from .util import TransactionalContext 

38from .. import exc 

39from .. import inspection 

40from .. import log 

41from .. import util 

42from ..sql import compiler 

43from ..sql import util as sql_util 

44from ..util.typing import TupleAny 

45from ..util.typing import TypeVarTuple 

46from ..util.typing import Unpack 

47 

48if typing.TYPE_CHECKING: 

49 from . import CursorResult 

50 from . import ScalarResult 

51 from .interfaces import _AnyExecuteParams 

52 from .interfaces import _AnyMultiExecuteParams 

53 from .interfaces import _CoreAnyExecuteParams 

54 from .interfaces import _CoreMultiExecuteParams 

55 from .interfaces import _CoreSingleExecuteParams 

56 from .interfaces import _DBAPIAnyExecuteParams 

57 from .interfaces import _DBAPISingleExecuteParams 

58 from .interfaces import _ExecuteOptions 

59 from .interfaces import CompiledCacheType 

60 from .interfaces import CoreExecuteOptionsParameter 

61 from .interfaces import Dialect 

62 from .interfaces import SchemaTranslateMapType 

63 from .reflection import Inspector # noqa 

64 from .url import URL 

65 from ..event import dispatcher 

66 from ..log import _EchoFlagType 

67 from ..pool import _ConnectionFairy 

68 from ..pool import Pool 

69 from ..pool import PoolProxiedConnection 

70 from ..sql import Executable 

71 from ..sql._typing import _InfoType 

72 from ..sql.compiler import Compiled 

73 from ..sql.ddl import ExecutableDDLElement 

74 from ..sql.ddl import InvokeDDLBase 

75 from ..sql.functions import FunctionElement 

76 from ..sql.schema import DefaultGenerator 

77 from ..sql.schema import HasSchemaAttr 

78 from ..sql.schema import SchemaVisitable 

79 from ..sql.selectable import TypedReturnsRows 

80 

81 

82_T = TypeVar("_T", bound=Any) 

83_Ts = TypeVarTuple("_Ts") 

84_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

85NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

86 

87 

88class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

89 """Provides high-level functionality for a wrapped DB-API connection. 

90 

91 The :class:`_engine.Connection` object is procured by calling the 

92 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

93 object, and provides services for execution of SQL statements as well 

94 as transaction control. 

95 

96 The Connection object is **not** thread-safe. While a Connection can be 

97 shared among threads using properly synchronized access, it is still 

98 possible that the underlying DBAPI connection may not support shared 

99 access between threads. Check the DBAPI documentation for details. 

100 

101 The Connection object represents a single DBAPI connection checked out 

102 from the connection pool. In this state, the connection pool has no 

103 affect upon the connection, including its expiration or timeout state. 

104 For the connection pool to properly manage connections, connections 

105 should be returned to the connection pool (i.e. ``connection.close()``) 

106 whenever the connection is not in use. 

107 

108 .. index:: 

109 single: thread safety; Connection 

110 

111 """ 

112 

113 dialect: Dialect 

114 dispatch: dispatcher[ConnectionEventsTarget] 

115 

116 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

117 

118 # used by sqlalchemy.engine.util.TransactionalContext 

119 _trans_context_manager: Optional[TransactionalContext] = None 

120 

121 # legacy as of 2.0, should be eventually deprecated and 

122 # removed. was used in the "pre_ping" recipe that's been in the docs 

123 # a long time 

124 should_close_with_result = False 

125 

126 _dbapi_connection: Optional[PoolProxiedConnection] 

127 

128 _execution_options: _ExecuteOptions 

129 

130 _transaction: Optional[RootTransaction] 

131 _nested_transaction: Optional[NestedTransaction] 

132 

133 def __init__( 

134 self, 

135 engine: Engine, 

136 connection: Optional[PoolProxiedConnection] = None, 

137 _has_events: Optional[bool] = None, 

138 _allow_revalidate: bool = True, 

139 _allow_autobegin: bool = True, 

140 ): 

141 """Construct a new Connection.""" 

142 self.engine = engine 

143 self.dialect = dialect = engine.dialect 

144 

145 if connection is None: 

146 try: 

147 self._dbapi_connection = engine.raw_connection() 

148 except dialect.loaded_dbapi.Error as err: 

149 Connection._handle_dbapi_exception_noconnection( 

150 err, dialect, engine 

151 ) 

152 raise 

153 else: 

154 self._dbapi_connection = connection 

155 

156 self._transaction = self._nested_transaction = None 

157 self.__savepoint_seq = 0 

158 self.__in_begin = False 

159 

160 self.__can_reconnect = _allow_revalidate 

161 self._allow_autobegin = _allow_autobegin 

162 self._echo = self.engine._should_log_info() 

163 

164 if _has_events is None: 

165 # if _has_events is sent explicitly as False, 

166 # then don't join the dispatch of the engine; we don't 

167 # want to handle any of the engine's events in that case. 

168 self.dispatch = self.dispatch._join(engine.dispatch) 

169 self._has_events = _has_events or ( 

170 _has_events is None and engine._has_events 

171 ) 

172 

173 self._execution_options = engine._execution_options 

174 

175 if self._has_events or self.engine._has_events: 

176 self.dispatch.engine_connect(self) 

177 

178 # this can be assigned differently via 

179 # characteristics.LoggingTokenCharacteristic 

180 _message_formatter: Any = None 

181 

182 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

183 fmt = self._message_formatter 

184 

185 if fmt: 

186 message = fmt(message) 

187 

188 if log.STACKLEVEL: 

189 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

190 

191 self.engine.logger.info(message, *arg, **kw) 

192 

193 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

194 fmt = self._message_formatter 

195 

196 if fmt: 

197 message = fmt(message) 

198 

199 if log.STACKLEVEL: 

200 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

201 

202 self.engine.logger.debug(message, *arg, **kw) 

203 

204 @property 

205 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

206 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

207 self._execution_options.get("schema_translate_map", None) 

208 ) 

209 

210 return schema_translate_map 

211 

212 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

213 """Return the schema name for the given schema item taking into 

214 account current schema translate map. 

215 

216 """ 

217 

218 name = obj.schema 

219 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

220 self._execution_options.get("schema_translate_map", None) 

221 ) 

222 

223 if ( 

224 schema_translate_map 

225 and name in schema_translate_map 

226 and obj._use_schema_map 

227 ): 

228 return schema_translate_map[name] 

229 else: 

230 return name 

231 

232 def __enter__(self) -> Connection: 

233 return self 

234 

235 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

236 self.close() 

237 

238 @overload 

239 def execution_options( 

240 self, 

241 *, 

242 compiled_cache: Optional[CompiledCacheType] = ..., 

243 logging_token: str = ..., 

244 isolation_level: IsolationLevel = ..., 

245 no_parameters: bool = False, 

246 stream_results: bool = False, 

247 max_row_buffer: int = ..., 

248 yield_per: int = ..., 

249 insertmanyvalues_page_size: int = ..., 

250 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

251 preserve_rowcount: bool = False, 

252 driver_column_names: bool = False, 

253 **opt: Any, 

254 ) -> Connection: ... 

255 

256 @overload 

257 def execution_options(self, **opt: Any) -> Connection: ... 

258 

259 def execution_options(self, **opt: Any) -> Connection: 

260 r"""Set non-SQL options for the connection which take effect 

261 during execution. 

262 

263 This method modifies this :class:`_engine.Connection` **in-place**; 

264 the return value is the same :class:`_engine.Connection` object 

265 upon which the method is called. Note that this is in contrast 

266 to the behavior of the ``execution_options`` methods on other 

267 objects such as :meth:`_engine.Engine.execution_options` and 

268 :meth:`_sql.Executable.execution_options`. The rationale is that many 

269 such execution options necessarily modify the state of the base 

270 DBAPI connection in any case so there is no feasible means of 

271 keeping the effect of such an option localized to a "sub" connection. 

272 

273 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

274 method, in contrast to other objects with this method, modifies 

275 the connection in-place without creating copy of it. 

276 

277 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

278 method accepts any arbitrary parameters including user defined names. 

279 All parameters given are consumable in a number of ways including 

280 by using the :meth:`_engine.Connection.get_execution_options` method. 

281 See the examples at :meth:`_sql.Executable.execution_options` 

282 and :meth:`_engine.Engine.execution_options`. 

283 

284 The keywords that are currently recognized by SQLAlchemy itself 

285 include all those listed under :meth:`.Executable.execution_options`, 

286 as well as others that are specific to :class:`_engine.Connection`. 

287 

288 :param compiled_cache: Available on: :class:`_engine.Connection`, 

289 :class:`_engine.Engine`. 

290 

291 A dictionary where :class:`.Compiled` objects 

292 will be cached when the :class:`_engine.Connection` 

293 compiles a clause 

294 expression into a :class:`.Compiled` object. This dictionary will 

295 supersede the statement cache that may be configured on the 

296 :class:`_engine.Engine` itself. If set to None, caching 

297 is disabled, even if the engine has a configured cache size. 

298 

299 Note that the ORM makes use of its own "compiled" caches for 

300 some operations, including flush operations. The caching 

301 used by the ORM internally supersedes a cache dictionary 

302 specified here. 

303 

304 :param logging_token: Available on: :class:`_engine.Connection`, 

305 :class:`_engine.Engine`, :class:`_sql.Executable`. 

306 

307 Adds the specified string token surrounded by brackets in log 

308 messages logged by the connection, i.e. the logging that's enabled 

309 either via the :paramref:`_sa.create_engine.echo` flag or via the 

310 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

311 per-connection or per-sub-engine token to be available which is 

312 useful for debugging concurrent connection scenarios. 

313 

314 .. versionadded:: 1.4.0b2 

315 

316 .. seealso:: 

317 

318 :ref:`dbengine_logging_tokens` - usage example 

319 

320 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

321 name used by the Python logger object itself. 

322 

323 :param isolation_level: Available on: :class:`_engine.Connection`, 

324 :class:`_engine.Engine`. 

325 

326 Set the transaction isolation level for the lifespan of this 

327 :class:`_engine.Connection` object. 

328 Valid values include those string 

329 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

330 parameter passed to :func:`_sa.create_engine`. These levels are 

331 semi-database specific; see individual dialect documentation for 

332 valid levels. 

333 

334 The isolation level option applies the isolation level by emitting 

335 statements on the DBAPI connection, and **necessarily affects the 

336 original Connection object overall**. The isolation level will remain 

337 at the given setting until explicitly changed, or when the DBAPI 

338 connection itself is :term:`released` to the connection pool, i.e. the 

339 :meth:`_engine.Connection.close` method is called, at which time an 

340 event handler will emit additional statements on the DBAPI connection 

341 in order to revert the isolation level change. 

342 

343 .. note:: The ``isolation_level`` execution option may only be 

344 established before the :meth:`_engine.Connection.begin` method is 

345 called, as well as before any SQL statements are emitted which 

346 would otherwise trigger "autobegin", or directly after a call to 

347 :meth:`_engine.Connection.commit` or 

348 :meth:`_engine.Connection.rollback`. A database cannot change the 

349 isolation level on a transaction in progress. 

350 

351 .. note:: The ``isolation_level`` execution option is implicitly 

352 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

353 the :meth:`_engine.Connection.invalidate` method, or if a 

354 disconnection error occurs. The new connection produced after the 

355 invalidation will **not** have the selected isolation level 

356 re-applied to it automatically. 

357 

358 .. seealso:: 

359 

360 :ref:`dbapi_autocommit` 

361 

362 :meth:`_engine.Connection.get_isolation_level` 

363 - view current actual level 

364 

365 :param no_parameters: Available on: :class:`_engine.Connection`, 

366 :class:`_sql.Executable`. 

367 

368 When ``True``, if the final parameter 

369 list or dictionary is totally empty, will invoke the 

370 statement on the cursor as ``cursor.execute(statement)``, 

371 not passing the parameter collection at all. 

372 Some DBAPIs such as psycopg2 and mysql-python consider 

373 percent signs as significant only when parameters are 

374 present; this option allows code to generate SQL 

375 containing percent signs (and possibly other characters) 

376 that is neutral regarding whether it's executed by the DBAPI 

377 or piped into a script that's later invoked by 

378 command line tools. 

379 

380 :param stream_results: Available on: :class:`_engine.Connection`, 

381 :class:`_sql.Executable`. 

382 

383 Indicate to the dialect that results should be "streamed" and not 

384 pre-buffered, if possible. For backends such as PostgreSQL, MySQL 

385 and MariaDB, this indicates the use of a "server side cursor" as 

386 opposed to a client side cursor. Other backends such as that of 

387 Oracle Database may already use server side cursors by default. 

388 

389 The usage of 

390 :paramref:`_engine.Connection.execution_options.stream_results` is 

391 usually combined with setting a fixed number of rows to to be fetched 

392 in batches, to allow for efficient iteration of database rows while 

393 at the same time not loading all result rows into memory at once; 

394 this can be configured on a :class:`_engine.Result` object using the 

395 :meth:`_engine.Result.yield_per` method, after execution has 

396 returned a new :class:`_engine.Result`. If 

397 :meth:`_engine.Result.yield_per` is not used, 

398 the :paramref:`_engine.Connection.execution_options.stream_results` 

399 mode of operation will instead use a dynamically sized buffer 

400 which buffers sets of rows at a time, growing on each batch 

401 based on a fixed growth size up until a limit which may 

402 be configured using the 

403 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

404 parameter. 

405 

406 When using the ORM to fetch ORM mapped objects from a result, 

407 :meth:`_engine.Result.yield_per` should always be used with 

408 :paramref:`_engine.Connection.execution_options.stream_results`, 

409 so that the ORM does not fetch all rows into new ORM objects at once. 

410 

411 For typical use, the 

412 :paramref:`_engine.Connection.execution_options.yield_per` execution 

413 option should be preferred, which sets up both 

414 :paramref:`_engine.Connection.execution_options.stream_results` and 

415 :meth:`_engine.Result.yield_per` at once. This option is supported 

416 both at a core level by :class:`_engine.Connection` as well as by the 

417 ORM :class:`_engine.Session`; the latter is described at 

418 :ref:`orm_queryguide_yield_per`. 

419 

420 .. seealso:: 

421 

422 :ref:`engine_stream_results` - background on 

423 :paramref:`_engine.Connection.execution_options.stream_results` 

424 

425 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

426 

427 :paramref:`_engine.Connection.execution_options.yield_per` 

428 

429 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

430 describing the ORM version of ``yield_per`` 

431 

432 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

433 :class:`_sql.Executable`. Sets a maximum 

434 buffer size to use when the 

435 :paramref:`_engine.Connection.execution_options.stream_results` 

436 execution option is used on a backend that supports server side 

437 cursors. The default value if not specified is 1000. 

438 

439 .. seealso:: 

440 

441 :paramref:`_engine.Connection.execution_options.stream_results` 

442 

443 :ref:`engine_stream_results` 

444 

445 

446 :param yield_per: Available on: :class:`_engine.Connection`, 

447 :class:`_sql.Executable`. Integer value applied which will 

448 set the :paramref:`_engine.Connection.execution_options.stream_results` 

449 execution option and invoke :meth:`_engine.Result.yield_per` 

450 automatically at once. Allows equivalent functionality as 

451 is present when using this parameter with the ORM. 

452 

453 .. versionadded:: 1.4.40 

454 

455 .. seealso:: 

456 

457 :ref:`engine_stream_results` - background and examples 

458 on using server side cursors with Core. 

459 

460 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

461 describing the ORM version of ``yield_per`` 

462 

463 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

464 :class:`_engine.Engine`. Number of rows to format into an 

465 INSERT statement when the statement uses "insertmanyvalues" mode, 

466 which is a paged form of bulk insert that is used for many backends 

467 when using :term:`executemany` execution typically in conjunction 

468 with RETURNING. Defaults to 1000. May also be modified on a 

469 per-engine basis using the 

470 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

471 

472 .. versionadded:: 2.0 

473 

474 .. seealso:: 

475 

476 :ref:`engine_insertmanyvalues` 

477 

478 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

479 :class:`_engine.Engine`, :class:`_sql.Executable`. 

480 

481 A dictionary mapping schema names to schema names, that will be 

482 applied to the :paramref:`_schema.Table.schema` element of each 

483 :class:`_schema.Table` 

484 encountered when SQL or DDL expression elements 

485 are compiled into strings; the resulting schema name will be 

486 converted based on presence in the map of the original name. 

487 

488 .. seealso:: 

489 

490 :ref:`schema_translating` 

491 

492 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

493 attribute will be unconditionally memoized within the result and 

494 made available via the :attr:`.CursorResult.rowcount` attribute. 

495 Normally, this attribute is only preserved for UPDATE and DELETE 

496 statements. Using this option, the DBAPIs rowcount value can 

497 be accessed for other kinds of statements such as INSERT and SELECT, 

498 to the degree that the DBAPI supports these statements. See 

499 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

500 of this attribute. 

501 

502 .. versionadded:: 2.0.28 

503 

504 .. seealso:: 

505 

506 :meth:`_engine.Engine.execution_options` 

507 

508 :meth:`.Executable.execution_options` 

509 

510 :meth:`_engine.Connection.get_execution_options` 

511 

512 :ref:`orm_queryguide_execution_options` - documentation on all 

513 ORM-specific execution options 

514 

515 :param driver_column_names: When True, the returned 

516 :class:`_engine.CursorResult` will use the column names as written in 

517 ``cursor.description`` to set up the keys for the result set, 

518 including the names of columns for the :class:`_engine.Row` object as 

519 well as the dictionary keys when using :attr:`_engine.Row._mapping`. 

520 On backends that use "name normalization" such as Oracle Database to 

521 correct for lower case names being converted to all uppercase, this 

522 behavior is turned off and the raw UPPERCASE names in 

523 cursor.description will be present. 

524 

525 .. versionadded:: 2.1 

526 

527 """ # noqa 

528 if self._has_events or self.engine._has_events: 

529 self.dispatch.set_connection_execution_options(self, opt) 

530 self._execution_options = self._execution_options.union(opt) 

531 self.dialect.set_connection_execution_options(self, opt) 

532 return self 

533 

534 def get_execution_options(self) -> _ExecuteOptions: 

535 """Get the non-SQL options which will take effect during execution. 

536 

537 .. seealso:: 

538 

539 :meth:`_engine.Connection.execution_options` 

540 """ 

541 return self._execution_options 

542 

543 @property 

544 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

545 pool_proxied_connection = self._dbapi_connection 

546 return ( 

547 pool_proxied_connection is not None 

548 and pool_proxied_connection.is_valid 

549 ) 

550 

551 @property 

552 def closed(self) -> bool: 

553 """Return True if this connection is closed.""" 

554 

555 return self._dbapi_connection is None and not self.__can_reconnect 

556 

557 @property 

558 def invalidated(self) -> bool: 

559 """Return True if this connection was invalidated. 

560 

561 This does not indicate whether or not the connection was 

562 invalidated at the pool level, however 

563 

564 """ 

565 

566 # prior to 1.4, "invalid" was stored as a state independent of 

567 # "closed", meaning an invalidated connection could be "closed", 

568 # the _dbapi_connection would be None and closed=True, yet the 

569 # "invalid" flag would stay True. This meant that there were 

570 # three separate states (open/valid, closed/valid, closed/invalid) 

571 # when there is really no reason for that; a connection that's 

572 # "closed" does not need to be "invalid". So the state is now 

573 # represented by the two facts alone. 

574 

575 pool_proxied_connection = self._dbapi_connection 

576 return pool_proxied_connection is None and self.__can_reconnect 

577 

578 @property 

579 def connection(self) -> PoolProxiedConnection: 

580 """The underlying DB-API connection managed by this Connection. 

581 

582 This is a SQLAlchemy connection-pool proxied connection 

583 which then has the attribute 

584 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

585 actual driver connection. 

586 

587 .. seealso:: 

588 

589 

590 :ref:`dbapi_connections` 

591 

592 """ 

593 

594 if self._dbapi_connection is None: 

595 try: 

596 return self._revalidate_connection() 

597 except (exc.PendingRollbackError, exc.ResourceClosedError): 

598 raise 

599 except BaseException as e: 

600 self._handle_dbapi_exception(e, None, None, None, None) 

601 else: 

602 return self._dbapi_connection 

603 

604 def get_isolation_level(self) -> IsolationLevel: 

605 """Return the current **actual** isolation level that's present on 

606 the database within the scope of this connection. 

607 

608 This attribute will perform a live SQL operation against the database 

609 in order to procure the current isolation level, so the value returned 

610 is the actual level on the underlying DBAPI connection regardless of 

611 how this state was set. This will be one of the four actual isolation 

612 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

613 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

614 level setting. Third party dialects may also feature additional 

615 isolation level settings. 

616 

617 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

618 isolation level, which is a separate :term:`dbapi` setting that's 

619 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

620 in use, the database connection still has a "traditional" isolation 

621 mode in effect, that is typically one of the four values 

622 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

623 ``SERIALIZABLE``. 

624 

625 Compare to the :attr:`_engine.Connection.default_isolation_level` 

626 accessor which returns the isolation level that is present on the 

627 database at initial connection time. 

628 

629 .. seealso:: 

630 

631 :attr:`_engine.Connection.default_isolation_level` 

632 - view default level 

633 

634 :paramref:`_sa.create_engine.isolation_level` 

635 - set per :class:`_engine.Engine` isolation level 

636 

637 :paramref:`.Connection.execution_options.isolation_level` 

638 - set per :class:`_engine.Connection` isolation level 

639 

640 """ 

641 dbapi_connection = self.connection.dbapi_connection 

642 assert dbapi_connection is not None 

643 try: 

644 return self.dialect.get_isolation_level(dbapi_connection) 

645 except BaseException as e: 

646 self._handle_dbapi_exception(e, None, None, None, None) 

647 

648 @property 

649 def default_isolation_level(self) -> Optional[IsolationLevel]: 

650 """The initial-connection time isolation level associated with the 

651 :class:`_engine.Dialect` in use. 

652 

653 This value is independent of the 

654 :paramref:`.Connection.execution_options.isolation_level` and 

655 :paramref:`.Engine.execution_options.isolation_level` execution 

656 options, and is determined by the :class:`_engine.Dialect` when the 

657 first connection is created, by performing a SQL query against the 

658 database for the current isolation level before any additional commands 

659 have been emitted. 

660 

661 Calling this accessor does not invoke any new SQL queries. 

662 

663 .. seealso:: 

664 

665 :meth:`_engine.Connection.get_isolation_level` 

666 - view current actual isolation level 

667 

668 :paramref:`_sa.create_engine.isolation_level` 

669 - set per :class:`_engine.Engine` isolation level 

670 

671 :paramref:`.Connection.execution_options.isolation_level` 

672 - set per :class:`_engine.Connection` isolation level 

673 

674 """ 

675 return self.dialect.default_isolation_level 

676 

677 def _invalid_transaction(self) -> NoReturn: 

678 raise exc.PendingRollbackError( 

679 "Can't reconnect until invalid %stransaction is rolled " 

680 "back. Please rollback() fully before proceeding" 

681 % ("savepoint " if self._nested_transaction is not None else ""), 

682 code="8s2b", 

683 ) 

684 

685 def _revalidate_connection(self) -> PoolProxiedConnection: 

686 if self.__can_reconnect and self.invalidated: 

687 if self._transaction is not None: 

688 self._invalid_transaction() 

689 self._dbapi_connection = self.engine.raw_connection() 

690 return self._dbapi_connection 

691 raise exc.ResourceClosedError("This Connection is closed") 

692 

693 @property 

694 def info(self) -> _InfoType: 

695 """Info dictionary associated with the underlying DBAPI connection 

696 referred to by this :class:`_engine.Connection`, allowing user-defined 

697 data to be associated with the connection. 

698 

699 The data here will follow along with the DBAPI connection including 

700 after it is returned to the connection pool and used again 

701 in subsequent instances of :class:`_engine.Connection`. 

702 

703 """ 

704 

705 return self.connection.info 

706 

707 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

708 """Invalidate the underlying DBAPI connection associated with 

709 this :class:`_engine.Connection`. 

710 

711 An attempt will be made to close the underlying DBAPI connection 

712 immediately; however if this operation fails, the error is logged 

713 but not raised. The connection is then discarded whether or not 

714 close() succeeded. 

715 

716 Upon the next use (where "use" typically means using the 

717 :meth:`_engine.Connection.execute` method or similar), 

718 this :class:`_engine.Connection` will attempt to 

719 procure a new DBAPI connection using the services of the 

720 :class:`_pool.Pool` as a source of connectivity (e.g. 

721 a "reconnection"). 

722 

723 If a transaction was in progress (e.g. the 

724 :meth:`_engine.Connection.begin` method has been called) when 

725 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

726 level all state associated with this transaction is lost, as 

727 the DBAPI connection is closed. The :class:`_engine.Connection` 

728 will not allow a reconnection to proceed until the 

729 :class:`.Transaction` object is ended, by calling the 

730 :meth:`.Transaction.rollback` method; until that point, any attempt at 

731 continuing to use the :class:`_engine.Connection` will raise an 

732 :class:`~sqlalchemy.exc.InvalidRequestError`. 

733 This is to prevent applications from accidentally 

734 continuing an ongoing transactional operations despite the 

735 fact that the transaction has been lost due to an 

736 invalidation. 

737 

738 The :meth:`_engine.Connection.invalidate` method, 

739 just like auto-invalidation, 

740 will at the connection pool level invoke the 

741 :meth:`_events.PoolEvents.invalidate` event. 

742 

743 :param exception: an optional ``Exception`` instance that's the 

744 reason for the invalidation. is passed along to event handlers 

745 and logging functions. 

746 

747 .. seealso:: 

748 

749 :ref:`pool_connection_invalidation` 

750 

751 """ 

752 

753 if self.invalidated: 

754 return 

755 

756 if self.closed: 

757 raise exc.ResourceClosedError("This Connection is closed") 

758 

759 if self._still_open_and_dbapi_connection_is_valid: 

760 pool_proxied_connection = self._dbapi_connection 

761 assert pool_proxied_connection is not None 

762 pool_proxied_connection.invalidate(exception) 

763 

764 self._dbapi_connection = None 

765 

766 def detach(self) -> None: 

767 """Detach the underlying DB-API connection from its connection pool. 

768 

769 E.g.:: 

770 

771 with engine.connect() as conn: 

772 conn.detach() 

773 conn.execute(text("SET search_path TO schema1, schema2")) 

774 

775 # work with connection 

776 

777 # connection is fully closed (since we used "with:", can 

778 # also call .close()) 

779 

780 This :class:`_engine.Connection` instance will remain usable. 

781 When closed 

782 (or exited from a context manager context as above), 

783 the DB-API connection will be literally closed and not 

784 returned to its originating pool. 

785 

786 This method can be used to insulate the rest of an application 

787 from a modified state on a connection (such as a transaction 

788 isolation level or similar). 

789 

790 """ 

791 

792 if self.closed: 

793 raise exc.ResourceClosedError("This Connection is closed") 

794 

795 pool_proxied_connection = self._dbapi_connection 

796 if pool_proxied_connection is None: 

797 raise exc.InvalidRequestError( 

798 "Can't detach an invalidated Connection" 

799 ) 

800 pool_proxied_connection.detach() 

801 

802 def _autobegin(self) -> None: 

803 if self._allow_autobegin and not self.__in_begin: 

804 self.begin() 

805 

806 def begin(self) -> RootTransaction: 

807 """Begin a transaction prior to autobegin occurring. 

808 

809 E.g.:: 

810 

811 with engine.connect() as conn: 

812 with conn.begin() as trans: 

813 conn.execute(table.insert(), {"username": "sandy"}) 

814 

815 The returned object is an instance of :class:`_engine.RootTransaction`. 

816 This object represents the "scope" of the transaction, 

817 which completes when either the :meth:`_engine.Transaction.rollback` 

818 or :meth:`_engine.Transaction.commit` method is called; the object 

819 also works as a context manager as illustrated above. 

820 

821 The :meth:`_engine.Connection.begin` method begins a 

822 transaction that normally will be begun in any case when the connection 

823 is first used to execute a statement. The reason this method might be 

824 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

825 event at a specific time, or to organize code within the scope of a 

826 connection checkout in terms of context managed blocks, such as:: 

827 

828 with engine.connect() as conn: 

829 with conn.begin(): 

830 conn.execute(...) 

831 conn.execute(...) 

832 

833 with conn.begin(): 

834 conn.execute(...) 

835 conn.execute(...) 

836 

837 The above code is not fundamentally any different in its behavior than 

838 the following code which does not use 

839 :meth:`_engine.Connection.begin`; the below style is known 

840 as "commit as you go" style:: 

841 

842 with engine.connect() as conn: 

843 conn.execute(...) 

844 conn.execute(...) 

845 conn.commit() 

846 

847 conn.execute(...) 

848 conn.execute(...) 

849 conn.commit() 

850 

851 From a database point of view, the :meth:`_engine.Connection.begin` 

852 method does not emit any SQL or change the state of the underlying 

853 DBAPI connection in any way; the Python DBAPI does not have any 

854 concept of explicit transaction begin. 

855 

856 .. seealso:: 

857 

858 :ref:`tutorial_working_with_transactions` - in the 

859 :ref:`unified_tutorial` 

860 

861 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

862 

863 :meth:`_engine.Connection.begin_twophase` - 

864 use a two phase /XID transaction 

865 

866 :meth:`_engine.Engine.begin` - context manager available from 

867 :class:`_engine.Engine` 

868 

869 """ 

870 if self._transaction is None: 

871 self._transaction = RootTransaction(self) 

872 return self._transaction 

873 else: 

874 raise exc.InvalidRequestError( 

875 "This connection has already initialized a SQLAlchemy " 

876 "Transaction() object via begin() or autobegin; can't " 

877 "call begin() here unless rollback() or commit() " 

878 "is called first." 

879 ) 

880 

881 def begin_nested(self) -> NestedTransaction: 

882 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

883 handle that controls the scope of the SAVEPOINT. 

884 

885 E.g.:: 

886 

887 with engine.begin() as connection: 

888 with connection.begin_nested(): 

889 connection.execute(table.insert(), {"username": "sandy"}) 

890 

891 The returned object is an instance of 

892 :class:`_engine.NestedTransaction`, which includes transactional 

893 methods :meth:`_engine.NestedTransaction.commit` and 

894 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

895 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

896 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

897 to the :class:`_engine.NestedTransaction` object and is generated 

898 automatically. Like any other :class:`_engine.Transaction`, the 

899 :class:`_engine.NestedTransaction` may be used as a context manager as 

900 illustrated above which will "release" or "rollback" corresponding to 

901 if the operation within the block were successful or raised an 

902 exception. 

903 

904 Nested transactions require SAVEPOINT support in the underlying 

905 database, else the behavior is undefined. SAVEPOINT is commonly used to 

906 run operations within a transaction that may fail, while continuing the 

907 outer transaction. E.g.:: 

908 

909 from sqlalchemy import exc 

910 

911 with engine.begin() as connection: 

912 trans = connection.begin_nested() 

913 try: 

914 connection.execute(table.insert(), {"username": "sandy"}) 

915 trans.commit() 

916 except exc.IntegrityError: # catch for duplicate username 

917 trans.rollback() # rollback to savepoint 

918 

919 # outer transaction continues 

920 connection.execute(...) 

921 

922 If :meth:`_engine.Connection.begin_nested` is called without first 

923 calling :meth:`_engine.Connection.begin` or 

924 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

925 will "autobegin" the outer transaction first. This outer transaction 

926 may be committed using "commit-as-you-go" style, e.g.:: 

927 

928 with engine.connect() as connection: # begin() wasn't called 

929 

930 with connection.begin_nested(): # will auto-"begin()" first 

931 connection.execute(...) 

932 # savepoint is released 

933 

934 connection.execute(...) 

935 

936 # explicitly commit outer transaction 

937 connection.commit() 

938 

939 # can continue working with connection here 

940 

941 .. versionchanged:: 2.0 

942 

943 :meth:`_engine.Connection.begin_nested` will now participate 

944 in the connection "autobegin" behavior that is new as of 

945 2.0 / "future" style connections in 1.4. 

946 

947 .. seealso:: 

948 

949 :meth:`_engine.Connection.begin` 

950 

951 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

952 

953 """ 

954 if self._transaction is None: 

955 self._autobegin() 

956 

957 return NestedTransaction(self) 

958 

959 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

960 """Begin a two-phase or XA transaction and return a transaction 

961 handle. 

962 

963 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

964 which in addition to the methods provided by 

965 :class:`.Transaction`, also provides a 

966 :meth:`~.TwoPhaseTransaction.prepare` method. 

967 

968 :param xid: the two phase transaction id. If not supplied, a 

969 random id will be generated. 

970 

971 .. seealso:: 

972 

973 :meth:`_engine.Connection.begin` 

974 

975 :meth:`_engine.Connection.begin_twophase` 

976 

977 """ 

978 

979 if self._transaction is not None: 

980 raise exc.InvalidRequestError( 

981 "Cannot start a two phase transaction when a transaction " 

982 "is already in progress." 

983 ) 

984 if xid is None: 

985 xid = self.engine.dialect.create_xid() 

986 return TwoPhaseTransaction(self, xid) 

987 

988 def commit(self) -> None: 

989 """Commit the transaction that is currently in progress. 

990 

991 This method commits the current transaction if one has been started. 

992 If no transaction was started, the method has no effect, assuming 

993 the connection is in a non-invalidated state. 

994 

995 A transaction is begun on a :class:`_engine.Connection` automatically 

996 whenever a statement is first executed, or when the 

997 :meth:`_engine.Connection.begin` method is called. 

998 

999 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

1000 the primary database transaction that is linked to the 

1001 :class:`_engine.Connection` object. It does not operate upon a 

1002 SAVEPOINT that would have been invoked from the 

1003 :meth:`_engine.Connection.begin_nested` method; for control of a 

1004 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

1005 :class:`_engine.NestedTransaction` that is returned by the 

1006 :meth:`_engine.Connection.begin_nested` method itself. 

1007 

1008 

1009 """ 

1010 if self._transaction: 

1011 self._transaction.commit() 

1012 

1013 def rollback(self) -> None: 

1014 """Roll back the transaction that is currently in progress. 

1015 

1016 This method rolls back the current transaction if one has been started. 

1017 If no transaction was started, the method has no effect. If a 

1018 transaction was started and the connection is in an invalidated state, 

1019 the transaction is cleared using this method. 

1020 

1021 A transaction is begun on a :class:`_engine.Connection` automatically 

1022 whenever a statement is first executed, or when the 

1023 :meth:`_engine.Connection.begin` method is called. 

1024 

1025 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1026 upon the primary database transaction that is linked to the 

1027 :class:`_engine.Connection` object. It does not operate upon a 

1028 SAVEPOINT that would have been invoked from the 

1029 :meth:`_engine.Connection.begin_nested` method; for control of a 

1030 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1031 :class:`_engine.NestedTransaction` that is returned by the 

1032 :meth:`_engine.Connection.begin_nested` method itself. 

1033 

1034 

1035 """ 

1036 if self._transaction: 

1037 self._transaction.rollback() 

1038 

1039 def recover_twophase(self) -> List[Any]: 

1040 return self.engine.dialect.do_recover_twophase(self) 

1041 

1042 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1043 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1044 

1045 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1046 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1047 

1048 def in_transaction(self) -> bool: 

1049 """Return True if a transaction is in progress.""" 

1050 return self._transaction is not None and self._transaction.is_active 

1051 

1052 def in_nested_transaction(self) -> bool: 

1053 """Return True if a transaction is in progress.""" 

1054 return ( 

1055 self._nested_transaction is not None 

1056 and self._nested_transaction.is_active 

1057 ) 

1058 

1059 def _is_autocommit_isolation(self) -> bool: 

1060 opt_iso = self._execution_options.get("isolation_level", None) 

1061 return bool( 

1062 opt_iso == "AUTOCOMMIT" 

1063 or ( 

1064 opt_iso is None 

1065 and self.engine.dialect._on_connect_isolation_level 

1066 == "AUTOCOMMIT" 

1067 ) 

1068 ) 

1069 

1070 def _get_required_transaction(self) -> RootTransaction: 

1071 trans = self._transaction 

1072 if trans is None: 

1073 raise exc.InvalidRequestError("connection is not in a transaction") 

1074 return trans 

1075 

1076 def _get_required_nested_transaction(self) -> NestedTransaction: 

1077 trans = self._nested_transaction 

1078 if trans is None: 

1079 raise exc.InvalidRequestError( 

1080 "connection is not in a nested transaction" 

1081 ) 

1082 return trans 

1083 

1084 def get_transaction(self) -> Optional[RootTransaction]: 

1085 """Return the current root transaction in progress, if any. 

1086 

1087 .. versionadded:: 1.4 

1088 

1089 """ 

1090 

1091 return self._transaction 

1092 

1093 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1094 """Return the current nested transaction in progress, if any. 

1095 

1096 .. versionadded:: 1.4 

1097 

1098 """ 

1099 return self._nested_transaction 

1100 

1101 def _begin_impl(self, transaction: RootTransaction) -> None: 

1102 if self._echo: 

1103 if self._is_autocommit_isolation(): 

1104 self._log_info( 

1105 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1106 "autocommit mode)" 

1107 ) 

1108 else: 

1109 self._log_info("BEGIN (implicit)") 

1110 

1111 self.__in_begin = True 

1112 

1113 if self._has_events or self.engine._has_events: 

1114 self.dispatch.begin(self) 

1115 

1116 try: 

1117 self.engine.dialect.do_begin(self.connection) 

1118 except BaseException as e: 

1119 self._handle_dbapi_exception(e, None, None, None, None) 

1120 finally: 

1121 self.__in_begin = False 

1122 

1123 def _rollback_impl(self) -> None: 

1124 if self._has_events or self.engine._has_events: 

1125 self.dispatch.rollback(self) 

1126 

1127 if self._still_open_and_dbapi_connection_is_valid: 

1128 if self._echo: 

1129 if self._is_autocommit_isolation(): 

1130 self._log_info( 

1131 "ROLLBACK using DBAPI connection.rollback(), " 

1132 "DBAPI should ignore due to autocommit mode" 

1133 ) 

1134 else: 

1135 self._log_info("ROLLBACK") 

1136 try: 

1137 self.engine.dialect.do_rollback(self.connection) 

1138 except BaseException as e: 

1139 self._handle_dbapi_exception(e, None, None, None, None) 

1140 

1141 def _commit_impl(self) -> None: 

1142 if self._has_events or self.engine._has_events: 

1143 self.dispatch.commit(self) 

1144 

1145 if self._echo: 

1146 if self._is_autocommit_isolation(): 

1147 self._log_info( 

1148 "COMMIT using DBAPI connection.commit(), " 

1149 "DBAPI should ignore due to autocommit mode" 

1150 ) 

1151 else: 

1152 self._log_info("COMMIT") 

1153 try: 

1154 self.engine.dialect.do_commit(self.connection) 

1155 except BaseException as e: 

1156 self._handle_dbapi_exception(e, None, None, None, None) 

1157 

1158 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1159 if self._has_events or self.engine._has_events: 

1160 self.dispatch.savepoint(self, name) 

1161 

1162 if name is None: 

1163 self.__savepoint_seq += 1 

1164 name = "sa_savepoint_%s" % self.__savepoint_seq 

1165 self.engine.dialect.do_savepoint(self, name) 

1166 return name 

1167 

1168 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1169 if self._has_events or self.engine._has_events: 

1170 self.dispatch.rollback_savepoint(self, name, None) 

1171 

1172 if self._still_open_and_dbapi_connection_is_valid: 

1173 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1174 

1175 def _release_savepoint_impl(self, name: str) -> None: 

1176 if self._has_events or self.engine._has_events: 

1177 self.dispatch.release_savepoint(self, name, None) 

1178 

1179 self.engine.dialect.do_release_savepoint(self, name) 

1180 

1181 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1182 if self._echo: 

1183 self._log_info("BEGIN TWOPHASE (implicit)") 

1184 if self._has_events or self.engine._has_events: 

1185 self.dispatch.begin_twophase(self, transaction.xid) 

1186 

1187 self.__in_begin = True 

1188 try: 

1189 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1190 except BaseException as e: 

1191 self._handle_dbapi_exception(e, None, None, None, None) 

1192 finally: 

1193 self.__in_begin = False 

1194 

1195 def _prepare_twophase_impl(self, xid: Any) -> None: 

1196 if self._has_events or self.engine._has_events: 

1197 self.dispatch.prepare_twophase(self, xid) 

1198 

1199 assert isinstance(self._transaction, TwoPhaseTransaction) 

1200 try: 

1201 self.engine.dialect.do_prepare_twophase(self, xid) 

1202 except BaseException as e: 

1203 self._handle_dbapi_exception(e, None, None, None, None) 

1204 

1205 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1206 if self._has_events or self.engine._has_events: 

1207 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1208 

1209 if self._still_open_and_dbapi_connection_is_valid: 

1210 assert isinstance(self._transaction, TwoPhaseTransaction) 

1211 try: 

1212 self.engine.dialect.do_rollback_twophase( 

1213 self, xid, is_prepared 

1214 ) 

1215 except BaseException as e: 

1216 self._handle_dbapi_exception(e, None, None, None, None) 

1217 

1218 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1219 if self._has_events or self.engine._has_events: 

1220 self.dispatch.commit_twophase(self, xid, is_prepared) 

1221 

1222 assert isinstance(self._transaction, TwoPhaseTransaction) 

1223 try: 

1224 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1225 except BaseException as e: 

1226 self._handle_dbapi_exception(e, None, None, None, None) 

1227 

1228 def close(self) -> None: 

1229 """Close this :class:`_engine.Connection`. 

1230 

1231 This results in a release of the underlying database 

1232 resources, that is, the DBAPI connection referenced 

1233 internally. The DBAPI connection is typically restored 

1234 back to the connection-holding :class:`_pool.Pool` referenced 

1235 by the :class:`_engine.Engine` that produced this 

1236 :class:`_engine.Connection`. Any transactional state present on 

1237 the DBAPI connection is also unconditionally released via 

1238 the DBAPI connection's ``rollback()`` method, regardless 

1239 of any :class:`.Transaction` object that may be 

1240 outstanding with regards to this :class:`_engine.Connection`. 

1241 

1242 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1243 if any transaction is in place. 

1244 

1245 After :meth:`_engine.Connection.close` is called, the 

1246 :class:`_engine.Connection` is permanently in a closed state, 

1247 and will allow no further operations. 

1248 

1249 """ 

1250 

1251 if self._transaction: 

1252 self._transaction.close() 

1253 skip_reset = True 

1254 else: 

1255 skip_reset = False 

1256 

1257 if self._dbapi_connection is not None: 

1258 conn = self._dbapi_connection 

1259 

1260 # as we just closed the transaction, close the connection 

1261 # pool connection without doing an additional reset 

1262 if skip_reset: 

1263 cast("_ConnectionFairy", conn)._close_special( 

1264 transaction_reset=True 

1265 ) 

1266 else: 

1267 conn.close() 

1268 

1269 # There is a slight chance that conn.close() may have 

1270 # triggered an invalidation here in which case 

1271 # _dbapi_connection would already be None, however usually 

1272 # it will be non-None here and in a "closed" state. 

1273 self._dbapi_connection = None 

1274 self.__can_reconnect = False 

1275 

1276 @overload 

1277 def scalar( 

1278 self, 

1279 statement: TypedReturnsRows[_T], 

1280 parameters: Optional[_CoreSingleExecuteParams] = None, 

1281 *, 

1282 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1283 ) -> Optional[_T]: ... 

1284 

1285 @overload 

1286 def scalar( 

1287 self, 

1288 statement: Executable, 

1289 parameters: Optional[_CoreSingleExecuteParams] = None, 

1290 *, 

1291 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1292 ) -> Any: ... 

1293 

1294 def scalar( 

1295 self, 

1296 statement: Executable, 

1297 parameters: Optional[_CoreSingleExecuteParams] = None, 

1298 *, 

1299 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1300 ) -> Any: 

1301 r"""Executes a SQL statement construct and returns a scalar object. 

1302 

1303 This method is shorthand for invoking the 

1304 :meth:`_engine.Result.scalar` method after invoking the 

1305 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1306 

1307 :return: a scalar Python value representing the first column of the 

1308 first row returned. 

1309 

1310 """ 

1311 distilled_parameters = _distill_params_20(parameters) 

1312 try: 

1313 meth = statement._execute_on_scalar 

1314 except AttributeError as err: 

1315 raise exc.ObjectNotExecutableError(statement) from err 

1316 else: 

1317 return meth( 

1318 self, 

1319 distilled_parameters, 

1320 execution_options or NO_OPTIONS, 

1321 ) 

1322 

1323 @overload 

1324 def scalars( 

1325 self, 

1326 statement: TypedReturnsRows[_T], 

1327 parameters: Optional[_CoreAnyExecuteParams] = None, 

1328 *, 

1329 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1330 ) -> ScalarResult[_T]: ... 

1331 

1332 @overload 

1333 def scalars( 

1334 self, 

1335 statement: Executable, 

1336 parameters: Optional[_CoreAnyExecuteParams] = None, 

1337 *, 

1338 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1339 ) -> ScalarResult[Any]: ... 

1340 

1341 def scalars( 

1342 self, 

1343 statement: Executable, 

1344 parameters: Optional[_CoreAnyExecuteParams] = None, 

1345 *, 

1346 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1347 ) -> ScalarResult[Any]: 

1348 """Executes and returns a scalar result set, which yields scalar values 

1349 from the first column of each row. 

1350 

1351 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1352 to receive a :class:`_result.Result` object, then invoking the 

1353 :meth:`_result.Result.scalars` method to produce a 

1354 :class:`_result.ScalarResult` instance. 

1355 

1356 :return: a :class:`_result.ScalarResult` 

1357 

1358 .. versionadded:: 1.4.24 

1359 

1360 """ 

1361 

1362 return self.execute( 

1363 statement, parameters, execution_options=execution_options 

1364 ).scalars() 

1365 

1366 @overload 

1367 def execute( 

1368 self, 

1369 statement: TypedReturnsRows[Unpack[_Ts]], 

1370 parameters: Optional[_CoreAnyExecuteParams] = None, 

1371 *, 

1372 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1373 ) -> CursorResult[Unpack[_Ts]]: ... 

1374 

1375 @overload 

1376 def execute( 

1377 self, 

1378 statement: Executable, 

1379 parameters: Optional[_CoreAnyExecuteParams] = None, 

1380 *, 

1381 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1382 ) -> CursorResult[Unpack[TupleAny]]: ... 

1383 

1384 def execute( 

1385 self, 

1386 statement: Executable, 

1387 parameters: Optional[_CoreAnyExecuteParams] = None, 

1388 *, 

1389 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1390 ) -> CursorResult[Unpack[TupleAny]]: 

1391 r"""Executes a SQL statement construct and returns a 

1392 :class:`_engine.CursorResult`. 

1393 

1394 :param statement: The statement to be executed. This is always 

1395 an object that is in both the :class:`_expression.ClauseElement` and 

1396 :class:`_expression.Executable` hierarchies, including: 

1397 

1398 * :class:`_expression.Select` 

1399 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1400 :class:`_expression.Delete` 

1401 * :class:`_expression.TextClause` and 

1402 :class:`_expression.TextualSelect` 

1403 * :class:`_schema.DDL` and objects which inherit from 

1404 :class:`_schema.ExecutableDDLElement` 

1405 

1406 :param parameters: parameters which will be bound into the statement. 

1407 This may be either a dictionary of parameter names to values, 

1408 or a mutable sequence (e.g. a list) of dictionaries. When a 

1409 list of dictionaries is passed, the underlying statement execution 

1410 will make use of the DBAPI ``cursor.executemany()`` method. 

1411 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1412 method will be used. 

1413 

1414 :param execution_options: optional dictionary of execution options, 

1415 which will be associated with the statement execution. This 

1416 dictionary can provide a subset of the options that are accepted 

1417 by :meth:`_engine.Connection.execution_options`. 

1418 

1419 :return: a :class:`_engine.Result` object. 

1420 

1421 """ 

1422 distilled_parameters = _distill_params_20(parameters) 

1423 try: 

1424 meth = statement._execute_on_connection 

1425 except AttributeError as err: 

1426 raise exc.ObjectNotExecutableError(statement) from err 

1427 else: 

1428 return meth( 

1429 self, 

1430 distilled_parameters, 

1431 execution_options or NO_OPTIONS, 

1432 ) 

1433 

1434 def _execute_function( 

1435 self, 

1436 func: FunctionElement[Any], 

1437 distilled_parameters: _CoreMultiExecuteParams, 

1438 execution_options: CoreExecuteOptionsParameter, 

1439 ) -> CursorResult[Unpack[TupleAny]]: 

1440 """Execute a sql.FunctionElement object.""" 

1441 

1442 return self._execute_clauseelement( 

1443 func.select(), distilled_parameters, execution_options 

1444 ) 

1445 

1446 def _execute_default( 

1447 self, 

1448 default: DefaultGenerator, 

1449 distilled_parameters: _CoreMultiExecuteParams, 

1450 execution_options: CoreExecuteOptionsParameter, 

1451 ) -> Any: 

1452 """Execute a schema.ColumnDefault object.""" 

1453 

1454 exec_opts = self._execution_options.merge_with(execution_options) 

1455 

1456 event_multiparams: Optional[_CoreMultiExecuteParams] 

1457 event_params: Optional[_CoreAnyExecuteParams] 

1458 

1459 # note for event handlers, the "distilled parameters" which is always 

1460 # a list of dicts is broken out into separate "multiparams" and 

1461 # "params" collections, which allows the handler to distinguish 

1462 # between an executemany and execute style set of parameters. 

1463 if self._has_events or self.engine._has_events: 

1464 ( 

1465 default, 

1466 distilled_parameters, 

1467 event_multiparams, 

1468 event_params, 

1469 ) = self._invoke_before_exec_event( 

1470 default, distilled_parameters, exec_opts 

1471 ) 

1472 else: 

1473 event_multiparams = event_params = None 

1474 

1475 try: 

1476 conn = self._dbapi_connection 

1477 if conn is None: 

1478 conn = self._revalidate_connection() 

1479 

1480 dialect = self.dialect 

1481 ctx = dialect.execution_ctx_cls._init_default( 

1482 dialect, self, conn, exec_opts 

1483 ) 

1484 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1485 raise 

1486 except BaseException as e: 

1487 self._handle_dbapi_exception(e, None, None, None, None) 

1488 

1489 ret = ctx._exec_default(None, default, None) 

1490 

1491 if self._has_events or self.engine._has_events: 

1492 self.dispatch.after_execute( 

1493 self, 

1494 default, 

1495 event_multiparams, 

1496 event_params, 

1497 exec_opts, 

1498 ret, 

1499 ) 

1500 

1501 return ret 

1502 

1503 def _execute_ddl( 

1504 self, 

1505 ddl: ExecutableDDLElement, 

1506 distilled_parameters: _CoreMultiExecuteParams, 

1507 execution_options: CoreExecuteOptionsParameter, 

1508 ) -> CursorResult[Unpack[TupleAny]]: 

1509 """Execute a schema.DDL object.""" 

1510 

1511 exec_opts = ddl._execution_options.merge_with( 

1512 self._execution_options, execution_options 

1513 ) 

1514 

1515 event_multiparams: Optional[_CoreMultiExecuteParams] 

1516 event_params: Optional[_CoreSingleExecuteParams] 

1517 

1518 if self._has_events or self.engine._has_events: 

1519 ( 

1520 ddl, 

1521 distilled_parameters, 

1522 event_multiparams, 

1523 event_params, 

1524 ) = self._invoke_before_exec_event( 

1525 ddl, distilled_parameters, exec_opts 

1526 ) 

1527 else: 

1528 event_multiparams = event_params = None 

1529 

1530 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1531 

1532 dialect = self.dialect 

1533 

1534 compiled = ddl.compile( 

1535 dialect=dialect, schema_translate_map=schema_translate_map 

1536 ) 

1537 ret = self._execute_context( 

1538 dialect, 

1539 dialect.execution_ctx_cls._init_ddl, 

1540 compiled, 

1541 None, 

1542 exec_opts, 

1543 compiled, 

1544 ) 

1545 if self._has_events or self.engine._has_events: 

1546 self.dispatch.after_execute( 

1547 self, 

1548 ddl, 

1549 event_multiparams, 

1550 event_params, 

1551 exec_opts, 

1552 ret, 

1553 ) 

1554 return ret 

1555 

1556 def _invoke_before_exec_event( 

1557 self, 

1558 elem: Any, 

1559 distilled_params: _CoreMultiExecuteParams, 

1560 execution_options: _ExecuteOptions, 

1561 ) -> Tuple[ 

1562 Any, 

1563 _CoreMultiExecuteParams, 

1564 _CoreMultiExecuteParams, 

1565 _CoreSingleExecuteParams, 

1566 ]: 

1567 event_multiparams: _CoreMultiExecuteParams 

1568 event_params: _CoreSingleExecuteParams 

1569 

1570 if len(distilled_params) == 1: 

1571 event_multiparams, event_params = [], distilled_params[0] 

1572 else: 

1573 event_multiparams, event_params = distilled_params, {} 

1574 

1575 for fn in self.dispatch.before_execute: 

1576 elem, event_multiparams, event_params = fn( 

1577 self, 

1578 elem, 

1579 event_multiparams, 

1580 event_params, 

1581 execution_options, 

1582 ) 

1583 

1584 if event_multiparams: 

1585 distilled_params = list(event_multiparams) 

1586 if event_params: 

1587 raise exc.InvalidRequestError( 

1588 "Event handler can't return non-empty multiparams " 

1589 "and params at the same time" 

1590 ) 

1591 elif event_params: 

1592 distilled_params = [event_params] 

1593 else: 

1594 distilled_params = [] 

1595 

1596 return elem, distilled_params, event_multiparams, event_params 

1597 

1598 def _execute_clauseelement( 

1599 self, 

1600 elem: Executable, 

1601 distilled_parameters: _CoreMultiExecuteParams, 

1602 execution_options: CoreExecuteOptionsParameter, 

1603 ) -> CursorResult[Unpack[TupleAny]]: 

1604 """Execute a sql.ClauseElement object.""" 

1605 

1606 exec_opts = elem._execution_options.merge_with( 

1607 self._execution_options, execution_options 

1608 ) 

1609 

1610 has_events = self._has_events or self.engine._has_events 

1611 if has_events: 

1612 ( 

1613 elem, 

1614 distilled_parameters, 

1615 event_multiparams, 

1616 event_params, 

1617 ) = self._invoke_before_exec_event( 

1618 elem, distilled_parameters, exec_opts 

1619 ) 

1620 

1621 if distilled_parameters: 

1622 # ensure we don't retain a link to the view object for keys() 

1623 # which links to the values, which we don't want to cache 

1624 keys = sorted(distilled_parameters[0]) 

1625 for_executemany = len(distilled_parameters) > 1 

1626 else: 

1627 keys = [] 

1628 for_executemany = False 

1629 

1630 dialect = self.dialect 

1631 

1632 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1633 

1634 compiled_cache: Optional[CompiledCacheType] = exec_opts.get( 

1635 "compiled_cache", self.engine._compiled_cache 

1636 ) 

1637 

1638 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 

1639 dialect=dialect, 

1640 compiled_cache=compiled_cache, 

1641 column_keys=keys, 

1642 for_executemany=for_executemany, 

1643 schema_translate_map=schema_translate_map, 

1644 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1645 ) 

1646 ret = self._execute_context( 

1647 dialect, 

1648 dialect.execution_ctx_cls._init_compiled, 

1649 compiled_sql, 

1650 distilled_parameters, 

1651 exec_opts, 

1652 compiled_sql, 

1653 distilled_parameters, 

1654 elem, 

1655 extracted_params, 

1656 cache_hit=cache_hit, 

1657 ) 

1658 if has_events: 

1659 self.dispatch.after_execute( 

1660 self, 

1661 elem, 

1662 event_multiparams, 

1663 event_params, 

1664 exec_opts, 

1665 ret, 

1666 ) 

1667 return ret 

1668 

1669 def _execute_compiled( 

1670 self, 

1671 compiled: Compiled, 

1672 distilled_parameters: _CoreMultiExecuteParams, 

1673 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS, 

1674 ) -> CursorResult[Unpack[TupleAny]]: 

1675 """Execute a sql.Compiled object. 

1676 

1677 TODO: why do we have this? likely deprecate or remove 

1678 

1679 """ 

1680 

1681 exec_opts = compiled.execution_options.merge_with( 

1682 self._execution_options, execution_options 

1683 ) 

1684 

1685 if self._has_events or self.engine._has_events: 

1686 ( 

1687 compiled, 

1688 distilled_parameters, 

1689 event_multiparams, 

1690 event_params, 

1691 ) = self._invoke_before_exec_event( 

1692 compiled, distilled_parameters, exec_opts 

1693 ) 

1694 

1695 dialect = self.dialect 

1696 

1697 ret = self._execute_context( 

1698 dialect, 

1699 dialect.execution_ctx_cls._init_compiled, 

1700 compiled, 

1701 distilled_parameters, 

1702 exec_opts, 

1703 compiled, 

1704 distilled_parameters, 

1705 None, 

1706 None, 

1707 ) 

1708 if self._has_events or self.engine._has_events: 

1709 self.dispatch.after_execute( 

1710 self, 

1711 compiled, 

1712 event_multiparams, 

1713 event_params, 

1714 exec_opts, 

1715 ret, 

1716 ) 

1717 return ret 

1718 

1719 def exec_driver_sql( 

1720 self, 

1721 statement: str, 

1722 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1723 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1724 ) -> CursorResult[Unpack[TupleAny]]: 

1725 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1726 without any SQL compilation steps. 

1727 

1728 This can be used to pass any string directly to the 

1729 ``cursor.execute()`` method of the DBAPI in use. 

1730 

1731 :param statement: The statement str to be executed. Bound parameters 

1732 must use the underlying DBAPI's paramstyle, such as "qmark", 

1733 "pyformat", "format", etc. 

1734 

1735 :param parameters: represent bound parameter values to be used in the 

1736 execution. The format is one of: a dictionary of named parameters, 

1737 a tuple of positional parameters, or a list containing either 

1738 dictionaries or tuples for multiple-execute support. 

1739 

1740 :return: a :class:`_engine.CursorResult`. 

1741 

1742 E.g. multiple dictionaries:: 

1743 

1744 

1745 conn.exec_driver_sql( 

1746 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1747 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}], 

1748 ) 

1749 

1750 Single dictionary:: 

1751 

1752 conn.exec_driver_sql( 

1753 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1754 dict(id=1, value="v1"), 

1755 ) 

1756 

1757 Single tuple:: 

1758 

1759 conn.exec_driver_sql( 

1760 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1") 

1761 ) 

1762 

1763 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1764 not participate in the 

1765 :meth:`_events.ConnectionEvents.before_execute` and 

1766 :meth:`_events.ConnectionEvents.after_execute` events. To 

1767 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1768 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1769 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1770 

1771 .. seealso:: 

1772 

1773 :pep:`249` 

1774 

1775 """ 

1776 

1777 distilled_parameters = _distill_raw_params(parameters) 

1778 

1779 exec_opts = self._execution_options.merge_with(execution_options) 

1780 

1781 dialect = self.dialect 

1782 ret = self._execute_context( 

1783 dialect, 

1784 dialect.execution_ctx_cls._init_statement, 

1785 statement, 

1786 None, 

1787 exec_opts, 

1788 statement, 

1789 distilled_parameters, 

1790 ) 

1791 

1792 return ret 

1793 

1794 def _execute_context( 

1795 self, 

1796 dialect: Dialect, 

1797 constructor: Callable[..., ExecutionContext], 

1798 statement: Union[str, Compiled], 

1799 parameters: Optional[_AnyMultiExecuteParams], 

1800 execution_options: _ExecuteOptions, 

1801 *args: Any, 

1802 **kw: Any, 

1803 ) -> CursorResult[Unpack[TupleAny]]: 

1804 """Create an :class:`.ExecutionContext` and execute, returning 

1805 a :class:`_engine.CursorResult`.""" 

1806 

1807 if execution_options: 

1808 yp = execution_options.get("yield_per", None) 

1809 if yp: 

1810 execution_options = execution_options.union( 

1811 {"stream_results": True, "max_row_buffer": yp} 

1812 ) 

1813 try: 

1814 conn = self._dbapi_connection 

1815 if conn is None: 

1816 conn = self._revalidate_connection() 

1817 

1818 context = constructor( 

1819 dialect, self, conn, execution_options, *args, **kw 

1820 ) 

1821 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1822 raise 

1823 except BaseException as e: 

1824 self._handle_dbapi_exception( 

1825 e, str(statement), parameters, None, None 

1826 ) 

1827 

1828 if ( 

1829 self._transaction 

1830 and not self._transaction.is_active 

1831 or ( 

1832 self._nested_transaction 

1833 and not self._nested_transaction.is_active 

1834 ) 

1835 ): 

1836 self._invalid_transaction() 

1837 

1838 elif self._trans_context_manager: 

1839 TransactionalContext._trans_ctx_check(self) 

1840 

1841 if self._transaction is None: 

1842 self._autobegin() 

1843 

1844 context.pre_exec() 

1845 

1846 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1847 return self._exec_insertmany_context(dialect, context) 

1848 else: 

1849 return self._exec_single_context( 

1850 dialect, context, statement, parameters 

1851 ) 

1852 

1853 def _exec_single_context( 

1854 self, 

1855 dialect: Dialect, 

1856 context: ExecutionContext, 

1857 statement: Union[str, Compiled], 

1858 parameters: Optional[_AnyMultiExecuteParams], 

1859 ) -> CursorResult[Unpack[TupleAny]]: 

1860 """continue the _execute_context() method for a single DBAPI 

1861 cursor.execute() or cursor.executemany() call. 

1862 

1863 """ 

1864 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1865 generic_setinputsizes = context._prepare_set_input_sizes() 

1866 

1867 if generic_setinputsizes: 

1868 try: 

1869 dialect.do_set_input_sizes( 

1870 context.cursor, generic_setinputsizes, context 

1871 ) 

1872 except BaseException as e: 

1873 self._handle_dbapi_exception( 

1874 e, str(statement), parameters, None, context 

1875 ) 

1876 

1877 cursor, str_statement, parameters = ( 

1878 context.cursor, 

1879 context.statement, 

1880 context.parameters, 

1881 ) 

1882 

1883 effective_parameters: Optional[_AnyExecuteParams] 

1884 

1885 if not context.executemany: 

1886 effective_parameters = parameters[0] 

1887 else: 

1888 effective_parameters = parameters 

1889 

1890 if self._has_events or self.engine._has_events: 

1891 for fn in self.dispatch.before_cursor_execute: 

1892 str_statement, effective_parameters = fn( 

1893 self, 

1894 cursor, 

1895 str_statement, 

1896 effective_parameters, 

1897 context, 

1898 context.executemany, 

1899 ) 

1900 

1901 if self._echo: 

1902 self._log_info(str_statement) 

1903 

1904 stats = context._get_cache_stats() 

1905 

1906 if not self.engine.hide_parameters: 

1907 self._log_info( 

1908 "[%s] %r", 

1909 stats, 

1910 sql_util._repr_params( 

1911 effective_parameters, 

1912 batches=10, 

1913 ismulti=context.executemany, 

1914 ), 

1915 ) 

1916 else: 

1917 self._log_info( 

1918 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1919 stats, 

1920 ) 

1921 

1922 evt_handled: bool = False 

1923 try: 

1924 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1925 effective_parameters = cast( 

1926 "_CoreMultiExecuteParams", effective_parameters 

1927 ) 

1928 if self.dialect._has_events: 

1929 for fn in self.dialect.dispatch.do_executemany: 

1930 if fn( 

1931 cursor, 

1932 str_statement, 

1933 effective_parameters, 

1934 context, 

1935 ): 

1936 evt_handled = True 

1937 break 

1938 if not evt_handled: 

1939 self.dialect.do_executemany( 

1940 cursor, 

1941 str_statement, 

1942 effective_parameters, 

1943 context, 

1944 ) 

1945 elif not effective_parameters and context.no_parameters: 

1946 if self.dialect._has_events: 

1947 for fn in self.dialect.dispatch.do_execute_no_params: 

1948 if fn(cursor, str_statement, context): 

1949 evt_handled = True 

1950 break 

1951 if not evt_handled: 

1952 self.dialect.do_execute_no_params( 

1953 cursor, str_statement, context 

1954 ) 

1955 else: 

1956 effective_parameters = cast( 

1957 "_CoreSingleExecuteParams", effective_parameters 

1958 ) 

1959 if self.dialect._has_events: 

1960 for fn in self.dialect.dispatch.do_execute: 

1961 if fn( 

1962 cursor, 

1963 str_statement, 

1964 effective_parameters, 

1965 context, 

1966 ): 

1967 evt_handled = True 

1968 break 

1969 if not evt_handled: 

1970 self.dialect.do_execute( 

1971 cursor, str_statement, effective_parameters, context 

1972 ) 

1973 

1974 if self._has_events or self.engine._has_events: 

1975 self.dispatch.after_cursor_execute( 

1976 self, 

1977 cursor, 

1978 str_statement, 

1979 effective_parameters, 

1980 context, 

1981 context.executemany, 

1982 ) 

1983 

1984 context.post_exec() 

1985 

1986 result = context._setup_result_proxy() 

1987 

1988 except BaseException as e: 

1989 self._handle_dbapi_exception( 

1990 e, str_statement, effective_parameters, cursor, context 

1991 ) 

1992 

1993 return result 

1994 

1995 def _exec_insertmany_context( 

1996 self, 

1997 dialect: Dialect, 

1998 context: ExecutionContext, 

1999 ) -> CursorResult[Unpack[TupleAny]]: 

2000 """continue the _execute_context() method for an "insertmanyvalues" 

2001 operation, which will invoke DBAPI 

2002 cursor.execute() one or more times with individual log and 

2003 event hook calls. 

2004 

2005 """ 

2006 

2007 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

2008 generic_setinputsizes = context._prepare_set_input_sizes() 

2009 else: 

2010 generic_setinputsizes = None 

2011 

2012 cursor, str_statement, parameters = ( 

2013 context.cursor, 

2014 context.statement, 

2015 context.parameters, 

2016 ) 

2017 

2018 effective_parameters = parameters 

2019 

2020 engine_events = self._has_events or self.engine._has_events 

2021 if self.dialect._has_events: 

2022 do_execute_dispatch: Iterable[Any] = ( 

2023 self.dialect.dispatch.do_execute 

2024 ) 

2025 else: 

2026 do_execute_dispatch = () 

2027 

2028 if self._echo: 

2029 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2030 

2031 preserve_rowcount = context.execution_options.get( 

2032 "preserve_rowcount", False 

2033 ) 

2034 rowcount = 0 

2035 

2036 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2037 self, 

2038 cursor, 

2039 str_statement, 

2040 effective_parameters, 

2041 generic_setinputsizes, 

2042 context, 

2043 ): 

2044 if imv_batch.processed_setinputsizes: 

2045 try: 

2046 dialect.do_set_input_sizes( 

2047 context.cursor, 

2048 imv_batch.processed_setinputsizes, 

2049 context, 

2050 ) 

2051 except BaseException as e: 

2052 self._handle_dbapi_exception( 

2053 e, 

2054 sql_util._long_statement(imv_batch.replaced_statement), 

2055 imv_batch.replaced_parameters, 

2056 None, 

2057 context, 

2058 is_sub_exec=True, 

2059 ) 

2060 

2061 sub_stmt = imv_batch.replaced_statement 

2062 sub_params = imv_batch.replaced_parameters 

2063 

2064 if engine_events: 

2065 for fn in self.dispatch.before_cursor_execute: 

2066 sub_stmt, sub_params = fn( 

2067 self, 

2068 cursor, 

2069 sub_stmt, 

2070 sub_params, 

2071 context, 

2072 True, 

2073 ) 

2074 

2075 if self._echo: 

2076 self._log_info(sql_util._long_statement(sub_stmt)) 

2077 

2078 imv_stats = f""" {imv_batch.batchnum}/{ 

2079 imv_batch.total_batches 

2080 } ({ 

2081 'ordered' 

2082 if imv_batch.rows_sorted else 'unordered' 

2083 }{ 

2084 '; batch not supported' 

2085 if imv_batch.is_downgraded 

2086 else '' 

2087 })""" 

2088 

2089 if imv_batch.batchnum == 1: 

2090 stats += imv_stats 

2091 else: 

2092 stats = f"insertmanyvalues{imv_stats}" 

2093 

2094 if not self.engine.hide_parameters: 

2095 self._log_info( 

2096 "[%s] %r", 

2097 stats, 

2098 sql_util._repr_params( 

2099 sub_params, 

2100 batches=10, 

2101 ismulti=False, 

2102 ), 

2103 ) 

2104 else: 

2105 self._log_info( 

2106 "[%s] [SQL parameters hidden due to " 

2107 "hide_parameters=True]", 

2108 stats, 

2109 ) 

2110 

2111 try: 

2112 for fn in do_execute_dispatch: 

2113 if fn( 

2114 cursor, 

2115 sub_stmt, 

2116 sub_params, 

2117 context, 

2118 ): 

2119 break 

2120 else: 

2121 dialect.do_execute( 

2122 cursor, 

2123 sub_stmt, 

2124 sub_params, 

2125 context, 

2126 ) 

2127 

2128 except BaseException as e: 

2129 self._handle_dbapi_exception( 

2130 e, 

2131 sql_util._long_statement(sub_stmt), 

2132 sub_params, 

2133 cursor, 

2134 context, 

2135 is_sub_exec=True, 

2136 ) 

2137 

2138 if engine_events: 

2139 self.dispatch.after_cursor_execute( 

2140 self, 

2141 cursor, 

2142 str_statement, 

2143 effective_parameters, 

2144 context, 

2145 context.executemany, 

2146 ) 

2147 

2148 if preserve_rowcount: 

2149 rowcount += imv_batch.current_batch_size 

2150 

2151 try: 

2152 context.post_exec() 

2153 

2154 if preserve_rowcount: 

2155 context._rowcount = rowcount # type: ignore[attr-defined] 

2156 

2157 result = context._setup_result_proxy() 

2158 

2159 except BaseException as e: 

2160 self._handle_dbapi_exception( 

2161 e, str_statement, effective_parameters, cursor, context 

2162 ) 

2163 

2164 return result 

2165 

2166 def _cursor_execute( 

2167 self, 

2168 cursor: DBAPICursor, 

2169 statement: str, 

2170 parameters: _DBAPISingleExecuteParams, 

2171 context: Optional[ExecutionContext] = None, 

2172 ) -> None: 

2173 """Execute a statement + params on the given cursor. 

2174 

2175 Adds appropriate logging and exception handling. 

2176 

2177 This method is used by DefaultDialect for special-case 

2178 executions, such as for sequences and column defaults. 

2179 The path of statement execution in the majority of cases 

2180 terminates at _execute_context(). 

2181 

2182 """ 

2183 if self._has_events or self.engine._has_events: 

2184 for fn in self.dispatch.before_cursor_execute: 

2185 statement, parameters = fn( 

2186 self, cursor, statement, parameters, context, False 

2187 ) 

2188 

2189 if self._echo: 

2190 self._log_info(statement) 

2191 self._log_info("[raw sql] %r", parameters) 

2192 try: 

2193 for fn in ( 

2194 () 

2195 if not self.dialect._has_events 

2196 else self.dialect.dispatch.do_execute 

2197 ): 

2198 if fn(cursor, statement, parameters, context): 

2199 break 

2200 else: 

2201 self.dialect.do_execute(cursor, statement, parameters, context) 

2202 except BaseException as e: 

2203 self._handle_dbapi_exception( 

2204 e, statement, parameters, cursor, context 

2205 ) 

2206 

2207 if self._has_events or self.engine._has_events: 

2208 self.dispatch.after_cursor_execute( 

2209 self, cursor, statement, parameters, context, False 

2210 ) 

2211 

2212 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2213 """Close the given cursor, catching exceptions 

2214 and turning into log warnings. 

2215 

2216 """ 

2217 try: 

2218 cursor.close() 

2219 except Exception: 

2220 # log the error through the connection pool's logger. 

2221 self.engine.pool.logger.error( 

2222 "Error closing cursor", exc_info=True 

2223 ) 

2224 

2225 _reentrant_error = False 

2226 _is_disconnect = False 

2227 

2228 def _handle_dbapi_exception( 

2229 self, 

2230 e: BaseException, 

2231 statement: Optional[str], 

2232 parameters: Optional[_AnyExecuteParams], 

2233 cursor: Optional[DBAPICursor], 

2234 context: Optional[ExecutionContext], 

2235 is_sub_exec: bool = False, 

2236 ) -> NoReturn: 

2237 exc_info = sys.exc_info() 

2238 

2239 is_exit_exception = util.is_exit_exception(e) 

2240 

2241 if not self._is_disconnect: 

2242 self._is_disconnect = ( 

2243 isinstance(e, self.dialect.loaded_dbapi.Error) 

2244 and not self.closed 

2245 and self.dialect.is_disconnect( 

2246 e, 

2247 self._dbapi_connection if not self.invalidated else None, 

2248 cursor, 

2249 ) 

2250 ) or (is_exit_exception and not self.closed) 

2251 

2252 invalidate_pool_on_disconnect = not is_exit_exception 

2253 

2254 ismulti: bool = ( 

2255 not is_sub_exec and context.executemany 

2256 if context is not None 

2257 else False 

2258 ) 

2259 if self._reentrant_error: 

2260 raise exc.DBAPIError.instance( 

2261 statement, 

2262 parameters, 

2263 e, 

2264 self.dialect.loaded_dbapi.Error, 

2265 hide_parameters=self.engine.hide_parameters, 

2266 dialect=self.dialect, 

2267 ismulti=ismulti, 

2268 ).with_traceback(exc_info[2]) from e 

2269 self._reentrant_error = True 

2270 try: 

2271 # non-DBAPI error - if we already got a context, 

2272 # or there's no string statement, don't wrap it 

2273 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2274 statement is not None 

2275 and context is None 

2276 and not is_exit_exception 

2277 ) 

2278 

2279 if should_wrap: 

2280 sqlalchemy_exception = exc.DBAPIError.instance( 

2281 statement, 

2282 parameters, 

2283 cast(Exception, e), 

2284 self.dialect.loaded_dbapi.Error, 

2285 hide_parameters=self.engine.hide_parameters, 

2286 connection_invalidated=self._is_disconnect, 

2287 dialect=self.dialect, 

2288 ismulti=ismulti, 

2289 ) 

2290 else: 

2291 sqlalchemy_exception = None 

2292 

2293 newraise = None 

2294 

2295 if (self.dialect._has_events) and not self._execution_options.get( 

2296 "skip_user_error_events", False 

2297 ): 

2298 ctx = ExceptionContextImpl( 

2299 e, 

2300 sqlalchemy_exception, 

2301 self.engine, 

2302 self.dialect, 

2303 self, 

2304 cursor, 

2305 statement, 

2306 parameters, 

2307 context, 

2308 self._is_disconnect, 

2309 invalidate_pool_on_disconnect, 

2310 False, 

2311 ) 

2312 

2313 for fn in self.dialect.dispatch.handle_error: 

2314 try: 

2315 # handler returns an exception; 

2316 # call next handler in a chain 

2317 per_fn = fn(ctx) 

2318 if per_fn is not None: 

2319 ctx.chained_exception = newraise = per_fn 

2320 except Exception as _raised: 

2321 # handler raises an exception - stop processing 

2322 newraise = _raised 

2323 break 

2324 

2325 if self._is_disconnect != ctx.is_disconnect: 

2326 self._is_disconnect = ctx.is_disconnect 

2327 if sqlalchemy_exception: 

2328 sqlalchemy_exception.connection_invalidated = ( 

2329 ctx.is_disconnect 

2330 ) 

2331 

2332 # set up potentially user-defined value for 

2333 # invalidate pool. 

2334 invalidate_pool_on_disconnect = ( 

2335 ctx.invalidate_pool_on_disconnect 

2336 ) 

2337 

2338 if should_wrap and context: 

2339 context.handle_dbapi_exception(e) 

2340 

2341 if not self._is_disconnect: 

2342 if cursor: 

2343 self._safe_close_cursor(cursor) 

2344 # "autorollback" was mostly relevant in 1.x series. 

2345 # It's very unlikely to reach here, as the connection 

2346 # does autobegin so when we are here, we are usually 

2347 # in an explicit / semi-explicit transaction. 

2348 # however we have a test which manufactures this 

2349 # scenario in any case using an event handler. 

2350 # test/engine/test_execute.py-> test_actual_autorollback 

2351 if not self.in_transaction(): 

2352 self._rollback_impl() 

2353 

2354 if newraise: 

2355 raise newraise.with_traceback(exc_info[2]) from e 

2356 elif should_wrap: 

2357 assert sqlalchemy_exception is not None 

2358 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2359 else: 

2360 assert exc_info[1] is not None 

2361 raise exc_info[1].with_traceback(exc_info[2]) 

2362 finally: 

2363 del self._reentrant_error 

2364 if self._is_disconnect: 

2365 del self._is_disconnect 

2366 if not self.invalidated: 

2367 dbapi_conn_wrapper = self._dbapi_connection 

2368 assert dbapi_conn_wrapper is not None 

2369 if invalidate_pool_on_disconnect: 

2370 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2371 self.invalidate(e) 

2372 

2373 @classmethod 

2374 def _handle_dbapi_exception_noconnection( 

2375 cls, 

2376 e: BaseException, 

2377 dialect: Dialect, 

2378 engine: Optional[Engine] = None, 

2379 is_disconnect: Optional[bool] = None, 

2380 invalidate_pool_on_disconnect: bool = True, 

2381 is_pre_ping: bool = False, 

2382 ) -> NoReturn: 

2383 exc_info = sys.exc_info() 

2384 

2385 if is_disconnect is None: 

2386 is_disconnect = isinstance( 

2387 e, dialect.loaded_dbapi.Error 

2388 ) and dialect.is_disconnect(e, None, None) 

2389 

2390 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2391 

2392 if should_wrap: 

2393 sqlalchemy_exception = exc.DBAPIError.instance( 

2394 None, 

2395 None, 

2396 cast(Exception, e), 

2397 dialect.loaded_dbapi.Error, 

2398 hide_parameters=( 

2399 engine.hide_parameters if engine is not None else False 

2400 ), 

2401 connection_invalidated=is_disconnect, 

2402 dialect=dialect, 

2403 ) 

2404 else: 

2405 sqlalchemy_exception = None 

2406 

2407 newraise = None 

2408 

2409 if dialect._has_events: 

2410 ctx = ExceptionContextImpl( 

2411 e, 

2412 sqlalchemy_exception, 

2413 engine, 

2414 dialect, 

2415 None, 

2416 None, 

2417 None, 

2418 None, 

2419 None, 

2420 is_disconnect, 

2421 invalidate_pool_on_disconnect, 

2422 is_pre_ping, 

2423 ) 

2424 for fn in dialect.dispatch.handle_error: 

2425 try: 

2426 # handler returns an exception; 

2427 # call next handler in a chain 

2428 per_fn = fn(ctx) 

2429 if per_fn is not None: 

2430 ctx.chained_exception = newraise = per_fn 

2431 except Exception as _raised: 

2432 # handler raises an exception - stop processing 

2433 newraise = _raised 

2434 break 

2435 

2436 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2437 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect 

2438 

2439 if newraise: 

2440 raise newraise.with_traceback(exc_info[2]) from e 

2441 elif should_wrap: 

2442 assert sqlalchemy_exception is not None 

2443 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2444 else: 

2445 assert exc_info[1] is not None 

2446 raise exc_info[1].with_traceback(exc_info[2]) 

2447 

2448 def _run_ddl_visitor( 

2449 self, 

2450 visitorcallable: Type[InvokeDDLBase], 

2451 element: SchemaVisitable, 

2452 **kwargs: Any, 

2453 ) -> None: 

2454 """run a DDL visitor. 

2455 

2456 This method is only here so that the MockConnection can change the 

2457 options given to the visitor so that "checkfirst" is skipped. 

2458 

2459 """ 

2460 visitorcallable( 

2461 dialect=self.dialect, connection=self, **kwargs 

2462 ).traverse_single(element) 

2463 

2464 

2465class ExceptionContextImpl(ExceptionContext): 

2466 """Implement the :class:`.ExceptionContext` interface.""" 

2467 

2468 __slots__ = ( 

2469 "connection", 

2470 "engine", 

2471 "dialect", 

2472 "cursor", 

2473 "statement", 

2474 "parameters", 

2475 "original_exception", 

2476 "sqlalchemy_exception", 

2477 "chained_exception", 

2478 "execution_context", 

2479 "is_disconnect", 

2480 "invalidate_pool_on_disconnect", 

2481 "is_pre_ping", 

2482 ) 

2483 

2484 def __init__( 

2485 self, 

2486 exception: BaseException, 

2487 sqlalchemy_exception: Optional[exc.StatementError], 

2488 engine: Optional[Engine], 

2489 dialect: Dialect, 

2490 connection: Optional[Connection], 

2491 cursor: Optional[DBAPICursor], 

2492 statement: Optional[str], 

2493 parameters: Optional[_DBAPIAnyExecuteParams], 

2494 context: Optional[ExecutionContext], 

2495 is_disconnect: bool, 

2496 invalidate_pool_on_disconnect: bool, 

2497 is_pre_ping: bool, 

2498 ): 

2499 self.engine = engine 

2500 self.dialect = dialect 

2501 self.connection = connection 

2502 self.sqlalchemy_exception = sqlalchemy_exception 

2503 self.original_exception = exception 

2504 self.execution_context = context 

2505 self.statement = statement 

2506 self.parameters = parameters 

2507 self.is_disconnect = is_disconnect 

2508 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2509 self.is_pre_ping = is_pre_ping 

2510 

2511 

2512class Transaction(TransactionalContext): 

2513 """Represent a database transaction in progress. 

2514 

2515 The :class:`.Transaction` object is procured by 

2516 calling the :meth:`_engine.Connection.begin` method of 

2517 :class:`_engine.Connection`:: 

2518 

2519 from sqlalchemy import create_engine 

2520 

2521 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2522 connection = engine.connect() 

2523 trans = connection.begin() 

2524 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2525 trans.commit() 

2526 

2527 The object provides :meth:`.rollback` and :meth:`.commit` 

2528 methods in order to control transaction boundaries. It 

2529 also implements a context manager interface so that 

2530 the Python ``with`` statement can be used with the 

2531 :meth:`_engine.Connection.begin` method:: 

2532 

2533 with connection.begin(): 

2534 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2535 

2536 The Transaction object is **not** threadsafe. 

2537 

2538 .. seealso:: 

2539 

2540 :meth:`_engine.Connection.begin` 

2541 

2542 :meth:`_engine.Connection.begin_twophase` 

2543 

2544 :meth:`_engine.Connection.begin_nested` 

2545 

2546 .. index:: 

2547 single: thread safety; Transaction 

2548 """ # noqa 

2549 

2550 __slots__ = () 

2551 

2552 _is_root: bool = False 

2553 is_active: bool 

2554 connection: Connection 

2555 

2556 def __init__(self, connection: Connection): 

2557 raise NotImplementedError() 

2558 

2559 @property 

2560 def _deactivated_from_connection(self) -> bool: 

2561 """True if this transaction is totally deactivated from the connection 

2562 and therefore can no longer affect its state. 

2563 

2564 """ 

2565 raise NotImplementedError() 

2566 

2567 def _do_close(self) -> None: 

2568 raise NotImplementedError() 

2569 

2570 def _do_rollback(self) -> None: 

2571 raise NotImplementedError() 

2572 

2573 def _do_commit(self) -> None: 

2574 raise NotImplementedError() 

2575 

2576 @property 

2577 def is_valid(self) -> bool: 

2578 return self.is_active and not self.connection.invalidated 

2579 

2580 def close(self) -> None: 

2581 """Close this :class:`.Transaction`. 

2582 

2583 If this transaction is the base transaction in a begin/commit 

2584 nesting, the transaction will rollback(). Otherwise, the 

2585 method returns. 

2586 

2587 This is used to cancel a Transaction without affecting the scope of 

2588 an enclosing transaction. 

2589 

2590 """ 

2591 try: 

2592 self._do_close() 

2593 finally: 

2594 assert not self.is_active 

2595 

2596 def rollback(self) -> None: 

2597 """Roll back this :class:`.Transaction`. 

2598 

2599 The implementation of this may vary based on the type of transaction in 

2600 use: 

2601 

2602 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2603 it corresponds to a ROLLBACK. 

2604 

2605 * For a :class:`.NestedTransaction`, it corresponds to a 

2606 "ROLLBACK TO SAVEPOINT" operation. 

2607 

2608 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2609 phase transactions may be used. 

2610 

2611 

2612 """ 

2613 try: 

2614 self._do_rollback() 

2615 finally: 

2616 assert not self.is_active 

2617 

2618 def commit(self) -> None: 

2619 """Commit this :class:`.Transaction`. 

2620 

2621 The implementation of this may vary based on the type of transaction in 

2622 use: 

2623 

2624 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2625 it corresponds to a COMMIT. 

2626 

2627 * For a :class:`.NestedTransaction`, it corresponds to a 

2628 "RELEASE SAVEPOINT" operation. 

2629 

2630 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2631 phase transactions may be used. 

2632 

2633 """ 

2634 try: 

2635 self._do_commit() 

2636 finally: 

2637 assert not self.is_active 

2638 

2639 def _get_subject(self) -> Connection: 

2640 return self.connection 

2641 

2642 def _transaction_is_active(self) -> bool: 

2643 return self.is_active 

2644 

2645 def _transaction_is_closed(self) -> bool: 

2646 return not self._deactivated_from_connection 

2647 

2648 def _rollback_can_be_called(self) -> bool: 

2649 # for RootTransaction / NestedTransaction, it's safe to call 

2650 # rollback() even if the transaction is deactive and no warnings 

2651 # will be emitted. tested in 

2652 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2653 return True 

2654 

2655 

2656class RootTransaction(Transaction): 

2657 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2658 

2659 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2660 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2661 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2662 remains associated with the :class:`_engine.Connection` throughout its 

2663 active span. The current :class:`_engine.RootTransaction` in use is 

2664 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2665 :class:`_engine.Connection`. 

2666 

2667 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2668 "autobegin" behavior that will create a new 

2669 :class:`_engine.RootTransaction` whenever a connection in a 

2670 non-transactional state is used to emit commands on the DBAPI connection. 

2671 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2672 use can be controlled using the :meth:`_engine.Connection.commit` and 

2673 :meth:`_engine.Connection.rollback` methods. 

2674 

2675 

2676 """ 

2677 

2678 _is_root = True 

2679 

2680 __slots__ = ("connection", "is_active") 

2681 

2682 def __init__(self, connection: Connection): 

2683 assert connection._transaction is None 

2684 if connection._trans_context_manager: 

2685 TransactionalContext._trans_ctx_check(connection) 

2686 self.connection = connection 

2687 self._connection_begin_impl() 

2688 connection._transaction = self 

2689 

2690 self.is_active = True 

2691 

2692 def _deactivate_from_connection(self) -> None: 

2693 if self.is_active: 

2694 assert self.connection._transaction is self 

2695 self.is_active = False 

2696 

2697 elif self.connection._transaction is not self: 

2698 util.warn("transaction already deassociated from connection") 

2699 

2700 @property 

2701 def _deactivated_from_connection(self) -> bool: 

2702 return self.connection._transaction is not self 

2703 

2704 def _connection_begin_impl(self) -> None: 

2705 self.connection._begin_impl(self) 

2706 

2707 def _connection_rollback_impl(self) -> None: 

2708 self.connection._rollback_impl() 

2709 

2710 def _connection_commit_impl(self) -> None: 

2711 self.connection._commit_impl() 

2712 

2713 def _close_impl(self, try_deactivate: bool = False) -> None: 

2714 try: 

2715 if self.is_active: 

2716 self._connection_rollback_impl() 

2717 

2718 if self.connection._nested_transaction: 

2719 self.connection._nested_transaction._cancel() 

2720 finally: 

2721 if self.is_active or try_deactivate: 

2722 self._deactivate_from_connection() 

2723 if self.connection._transaction is self: 

2724 self.connection._transaction = None 

2725 

2726 assert not self.is_active 

2727 assert self.connection._transaction is not self 

2728 

2729 def _do_close(self) -> None: 

2730 self._close_impl() 

2731 

2732 def _do_rollback(self) -> None: 

2733 self._close_impl(try_deactivate=True) 

2734 

2735 def _do_commit(self) -> None: 

2736 if self.is_active: 

2737 assert self.connection._transaction is self 

2738 

2739 try: 

2740 self._connection_commit_impl() 

2741 finally: 

2742 # whether or not commit succeeds, cancel any 

2743 # nested transactions, make this transaction "inactive" 

2744 # and remove it as a reset agent 

2745 if self.connection._nested_transaction: 

2746 self.connection._nested_transaction._cancel() 

2747 

2748 self._deactivate_from_connection() 

2749 

2750 # ...however only remove as the connection's current transaction 

2751 # if commit succeeded. otherwise it stays on so that a rollback 

2752 # needs to occur. 

2753 self.connection._transaction = None 

2754 else: 

2755 if self.connection._transaction is self: 

2756 self.connection._invalid_transaction() 

2757 else: 

2758 raise exc.InvalidRequestError("This transaction is inactive") 

2759 

2760 assert not self.is_active 

2761 assert self.connection._transaction is not self 

2762 

2763 

2764class NestedTransaction(Transaction): 

2765 """Represent a 'nested', or SAVEPOINT transaction. 

2766 

2767 The :class:`.NestedTransaction` object is created by calling the 

2768 :meth:`_engine.Connection.begin_nested` method of 

2769 :class:`_engine.Connection`. 

2770 

2771 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2772 "commit" / "rollback" are as follows: 

2773 

2774 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2775 the savepoint is given an explicit name that is part of the state 

2776 of this object. 

2777 

2778 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2779 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2780 with this :class:`.NestedTransaction`. 

2781 

2782 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2783 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2784 associated with this :class:`.NestedTransaction`. 

2785 

2786 The rationale for mimicking the semantics of an outer transaction in 

2787 terms of savepoints so that code may deal with a "savepoint" transaction 

2788 and an "outer" transaction in an agnostic way. 

2789 

2790 .. seealso:: 

2791 

2792 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2793 

2794 """ 

2795 

2796 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2797 

2798 _savepoint: str 

2799 

2800 def __init__(self, connection: Connection): 

2801 assert connection._transaction is not None 

2802 if connection._trans_context_manager: 

2803 TransactionalContext._trans_ctx_check(connection) 

2804 self.connection = connection 

2805 self._savepoint = self.connection._savepoint_impl() 

2806 self.is_active = True 

2807 self._previous_nested = connection._nested_transaction 

2808 connection._nested_transaction = self 

2809 

2810 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2811 if self.connection._nested_transaction is self: 

2812 self.connection._nested_transaction = self._previous_nested 

2813 elif warn: 

2814 util.warn( 

2815 "nested transaction already deassociated from connection" 

2816 ) 

2817 

2818 @property 

2819 def _deactivated_from_connection(self) -> bool: 

2820 return self.connection._nested_transaction is not self 

2821 

2822 def _cancel(self) -> None: 

2823 # called by RootTransaction when the outer transaction is 

2824 # committed, rolled back, or closed to cancel all savepoints 

2825 # without any action being taken 

2826 self.is_active = False 

2827 self._deactivate_from_connection() 

2828 if self._previous_nested: 

2829 self._previous_nested._cancel() 

2830 

2831 def _close_impl( 

2832 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2833 ) -> None: 

2834 try: 

2835 if ( 

2836 self.is_active 

2837 and self.connection._transaction 

2838 and self.connection._transaction.is_active 

2839 ): 

2840 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2841 finally: 

2842 self.is_active = False 

2843 

2844 if deactivate_from_connection: 

2845 self._deactivate_from_connection(warn=warn_already_deactive) 

2846 

2847 assert not self.is_active 

2848 if deactivate_from_connection: 

2849 assert self.connection._nested_transaction is not self 

2850 

2851 def _do_close(self) -> None: 

2852 self._close_impl(True, False) 

2853 

2854 def _do_rollback(self) -> None: 

2855 self._close_impl(True, True) 

2856 

2857 def _do_commit(self) -> None: 

2858 if self.is_active: 

2859 try: 

2860 self.connection._release_savepoint_impl(self._savepoint) 

2861 finally: 

2862 # nested trans becomes inactive on failed release 

2863 # unconditionally. this prevents it from trying to 

2864 # emit SQL when it rolls back. 

2865 self.is_active = False 

2866 

2867 # but only de-associate from connection if it succeeded 

2868 self._deactivate_from_connection() 

2869 else: 

2870 if self.connection._nested_transaction is self: 

2871 self.connection._invalid_transaction() 

2872 else: 

2873 raise exc.InvalidRequestError( 

2874 "This nested transaction is inactive" 

2875 ) 

2876 

2877 

2878class TwoPhaseTransaction(RootTransaction): 

2879 """Represent a two-phase transaction. 

2880 

2881 A new :class:`.TwoPhaseTransaction` object may be procured 

2882 using the :meth:`_engine.Connection.begin_twophase` method. 

2883 

2884 The interface is the same as that of :class:`.Transaction` 

2885 with the addition of the :meth:`prepare` method. 

2886 

2887 """ 

2888 

2889 __slots__ = ("xid", "_is_prepared") 

2890 

2891 xid: Any 

2892 

2893 def __init__(self, connection: Connection, xid: Any): 

2894 self._is_prepared = False 

2895 self.xid = xid 

2896 super().__init__(connection) 

2897 

2898 def prepare(self) -> None: 

2899 """Prepare this :class:`.TwoPhaseTransaction`. 

2900 

2901 After a PREPARE, the transaction can be committed. 

2902 

2903 """ 

2904 if not self.is_active: 

2905 raise exc.InvalidRequestError("This transaction is inactive") 

2906 self.connection._prepare_twophase_impl(self.xid) 

2907 self._is_prepared = True 

2908 

2909 def _connection_begin_impl(self) -> None: 

2910 self.connection._begin_twophase_impl(self) 

2911 

2912 def _connection_rollback_impl(self) -> None: 

2913 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2914 

2915 def _connection_commit_impl(self) -> None: 

2916 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2917 

2918 

2919class Engine( 

2920 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2921): 

2922 """ 

2923 Connects a :class:`~sqlalchemy.pool.Pool` and 

2924 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2925 source of database connectivity and behavior. 

2926 

2927 An :class:`_engine.Engine` object is instantiated publicly using the 

2928 :func:`~sqlalchemy.create_engine` function. 

2929 

2930 .. seealso:: 

2931 

2932 :doc:`/core/engines` 

2933 

2934 :ref:`connections_toplevel` 

2935 

2936 """ 

2937 

2938 dispatch: dispatcher[ConnectionEventsTarget] 

2939 

2940 _compiled_cache: Optional[CompiledCacheType] 

2941 

2942 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2943 _has_events: bool = False 

2944 _connection_cls: Type[Connection] = Connection 

2945 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2946 _is_future: bool = False 

2947 

2948 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2949 _option_cls: Type[OptionEngine] 

2950 

2951 dialect: Dialect 

2952 pool: Pool 

2953 url: URL 

2954 hide_parameters: bool 

2955 

2956 def __init__( 

2957 self, 

2958 pool: Pool, 

2959 dialect: Dialect, 

2960 url: URL, 

2961 logging_name: Optional[str] = None, 

2962 echo: Optional[_EchoFlagType] = None, 

2963 query_cache_size: int = 500, 

2964 execution_options: Optional[Mapping[str, Any]] = None, 

2965 hide_parameters: bool = False, 

2966 ): 

2967 self.pool = pool 

2968 self.url = url 

2969 self.dialect = dialect 

2970 if logging_name: 

2971 self.logging_name = logging_name 

2972 self.echo = echo 

2973 self.hide_parameters = hide_parameters 

2974 if query_cache_size != 0: 

2975 self._compiled_cache = util.LRUCache( 

2976 query_cache_size, size_alert=self._lru_size_alert 

2977 ) 

2978 else: 

2979 self._compiled_cache = None 

2980 log.instance_logger(self, echoflag=echo) 

2981 if execution_options: 

2982 self.update_execution_options(**execution_options) 

2983 

2984 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2985 if self._should_log_info(): 

2986 self.logger.info( 

2987 "Compiled cache size pruning from %d items to %d. " 

2988 "Increase cache size to reduce the frequency of pruning.", 

2989 len(cache), 

2990 cache.capacity, 

2991 ) 

2992 

2993 @property 

2994 def engine(self) -> Engine: 

2995 """Returns this :class:`.Engine`. 

2996 

2997 Used for legacy schemes that accept :class:`.Connection` / 

2998 :class:`.Engine` objects within the same variable. 

2999 

3000 """ 

3001 return self 

3002 

3003 def clear_compiled_cache(self) -> None: 

3004 """Clear the compiled cache associated with the dialect. 

3005 

3006 This applies **only** to the built-in cache that is established 

3007 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

3008 It will not impact any dictionary caches that were passed via the 

3009 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

3010 

3011 .. versionadded:: 1.4 

3012 

3013 """ 

3014 if self._compiled_cache: 

3015 self._compiled_cache.clear() 

3016 

3017 def update_execution_options(self, **opt: Any) -> None: 

3018 r"""Update the default execution_options dictionary 

3019 of this :class:`_engine.Engine`. 

3020 

3021 The given keys/values in \**opt are added to the 

3022 default execution options that will be used for 

3023 all connections. The initial contents of this dictionary 

3024 can be sent via the ``execution_options`` parameter 

3025 to :func:`_sa.create_engine`. 

3026 

3027 .. seealso:: 

3028 

3029 :meth:`_engine.Connection.execution_options` 

3030 

3031 :meth:`_engine.Engine.execution_options` 

3032 

3033 """ 

3034 self.dispatch.set_engine_execution_options(self, opt) 

3035 self._execution_options = self._execution_options.union(opt) 

3036 self.dialect.set_engine_execution_options(self, opt) 

3037 

3038 @overload 

3039 def execution_options( 

3040 self, 

3041 *, 

3042 compiled_cache: Optional[CompiledCacheType] = ..., 

3043 logging_token: str = ..., 

3044 isolation_level: IsolationLevel = ..., 

3045 insertmanyvalues_page_size: int = ..., 

3046 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3047 **opt: Any, 

3048 ) -> OptionEngine: ... 

3049 

3050 @overload 

3051 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3052 

3053 def execution_options(self, **opt: Any) -> OptionEngine: 

3054 """Return a new :class:`_engine.Engine` that will provide 

3055 :class:`_engine.Connection` objects with the given execution options. 

3056 

3057 The returned :class:`_engine.Engine` remains related to the original 

3058 :class:`_engine.Engine` in that it shares the same connection pool and 

3059 other state: 

3060 

3061 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3062 is the 

3063 same instance. The :meth:`_engine.Engine.dispose` 

3064 method will replace 

3065 the connection pool instance for the parent engine as well 

3066 as this one. 

3067 * Event listeners are "cascaded" - meaning, the new 

3068 :class:`_engine.Engine` 

3069 inherits the events of the parent, and new events can be associated 

3070 with the new :class:`_engine.Engine` individually. 

3071 * The logging configuration and logging_name is copied from the parent 

3072 :class:`_engine.Engine`. 

3073 

3074 The intent of the :meth:`_engine.Engine.execution_options` method is 

3075 to implement schemes where multiple :class:`_engine.Engine` 

3076 objects refer to the same connection pool, but are differentiated 

3077 by options that affect some execution-level behavior for each 

3078 engine. One such example is breaking into separate "reader" and 

3079 "writer" :class:`_engine.Engine` instances, where one 

3080 :class:`_engine.Engine` 

3081 has a lower :term:`isolation level` setting configured or is even 

3082 transaction-disabled using "autocommit". An example of this 

3083 configuration is at :ref:`dbapi_autocommit_multiple`. 

3084 

3085 Another example is one that 

3086 uses a custom option ``shard_id`` which is consumed by an event 

3087 to change the current schema on a database connection:: 

3088 

3089 from sqlalchemy import event 

3090 from sqlalchemy.engine import Engine 

3091 

3092 primary_engine = create_engine("mysql+mysqldb://") 

3093 shard1 = primary_engine.execution_options(shard_id="shard1") 

3094 shard2 = primary_engine.execution_options(shard_id="shard2") 

3095 

3096 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3097 

3098 

3099 @event.listens_for(Engine, "before_cursor_execute") 

3100 def _switch_shard(conn, cursor, stmt, params, context, executemany): 

3101 shard_id = conn.get_execution_options().get("shard_id", "default") 

3102 current_shard = conn.info.get("current_shard", None) 

3103 

3104 if current_shard != shard_id: 

3105 cursor.execute("use %s" % shards[shard_id]) 

3106 conn.info["current_shard"] = shard_id 

3107 

3108 The above recipe illustrates two :class:`_engine.Engine` objects that 

3109 will each serve as factories for :class:`_engine.Connection` objects 

3110 that have pre-established "shard_id" execution options present. A 

3111 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3112 then interprets this execution option to emit a MySQL ``use`` statement 

3113 to switch databases before a statement execution, while at the same 

3114 time keeping track of which database we've established using the 

3115 :attr:`_engine.Connection.info` dictionary. 

3116 

3117 .. seealso:: 

3118 

3119 :meth:`_engine.Connection.execution_options` 

3120 - update execution options 

3121 on a :class:`_engine.Connection` object. 

3122 

3123 :meth:`_engine.Engine.update_execution_options` 

3124 - update the execution 

3125 options for a given :class:`_engine.Engine` in place. 

3126 

3127 :meth:`_engine.Engine.get_execution_options` 

3128 

3129 

3130 """ # noqa: E501 

3131 return self._option_cls(self, opt) 

3132 

3133 def get_execution_options(self) -> _ExecuteOptions: 

3134 """Get the non-SQL options which will take effect during execution. 

3135 

3136 .. seealso:: 

3137 

3138 :meth:`_engine.Engine.execution_options` 

3139 """ 

3140 return self._execution_options 

3141 

3142 @property 

3143 def name(self) -> str: 

3144 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3145 in use by this :class:`Engine`. 

3146 

3147 """ 

3148 

3149 return self.dialect.name 

3150 

3151 @property 

3152 def driver(self) -> str: 

3153 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3154 in use by this :class:`Engine`. 

3155 

3156 """ 

3157 

3158 return self.dialect.driver 

3159 

3160 echo = log.echo_property() 

3161 

3162 def __repr__(self) -> str: 

3163 return "Engine(%r)" % (self.url,) 

3164 

3165 def dispose(self, close: bool = True) -> None: 

3166 """Dispose of the connection pool used by this 

3167 :class:`_engine.Engine`. 

3168 

3169 A new connection pool is created immediately after the old one has been 

3170 disposed. The previous connection pool is disposed either actively, by 

3171 closing out all currently checked-in connections in that pool, or 

3172 passively, by losing references to it but otherwise not closing any 

3173 connections. The latter strategy is more appropriate for an initializer 

3174 in a forked Python process. 

3175 

3176 :param close: if left at its default of ``True``, has the 

3177 effect of fully closing all **currently checked in** 

3178 database connections. Connections that are still checked out 

3179 will **not** be closed, however they will no longer be associated 

3180 with this :class:`_engine.Engine`, 

3181 so when they are closed individually, eventually the 

3182 :class:`_pool.Pool` which they are associated with will 

3183 be garbage collected and they will be closed out fully, if 

3184 not already closed on checkin. 

3185 

3186 If set to ``False``, the previous connection pool is de-referenced, 

3187 and otherwise not touched in any way. 

3188 

3189 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3190 parameter to allow the replacement of a connection pool in a child 

3191 process without interfering with the connections used by the parent 

3192 process. 

3193 

3194 

3195 .. seealso:: 

3196 

3197 :ref:`engine_disposal` 

3198 

3199 :ref:`pooling_multiprocessing` 

3200 

3201 """ 

3202 if close: 

3203 self.pool.dispose() 

3204 self.pool = self.pool.recreate() 

3205 self.dispatch.engine_disposed(self) 

3206 

3207 @contextlib.contextmanager 

3208 def _optional_conn_ctx_manager( 

3209 self, connection: Optional[Connection] = None 

3210 ) -> Iterator[Connection]: 

3211 if connection is None: 

3212 with self.connect() as conn: 

3213 yield conn 

3214 else: 

3215 yield connection 

3216 

3217 @contextlib.contextmanager 

3218 def begin(self) -> Iterator[Connection]: 

3219 """Return a context manager delivering a :class:`_engine.Connection` 

3220 with a :class:`.Transaction` established. 

3221 

3222 E.g.:: 

3223 

3224 with engine.begin() as conn: 

3225 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)")) 

3226 conn.execute(text("my_special_procedure(5)")) 

3227 

3228 Upon successful operation, the :class:`.Transaction` 

3229 is committed. If an error is raised, the :class:`.Transaction` 

3230 is rolled back. 

3231 

3232 .. seealso:: 

3233 

3234 :meth:`_engine.Engine.connect` - procure a 

3235 :class:`_engine.Connection` from 

3236 an :class:`_engine.Engine`. 

3237 

3238 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3239 for a particular :class:`_engine.Connection`. 

3240 

3241 """ # noqa: E501 

3242 with self.connect() as conn: 

3243 with conn.begin(): 

3244 yield conn 

3245 

3246 def _run_ddl_visitor( 

3247 self, 

3248 visitorcallable: Type[InvokeDDLBase], 

3249 element: SchemaVisitable, 

3250 **kwargs: Any, 

3251 ) -> None: 

3252 with self.begin() as conn: 

3253 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3254 

3255 def connect(self) -> Connection: 

3256 """Return a new :class:`_engine.Connection` object. 

3257 

3258 The :class:`_engine.Connection` acts as a Python context manager, so 

3259 the typical use of this method looks like:: 

3260 

3261 with engine.connect() as connection: 

3262 connection.execute(text("insert into table values ('foo')")) 

3263 connection.commit() 

3264 

3265 Where above, after the block is completed, the connection is "closed" 

3266 and its underlying DBAPI resources are returned to the connection pool. 

3267 This also has the effect of rolling back any transaction that 

3268 was explicitly begun or was begun via autobegin, and will 

3269 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3270 started and is still in progress. 

3271 

3272 .. seealso:: 

3273 

3274 :meth:`_engine.Engine.begin` 

3275 

3276 """ 

3277 

3278 return self._connection_cls(self) 

3279 

3280 def raw_connection(self) -> PoolProxiedConnection: 

3281 """Return a "raw" DBAPI connection from the connection pool. 

3282 

3283 The returned object is a proxied version of the DBAPI 

3284 connection object used by the underlying driver in use. 

3285 The object will have all the same behavior as the real DBAPI 

3286 connection, except that its ``close()`` method will result in the 

3287 connection being returned to the pool, rather than being closed 

3288 for real. 

3289 

3290 This method provides direct DBAPI connection access for 

3291 special situations when the API provided by 

3292 :class:`_engine.Connection` 

3293 is not needed. When a :class:`_engine.Connection` object is already 

3294 present, the DBAPI connection is available using 

3295 the :attr:`_engine.Connection.connection` accessor. 

3296 

3297 .. seealso:: 

3298 

3299 :ref:`dbapi_connections` 

3300 

3301 """ 

3302 return self.pool.connect() 

3303 

3304 

3305class OptionEngineMixin(log.Identified): 

3306 _sa_propagate_class_events = False 

3307 

3308 dispatch: dispatcher[ConnectionEventsTarget] 

3309 _compiled_cache: Optional[CompiledCacheType] 

3310 dialect: Dialect 

3311 pool: Pool 

3312 url: URL 

3313 hide_parameters: bool 

3314 echo: log.echo_property 

3315 

3316 def __init__( 

3317 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3318 ): 

3319 self._proxied = proxied 

3320 self.url = proxied.url 

3321 self.dialect = proxied.dialect 

3322 self.logging_name = proxied.logging_name 

3323 self.echo = proxied.echo 

3324 self._compiled_cache = proxied._compiled_cache 

3325 self.hide_parameters = proxied.hide_parameters 

3326 log.instance_logger(self, echoflag=self.echo) 

3327 

3328 # note: this will propagate events that are assigned to the parent 

3329 # engine after this OptionEngine is created. Since we share 

3330 # the events of the parent we also disallow class-level events 

3331 # to apply to the OptionEngine class directly. 

3332 # 

3333 # the other way this can work would be to transfer existing 

3334 # events only, using: 

3335 # self.dispatch._update(proxied.dispatch) 

3336 # 

3337 # that might be more appropriate however it would be a behavioral 

3338 # change for logic that assigns events to the parent engine and 

3339 # would like it to take effect for the already-created sub-engine. 

3340 self.dispatch = self.dispatch._join(proxied.dispatch) 

3341 

3342 self._execution_options = proxied._execution_options 

3343 self.update_execution_options(**execution_options) 

3344 

3345 def update_execution_options(self, **opt: Any) -> None: 

3346 raise NotImplementedError() 

3347 

3348 if not typing.TYPE_CHECKING: 

3349 # https://github.com/python/typing/discussions/1095 

3350 

3351 @property 

3352 def pool(self) -> Pool: 

3353 return self._proxied.pool 

3354 

3355 @pool.setter 

3356 def pool(self, pool: Pool) -> None: 

3357 self._proxied.pool = pool 

3358 

3359 @property 

3360 def _has_events(self) -> bool: 

3361 return self._proxied._has_events or self.__dict__.get( 

3362 "_has_events", False 

3363 ) 

3364 

3365 @_has_events.setter 

3366 def _has_events(self, value: bool) -> None: 

3367 self.__dict__["_has_events"] = value 

3368 

3369 

3370class OptionEngine(OptionEngineMixin, Engine): 

3371 def update_execution_options(self, **opt: Any) -> None: 

3372 Engine.update_execution_options(self, **opt) 

3373 

3374 

3375Engine._option_cls = OptionEngine