Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1006 statements  

1# engine/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.""" 

8from __future__ import annotations 

9 

10import contextlib 

11import sys 

12import typing 

13from typing import Any 

14from typing import Callable 

15from typing import cast 

16from typing import Iterable 

17from typing import Iterator 

18from typing import List 

19from typing import Mapping 

20from typing import NoReturn 

21from typing import Optional 

22from typing import overload 

23from typing import Tuple 

24from typing import Type 

25from typing import TypeVar 

26from typing import Union 

27 

28from .interfaces import BindTyping 

29from .interfaces import ConnectionEventsTarget 

30from .interfaces import DBAPICursor 

31from .interfaces import ExceptionContext 

32from .interfaces import ExecuteStyle 

33from .interfaces import ExecutionContext 

34from .interfaces import IsolationLevel 

35from .util import _distill_params_20 

36from .util import _distill_raw_params 

37from .util import TransactionalContext 

38from .. import exc 

39from .. import inspection 

40from .. import log 

41from .. import util 

42from ..sql import compiler 

43from ..sql import util as sql_util 

44from ..util.typing import TupleAny 

45from ..util.typing import TypeVarTuple 

46from ..util.typing import Unpack 

47 

48if typing.TYPE_CHECKING: 

49 from . import CursorResult 

50 from . import ScalarResult 

51 from .interfaces import _AnyExecuteParams 

52 from .interfaces import _AnyMultiExecuteParams 

53 from .interfaces import _CoreAnyExecuteParams 

54 from .interfaces import _CoreMultiExecuteParams 

55 from .interfaces import _CoreSingleExecuteParams 

56 from .interfaces import _DBAPIAnyExecuteParams 

57 from .interfaces import _DBAPISingleExecuteParams 

58 from .interfaces import _ExecuteOptions 

59 from .interfaces import CompiledCacheType 

60 from .interfaces import CoreExecuteOptionsParameter 

61 from .interfaces import Dialect 

62 from .interfaces import SchemaTranslateMapType 

63 from .reflection import Inspector # noqa 

64 from .url import URL 

65 from ..event import dispatcher 

66 from ..log import _EchoFlagType 

67 from ..pool import _ConnectionFairy 

68 from ..pool import Pool 

69 from ..pool import PoolProxiedConnection 

70 from ..sql import Executable 

71 from ..sql._typing import _InfoType 

72 from ..sql.compiler import Compiled 

73 from ..sql.ddl import ExecutableDDLElement 

74 from ..sql.ddl import InvokeDDLBase 

75 from ..sql.functions import FunctionElement 

76 from ..sql.schema import DefaultGenerator 

77 from ..sql.schema import HasSchemaAttr 

78 from ..sql.schema import SchemaVisitable 

79 from ..sql.selectable import TypedReturnsRows 

80 

81 

82_T = TypeVar("_T", bound=Any) 

83_Ts = TypeVarTuple("_Ts") 

84_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

85NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

86 

87 

88class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

89 """Provides high-level functionality for a wrapped DB-API connection. 

90 

91 The :class:`_engine.Connection` object is procured by calling the 

92 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

93 object, and provides services for execution of SQL statements as well 

94 as transaction control. 

95 

96 The Connection object is **not** thread-safe. While a Connection can be 

97 shared among threads using properly synchronized access, it is still 

98 possible that the underlying DBAPI connection may not support shared 

99 access between threads. Check the DBAPI documentation for details. 

100 

101 The Connection object represents a single DBAPI connection checked out 

102 from the connection pool. In this state, the connection pool has no 

103 affect upon the connection, including its expiration or timeout state. 

104 For the connection pool to properly manage connections, connections 

105 should be returned to the connection pool (i.e. ``connection.close()``) 

106 whenever the connection is not in use. 

107 

108 .. index:: 

109 single: thread safety; Connection 

110 

111 """ 

112 

113 dialect: Dialect 

114 dispatch: dispatcher[ConnectionEventsTarget] 

115 

116 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

117 

118 # used by sqlalchemy.engine.util.TransactionalContext 

119 _trans_context_manager: Optional[TransactionalContext] = None 

120 

121 # legacy as of 2.0, should be eventually deprecated and 

122 # removed. was used in the "pre_ping" recipe that's been in the docs 

123 # a long time 

124 should_close_with_result = False 

125 

126 _dbapi_connection: Optional[PoolProxiedConnection] 

127 

128 _execution_options: _ExecuteOptions 

129 

130 _transaction: Optional[RootTransaction] 

131 _nested_transaction: Optional[NestedTransaction] 

132 

133 def __init__( 

134 self, 

135 engine: Engine, 

136 connection: Optional[PoolProxiedConnection] = None, 

137 _has_events: Optional[bool] = None, 

138 _allow_revalidate: bool = True, 

139 _allow_autobegin: bool = True, 

140 ): 

141 """Construct a new Connection.""" 

142 self.engine = engine 

143 self.dialect = dialect = engine.dialect 

144 

145 if connection is None: 

146 try: 

147 self._dbapi_connection = engine.raw_connection() 

148 except dialect.loaded_dbapi.Error as err: 

149 Connection._handle_dbapi_exception_noconnection( 

150 err, dialect, engine 

151 ) 

152 raise 

153 else: 

154 self._dbapi_connection = connection 

155 

156 self._transaction = self._nested_transaction = None 

157 self.__savepoint_seq = 0 

158 self.__in_begin = False 

159 

160 self.__can_reconnect = _allow_revalidate 

161 self._allow_autobegin = _allow_autobegin 

162 self._echo = self.engine._should_log_info() 

163 

164 if _has_events is None: 

165 # if _has_events is sent explicitly as False, 

166 # then don't join the dispatch of the engine; we don't 

167 # want to handle any of the engine's events in that case. 

168 self.dispatch = self.dispatch._join(engine.dispatch) 

169 self._has_events = _has_events or ( 

170 _has_events is None and engine._has_events 

171 ) 

172 

173 self._execution_options = engine._execution_options 

174 

175 if self._has_events or self.engine._has_events: 

176 self.dispatch.engine_connect(self) 

177 

178 # this can be assigned differently via 

179 # characteristics.LoggingTokenCharacteristic 

180 _message_formatter: Any = None 

181 

182 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

183 fmt = self._message_formatter 

184 

185 if fmt: 

186 message = fmt(message) 

187 

188 if log.STACKLEVEL: 

189 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

190 

191 self.engine.logger.info(message, *arg, **kw) 

192 

193 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

194 fmt = self._message_formatter 

195 

196 if fmt: 

197 message = fmt(message) 

198 

199 if log.STACKLEVEL: 

200 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

201 

202 self.engine.logger.debug(message, *arg, **kw) 

203 

204 @property 

205 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

206 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

207 self._execution_options.get("schema_translate_map", None) 

208 ) 

209 

210 return schema_translate_map 

211 

212 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

213 """Return the schema name for the given schema item taking into 

214 account current schema translate map. 

215 

216 """ 

217 

218 name = obj.schema 

219 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

220 self._execution_options.get("schema_translate_map", None) 

221 ) 

222 

223 if ( 

224 schema_translate_map 

225 and name in schema_translate_map 

226 and obj._use_schema_map 

227 ): 

228 return schema_translate_map[name] 

229 else: 

230 return name 

231 

232 def __enter__(self) -> Connection: 

233 return self 

234 

235 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

236 self.close() 

237 

238 @overload 

239 def execution_options( 

240 self, 

241 *, 

242 compiled_cache: Optional[CompiledCacheType] = ..., 

243 logging_token: str = ..., 

244 isolation_level: IsolationLevel = ..., 

245 no_parameters: bool = False, 

246 stream_results: bool = False, 

247 max_row_buffer: int = ..., 

248 yield_per: int = ..., 

249 insertmanyvalues_page_size: int = ..., 

250 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

251 preserve_rowcount: bool = False, 

252 driver_column_names: bool = False, 

253 **opt: Any, 

254 ) -> Connection: ... 

255 

256 @overload 

257 def execution_options(self, **opt: Any) -> Connection: ... 

258 

259 def execution_options(self, **opt: Any) -> Connection: 

260 r"""Set non-SQL options for the connection which take effect 

261 during execution. 

262 

263 This method modifies this :class:`_engine.Connection` **in-place**; 

264 the return value is the same :class:`_engine.Connection` object 

265 upon which the method is called. Note that this is in contrast 

266 to the behavior of the ``execution_options`` methods on other 

267 objects such as :meth:`_engine.Engine.execution_options` and 

268 :meth:`_sql.Executable.execution_options`. The rationale is that many 

269 such execution options necessarily modify the state of the base 

270 DBAPI connection in any case so there is no feasible means of 

271 keeping the effect of such an option localized to a "sub" connection. 

272 

273 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

274 method, in contrast to other objects with this method, modifies 

275 the connection in-place without creating copy of it. 

276 

277 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

278 method accepts any arbitrary parameters including user defined names. 

279 All parameters given are consumable in a number of ways including 

280 by using the :meth:`_engine.Connection.get_execution_options` method. 

281 See the examples at :meth:`_sql.Executable.execution_options` 

282 and :meth:`_engine.Engine.execution_options`. 

283 

284 The keywords that are currently recognized by SQLAlchemy itself 

285 include all those listed under :meth:`.Executable.execution_options`, 

286 as well as others that are specific to :class:`_engine.Connection`. 

287 

288 :param compiled_cache: Available on: :class:`_engine.Connection`, 

289 :class:`_engine.Engine`. 

290 

291 A dictionary where :class:`.Compiled` objects 

292 will be cached when the :class:`_engine.Connection` 

293 compiles a clause 

294 expression into a :class:`.Compiled` object. This dictionary will 

295 supersede the statement cache that may be configured on the 

296 :class:`_engine.Engine` itself. If set to None, caching 

297 is disabled, even if the engine has a configured cache size. 

298 

299 Note that the ORM makes use of its own "compiled" caches for 

300 some operations, including flush operations. The caching 

301 used by the ORM internally supersedes a cache dictionary 

302 specified here. 

303 

304 :param logging_token: Available on: :class:`_engine.Connection`, 

305 :class:`_engine.Engine`, :class:`_sql.Executable`. 

306 

307 Adds the specified string token surrounded by brackets in log 

308 messages logged by the connection, i.e. the logging that's enabled 

309 either via the :paramref:`_sa.create_engine.echo` flag or via the 

310 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

311 per-connection or per-sub-engine token to be available which is 

312 useful for debugging concurrent connection scenarios. 

313 

314 .. versionadded:: 1.4.0b2 

315 

316 .. seealso:: 

317 

318 :ref:`dbengine_logging_tokens` - usage example 

319 

320 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

321 name used by the Python logger object itself. 

322 

323 :param isolation_level: Available on: :class:`_engine.Connection`, 

324 :class:`_engine.Engine`. 

325 

326 Set the transaction isolation level for the lifespan of this 

327 :class:`_engine.Connection` object. 

328 Valid values include those string 

329 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

330 parameter passed to :func:`_sa.create_engine`. These levels are 

331 semi-database specific; see individual dialect documentation for 

332 valid levels. 

333 

334 The isolation level option applies the isolation level by emitting 

335 statements on the DBAPI connection, and **necessarily affects the 

336 original Connection object overall**. The isolation level will remain 

337 at the given setting until explicitly changed, or when the DBAPI 

338 connection itself is :term:`released` to the connection pool, i.e. the 

339 :meth:`_engine.Connection.close` method is called, at which time an 

340 event handler will emit additional statements on the DBAPI connection 

341 in order to revert the isolation level change. 

342 

343 .. note:: The ``isolation_level`` execution option may only be 

344 established before the :meth:`_engine.Connection.begin` method is 

345 called, as well as before any SQL statements are emitted which 

346 would otherwise trigger "autobegin", or directly after a call to 

347 :meth:`_engine.Connection.commit` or 

348 :meth:`_engine.Connection.rollback`. A database cannot change the 

349 isolation level on a transaction in progress. 

350 

351 .. note:: The ``isolation_level`` execution option is implicitly 

352 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

353 the :meth:`_engine.Connection.invalidate` method, or if a 

354 disconnection error occurs. The new connection produced after the 

355 invalidation will **not** have the selected isolation level 

356 re-applied to it automatically. 

357 

358 .. seealso:: 

359 

360 :ref:`dbapi_autocommit` 

361 

362 :meth:`_engine.Connection.get_isolation_level` 

363 - view current actual level 

364 

365 :param no_parameters: Available on: :class:`_engine.Connection`, 

366 :class:`_sql.Executable`. 

367 

368 When ``True``, if the final parameter 

369 list or dictionary is totally empty, will invoke the 

370 statement on the cursor as ``cursor.execute(statement)``, 

371 not passing the parameter collection at all. 

372 Some DBAPIs such as psycopg2 and mysql-python consider 

373 percent signs as significant only when parameters are 

374 present; this option allows code to generate SQL 

375 containing percent signs (and possibly other characters) 

376 that is neutral regarding whether it's executed by the DBAPI 

377 or piped into a script that's later invoked by 

378 command line tools. 

379 

380 :param stream_results: Available on: :class:`_engine.Connection`, 

381 :class:`_sql.Executable`. 

382 

383 Indicate to the dialect that results should be "streamed" and not 

384 pre-buffered, if possible. For backends such as PostgreSQL, MySQL 

385 and MariaDB, this indicates the use of a "server side cursor" as 

386 opposed to a client side cursor. Other backends such as that of 

387 Oracle Database may already use server side cursors by default. 

388 

389 The usage of 

390 :paramref:`_engine.Connection.execution_options.stream_results` is 

391 usually combined with setting a fixed number of rows to to be fetched 

392 in batches, to allow for efficient iteration of database rows while 

393 at the same time not loading all result rows into memory at once; 

394 this can be configured on a :class:`_engine.Result` object using the 

395 :meth:`_engine.Result.yield_per` method, after execution has 

396 returned a new :class:`_engine.Result`. If 

397 :meth:`_engine.Result.yield_per` is not used, 

398 the :paramref:`_engine.Connection.execution_options.stream_results` 

399 mode of operation will instead use a dynamically sized buffer 

400 which buffers sets of rows at a time, growing on each batch 

401 based on a fixed growth size up until a limit which may 

402 be configured using the 

403 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

404 parameter. 

405 

406 When using the ORM to fetch ORM mapped objects from a result, 

407 :meth:`_engine.Result.yield_per` should always be used with 

408 :paramref:`_engine.Connection.execution_options.stream_results`, 

409 so that the ORM does not fetch all rows into new ORM objects at once. 

410 

411 For typical use, the 

412 :paramref:`_engine.Connection.execution_options.yield_per` execution 

413 option should be preferred, which sets up both 

414 :paramref:`_engine.Connection.execution_options.stream_results` and 

415 :meth:`_engine.Result.yield_per` at once. This option is supported 

416 both at a core level by :class:`_engine.Connection` as well as by the 

417 ORM :class:`_engine.Session`; the latter is described at 

418 :ref:`orm_queryguide_yield_per`. 

419 

420 .. seealso:: 

421 

422 :ref:`engine_stream_results` - background on 

423 :paramref:`_engine.Connection.execution_options.stream_results` 

424 

425 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

426 

427 :paramref:`_engine.Connection.execution_options.yield_per` 

428 

429 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

430 describing the ORM version of ``yield_per`` 

431 

432 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

433 :class:`_sql.Executable`. Sets a maximum 

434 buffer size to use when the 

435 :paramref:`_engine.Connection.execution_options.stream_results` 

436 execution option is used on a backend that supports server side 

437 cursors. The default value if not specified is 1000. 

438 

439 .. seealso:: 

440 

441 :paramref:`_engine.Connection.execution_options.stream_results` 

442 

443 :ref:`engine_stream_results` 

444 

445 

446 :param yield_per: Available on: :class:`_engine.Connection`, 

447 :class:`_sql.Executable`. Integer value applied which will 

448 set the :paramref:`_engine.Connection.execution_options.stream_results` 

449 execution option and invoke :meth:`_engine.Result.yield_per` 

450 automatically at once. Allows equivalent functionality as 

451 is present when using this parameter with the ORM. 

452 

453 .. versionadded:: 1.4.40 

454 

455 .. seealso:: 

456 

457 :ref:`engine_stream_results` - background and examples 

458 on using server side cursors with Core. 

459 

460 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

461 describing the ORM version of ``yield_per`` 

462 

463 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

464 :class:`_engine.Engine`. Number of rows to format into an 

465 INSERT statement when the statement uses "insertmanyvalues" mode, 

466 which is a paged form of bulk insert that is used for many backends 

467 when using :term:`executemany` execution typically in conjunction 

468 with RETURNING. Defaults to 1000. May also be modified on a 

469 per-engine basis using the 

470 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

471 

472 .. versionadded:: 2.0 

473 

474 .. seealso:: 

475 

476 :ref:`engine_insertmanyvalues` 

477 

478 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

479 :class:`_engine.Engine`, :class:`_sql.Executable`. 

480 

481 A dictionary mapping schema names to schema names, that will be 

482 applied to the :paramref:`_schema.Table.schema` element of each 

483 :class:`_schema.Table` 

484 encountered when SQL or DDL expression elements 

485 are compiled into strings; the resulting schema name will be 

486 converted based on presence in the map of the original name. 

487 

488 .. seealso:: 

489 

490 :ref:`schema_translating` 

491 

492 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

493 attribute will be unconditionally memoized within the result and 

494 made available via the :attr:`.CursorResult.rowcount` attribute. 

495 Normally, this attribute is only preserved for UPDATE and DELETE 

496 statements. Using this option, the DBAPIs rowcount value can 

497 be accessed for other kinds of statements such as INSERT and SELECT, 

498 to the degree that the DBAPI supports these statements. See 

499 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

500 of this attribute. 

501 

502 .. versionadded:: 2.0.28 

503 

504 .. seealso:: 

505 

506 :meth:`_engine.Engine.execution_options` 

507 

508 :meth:`.Executable.execution_options` 

509 

510 :meth:`_engine.Connection.get_execution_options` 

511 

512 :ref:`orm_queryguide_execution_options` - documentation on all 

513 ORM-specific execution options 

514 

515 :param driver_column_names: When True, the returned 

516 :class:`_engine.CursorResult` will use the column names as written in 

517 ``cursor.description`` to set up the keys for the result set, 

518 including the names of columns for the :class:`_engine.Row` object as 

519 well as the dictionary keys when using :attr:`_engine.Row._mapping`. 

520 On backends that use "name normalization" such as Oracle Database to 

521 correct for lower case names being converted to all uppercase, this 

522 behavior is turned off and the raw UPPERCASE names in 

523 cursor.description will be present. 

524 

525 .. versionadded:: 2.1 

526 

527 """ # noqa 

528 if self._has_events or self.engine._has_events: 

529 self.dispatch.set_connection_execution_options(self, opt) 

530 self._execution_options = self._execution_options.union(opt) 

531 self.dialect.set_connection_execution_options(self, opt) 

532 return self 

533 

534 def get_execution_options(self) -> _ExecuteOptions: 

535 """Get the non-SQL options which will take effect during execution. 

536 

537 .. seealso:: 

538 

539 :meth:`_engine.Connection.execution_options` 

540 """ 

541 return self._execution_options 

542 

543 @property 

544 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

545 pool_proxied_connection = self._dbapi_connection 

546 return ( 

547 pool_proxied_connection is not None 

548 and pool_proxied_connection.is_valid 

549 ) 

550 

551 @property 

552 def closed(self) -> bool: 

553 """Return True if this connection is closed.""" 

554 

555 return self._dbapi_connection is None and not self.__can_reconnect 

556 

557 @property 

558 def invalidated(self) -> bool: 

559 """Return True if this connection was invalidated. 

560 

561 This does not indicate whether or not the connection was 

562 invalidated at the pool level, however 

563 

564 """ 

565 

566 # prior to 1.4, "invalid" was stored as a state independent of 

567 # "closed", meaning an invalidated connection could be "closed", 

568 # the _dbapi_connection would be None and closed=True, yet the 

569 # "invalid" flag would stay True. This meant that there were 

570 # three separate states (open/valid, closed/valid, closed/invalid) 

571 # when there is really no reason for that; a connection that's 

572 # "closed" does not need to be "invalid". So the state is now 

573 # represented by the two facts alone. 

574 

575 pool_proxied_connection = self._dbapi_connection 

576 return pool_proxied_connection is None and self.__can_reconnect 

577 

578 @property 

579 def connection(self) -> PoolProxiedConnection: 

580 """The underlying DB-API connection managed by this Connection. 

581 

582 This is a SQLAlchemy connection-pool proxied connection 

583 which then has the attribute 

584 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

585 actual driver connection. 

586 

587 .. seealso:: 

588 

589 

590 :ref:`dbapi_connections` 

591 

592 """ 

593 

594 if self._dbapi_connection is None: 

595 try: 

596 return self._revalidate_connection() 

597 except (exc.PendingRollbackError, exc.ResourceClosedError): 

598 raise 

599 except BaseException as e: 

600 self._handle_dbapi_exception(e, None, None, None, None) 

601 else: 

602 return self._dbapi_connection 

603 

604 def get_isolation_level(self) -> IsolationLevel: 

605 """Return the current **actual** isolation level that's present on 

606 the database within the scope of this connection. 

607 

608 This attribute will perform a live SQL operation against the database 

609 in order to procure the current isolation level, so the value returned 

610 is the actual level on the underlying DBAPI connection regardless of 

611 how this state was set. This will be one of the four actual isolation 

612 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

613 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

614 level setting. Third party dialects may also feature additional 

615 isolation level settings. 

616 

617 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

618 isolation level, which is a separate :term:`dbapi` setting that's 

619 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

620 in use, the database connection still has a "traditional" isolation 

621 mode in effect, that is typically one of the four values 

622 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

623 ``SERIALIZABLE``. 

624 

625 Compare to the :attr:`_engine.Connection.default_isolation_level` 

626 accessor which returns the isolation level that is present on the 

627 database at initial connection time. 

628 

629 .. seealso:: 

630 

631 :attr:`_engine.Connection.default_isolation_level` 

632 - view default level 

633 

634 :paramref:`_sa.create_engine.isolation_level` 

635 - set per :class:`_engine.Engine` isolation level 

636 

637 :paramref:`.Connection.execution_options.isolation_level` 

638 - set per :class:`_engine.Connection` isolation level 

639 

640 """ 

641 dbapi_connection = self.connection.dbapi_connection 

642 assert dbapi_connection is not None 

643 try: 

644 return self.dialect.get_isolation_level(dbapi_connection) 

645 except BaseException as e: 

646 self._handle_dbapi_exception(e, None, None, None, None) 

647 

648 @property 

649 def default_isolation_level(self) -> Optional[IsolationLevel]: 

650 """The initial-connection time isolation level associated with the 

651 :class:`_engine.Dialect` in use. 

652 

653 This value is independent of the 

654 :paramref:`.Connection.execution_options.isolation_level` and 

655 :paramref:`.Engine.execution_options.isolation_level` execution 

656 options, and is determined by the :class:`_engine.Dialect` when the 

657 first connection is created, by performing a SQL query against the 

658 database for the current isolation level before any additional commands 

659 have been emitted. 

660 

661 Calling this accessor does not invoke any new SQL queries. 

662 

663 .. seealso:: 

664 

665 :meth:`_engine.Connection.get_isolation_level` 

666 - view current actual isolation level 

667 

668 :paramref:`_sa.create_engine.isolation_level` 

669 - set per :class:`_engine.Engine` isolation level 

670 

671 :paramref:`.Connection.execution_options.isolation_level` 

672 - set per :class:`_engine.Connection` isolation level 

673 

674 """ 

675 return self.dialect.default_isolation_level 

676 

677 def _invalid_transaction(self) -> NoReturn: 

678 raise exc.PendingRollbackError( 

679 "Can't reconnect until invalid %stransaction is rolled " 

680 "back. Please rollback() fully before proceeding" 

681 % ("savepoint " if self._nested_transaction is not None else ""), 

682 code="8s2b", 

683 ) 

684 

685 def _revalidate_connection(self) -> PoolProxiedConnection: 

686 if self.__can_reconnect and self.invalidated: 

687 if self._transaction is not None: 

688 self._invalid_transaction() 

689 self._dbapi_connection = self.engine.raw_connection() 

690 return self._dbapi_connection 

691 raise exc.ResourceClosedError("This Connection is closed") 

692 

693 @property 

694 def info(self) -> _InfoType: 

695 """Info dictionary associated with the underlying DBAPI connection 

696 referred to by this :class:`_engine.Connection`, allowing user-defined 

697 data to be associated with the connection. 

698 

699 The data here will follow along with the DBAPI connection including 

700 after it is returned to the connection pool and used again 

701 in subsequent instances of :class:`_engine.Connection`. 

702 

703 """ 

704 

705 return self.connection.info 

706 

707 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

708 """Invalidate the underlying DBAPI connection associated with 

709 this :class:`_engine.Connection`. 

710 

711 An attempt will be made to close the underlying DBAPI connection 

712 immediately; however if this operation fails, the error is logged 

713 but not raised. The connection is then discarded whether or not 

714 close() succeeded. 

715 

716 Upon the next use (where "use" typically means using the 

717 :meth:`_engine.Connection.execute` method or similar), 

718 this :class:`_engine.Connection` will attempt to 

719 procure a new DBAPI connection using the services of the 

720 :class:`_pool.Pool` as a source of connectivity (e.g. 

721 a "reconnection"). 

722 

723 If a transaction was in progress (e.g. the 

724 :meth:`_engine.Connection.begin` method has been called) when 

725 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

726 level all state associated with this transaction is lost, as 

727 the DBAPI connection is closed. The :class:`_engine.Connection` 

728 will not allow a reconnection to proceed until the 

729 :class:`.Transaction` object is ended, by calling the 

730 :meth:`.Transaction.rollback` method; until that point, any attempt at 

731 continuing to use the :class:`_engine.Connection` will raise an 

732 :class:`~sqlalchemy.exc.InvalidRequestError`. 

733 This is to prevent applications from accidentally 

734 continuing an ongoing transactional operations despite the 

735 fact that the transaction has been lost due to an 

736 invalidation. 

737 

738 The :meth:`_engine.Connection.invalidate` method, 

739 just like auto-invalidation, 

740 will at the connection pool level invoke the 

741 :meth:`_events.PoolEvents.invalidate` event. 

742 

743 :param exception: an optional ``Exception`` instance that's the 

744 reason for the invalidation. is passed along to event handlers 

745 and logging functions. 

746 

747 .. seealso:: 

748 

749 :ref:`pool_connection_invalidation` 

750 

751 """ 

752 

753 if self.invalidated: 

754 return 

755 

756 if self.closed: 

757 raise exc.ResourceClosedError("This Connection is closed") 

758 

759 if self._still_open_and_dbapi_connection_is_valid: 

760 pool_proxied_connection = self._dbapi_connection 

761 assert pool_proxied_connection is not None 

762 pool_proxied_connection.invalidate(exception) 

763 

764 self._dbapi_connection = None 

765 

766 def detach(self) -> None: 

767 """Detach the underlying DB-API connection from its connection pool. 

768 

769 E.g.:: 

770 

771 with engine.connect() as conn: 

772 conn.detach() 

773 conn.execute(text("SET search_path TO schema1, schema2")) 

774 

775 # work with connection 

776 

777 # connection is fully closed (since we used "with:", can 

778 # also call .close()) 

779 

780 This :class:`_engine.Connection` instance will remain usable. 

781 When closed 

782 (or exited from a context manager context as above), 

783 the DB-API connection will be literally closed and not 

784 returned to its originating pool. 

785 

786 This method can be used to insulate the rest of an application 

787 from a modified state on a connection (such as a transaction 

788 isolation level or similar). 

789 

790 """ 

791 

792 if self.closed: 

793 raise exc.ResourceClosedError("This Connection is closed") 

794 

795 pool_proxied_connection = self._dbapi_connection 

796 if pool_proxied_connection is None: 

797 raise exc.InvalidRequestError( 

798 "Can't detach an invalidated Connection" 

799 ) 

800 pool_proxied_connection.detach() 

801 

802 def _autobegin(self) -> None: 

803 if self._allow_autobegin and not self.__in_begin: 

804 self.begin() 

805 

806 def begin(self) -> RootTransaction: 

807 """Begin a transaction prior to autobegin occurring. 

808 

809 E.g.:: 

810 

811 with engine.connect() as conn: 

812 with conn.begin() as trans: 

813 conn.execute(table.insert(), {"username": "sandy"}) 

814 

815 The returned object is an instance of :class:`_engine.RootTransaction`. 

816 This object represents the "scope" of the transaction, 

817 which completes when either the :meth:`_engine.Transaction.rollback` 

818 or :meth:`_engine.Transaction.commit` method is called; the object 

819 also works as a context manager as illustrated above. 

820 

821 The :meth:`_engine.Connection.begin` method begins a 

822 transaction that normally will be begun in any case when the connection 

823 is first used to execute a statement. The reason this method might be 

824 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

825 event at a specific time, or to organize code within the scope of a 

826 connection checkout in terms of context managed blocks, such as:: 

827 

828 with engine.connect() as conn: 

829 with conn.begin(): 

830 conn.execute(...) 

831 conn.execute(...) 

832 

833 with conn.begin(): 

834 conn.execute(...) 

835 conn.execute(...) 

836 

837 The above code is not fundamentally any different in its behavior than 

838 the following code which does not use 

839 :meth:`_engine.Connection.begin`; the below style is known 

840 as "commit as you go" style:: 

841 

842 with engine.connect() as conn: 

843 conn.execute(...) 

844 conn.execute(...) 

845 conn.commit() 

846 

847 conn.execute(...) 

848 conn.execute(...) 

849 conn.commit() 

850 

851 From a database point of view, the :meth:`_engine.Connection.begin` 

852 method does not emit any SQL or change the state of the underlying 

853 DBAPI connection in any way; the Python DBAPI does not have any 

854 concept of explicit transaction begin. 

855 

856 .. seealso:: 

857 

858 :ref:`tutorial_working_with_transactions` - in the 

859 :ref:`unified_tutorial` 

860 

861 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

862 

863 :meth:`_engine.Connection.begin_twophase` - 

864 use a two phase /XID transaction 

865 

866 :meth:`_engine.Engine.begin` - context manager available from 

867 :class:`_engine.Engine` 

868 

869 """ 

870 if self._transaction is None: 

871 self._transaction = RootTransaction(self) 

872 return self._transaction 

873 else: 

874 raise exc.InvalidRequestError( 

875 "This connection has already initialized a SQLAlchemy " 

876 "Transaction() object via begin() or autobegin; can't " 

877 "call begin() here unless rollback() or commit() " 

878 "is called first." 

879 ) 

880 

881 def begin_nested(self) -> NestedTransaction: 

882 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

883 handle that controls the scope of the SAVEPOINT. 

884 

885 E.g.:: 

886 

887 with engine.begin() as connection: 

888 with connection.begin_nested(): 

889 connection.execute(table.insert(), {"username": "sandy"}) 

890 

891 The returned object is an instance of 

892 :class:`_engine.NestedTransaction`, which includes transactional 

893 methods :meth:`_engine.NestedTransaction.commit` and 

894 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

895 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

896 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

897 to the :class:`_engine.NestedTransaction` object and is generated 

898 automatically. Like any other :class:`_engine.Transaction`, the 

899 :class:`_engine.NestedTransaction` may be used as a context manager as 

900 illustrated above which will "release" or "rollback" corresponding to 

901 if the operation within the block were successful or raised an 

902 exception. 

903 

904 Nested transactions require SAVEPOINT support in the underlying 

905 database, else the behavior is undefined. SAVEPOINT is commonly used to 

906 run operations within a transaction that may fail, while continuing the 

907 outer transaction. E.g.:: 

908 

909 from sqlalchemy import exc 

910 

911 with engine.begin() as connection: 

912 trans = connection.begin_nested() 

913 try: 

914 connection.execute(table.insert(), {"username": "sandy"}) 

915 trans.commit() 

916 except exc.IntegrityError: # catch for duplicate username 

917 trans.rollback() # rollback to savepoint 

918 

919 # outer transaction continues 

920 connection.execute(...) 

921 

922 If :meth:`_engine.Connection.begin_nested` is called without first 

923 calling :meth:`_engine.Connection.begin` or 

924 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

925 will "autobegin" the outer transaction first. This outer transaction 

926 may be committed using "commit-as-you-go" style, e.g.:: 

927 

928 with engine.connect() as connection: # begin() wasn't called 

929 

930 with connection.begin_nested(): # will auto-"begin()" first 

931 connection.execute(...) 

932 # savepoint is released 

933 

934 connection.execute(...) 

935 

936 # explicitly commit outer transaction 

937 connection.commit() 

938 

939 # can continue working with connection here 

940 

941 .. versionchanged:: 2.0 

942 

943 :meth:`_engine.Connection.begin_nested` will now participate 

944 in the connection "autobegin" behavior that is new as of 

945 2.0 / "future" style connections in 1.4. 

946 

947 .. seealso:: 

948 

949 :meth:`_engine.Connection.begin` 

950 

951 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

952 

953 """ 

954 if self._transaction is None: 

955 self._autobegin() 

956 

957 return NestedTransaction(self) 

958 

959 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

960 """Begin a two-phase or XA transaction and return a transaction 

961 handle. 

962 

963 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

964 which in addition to the methods provided by 

965 :class:`.Transaction`, also provides a 

966 :meth:`~.TwoPhaseTransaction.prepare` method. 

967 

968 :param xid: the two phase transaction id. If not supplied, a 

969 random id will be generated. 

970 

971 .. seealso:: 

972 

973 :meth:`_engine.Connection.begin` 

974 

975 :meth:`_engine.Connection.begin_twophase` 

976 

977 """ 

978 

979 if self._transaction is not None: 

980 raise exc.InvalidRequestError( 

981 "Cannot start a two phase transaction when a transaction " 

982 "is already in progress." 

983 ) 

984 if xid is None: 

985 xid = self.engine.dialect.create_xid() 

986 return TwoPhaseTransaction(self, xid) 

987 

988 def commit(self) -> None: 

989 """Commit the transaction that is currently in progress. 

990 

991 This method commits the current transaction if one has been started. 

992 If no transaction was started, the method has no effect, assuming 

993 the connection is in a non-invalidated state. 

994 

995 A transaction is begun on a :class:`_engine.Connection` automatically 

996 whenever a statement is first executed, or when the 

997 :meth:`_engine.Connection.begin` method is called. 

998 

999 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

1000 the primary database transaction that is linked to the 

1001 :class:`_engine.Connection` object. It does not operate upon a 

1002 SAVEPOINT that would have been invoked from the 

1003 :meth:`_engine.Connection.begin_nested` method; for control of a 

1004 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

1005 :class:`_engine.NestedTransaction` that is returned by the 

1006 :meth:`_engine.Connection.begin_nested` method itself. 

1007 

1008 

1009 """ 

1010 if self._transaction: 

1011 self._transaction.commit() 

1012 

1013 def rollback(self) -> None: 

1014 """Roll back the transaction that is currently in progress. 

1015 

1016 This method rolls back the current transaction if one has been started. 

1017 If no transaction was started, the method has no effect. If a 

1018 transaction was started and the connection is in an invalidated state, 

1019 the transaction is cleared using this method. 

1020 

1021 A transaction is begun on a :class:`_engine.Connection` automatically 

1022 whenever a statement is first executed, or when the 

1023 :meth:`_engine.Connection.begin` method is called. 

1024 

1025 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1026 upon the primary database transaction that is linked to the 

1027 :class:`_engine.Connection` object. It does not operate upon a 

1028 SAVEPOINT that would have been invoked from the 

1029 :meth:`_engine.Connection.begin_nested` method; for control of a 

1030 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1031 :class:`_engine.NestedTransaction` that is returned by the 

1032 :meth:`_engine.Connection.begin_nested` method itself. 

1033 

1034 

1035 """ 

1036 if self._transaction: 

1037 self._transaction.rollback() 

1038 

1039 def recover_twophase(self) -> List[Any]: 

1040 return self.engine.dialect.do_recover_twophase(self) 

1041 

1042 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1043 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1044 

1045 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1046 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1047 

1048 def in_transaction(self) -> bool: 

1049 """Return True if a transaction is in progress.""" 

1050 return self._transaction is not None and self._transaction.is_active 

1051 

1052 def in_nested_transaction(self) -> bool: 

1053 """Return True if a transaction is in progress.""" 

1054 return ( 

1055 self._nested_transaction is not None 

1056 and self._nested_transaction.is_active 

1057 ) 

1058 

1059 def _is_autocommit_isolation(self) -> bool: 

1060 opt_iso = self._execution_options.get("isolation_level", None) 

1061 return bool( 

1062 opt_iso == "AUTOCOMMIT" 

1063 or ( 

1064 opt_iso is None 

1065 and self.engine.dialect._on_connect_isolation_level 

1066 == "AUTOCOMMIT" 

1067 ) 

1068 ) 

1069 

1070 def _get_required_transaction(self) -> RootTransaction: 

1071 trans = self._transaction 

1072 if trans is None: 

1073 raise exc.InvalidRequestError("connection is not in a transaction") 

1074 return trans 

1075 

1076 def _get_required_nested_transaction(self) -> NestedTransaction: 

1077 trans = self._nested_transaction 

1078 if trans is None: 

1079 raise exc.InvalidRequestError( 

1080 "connection is not in a nested transaction" 

1081 ) 

1082 return trans 

1083 

1084 def get_transaction(self) -> Optional[RootTransaction]: 

1085 """Return the current root transaction in progress, if any. 

1086 

1087 .. versionadded:: 1.4 

1088 

1089 """ 

1090 

1091 return self._transaction 

1092 

1093 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1094 """Return the current nested transaction in progress, if any. 

1095 

1096 .. versionadded:: 1.4 

1097 

1098 """ 

1099 return self._nested_transaction 

1100 

1101 def _begin_impl(self, transaction: RootTransaction) -> None: 

1102 if self._echo: 

1103 if self._is_autocommit_isolation(): 

1104 self._log_info( 

1105 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1106 "autocommit mode)" 

1107 ) 

1108 else: 

1109 self._log_info("BEGIN (implicit)") 

1110 

1111 self.__in_begin = True 

1112 

1113 if self._has_events or self.engine._has_events: 

1114 self.dispatch.begin(self) 

1115 

1116 try: 

1117 self.engine.dialect.do_begin(self.connection) 

1118 except BaseException as e: 

1119 self._handle_dbapi_exception(e, None, None, None, None) 

1120 finally: 

1121 self.__in_begin = False 

1122 

1123 def _rollback_impl(self) -> None: 

1124 if self._has_events or self.engine._has_events: 

1125 self.dispatch.rollback(self) 

1126 

1127 if self._still_open_and_dbapi_connection_is_valid: 

1128 if self._echo: 

1129 if self._is_autocommit_isolation(): 

1130 if self.dialect.skip_autocommit_rollback: 

1131 self._log_info( 

1132 "ROLLBACK will be skipped by " 

1133 "skip_autocommit_rollback" 

1134 ) 

1135 else: 

1136 self._log_info( 

1137 "ROLLBACK using DBAPI connection.rollback(); " 

1138 "set skip_autocommit_rollback to prevent fully" 

1139 ) 

1140 else: 

1141 self._log_info("ROLLBACK") 

1142 try: 

1143 self.engine.dialect.do_rollback(self.connection) 

1144 except BaseException as e: 

1145 self._handle_dbapi_exception(e, None, None, None, None) 

1146 

1147 def _commit_impl(self) -> None: 

1148 if self._has_events or self.engine._has_events: 

1149 self.dispatch.commit(self) 

1150 

1151 if self._echo: 

1152 if self._is_autocommit_isolation(): 

1153 self._log_info( 

1154 "COMMIT using DBAPI connection.commit(), " 

1155 "has no effect due to autocommit mode" 

1156 ) 

1157 else: 

1158 self._log_info("COMMIT") 

1159 try: 

1160 self.engine.dialect.do_commit(self.connection) 

1161 except BaseException as e: 

1162 self._handle_dbapi_exception(e, None, None, None, None) 

1163 

1164 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1165 if self._has_events or self.engine._has_events: 

1166 self.dispatch.savepoint(self, name) 

1167 

1168 if name is None: 

1169 self.__savepoint_seq += 1 

1170 name = "sa_savepoint_%s" % self.__savepoint_seq 

1171 self.engine.dialect.do_savepoint(self, name) 

1172 return name 

1173 

1174 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1175 if self._has_events or self.engine._has_events: 

1176 self.dispatch.rollback_savepoint(self, name, None) 

1177 

1178 if self._still_open_and_dbapi_connection_is_valid: 

1179 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1180 

1181 def _release_savepoint_impl(self, name: str) -> None: 

1182 if self._has_events or self.engine._has_events: 

1183 self.dispatch.release_savepoint(self, name, None) 

1184 

1185 self.engine.dialect.do_release_savepoint(self, name) 

1186 

1187 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1188 if self._echo: 

1189 self._log_info("BEGIN TWOPHASE (implicit)") 

1190 if self._has_events or self.engine._has_events: 

1191 self.dispatch.begin_twophase(self, transaction.xid) 

1192 

1193 self.__in_begin = True 

1194 try: 

1195 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1196 except BaseException as e: 

1197 self._handle_dbapi_exception(e, None, None, None, None) 

1198 finally: 

1199 self.__in_begin = False 

1200 

1201 def _prepare_twophase_impl(self, xid: Any) -> None: 

1202 if self._has_events or self.engine._has_events: 

1203 self.dispatch.prepare_twophase(self, xid) 

1204 

1205 assert isinstance(self._transaction, TwoPhaseTransaction) 

1206 try: 

1207 self.engine.dialect.do_prepare_twophase(self, xid) 

1208 except BaseException as e: 

1209 self._handle_dbapi_exception(e, None, None, None, None) 

1210 

1211 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1212 if self._has_events or self.engine._has_events: 

1213 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1214 

1215 if self._still_open_and_dbapi_connection_is_valid: 

1216 assert isinstance(self._transaction, TwoPhaseTransaction) 

1217 try: 

1218 self.engine.dialect.do_rollback_twophase( 

1219 self, xid, is_prepared 

1220 ) 

1221 except BaseException as e: 

1222 self._handle_dbapi_exception(e, None, None, None, None) 

1223 

1224 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1225 if self._has_events or self.engine._has_events: 

1226 self.dispatch.commit_twophase(self, xid, is_prepared) 

1227 

1228 assert isinstance(self._transaction, TwoPhaseTransaction) 

1229 try: 

1230 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1231 except BaseException as e: 

1232 self._handle_dbapi_exception(e, None, None, None, None) 

1233 

1234 def close(self) -> None: 

1235 """Close this :class:`_engine.Connection`. 

1236 

1237 This results in a release of the underlying database 

1238 resources, that is, the DBAPI connection referenced 

1239 internally. The DBAPI connection is typically restored 

1240 back to the connection-holding :class:`_pool.Pool` referenced 

1241 by the :class:`_engine.Engine` that produced this 

1242 :class:`_engine.Connection`. Any transactional state present on 

1243 the DBAPI connection is also unconditionally released via 

1244 the DBAPI connection's ``rollback()`` method, regardless 

1245 of any :class:`.Transaction` object that may be 

1246 outstanding with regards to this :class:`_engine.Connection`. 

1247 

1248 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1249 if any transaction is in place. 

1250 

1251 After :meth:`_engine.Connection.close` is called, the 

1252 :class:`_engine.Connection` is permanently in a closed state, 

1253 and will allow no further operations. 

1254 

1255 """ 

1256 

1257 if self._transaction: 

1258 self._transaction.close() 

1259 skip_reset = True 

1260 else: 

1261 skip_reset = False 

1262 

1263 if self._dbapi_connection is not None: 

1264 conn = self._dbapi_connection 

1265 

1266 # as we just closed the transaction, close the connection 

1267 # pool connection without doing an additional reset 

1268 if skip_reset: 

1269 cast("_ConnectionFairy", conn)._close_special( 

1270 transaction_reset=True 

1271 ) 

1272 else: 

1273 conn.close() 

1274 

1275 # There is a slight chance that conn.close() may have 

1276 # triggered an invalidation here in which case 

1277 # _dbapi_connection would already be None, however usually 

1278 # it will be non-None here and in a "closed" state. 

1279 self._dbapi_connection = None 

1280 self.__can_reconnect = False 

1281 

1282 @overload 

1283 def scalar( 

1284 self, 

1285 statement: TypedReturnsRows[_T], 

1286 parameters: Optional[_CoreSingleExecuteParams] = None, 

1287 *, 

1288 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1289 ) -> Optional[_T]: ... 

1290 

1291 @overload 

1292 def scalar( 

1293 self, 

1294 statement: Executable, 

1295 parameters: Optional[_CoreSingleExecuteParams] = None, 

1296 *, 

1297 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1298 ) -> Any: ... 

1299 

1300 def scalar( 

1301 self, 

1302 statement: Executable, 

1303 parameters: Optional[_CoreSingleExecuteParams] = None, 

1304 *, 

1305 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1306 ) -> Any: 

1307 r"""Executes a SQL statement construct and returns a scalar object. 

1308 

1309 This method is shorthand for invoking the 

1310 :meth:`_engine.Result.scalar` method after invoking the 

1311 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1312 

1313 :return: a scalar Python value representing the first column of the 

1314 first row returned. 

1315 

1316 """ 

1317 distilled_parameters = _distill_params_20(parameters) 

1318 try: 

1319 meth = statement._execute_on_scalar 

1320 except AttributeError as err: 

1321 raise exc.ObjectNotExecutableError(statement) from err 

1322 else: 

1323 return meth( 

1324 self, 

1325 distilled_parameters, 

1326 execution_options or NO_OPTIONS, 

1327 ) 

1328 

1329 @overload 

1330 def scalars( 

1331 self, 

1332 statement: TypedReturnsRows[_T], 

1333 parameters: Optional[_CoreAnyExecuteParams] = None, 

1334 *, 

1335 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1336 ) -> ScalarResult[_T]: ... 

1337 

1338 @overload 

1339 def scalars( 

1340 self, 

1341 statement: Executable, 

1342 parameters: Optional[_CoreAnyExecuteParams] = None, 

1343 *, 

1344 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1345 ) -> ScalarResult[Any]: ... 

1346 

1347 def scalars( 

1348 self, 

1349 statement: Executable, 

1350 parameters: Optional[_CoreAnyExecuteParams] = None, 

1351 *, 

1352 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1353 ) -> ScalarResult[Any]: 

1354 """Executes and returns a scalar result set, which yields scalar values 

1355 from the first column of each row. 

1356 

1357 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1358 to receive a :class:`_result.Result` object, then invoking the 

1359 :meth:`_result.Result.scalars` method to produce a 

1360 :class:`_result.ScalarResult` instance. 

1361 

1362 :return: a :class:`_result.ScalarResult` 

1363 

1364 .. versionadded:: 1.4.24 

1365 

1366 """ 

1367 

1368 return self.execute( 

1369 statement, parameters, execution_options=execution_options 

1370 ).scalars() 

1371 

1372 @overload 

1373 def execute( 

1374 self, 

1375 statement: TypedReturnsRows[Unpack[_Ts]], 

1376 parameters: Optional[_CoreAnyExecuteParams] = None, 

1377 *, 

1378 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1379 ) -> CursorResult[Unpack[_Ts]]: ... 

1380 

1381 @overload 

1382 def execute( 

1383 self, 

1384 statement: Executable, 

1385 parameters: Optional[_CoreAnyExecuteParams] = None, 

1386 *, 

1387 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1388 ) -> CursorResult[Unpack[TupleAny]]: ... 

1389 

1390 def execute( 

1391 self, 

1392 statement: Executable, 

1393 parameters: Optional[_CoreAnyExecuteParams] = None, 

1394 *, 

1395 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1396 ) -> CursorResult[Unpack[TupleAny]]: 

1397 r"""Executes a SQL statement construct and returns a 

1398 :class:`_engine.CursorResult`. 

1399 

1400 :param statement: The statement to be executed. This is always 

1401 an object that is in both the :class:`_expression.ClauseElement` and 

1402 :class:`_expression.Executable` hierarchies, including: 

1403 

1404 * :class:`_expression.Select` 

1405 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1406 :class:`_expression.Delete` 

1407 * :class:`_expression.TextClause` and 

1408 :class:`_expression.TextualSelect` 

1409 * :class:`_schema.DDL` and objects which inherit from 

1410 :class:`_schema.ExecutableDDLElement` 

1411 

1412 :param parameters: parameters which will be bound into the statement. 

1413 This may be either a dictionary of parameter names to values, 

1414 or a mutable sequence (e.g. a list) of dictionaries. When a 

1415 list of dictionaries is passed, the underlying statement execution 

1416 will make use of the DBAPI ``cursor.executemany()`` method. 

1417 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1418 method will be used. 

1419 

1420 :param execution_options: optional dictionary of execution options, 

1421 which will be associated with the statement execution. This 

1422 dictionary can provide a subset of the options that are accepted 

1423 by :meth:`_engine.Connection.execution_options`. 

1424 

1425 :return: a :class:`_engine.Result` object. 

1426 

1427 """ 

1428 distilled_parameters = _distill_params_20(parameters) 

1429 try: 

1430 meth = statement._execute_on_connection 

1431 except AttributeError as err: 

1432 raise exc.ObjectNotExecutableError(statement) from err 

1433 else: 

1434 return meth( 

1435 self, 

1436 distilled_parameters, 

1437 execution_options or NO_OPTIONS, 

1438 ) 

1439 

1440 def _execute_function( 

1441 self, 

1442 func: FunctionElement[Any], 

1443 distilled_parameters: _CoreMultiExecuteParams, 

1444 execution_options: CoreExecuteOptionsParameter, 

1445 ) -> CursorResult[Unpack[TupleAny]]: 

1446 """Execute a sql.FunctionElement object.""" 

1447 

1448 return self._execute_clauseelement( 

1449 func.select(), distilled_parameters, execution_options 

1450 ) 

1451 

1452 def _execute_default( 

1453 self, 

1454 default: DefaultGenerator, 

1455 distilled_parameters: _CoreMultiExecuteParams, 

1456 execution_options: CoreExecuteOptionsParameter, 

1457 ) -> Any: 

1458 """Execute a schema.ColumnDefault object.""" 

1459 

1460 exec_opts = self._execution_options.merge_with(execution_options) 

1461 

1462 event_multiparams: Optional[_CoreMultiExecuteParams] 

1463 event_params: Optional[_CoreAnyExecuteParams] 

1464 

1465 # note for event handlers, the "distilled parameters" which is always 

1466 # a list of dicts is broken out into separate "multiparams" and 

1467 # "params" collections, which allows the handler to distinguish 

1468 # between an executemany and execute style set of parameters. 

1469 if self._has_events or self.engine._has_events: 

1470 ( 

1471 default, 

1472 distilled_parameters, 

1473 event_multiparams, 

1474 event_params, 

1475 ) = self._invoke_before_exec_event( 

1476 default, distilled_parameters, exec_opts 

1477 ) 

1478 else: 

1479 event_multiparams = event_params = None 

1480 

1481 try: 

1482 conn = self._dbapi_connection 

1483 if conn is None: 

1484 conn = self._revalidate_connection() 

1485 

1486 dialect = self.dialect 

1487 ctx = dialect.execution_ctx_cls._init_default( 

1488 dialect, self, conn, exec_opts 

1489 ) 

1490 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1491 raise 

1492 except BaseException as e: 

1493 self._handle_dbapi_exception(e, None, None, None, None) 

1494 

1495 ret = ctx._exec_default(None, default, None) 

1496 

1497 if self._has_events or self.engine._has_events: 

1498 self.dispatch.after_execute( 

1499 self, 

1500 default, 

1501 event_multiparams, 

1502 event_params, 

1503 exec_opts, 

1504 ret, 

1505 ) 

1506 

1507 return ret 

1508 

1509 def _execute_ddl( 

1510 self, 

1511 ddl: ExecutableDDLElement, 

1512 distilled_parameters: _CoreMultiExecuteParams, 

1513 execution_options: CoreExecuteOptionsParameter, 

1514 ) -> CursorResult[Unpack[TupleAny]]: 

1515 """Execute a schema.DDL object.""" 

1516 

1517 exec_opts = ddl._execution_options.merge_with( 

1518 self._execution_options, execution_options 

1519 ) 

1520 

1521 event_multiparams: Optional[_CoreMultiExecuteParams] 

1522 event_params: Optional[_CoreSingleExecuteParams] 

1523 

1524 if self._has_events or self.engine._has_events: 

1525 ( 

1526 ddl, 

1527 distilled_parameters, 

1528 event_multiparams, 

1529 event_params, 

1530 ) = self._invoke_before_exec_event( 

1531 ddl, distilled_parameters, exec_opts 

1532 ) 

1533 else: 

1534 event_multiparams = event_params = None 

1535 

1536 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1537 

1538 dialect = self.dialect 

1539 

1540 compiled = ddl.compile( 

1541 dialect=dialect, schema_translate_map=schema_translate_map 

1542 ) 

1543 ret = self._execute_context( 

1544 dialect, 

1545 dialect.execution_ctx_cls._init_ddl, 

1546 compiled, 

1547 None, 

1548 exec_opts, 

1549 compiled, 

1550 ) 

1551 if self._has_events or self.engine._has_events: 

1552 self.dispatch.after_execute( 

1553 self, 

1554 ddl, 

1555 event_multiparams, 

1556 event_params, 

1557 exec_opts, 

1558 ret, 

1559 ) 

1560 return ret 

1561 

1562 def _invoke_before_exec_event( 

1563 self, 

1564 elem: Any, 

1565 distilled_params: _CoreMultiExecuteParams, 

1566 execution_options: _ExecuteOptions, 

1567 ) -> Tuple[ 

1568 Any, 

1569 _CoreMultiExecuteParams, 

1570 _CoreMultiExecuteParams, 

1571 _CoreSingleExecuteParams, 

1572 ]: 

1573 event_multiparams: _CoreMultiExecuteParams 

1574 event_params: _CoreSingleExecuteParams 

1575 

1576 if len(distilled_params) == 1: 

1577 event_multiparams, event_params = [], distilled_params[0] 

1578 else: 

1579 event_multiparams, event_params = distilled_params, {} 

1580 

1581 for fn in self.dispatch.before_execute: 

1582 elem, event_multiparams, event_params = fn( 

1583 self, 

1584 elem, 

1585 event_multiparams, 

1586 event_params, 

1587 execution_options, 

1588 ) 

1589 

1590 if event_multiparams: 

1591 distilled_params = list(event_multiparams) 

1592 if event_params: 

1593 raise exc.InvalidRequestError( 

1594 "Event handler can't return non-empty multiparams " 

1595 "and params at the same time" 

1596 ) 

1597 elif event_params: 

1598 distilled_params = [event_params] 

1599 else: 

1600 distilled_params = [] 

1601 

1602 return elem, distilled_params, event_multiparams, event_params 

1603 

1604 def _execute_clauseelement( 

1605 self, 

1606 elem: Executable, 

1607 distilled_parameters: _CoreMultiExecuteParams, 

1608 execution_options: CoreExecuteOptionsParameter, 

1609 ) -> CursorResult[Unpack[TupleAny]]: 

1610 """Execute a sql.ClauseElement object.""" 

1611 

1612 exec_opts = elem._execution_options.merge_with( 

1613 self._execution_options, execution_options 

1614 ) 

1615 

1616 has_events = self._has_events or self.engine._has_events 

1617 if has_events: 

1618 ( 

1619 elem, 

1620 distilled_parameters, 

1621 event_multiparams, 

1622 event_params, 

1623 ) = self._invoke_before_exec_event( 

1624 elem, distilled_parameters, exec_opts 

1625 ) 

1626 

1627 if distilled_parameters: 

1628 # ensure we don't retain a link to the view object for keys() 

1629 # which links to the values, which we don't want to cache 

1630 keys = sorted(distilled_parameters[0]) 

1631 for_executemany = len(distilled_parameters) > 1 

1632 else: 

1633 keys = [] 

1634 for_executemany = False 

1635 

1636 dialect = self.dialect 

1637 

1638 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1639 

1640 compiled_cache: Optional[CompiledCacheType] = exec_opts.get( 

1641 "compiled_cache", self.engine._compiled_cache 

1642 ) 

1643 

1644 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 

1645 dialect=dialect, 

1646 compiled_cache=compiled_cache, 

1647 column_keys=keys, 

1648 for_executemany=for_executemany, 

1649 schema_translate_map=schema_translate_map, 

1650 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1651 ) 

1652 ret = self._execute_context( 

1653 dialect, 

1654 dialect.execution_ctx_cls._init_compiled, 

1655 compiled_sql, 

1656 distilled_parameters, 

1657 exec_opts, 

1658 compiled_sql, 

1659 distilled_parameters, 

1660 elem, 

1661 extracted_params, 

1662 cache_hit=cache_hit, 

1663 ) 

1664 if has_events: 

1665 self.dispatch.after_execute( 

1666 self, 

1667 elem, 

1668 event_multiparams, 

1669 event_params, 

1670 exec_opts, 

1671 ret, 

1672 ) 

1673 return ret 

1674 

1675 def _execute_compiled( 

1676 self, 

1677 compiled: Compiled, 

1678 distilled_parameters: _CoreMultiExecuteParams, 

1679 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS, 

1680 ) -> CursorResult[Unpack[TupleAny]]: 

1681 """Execute a sql.Compiled object. 

1682 

1683 TODO: why do we have this? likely deprecate or remove 

1684 

1685 """ 

1686 

1687 exec_opts = compiled.execution_options.merge_with( 

1688 self._execution_options, execution_options 

1689 ) 

1690 

1691 if self._has_events or self.engine._has_events: 

1692 ( 

1693 compiled, 

1694 distilled_parameters, 

1695 event_multiparams, 

1696 event_params, 

1697 ) = self._invoke_before_exec_event( 

1698 compiled, distilled_parameters, exec_opts 

1699 ) 

1700 

1701 dialect = self.dialect 

1702 

1703 ret = self._execute_context( 

1704 dialect, 

1705 dialect.execution_ctx_cls._init_compiled, 

1706 compiled, 

1707 distilled_parameters, 

1708 exec_opts, 

1709 compiled, 

1710 distilled_parameters, 

1711 None, 

1712 None, 

1713 ) 

1714 if self._has_events or self.engine._has_events: 

1715 self.dispatch.after_execute( 

1716 self, 

1717 compiled, 

1718 event_multiparams, 

1719 event_params, 

1720 exec_opts, 

1721 ret, 

1722 ) 

1723 return ret 

1724 

1725 def exec_driver_sql( 

1726 self, 

1727 statement: str, 

1728 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1729 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1730 ) -> CursorResult[Unpack[TupleAny]]: 

1731 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1732 without any SQL compilation steps. 

1733 

1734 This can be used to pass any string directly to the 

1735 ``cursor.execute()`` method of the DBAPI in use. 

1736 

1737 :param statement: The statement str to be executed. Bound parameters 

1738 must use the underlying DBAPI's paramstyle, such as "qmark", 

1739 "pyformat", "format", etc. 

1740 

1741 :param parameters: represent bound parameter values to be used in the 

1742 execution. The format is one of: a dictionary of named parameters, 

1743 a tuple of positional parameters, or a list containing either 

1744 dictionaries or tuples for multiple-execute support. 

1745 

1746 :return: a :class:`_engine.CursorResult`. 

1747 

1748 E.g. multiple dictionaries:: 

1749 

1750 

1751 conn.exec_driver_sql( 

1752 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1753 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}], 

1754 ) 

1755 

1756 Single dictionary:: 

1757 

1758 conn.exec_driver_sql( 

1759 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1760 dict(id=1, value="v1"), 

1761 ) 

1762 

1763 Single tuple:: 

1764 

1765 conn.exec_driver_sql( 

1766 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1") 

1767 ) 

1768 

1769 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1770 not participate in the 

1771 :meth:`_events.ConnectionEvents.before_execute` and 

1772 :meth:`_events.ConnectionEvents.after_execute` events. To 

1773 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1774 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1775 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1776 

1777 .. seealso:: 

1778 

1779 :pep:`249` 

1780 

1781 """ 

1782 

1783 distilled_parameters = _distill_raw_params(parameters) 

1784 

1785 exec_opts = self._execution_options.merge_with(execution_options) 

1786 

1787 dialect = self.dialect 

1788 ret = self._execute_context( 

1789 dialect, 

1790 dialect.execution_ctx_cls._init_statement, 

1791 statement, 

1792 None, 

1793 exec_opts, 

1794 statement, 

1795 distilled_parameters, 

1796 ) 

1797 

1798 return ret 

1799 

1800 def _execute_context( 

1801 self, 

1802 dialect: Dialect, 

1803 constructor: Callable[..., ExecutionContext], 

1804 statement: Union[str, Compiled], 

1805 parameters: Optional[_AnyMultiExecuteParams], 

1806 execution_options: _ExecuteOptions, 

1807 *args: Any, 

1808 **kw: Any, 

1809 ) -> CursorResult[Unpack[TupleAny]]: 

1810 """Create an :class:`.ExecutionContext` and execute, returning 

1811 a :class:`_engine.CursorResult`.""" 

1812 

1813 if execution_options: 

1814 yp = execution_options.get("yield_per", None) 

1815 if yp: 

1816 execution_options = execution_options.union( 

1817 {"stream_results": True, "max_row_buffer": yp} 

1818 ) 

1819 try: 

1820 conn = self._dbapi_connection 

1821 if conn is None: 

1822 conn = self._revalidate_connection() 

1823 

1824 context = constructor( 

1825 dialect, self, conn, execution_options, *args, **kw 

1826 ) 

1827 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1828 raise 

1829 except BaseException as e: 

1830 self._handle_dbapi_exception( 

1831 e, str(statement), parameters, None, None 

1832 ) 

1833 

1834 if ( 

1835 self._transaction 

1836 and not self._transaction.is_active 

1837 or ( 

1838 self._nested_transaction 

1839 and not self._nested_transaction.is_active 

1840 ) 

1841 ): 

1842 self._invalid_transaction() 

1843 

1844 elif self._trans_context_manager: 

1845 TransactionalContext._trans_ctx_check(self) 

1846 

1847 if self._transaction is None: 

1848 self._autobegin() 

1849 

1850 context.pre_exec() 

1851 

1852 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1853 return self._exec_insertmany_context(dialect, context) 

1854 else: 

1855 return self._exec_single_context( 

1856 dialect, context, statement, parameters 

1857 ) 

1858 

1859 def _exec_single_context( 

1860 self, 

1861 dialect: Dialect, 

1862 context: ExecutionContext, 

1863 statement: Union[str, Compiled], 

1864 parameters: Optional[_AnyMultiExecuteParams], 

1865 ) -> CursorResult[Unpack[TupleAny]]: 

1866 """continue the _execute_context() method for a single DBAPI 

1867 cursor.execute() or cursor.executemany() call. 

1868 

1869 """ 

1870 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1871 generic_setinputsizes = context._prepare_set_input_sizes() 

1872 

1873 if generic_setinputsizes: 

1874 try: 

1875 dialect.do_set_input_sizes( 

1876 context.cursor, generic_setinputsizes, context 

1877 ) 

1878 except BaseException as e: 

1879 self._handle_dbapi_exception( 

1880 e, str(statement), parameters, None, context 

1881 ) 

1882 

1883 cursor, str_statement, parameters = ( 

1884 context.cursor, 

1885 context.statement, 

1886 context.parameters, 

1887 ) 

1888 

1889 effective_parameters: Optional[_AnyExecuteParams] 

1890 

1891 if not context.executemany: 

1892 effective_parameters = parameters[0] 

1893 else: 

1894 effective_parameters = parameters 

1895 

1896 if self._has_events or self.engine._has_events: 

1897 for fn in self.dispatch.before_cursor_execute: 

1898 str_statement, effective_parameters = fn( 

1899 self, 

1900 cursor, 

1901 str_statement, 

1902 effective_parameters, 

1903 context, 

1904 context.executemany, 

1905 ) 

1906 

1907 if self._echo: 

1908 self._log_info(str_statement) 

1909 

1910 stats = context._get_cache_stats() 

1911 

1912 if not self.engine.hide_parameters: 

1913 self._log_info( 

1914 "[%s] %r", 

1915 stats, 

1916 sql_util._repr_params( 

1917 effective_parameters, 

1918 batches=10, 

1919 ismulti=context.executemany, 

1920 ), 

1921 ) 

1922 else: 

1923 self._log_info( 

1924 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1925 stats, 

1926 ) 

1927 

1928 evt_handled: bool = False 

1929 try: 

1930 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1931 effective_parameters = cast( 

1932 "_CoreMultiExecuteParams", effective_parameters 

1933 ) 

1934 if self.dialect._has_events: 

1935 for fn in self.dialect.dispatch.do_executemany: 

1936 if fn( 

1937 cursor, 

1938 str_statement, 

1939 effective_parameters, 

1940 context, 

1941 ): 

1942 evt_handled = True 

1943 break 

1944 if not evt_handled: 

1945 self.dialect.do_executemany( 

1946 cursor, 

1947 str_statement, 

1948 effective_parameters, 

1949 context, 

1950 ) 

1951 elif not effective_parameters and context.no_parameters: 

1952 if self.dialect._has_events: 

1953 for fn in self.dialect.dispatch.do_execute_no_params: 

1954 if fn(cursor, str_statement, context): 

1955 evt_handled = True 

1956 break 

1957 if not evt_handled: 

1958 self.dialect.do_execute_no_params( 

1959 cursor, str_statement, context 

1960 ) 

1961 else: 

1962 effective_parameters = cast( 

1963 "_CoreSingleExecuteParams", effective_parameters 

1964 ) 

1965 if self.dialect._has_events: 

1966 for fn in self.dialect.dispatch.do_execute: 

1967 if fn( 

1968 cursor, 

1969 str_statement, 

1970 effective_parameters, 

1971 context, 

1972 ): 

1973 evt_handled = True 

1974 break 

1975 if not evt_handled: 

1976 self.dialect.do_execute( 

1977 cursor, str_statement, effective_parameters, context 

1978 ) 

1979 

1980 if self._has_events or self.engine._has_events: 

1981 self.dispatch.after_cursor_execute( 

1982 self, 

1983 cursor, 

1984 str_statement, 

1985 effective_parameters, 

1986 context, 

1987 context.executemany, 

1988 ) 

1989 

1990 context.post_exec() 

1991 

1992 result = context._setup_result_proxy() 

1993 

1994 except BaseException as e: 

1995 self._handle_dbapi_exception( 

1996 e, str_statement, effective_parameters, cursor, context 

1997 ) 

1998 

1999 return result 

2000 

2001 def _exec_insertmany_context( 

2002 self, 

2003 dialect: Dialect, 

2004 context: ExecutionContext, 

2005 ) -> CursorResult[Unpack[TupleAny]]: 

2006 """continue the _execute_context() method for an "insertmanyvalues" 

2007 operation, which will invoke DBAPI 

2008 cursor.execute() one or more times with individual log and 

2009 event hook calls. 

2010 

2011 """ 

2012 

2013 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

2014 generic_setinputsizes = context._prepare_set_input_sizes() 

2015 else: 

2016 generic_setinputsizes = None 

2017 

2018 cursor, str_statement, parameters = ( 

2019 context.cursor, 

2020 context.statement, 

2021 context.parameters, 

2022 ) 

2023 

2024 effective_parameters = parameters 

2025 

2026 engine_events = self._has_events or self.engine._has_events 

2027 if self.dialect._has_events: 

2028 do_execute_dispatch: Iterable[Any] = ( 

2029 self.dialect.dispatch.do_execute 

2030 ) 

2031 else: 

2032 do_execute_dispatch = () 

2033 

2034 if self._echo: 

2035 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2036 

2037 preserve_rowcount = context.execution_options.get( 

2038 "preserve_rowcount", False 

2039 ) 

2040 rowcount = 0 

2041 

2042 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2043 self, 

2044 cursor, 

2045 str_statement, 

2046 effective_parameters, 

2047 generic_setinputsizes, 

2048 context, 

2049 ): 

2050 if imv_batch.processed_setinputsizes: 

2051 try: 

2052 dialect.do_set_input_sizes( 

2053 context.cursor, 

2054 imv_batch.processed_setinputsizes, 

2055 context, 

2056 ) 

2057 except BaseException as e: 

2058 self._handle_dbapi_exception( 

2059 e, 

2060 sql_util._long_statement(imv_batch.replaced_statement), 

2061 imv_batch.replaced_parameters, 

2062 None, 

2063 context, 

2064 is_sub_exec=True, 

2065 ) 

2066 

2067 sub_stmt = imv_batch.replaced_statement 

2068 sub_params = imv_batch.replaced_parameters 

2069 

2070 if engine_events: 

2071 for fn in self.dispatch.before_cursor_execute: 

2072 sub_stmt, sub_params = fn( 

2073 self, 

2074 cursor, 

2075 sub_stmt, 

2076 sub_params, 

2077 context, 

2078 True, 

2079 ) 

2080 

2081 if self._echo: 

2082 self._log_info(sql_util._long_statement(sub_stmt)) 

2083 

2084 imv_stats = f""" {imv_batch.batchnum}/{ 

2085 imv_batch.total_batches 

2086 } ({ 

2087 'ordered' 

2088 if imv_batch.rows_sorted else 'unordered' 

2089 }{ 

2090 '; batch not supported' 

2091 if imv_batch.is_downgraded 

2092 else '' 

2093 })""" 

2094 

2095 if imv_batch.batchnum == 1: 

2096 stats += imv_stats 

2097 else: 

2098 stats = f"insertmanyvalues{imv_stats}" 

2099 

2100 if not self.engine.hide_parameters: 

2101 self._log_info( 

2102 "[%s] %r", 

2103 stats, 

2104 sql_util._repr_params( 

2105 sub_params, 

2106 batches=10, 

2107 ismulti=False, 

2108 ), 

2109 ) 

2110 else: 

2111 self._log_info( 

2112 "[%s] [SQL parameters hidden due to " 

2113 "hide_parameters=True]", 

2114 stats, 

2115 ) 

2116 

2117 try: 

2118 for fn in do_execute_dispatch: 

2119 if fn( 

2120 cursor, 

2121 sub_stmt, 

2122 sub_params, 

2123 context, 

2124 ): 

2125 break 

2126 else: 

2127 dialect.do_execute( 

2128 cursor, 

2129 sub_stmt, 

2130 sub_params, 

2131 context, 

2132 ) 

2133 

2134 except BaseException as e: 

2135 self._handle_dbapi_exception( 

2136 e, 

2137 sql_util._long_statement(sub_stmt), 

2138 sub_params, 

2139 cursor, 

2140 context, 

2141 is_sub_exec=True, 

2142 ) 

2143 

2144 if engine_events: 

2145 self.dispatch.after_cursor_execute( 

2146 self, 

2147 cursor, 

2148 str_statement, 

2149 effective_parameters, 

2150 context, 

2151 context.executemany, 

2152 ) 

2153 

2154 if preserve_rowcount: 

2155 rowcount += imv_batch.current_batch_size 

2156 

2157 try: 

2158 context.post_exec() 

2159 

2160 if preserve_rowcount: 

2161 context._rowcount = rowcount # type: ignore[attr-defined] 

2162 

2163 result = context._setup_result_proxy() 

2164 

2165 except BaseException as e: 

2166 self._handle_dbapi_exception( 

2167 e, str_statement, effective_parameters, cursor, context 

2168 ) 

2169 

2170 return result 

2171 

2172 def _cursor_execute( 

2173 self, 

2174 cursor: DBAPICursor, 

2175 statement: str, 

2176 parameters: _DBAPISingleExecuteParams, 

2177 context: Optional[ExecutionContext] = None, 

2178 ) -> None: 

2179 """Execute a statement + params on the given cursor. 

2180 

2181 Adds appropriate logging and exception handling. 

2182 

2183 This method is used by DefaultDialect for special-case 

2184 executions, such as for sequences and column defaults. 

2185 The path of statement execution in the majority of cases 

2186 terminates at _execute_context(). 

2187 

2188 """ 

2189 if self._has_events or self.engine._has_events: 

2190 for fn in self.dispatch.before_cursor_execute: 

2191 statement, parameters = fn( 

2192 self, cursor, statement, parameters, context, False 

2193 ) 

2194 

2195 if self._echo: 

2196 self._log_info(statement) 

2197 self._log_info("[raw sql] %r", parameters) 

2198 try: 

2199 for fn in ( 

2200 () 

2201 if not self.dialect._has_events 

2202 else self.dialect.dispatch.do_execute 

2203 ): 

2204 if fn(cursor, statement, parameters, context): 

2205 break 

2206 else: 

2207 self.dialect.do_execute(cursor, statement, parameters, context) 

2208 except BaseException as e: 

2209 self._handle_dbapi_exception( 

2210 e, statement, parameters, cursor, context 

2211 ) 

2212 

2213 if self._has_events or self.engine._has_events: 

2214 self.dispatch.after_cursor_execute( 

2215 self, cursor, statement, parameters, context, False 

2216 ) 

2217 

2218 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2219 """Close the given cursor, catching exceptions 

2220 and turning into log warnings. 

2221 

2222 """ 

2223 try: 

2224 cursor.close() 

2225 except Exception: 

2226 # log the error through the connection pool's logger. 

2227 self.engine.pool.logger.error( 

2228 "Error closing cursor", exc_info=True 

2229 ) 

2230 

2231 _reentrant_error = False 

2232 _is_disconnect = False 

2233 

2234 def _handle_dbapi_exception( 

2235 self, 

2236 e: BaseException, 

2237 statement: Optional[str], 

2238 parameters: Optional[_AnyExecuteParams], 

2239 cursor: Optional[DBAPICursor], 

2240 context: Optional[ExecutionContext], 

2241 is_sub_exec: bool = False, 

2242 ) -> NoReturn: 

2243 exc_info = sys.exc_info() 

2244 

2245 is_exit_exception = util.is_exit_exception(e) 

2246 

2247 if not self._is_disconnect: 

2248 self._is_disconnect = ( 

2249 isinstance(e, self.dialect.loaded_dbapi.Error) 

2250 and not self.closed 

2251 and self.dialect.is_disconnect( 

2252 e, 

2253 self._dbapi_connection if not self.invalidated else None, 

2254 cursor, 

2255 ) 

2256 ) or (is_exit_exception and not self.closed) 

2257 

2258 invalidate_pool_on_disconnect = not is_exit_exception 

2259 

2260 ismulti: bool = ( 

2261 not is_sub_exec and context.executemany 

2262 if context is not None 

2263 else False 

2264 ) 

2265 if self._reentrant_error: 

2266 raise exc.DBAPIError.instance( 

2267 statement, 

2268 parameters, 

2269 e, 

2270 self.dialect.loaded_dbapi.Error, 

2271 hide_parameters=self.engine.hide_parameters, 

2272 dialect=self.dialect, 

2273 ismulti=ismulti, 

2274 ).with_traceback(exc_info[2]) from e 

2275 self._reentrant_error = True 

2276 try: 

2277 # non-DBAPI error - if we already got a context, 

2278 # or there's no string statement, don't wrap it 

2279 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2280 statement is not None 

2281 and context is None 

2282 and not is_exit_exception 

2283 ) 

2284 

2285 if should_wrap: 

2286 sqlalchemy_exception = exc.DBAPIError.instance( 

2287 statement, 

2288 parameters, 

2289 cast(Exception, e), 

2290 self.dialect.loaded_dbapi.Error, 

2291 hide_parameters=self.engine.hide_parameters, 

2292 connection_invalidated=self._is_disconnect, 

2293 dialect=self.dialect, 

2294 ismulti=ismulti, 

2295 ) 

2296 else: 

2297 sqlalchemy_exception = None 

2298 

2299 newraise = None 

2300 

2301 if (self.dialect._has_events) and not self._execution_options.get( 

2302 "skip_user_error_events", False 

2303 ): 

2304 ctx = ExceptionContextImpl( 

2305 e, 

2306 sqlalchemy_exception, 

2307 self.engine, 

2308 self.dialect, 

2309 self, 

2310 cursor, 

2311 statement, 

2312 parameters, 

2313 context, 

2314 self._is_disconnect, 

2315 invalidate_pool_on_disconnect, 

2316 False, 

2317 ) 

2318 

2319 for fn in self.dialect.dispatch.handle_error: 

2320 try: 

2321 # handler returns an exception; 

2322 # call next handler in a chain 

2323 per_fn = fn(ctx) 

2324 if per_fn is not None: 

2325 ctx.chained_exception = newraise = per_fn 

2326 except Exception as _raised: 

2327 # handler raises an exception - stop processing 

2328 newraise = _raised 

2329 break 

2330 

2331 if self._is_disconnect != ctx.is_disconnect: 

2332 self._is_disconnect = ctx.is_disconnect 

2333 if sqlalchemy_exception: 

2334 sqlalchemy_exception.connection_invalidated = ( 

2335 ctx.is_disconnect 

2336 ) 

2337 

2338 # set up potentially user-defined value for 

2339 # invalidate pool. 

2340 invalidate_pool_on_disconnect = ( 

2341 ctx.invalidate_pool_on_disconnect 

2342 ) 

2343 

2344 if should_wrap and context: 

2345 context.handle_dbapi_exception(e) 

2346 

2347 if not self._is_disconnect: 

2348 if cursor: 

2349 self._safe_close_cursor(cursor) 

2350 # "autorollback" was mostly relevant in 1.x series. 

2351 # It's very unlikely to reach here, as the connection 

2352 # does autobegin so when we are here, we are usually 

2353 # in an explicit / semi-explicit transaction. 

2354 # however we have a test which manufactures this 

2355 # scenario in any case using an event handler. 

2356 # test/engine/test_execute.py-> test_actual_autorollback 

2357 if not self.in_transaction(): 

2358 self._rollback_impl() 

2359 

2360 if newraise: 

2361 raise newraise.with_traceback(exc_info[2]) from e 

2362 elif should_wrap: 

2363 assert sqlalchemy_exception is not None 

2364 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2365 else: 

2366 assert exc_info[1] is not None 

2367 raise exc_info[1].with_traceback(exc_info[2]) 

2368 finally: 

2369 del self._reentrant_error 

2370 if self._is_disconnect: 

2371 del self._is_disconnect 

2372 if not self.invalidated: 

2373 dbapi_conn_wrapper = self._dbapi_connection 

2374 assert dbapi_conn_wrapper is not None 

2375 if invalidate_pool_on_disconnect: 

2376 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2377 self.invalidate(e) 

2378 

2379 @classmethod 

2380 def _handle_dbapi_exception_noconnection( 

2381 cls, 

2382 e: BaseException, 

2383 dialect: Dialect, 

2384 engine: Optional[Engine] = None, 

2385 is_disconnect: Optional[bool] = None, 

2386 invalidate_pool_on_disconnect: bool = True, 

2387 is_pre_ping: bool = False, 

2388 ) -> NoReturn: 

2389 exc_info = sys.exc_info() 

2390 

2391 if is_disconnect is None: 

2392 is_disconnect = isinstance( 

2393 e, dialect.loaded_dbapi.Error 

2394 ) and dialect.is_disconnect(e, None, None) 

2395 

2396 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2397 

2398 if should_wrap: 

2399 sqlalchemy_exception = exc.DBAPIError.instance( 

2400 None, 

2401 None, 

2402 cast(Exception, e), 

2403 dialect.loaded_dbapi.Error, 

2404 hide_parameters=( 

2405 engine.hide_parameters if engine is not None else False 

2406 ), 

2407 connection_invalidated=is_disconnect, 

2408 dialect=dialect, 

2409 ) 

2410 else: 

2411 sqlalchemy_exception = None 

2412 

2413 newraise = None 

2414 

2415 if dialect._has_events: 

2416 ctx = ExceptionContextImpl( 

2417 e, 

2418 sqlalchemy_exception, 

2419 engine, 

2420 dialect, 

2421 None, 

2422 None, 

2423 None, 

2424 None, 

2425 None, 

2426 is_disconnect, 

2427 invalidate_pool_on_disconnect, 

2428 is_pre_ping, 

2429 ) 

2430 for fn in dialect.dispatch.handle_error: 

2431 try: 

2432 # handler returns an exception; 

2433 # call next handler in a chain 

2434 per_fn = fn(ctx) 

2435 if per_fn is not None: 

2436 ctx.chained_exception = newraise = per_fn 

2437 except Exception as _raised: 

2438 # handler raises an exception - stop processing 

2439 newraise = _raised 

2440 break 

2441 

2442 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2443 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect 

2444 

2445 if newraise: 

2446 raise newraise.with_traceback(exc_info[2]) from e 

2447 elif should_wrap: 

2448 assert sqlalchemy_exception is not None 

2449 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2450 else: 

2451 assert exc_info[1] is not None 

2452 raise exc_info[1].with_traceback(exc_info[2]) 

2453 

2454 def _run_ddl_visitor( 

2455 self, 

2456 visitorcallable: Type[InvokeDDLBase], 

2457 element: SchemaVisitable, 

2458 **kwargs: Any, 

2459 ) -> None: 

2460 """run a DDL visitor. 

2461 

2462 This method is only here so that the MockConnection can change the 

2463 options given to the visitor so that "checkfirst" is skipped. 

2464 

2465 """ 

2466 visitorcallable( 

2467 dialect=self.dialect, connection=self, **kwargs 

2468 ).traverse_single(element) 

2469 

2470 

2471class ExceptionContextImpl(ExceptionContext): 

2472 """Implement the :class:`.ExceptionContext` interface.""" 

2473 

2474 __slots__ = ( 

2475 "connection", 

2476 "engine", 

2477 "dialect", 

2478 "cursor", 

2479 "statement", 

2480 "parameters", 

2481 "original_exception", 

2482 "sqlalchemy_exception", 

2483 "chained_exception", 

2484 "execution_context", 

2485 "is_disconnect", 

2486 "invalidate_pool_on_disconnect", 

2487 "is_pre_ping", 

2488 ) 

2489 

2490 def __init__( 

2491 self, 

2492 exception: BaseException, 

2493 sqlalchemy_exception: Optional[exc.StatementError], 

2494 engine: Optional[Engine], 

2495 dialect: Dialect, 

2496 connection: Optional[Connection], 

2497 cursor: Optional[DBAPICursor], 

2498 statement: Optional[str], 

2499 parameters: Optional[_DBAPIAnyExecuteParams], 

2500 context: Optional[ExecutionContext], 

2501 is_disconnect: bool, 

2502 invalidate_pool_on_disconnect: bool, 

2503 is_pre_ping: bool, 

2504 ): 

2505 self.engine = engine 

2506 self.dialect = dialect 

2507 self.connection = connection 

2508 self.sqlalchemy_exception = sqlalchemy_exception 

2509 self.original_exception = exception 

2510 self.execution_context = context 

2511 self.statement = statement 

2512 self.parameters = parameters 

2513 self.is_disconnect = is_disconnect 

2514 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2515 self.is_pre_ping = is_pre_ping 

2516 

2517 

2518class Transaction(TransactionalContext): 

2519 """Represent a database transaction in progress. 

2520 

2521 The :class:`.Transaction` object is procured by 

2522 calling the :meth:`_engine.Connection.begin` method of 

2523 :class:`_engine.Connection`:: 

2524 

2525 from sqlalchemy import create_engine 

2526 

2527 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2528 connection = engine.connect() 

2529 trans = connection.begin() 

2530 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2531 trans.commit() 

2532 

2533 The object provides :meth:`.rollback` and :meth:`.commit` 

2534 methods in order to control transaction boundaries. It 

2535 also implements a context manager interface so that 

2536 the Python ``with`` statement can be used with the 

2537 :meth:`_engine.Connection.begin` method:: 

2538 

2539 with connection.begin(): 

2540 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2541 

2542 The Transaction object is **not** threadsafe. 

2543 

2544 .. seealso:: 

2545 

2546 :meth:`_engine.Connection.begin` 

2547 

2548 :meth:`_engine.Connection.begin_twophase` 

2549 

2550 :meth:`_engine.Connection.begin_nested` 

2551 

2552 .. index:: 

2553 single: thread safety; Transaction 

2554 """ # noqa 

2555 

2556 __slots__ = () 

2557 

2558 _is_root: bool = False 

2559 is_active: bool 

2560 connection: Connection 

2561 

2562 def __init__(self, connection: Connection): 

2563 raise NotImplementedError() 

2564 

2565 @property 

2566 def _deactivated_from_connection(self) -> bool: 

2567 """True if this transaction is totally deactivated from the connection 

2568 and therefore can no longer affect its state. 

2569 

2570 """ 

2571 raise NotImplementedError() 

2572 

2573 def _do_close(self) -> None: 

2574 raise NotImplementedError() 

2575 

2576 def _do_rollback(self) -> None: 

2577 raise NotImplementedError() 

2578 

2579 def _do_commit(self) -> None: 

2580 raise NotImplementedError() 

2581 

2582 @property 

2583 def is_valid(self) -> bool: 

2584 return self.is_active and not self.connection.invalidated 

2585 

2586 def close(self) -> None: 

2587 """Close this :class:`.Transaction`. 

2588 

2589 If this transaction is the base transaction in a begin/commit 

2590 nesting, the transaction will rollback(). Otherwise, the 

2591 method returns. 

2592 

2593 This is used to cancel a Transaction without affecting the scope of 

2594 an enclosing transaction. 

2595 

2596 """ 

2597 try: 

2598 self._do_close() 

2599 finally: 

2600 assert not self.is_active 

2601 

2602 def rollback(self) -> None: 

2603 """Roll back this :class:`.Transaction`. 

2604 

2605 The implementation of this may vary based on the type of transaction in 

2606 use: 

2607 

2608 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2609 it corresponds to a ROLLBACK. 

2610 

2611 * For a :class:`.NestedTransaction`, it corresponds to a 

2612 "ROLLBACK TO SAVEPOINT" operation. 

2613 

2614 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2615 phase transactions may be used. 

2616 

2617 

2618 """ 

2619 try: 

2620 self._do_rollback() 

2621 finally: 

2622 assert not self.is_active 

2623 

2624 def commit(self) -> None: 

2625 """Commit this :class:`.Transaction`. 

2626 

2627 The implementation of this may vary based on the type of transaction in 

2628 use: 

2629 

2630 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2631 it corresponds to a COMMIT. 

2632 

2633 * For a :class:`.NestedTransaction`, it corresponds to a 

2634 "RELEASE SAVEPOINT" operation. 

2635 

2636 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2637 phase transactions may be used. 

2638 

2639 """ 

2640 try: 

2641 self._do_commit() 

2642 finally: 

2643 assert not self.is_active 

2644 

2645 def _get_subject(self) -> Connection: 

2646 return self.connection 

2647 

2648 def _transaction_is_active(self) -> bool: 

2649 return self.is_active 

2650 

2651 def _transaction_is_closed(self) -> bool: 

2652 return not self._deactivated_from_connection 

2653 

2654 def _rollback_can_be_called(self) -> bool: 

2655 # for RootTransaction / NestedTransaction, it's safe to call 

2656 # rollback() even if the transaction is deactive and no warnings 

2657 # will be emitted. tested in 

2658 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2659 return True 

2660 

2661 

2662class RootTransaction(Transaction): 

2663 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2664 

2665 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2666 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2667 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2668 remains associated with the :class:`_engine.Connection` throughout its 

2669 active span. The current :class:`_engine.RootTransaction` in use is 

2670 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2671 :class:`_engine.Connection`. 

2672 

2673 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2674 "autobegin" behavior that will create a new 

2675 :class:`_engine.RootTransaction` whenever a connection in a 

2676 non-transactional state is used to emit commands on the DBAPI connection. 

2677 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2678 use can be controlled using the :meth:`_engine.Connection.commit` and 

2679 :meth:`_engine.Connection.rollback` methods. 

2680 

2681 

2682 """ 

2683 

2684 _is_root = True 

2685 

2686 __slots__ = ("connection", "is_active") 

2687 

2688 def __init__(self, connection: Connection): 

2689 assert connection._transaction is None 

2690 if connection._trans_context_manager: 

2691 TransactionalContext._trans_ctx_check(connection) 

2692 self.connection = connection 

2693 self._connection_begin_impl() 

2694 connection._transaction = self 

2695 

2696 self.is_active = True 

2697 

2698 def _deactivate_from_connection(self) -> None: 

2699 if self.is_active: 

2700 assert self.connection._transaction is self 

2701 self.is_active = False 

2702 

2703 elif self.connection._transaction is not self: 

2704 util.warn("transaction already deassociated from connection") 

2705 

2706 @property 

2707 def _deactivated_from_connection(self) -> bool: 

2708 return self.connection._transaction is not self 

2709 

2710 def _connection_begin_impl(self) -> None: 

2711 self.connection._begin_impl(self) 

2712 

2713 def _connection_rollback_impl(self) -> None: 

2714 self.connection._rollback_impl() 

2715 

2716 def _connection_commit_impl(self) -> None: 

2717 self.connection._commit_impl() 

2718 

2719 def _close_impl(self, try_deactivate: bool = False) -> None: 

2720 try: 

2721 if self.is_active: 

2722 self._connection_rollback_impl() 

2723 

2724 if self.connection._nested_transaction: 

2725 self.connection._nested_transaction._cancel() 

2726 finally: 

2727 if self.is_active or try_deactivate: 

2728 self._deactivate_from_connection() 

2729 if self.connection._transaction is self: 

2730 self.connection._transaction = None 

2731 

2732 assert not self.is_active 

2733 assert self.connection._transaction is not self 

2734 

2735 def _do_close(self) -> None: 

2736 self._close_impl() 

2737 

2738 def _do_rollback(self) -> None: 

2739 self._close_impl(try_deactivate=True) 

2740 

2741 def _do_commit(self) -> None: 

2742 if self.is_active: 

2743 assert self.connection._transaction is self 

2744 

2745 try: 

2746 self._connection_commit_impl() 

2747 finally: 

2748 # whether or not commit succeeds, cancel any 

2749 # nested transactions, make this transaction "inactive" 

2750 # and remove it as a reset agent 

2751 if self.connection._nested_transaction: 

2752 self.connection._nested_transaction._cancel() 

2753 

2754 self._deactivate_from_connection() 

2755 

2756 # ...however only remove as the connection's current transaction 

2757 # if commit succeeded. otherwise it stays on so that a rollback 

2758 # needs to occur. 

2759 self.connection._transaction = None 

2760 else: 

2761 if self.connection._transaction is self: 

2762 self.connection._invalid_transaction() 

2763 else: 

2764 raise exc.InvalidRequestError("This transaction is inactive") 

2765 

2766 assert not self.is_active 

2767 assert self.connection._transaction is not self 

2768 

2769 

2770class NestedTransaction(Transaction): 

2771 """Represent a 'nested', or SAVEPOINT transaction. 

2772 

2773 The :class:`.NestedTransaction` object is created by calling the 

2774 :meth:`_engine.Connection.begin_nested` method of 

2775 :class:`_engine.Connection`. 

2776 

2777 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2778 "commit" / "rollback" are as follows: 

2779 

2780 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2781 the savepoint is given an explicit name that is part of the state 

2782 of this object. 

2783 

2784 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2785 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2786 with this :class:`.NestedTransaction`. 

2787 

2788 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2789 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2790 associated with this :class:`.NestedTransaction`. 

2791 

2792 The rationale for mimicking the semantics of an outer transaction in 

2793 terms of savepoints so that code may deal with a "savepoint" transaction 

2794 and an "outer" transaction in an agnostic way. 

2795 

2796 .. seealso:: 

2797 

2798 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2799 

2800 """ 

2801 

2802 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2803 

2804 _savepoint: str 

2805 

2806 def __init__(self, connection: Connection): 

2807 assert connection._transaction is not None 

2808 if connection._trans_context_manager: 

2809 TransactionalContext._trans_ctx_check(connection) 

2810 self.connection = connection 

2811 self._savepoint = self.connection._savepoint_impl() 

2812 self.is_active = True 

2813 self._previous_nested = connection._nested_transaction 

2814 connection._nested_transaction = self 

2815 

2816 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2817 if self.connection._nested_transaction is self: 

2818 self.connection._nested_transaction = self._previous_nested 

2819 elif warn: 

2820 util.warn( 

2821 "nested transaction already deassociated from connection" 

2822 ) 

2823 

2824 @property 

2825 def _deactivated_from_connection(self) -> bool: 

2826 return self.connection._nested_transaction is not self 

2827 

2828 def _cancel(self) -> None: 

2829 # called by RootTransaction when the outer transaction is 

2830 # committed, rolled back, or closed to cancel all savepoints 

2831 # without any action being taken 

2832 self.is_active = False 

2833 self._deactivate_from_connection() 

2834 if self._previous_nested: 

2835 self._previous_nested._cancel() 

2836 

2837 def _close_impl( 

2838 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2839 ) -> None: 

2840 try: 

2841 if ( 

2842 self.is_active 

2843 and self.connection._transaction 

2844 and self.connection._transaction.is_active 

2845 ): 

2846 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2847 finally: 

2848 self.is_active = False 

2849 

2850 if deactivate_from_connection: 

2851 self._deactivate_from_connection(warn=warn_already_deactive) 

2852 

2853 assert not self.is_active 

2854 if deactivate_from_connection: 

2855 assert self.connection._nested_transaction is not self 

2856 

2857 def _do_close(self) -> None: 

2858 self._close_impl(True, False) 

2859 

2860 def _do_rollback(self) -> None: 

2861 self._close_impl(True, True) 

2862 

2863 def _do_commit(self) -> None: 

2864 if self.is_active: 

2865 try: 

2866 self.connection._release_savepoint_impl(self._savepoint) 

2867 finally: 

2868 # nested trans becomes inactive on failed release 

2869 # unconditionally. this prevents it from trying to 

2870 # emit SQL when it rolls back. 

2871 self.is_active = False 

2872 

2873 # but only de-associate from connection if it succeeded 

2874 self._deactivate_from_connection() 

2875 else: 

2876 if self.connection._nested_transaction is self: 

2877 self.connection._invalid_transaction() 

2878 else: 

2879 raise exc.InvalidRequestError( 

2880 "This nested transaction is inactive" 

2881 ) 

2882 

2883 

2884class TwoPhaseTransaction(RootTransaction): 

2885 """Represent a two-phase transaction. 

2886 

2887 A new :class:`.TwoPhaseTransaction` object may be procured 

2888 using the :meth:`_engine.Connection.begin_twophase` method. 

2889 

2890 The interface is the same as that of :class:`.Transaction` 

2891 with the addition of the :meth:`prepare` method. 

2892 

2893 """ 

2894 

2895 __slots__ = ("xid", "_is_prepared") 

2896 

2897 xid: Any 

2898 

2899 def __init__(self, connection: Connection, xid: Any): 

2900 self._is_prepared = False 

2901 self.xid = xid 

2902 super().__init__(connection) 

2903 

2904 def prepare(self) -> None: 

2905 """Prepare this :class:`.TwoPhaseTransaction`. 

2906 

2907 After a PREPARE, the transaction can be committed. 

2908 

2909 """ 

2910 if not self.is_active: 

2911 raise exc.InvalidRequestError("This transaction is inactive") 

2912 self.connection._prepare_twophase_impl(self.xid) 

2913 self._is_prepared = True 

2914 

2915 def _connection_begin_impl(self) -> None: 

2916 self.connection._begin_twophase_impl(self) 

2917 

2918 def _connection_rollback_impl(self) -> None: 

2919 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2920 

2921 def _connection_commit_impl(self) -> None: 

2922 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2923 

2924 

2925class Engine( 

2926 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2927): 

2928 """ 

2929 Connects a :class:`~sqlalchemy.pool.Pool` and 

2930 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2931 source of database connectivity and behavior. 

2932 

2933 An :class:`_engine.Engine` object is instantiated publicly using the 

2934 :func:`~sqlalchemy.create_engine` function. 

2935 

2936 .. seealso:: 

2937 

2938 :doc:`/core/engines` 

2939 

2940 :ref:`connections_toplevel` 

2941 

2942 """ 

2943 

2944 dispatch: dispatcher[ConnectionEventsTarget] 

2945 

2946 _compiled_cache: Optional[CompiledCacheType] 

2947 

2948 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2949 _has_events: bool = False 

2950 _connection_cls: Type[Connection] = Connection 

2951 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2952 _is_future: bool = False 

2953 

2954 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2955 _option_cls: Type[OptionEngine] 

2956 

2957 dialect: Dialect 

2958 pool: Pool 

2959 url: URL 

2960 hide_parameters: bool 

2961 

2962 def __init__( 

2963 self, 

2964 pool: Pool, 

2965 dialect: Dialect, 

2966 url: URL, 

2967 logging_name: Optional[str] = None, 

2968 echo: Optional[_EchoFlagType] = None, 

2969 query_cache_size: int = 500, 

2970 execution_options: Optional[Mapping[str, Any]] = None, 

2971 hide_parameters: bool = False, 

2972 ): 

2973 self.pool = pool 

2974 self.url = url 

2975 self.dialect = dialect 

2976 if logging_name: 

2977 self.logging_name = logging_name 

2978 self.echo = echo 

2979 self.hide_parameters = hide_parameters 

2980 if query_cache_size != 0: 

2981 self._compiled_cache = util.LRUCache( 

2982 query_cache_size, size_alert=self._lru_size_alert 

2983 ) 

2984 else: 

2985 self._compiled_cache = None 

2986 log.instance_logger(self, echoflag=echo) 

2987 if execution_options: 

2988 self.update_execution_options(**execution_options) 

2989 

2990 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2991 if self._should_log_info(): 

2992 self.logger.info( 

2993 "Compiled cache size pruning from %d items to %d. " 

2994 "Increase cache size to reduce the frequency of pruning.", 

2995 len(cache), 

2996 cache.capacity, 

2997 ) 

2998 

2999 @property 

3000 def engine(self) -> Engine: 

3001 """Returns this :class:`.Engine`. 

3002 

3003 Used for legacy schemes that accept :class:`.Connection` / 

3004 :class:`.Engine` objects within the same variable. 

3005 

3006 """ 

3007 return self 

3008 

3009 def clear_compiled_cache(self) -> None: 

3010 """Clear the compiled cache associated with the dialect. 

3011 

3012 This applies **only** to the built-in cache that is established 

3013 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

3014 It will not impact any dictionary caches that were passed via the 

3015 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

3016 

3017 .. versionadded:: 1.4 

3018 

3019 """ 

3020 if self._compiled_cache: 

3021 self._compiled_cache.clear() 

3022 

3023 def update_execution_options(self, **opt: Any) -> None: 

3024 r"""Update the default execution_options dictionary 

3025 of this :class:`_engine.Engine`. 

3026 

3027 The given keys/values in \**opt are added to the 

3028 default execution options that will be used for 

3029 all connections. The initial contents of this dictionary 

3030 can be sent via the ``execution_options`` parameter 

3031 to :func:`_sa.create_engine`. 

3032 

3033 .. seealso:: 

3034 

3035 :meth:`_engine.Connection.execution_options` 

3036 

3037 :meth:`_engine.Engine.execution_options` 

3038 

3039 """ 

3040 self.dispatch.set_engine_execution_options(self, opt) 

3041 self._execution_options = self._execution_options.union(opt) 

3042 self.dialect.set_engine_execution_options(self, opt) 

3043 

3044 @overload 

3045 def execution_options( 

3046 self, 

3047 *, 

3048 compiled_cache: Optional[CompiledCacheType] = ..., 

3049 logging_token: str = ..., 

3050 isolation_level: IsolationLevel = ..., 

3051 insertmanyvalues_page_size: int = ..., 

3052 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3053 **opt: Any, 

3054 ) -> OptionEngine: ... 

3055 

3056 @overload 

3057 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3058 

3059 def execution_options(self, **opt: Any) -> OptionEngine: 

3060 """Return a new :class:`_engine.Engine` that will provide 

3061 :class:`_engine.Connection` objects with the given execution options. 

3062 

3063 The returned :class:`_engine.Engine` remains related to the original 

3064 :class:`_engine.Engine` in that it shares the same connection pool and 

3065 other state: 

3066 

3067 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3068 is the 

3069 same instance. The :meth:`_engine.Engine.dispose` 

3070 method will replace 

3071 the connection pool instance for the parent engine as well 

3072 as this one. 

3073 * Event listeners are "cascaded" - meaning, the new 

3074 :class:`_engine.Engine` 

3075 inherits the events of the parent, and new events can be associated 

3076 with the new :class:`_engine.Engine` individually. 

3077 * The logging configuration and logging_name is copied from the parent 

3078 :class:`_engine.Engine`. 

3079 

3080 The intent of the :meth:`_engine.Engine.execution_options` method is 

3081 to implement schemes where multiple :class:`_engine.Engine` 

3082 objects refer to the same connection pool, but are differentiated 

3083 by options that affect some execution-level behavior for each 

3084 engine. One such example is breaking into separate "reader" and 

3085 "writer" :class:`_engine.Engine` instances, where one 

3086 :class:`_engine.Engine` 

3087 has a lower :term:`isolation level` setting configured or is even 

3088 transaction-disabled using "autocommit". An example of this 

3089 configuration is at :ref:`dbapi_autocommit_multiple`. 

3090 

3091 Another example is one that 

3092 uses a custom option ``shard_id`` which is consumed by an event 

3093 to change the current schema on a database connection:: 

3094 

3095 from sqlalchemy import event 

3096 from sqlalchemy.engine import Engine 

3097 

3098 primary_engine = create_engine("mysql+mysqldb://") 

3099 shard1 = primary_engine.execution_options(shard_id="shard1") 

3100 shard2 = primary_engine.execution_options(shard_id="shard2") 

3101 

3102 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3103 

3104 

3105 @event.listens_for(Engine, "before_cursor_execute") 

3106 def _switch_shard(conn, cursor, stmt, params, context, executemany): 

3107 shard_id = conn.get_execution_options().get("shard_id", "default") 

3108 current_shard = conn.info.get("current_shard", None) 

3109 

3110 if current_shard != shard_id: 

3111 cursor.execute("use %s" % shards[shard_id]) 

3112 conn.info["current_shard"] = shard_id 

3113 

3114 The above recipe illustrates two :class:`_engine.Engine` objects that 

3115 will each serve as factories for :class:`_engine.Connection` objects 

3116 that have pre-established "shard_id" execution options present. A 

3117 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3118 then interprets this execution option to emit a MySQL ``use`` statement 

3119 to switch databases before a statement execution, while at the same 

3120 time keeping track of which database we've established using the 

3121 :attr:`_engine.Connection.info` dictionary. 

3122 

3123 .. seealso:: 

3124 

3125 :meth:`_engine.Connection.execution_options` 

3126 - update execution options 

3127 on a :class:`_engine.Connection` object. 

3128 

3129 :meth:`_engine.Engine.update_execution_options` 

3130 - update the execution 

3131 options for a given :class:`_engine.Engine` in place. 

3132 

3133 :meth:`_engine.Engine.get_execution_options` 

3134 

3135 

3136 """ # noqa: E501 

3137 return self._option_cls(self, opt) 

3138 

3139 def get_execution_options(self) -> _ExecuteOptions: 

3140 """Get the non-SQL options which will take effect during execution. 

3141 

3142 .. seealso:: 

3143 

3144 :meth:`_engine.Engine.execution_options` 

3145 """ 

3146 return self._execution_options 

3147 

3148 @property 

3149 def name(self) -> str: 

3150 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3151 in use by this :class:`Engine`. 

3152 

3153 """ 

3154 

3155 return self.dialect.name 

3156 

3157 @property 

3158 def driver(self) -> str: 

3159 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3160 in use by this :class:`Engine`. 

3161 

3162 """ 

3163 

3164 return self.dialect.driver 

3165 

3166 echo = log.echo_property() 

3167 

3168 def __repr__(self) -> str: 

3169 return "Engine(%r)" % (self.url,) 

3170 

3171 def dispose(self, close: bool = True) -> None: 

3172 """Dispose of the connection pool used by this 

3173 :class:`_engine.Engine`. 

3174 

3175 A new connection pool is created immediately after the old one has been 

3176 disposed. The previous connection pool is disposed either actively, by 

3177 closing out all currently checked-in connections in that pool, or 

3178 passively, by losing references to it but otherwise not closing any 

3179 connections. The latter strategy is more appropriate for an initializer 

3180 in a forked Python process. 

3181 

3182 :param close: if left at its default of ``True``, has the 

3183 effect of fully closing all **currently checked in** 

3184 database connections. Connections that are still checked out 

3185 will **not** be closed, however they will no longer be associated 

3186 with this :class:`_engine.Engine`, 

3187 so when they are closed individually, eventually the 

3188 :class:`_pool.Pool` which they are associated with will 

3189 be garbage collected and they will be closed out fully, if 

3190 not already closed on checkin. 

3191 

3192 If set to ``False``, the previous connection pool is de-referenced, 

3193 and otherwise not touched in any way. 

3194 

3195 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3196 parameter to allow the replacement of a connection pool in a child 

3197 process without interfering with the connections used by the parent 

3198 process. 

3199 

3200 

3201 .. seealso:: 

3202 

3203 :ref:`engine_disposal` 

3204 

3205 :ref:`pooling_multiprocessing` 

3206 

3207 """ 

3208 if close: 

3209 self.pool.dispose() 

3210 self.pool = self.pool.recreate() 

3211 self.dispatch.engine_disposed(self) 

3212 

3213 @contextlib.contextmanager 

3214 def _optional_conn_ctx_manager( 

3215 self, connection: Optional[Connection] = None 

3216 ) -> Iterator[Connection]: 

3217 if connection is None: 

3218 with self.connect() as conn: 

3219 yield conn 

3220 else: 

3221 yield connection 

3222 

3223 @contextlib.contextmanager 

3224 def begin(self) -> Iterator[Connection]: 

3225 """Return a context manager delivering a :class:`_engine.Connection` 

3226 with a :class:`.Transaction` established. 

3227 

3228 E.g.:: 

3229 

3230 with engine.begin() as conn: 

3231 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)")) 

3232 conn.execute(text("my_special_procedure(5)")) 

3233 

3234 Upon successful operation, the :class:`.Transaction` 

3235 is committed. If an error is raised, the :class:`.Transaction` 

3236 is rolled back. 

3237 

3238 .. seealso:: 

3239 

3240 :meth:`_engine.Engine.connect` - procure a 

3241 :class:`_engine.Connection` from 

3242 an :class:`_engine.Engine`. 

3243 

3244 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3245 for a particular :class:`_engine.Connection`. 

3246 

3247 """ # noqa: E501 

3248 with self.connect() as conn: 

3249 with conn.begin(): 

3250 yield conn 

3251 

3252 def _run_ddl_visitor( 

3253 self, 

3254 visitorcallable: Type[InvokeDDLBase], 

3255 element: SchemaVisitable, 

3256 **kwargs: Any, 

3257 ) -> None: 

3258 with self.begin() as conn: 

3259 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3260 

3261 def connect(self) -> Connection: 

3262 """Return a new :class:`_engine.Connection` object. 

3263 

3264 The :class:`_engine.Connection` acts as a Python context manager, so 

3265 the typical use of this method looks like:: 

3266 

3267 with engine.connect() as connection: 

3268 connection.execute(text("insert into table values ('foo')")) 

3269 connection.commit() 

3270 

3271 Where above, after the block is completed, the connection is "closed" 

3272 and its underlying DBAPI resources are returned to the connection pool. 

3273 This also has the effect of rolling back any transaction that 

3274 was explicitly begun or was begun via autobegin, and will 

3275 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3276 started and is still in progress. 

3277 

3278 .. seealso:: 

3279 

3280 :meth:`_engine.Engine.begin` 

3281 

3282 """ 

3283 

3284 return self._connection_cls(self) 

3285 

3286 def raw_connection(self) -> PoolProxiedConnection: 

3287 """Return a "raw" DBAPI connection from the connection pool. 

3288 

3289 The returned object is a proxied version of the DBAPI 

3290 connection object used by the underlying driver in use. 

3291 The object will have all the same behavior as the real DBAPI 

3292 connection, except that its ``close()`` method will result in the 

3293 connection being returned to the pool, rather than being closed 

3294 for real. 

3295 

3296 This method provides direct DBAPI connection access for 

3297 special situations when the API provided by 

3298 :class:`_engine.Connection` 

3299 is not needed. When a :class:`_engine.Connection` object is already 

3300 present, the DBAPI connection is available using 

3301 the :attr:`_engine.Connection.connection` accessor. 

3302 

3303 .. seealso:: 

3304 

3305 :ref:`dbapi_connections` 

3306 

3307 """ 

3308 return self.pool.connect() 

3309 

3310 

3311class OptionEngineMixin(log.Identified): 

3312 _sa_propagate_class_events = False 

3313 

3314 dispatch: dispatcher[ConnectionEventsTarget] 

3315 _compiled_cache: Optional[CompiledCacheType] 

3316 dialect: Dialect 

3317 pool: Pool 

3318 url: URL 

3319 hide_parameters: bool 

3320 echo: log.echo_property 

3321 

3322 def __init__( 

3323 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3324 ): 

3325 self._proxied = proxied 

3326 self.url = proxied.url 

3327 self.dialect = proxied.dialect 

3328 self.logging_name = proxied.logging_name 

3329 self.echo = proxied.echo 

3330 self._compiled_cache = proxied._compiled_cache 

3331 self.hide_parameters = proxied.hide_parameters 

3332 log.instance_logger(self, echoflag=self.echo) 

3333 

3334 # note: this will propagate events that are assigned to the parent 

3335 # engine after this OptionEngine is created. Since we share 

3336 # the events of the parent we also disallow class-level events 

3337 # to apply to the OptionEngine class directly. 

3338 # 

3339 # the other way this can work would be to transfer existing 

3340 # events only, using: 

3341 # self.dispatch._update(proxied.dispatch) 

3342 # 

3343 # that might be more appropriate however it would be a behavioral 

3344 # change for logic that assigns events to the parent engine and 

3345 # would like it to take effect for the already-created sub-engine. 

3346 self.dispatch = self.dispatch._join(proxied.dispatch) 

3347 

3348 self._execution_options = proxied._execution_options 

3349 self.update_execution_options(**execution_options) 

3350 

3351 def update_execution_options(self, **opt: Any) -> None: 

3352 raise NotImplementedError() 

3353 

3354 if not typing.TYPE_CHECKING: 

3355 # https://github.com/python/typing/discussions/1095 

3356 

3357 @property 

3358 def pool(self) -> Pool: 

3359 return self._proxied.pool 

3360 

3361 @pool.setter 

3362 def pool(self, pool: Pool) -> None: 

3363 self._proxied.pool = pool 

3364 

3365 @property 

3366 def _has_events(self) -> bool: 

3367 return self._proxied._has_events or self.__dict__.get( 

3368 "_has_events", False 

3369 ) 

3370 

3371 @_has_events.setter 

3372 def _has_events(self, value: bool) -> None: 

3373 self.__dict__["_has_events"] = value 

3374 

3375 

3376class OptionEngine(OptionEngineMixin, Engine): 

3377 def update_execution_options(self, **opt: Any) -> None: 

3378 Engine.update_execution_options(self, **opt) 

3379 

3380 

3381Engine._option_cls = OptionEngine