Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1000 statements  

1# engine/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.""" 

8from __future__ import annotations 

9 

10import contextlib 

11import sys 

12import typing 

13from typing import Any 

14from typing import Callable 

15from typing import cast 

16from typing import Iterable 

17from typing import Iterator 

18from typing import List 

19from typing import Mapping 

20from typing import NoReturn 

21from typing import Optional 

22from typing import overload 

23from typing import Tuple 

24from typing import Type 

25from typing import TypeVar 

26from typing import Union 

27 

28from .interfaces import BindTyping 

29from .interfaces import ConnectionEventsTarget 

30from .interfaces import DBAPICursor 

31from .interfaces import ExceptionContext 

32from .interfaces import ExecuteStyle 

33from .interfaces import ExecutionContext 

34from .interfaces import IsolationLevel 

35from .util import _distill_params_20 

36from .util import _distill_raw_params 

37from .util import TransactionalContext 

38from .. import exc 

39from .. import inspection 

40from .. import log 

41from .. import util 

42from ..sql import compiler 

43from ..sql import util as sql_util 

44from ..util.typing import Never 

45from ..util.typing import TupleAny 

46from ..util.typing import TypeVarTuple 

47from ..util.typing import Unpack 

48 

49if typing.TYPE_CHECKING: 

50 from . import CursorResult 

51 from . import ScalarResult 

52 from .interfaces import _AnyExecuteParams 

53 from .interfaces import _AnyMultiExecuteParams 

54 from .interfaces import _CoreAnyExecuteParams 

55 from .interfaces import _CoreMultiExecuteParams 

56 from .interfaces import _CoreSingleExecuteParams 

57 from .interfaces import _DBAPIAnyExecuteParams 

58 from .interfaces import _DBAPISingleExecuteParams 

59 from .interfaces import _ExecuteOptions 

60 from .interfaces import CompiledCacheType 

61 from .interfaces import CoreExecuteOptionsParameter 

62 from .interfaces import Dialect 

63 from .interfaces import SchemaTranslateMapType 

64 from .reflection import Inspector # noqa 

65 from .url import URL 

66 from ..event import dispatcher 

67 from ..log import _EchoFlagType 

68 from ..pool import _ConnectionFairy 

69 from ..pool import Pool 

70 from ..pool import PoolProxiedConnection 

71 from ..sql import Executable 

72 from ..sql._typing import _InfoType 

73 from ..sql.compiler import Compiled 

74 from ..sql.ddl import ExecutableDDLElement 

75 from ..sql.ddl import InvokeDDLBase 

76 from ..sql.functions import FunctionElement 

77 from ..sql.schema import DefaultGenerator 

78 from ..sql.schema import HasSchemaAttr 

79 from ..sql.schema import SchemaVisitable 

80 from ..sql.selectable import TypedReturnsRows 

81 

82 

83_T = TypeVar("_T", bound=Any) 

84_Ts = TypeVarTuple("_Ts") 

85_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

86NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

87 

88 

89class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

90 """Provides high-level functionality for a wrapped DB-API connection. 

91 

92 The :class:`_engine.Connection` object is procured by calling the 

93 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

94 object, and provides services for execution of SQL statements as well 

95 as transaction control. 

96 

97 The Connection object is **not** thread-safe. While a Connection can be 

98 shared among threads using properly synchronized access, it is still 

99 possible that the underlying DBAPI connection may not support shared 

100 access between threads. Check the DBAPI documentation for details. 

101 

102 The Connection object represents a single DBAPI connection checked out 

103 from the connection pool. In this state, the connection pool has no 

104 affect upon the connection, including its expiration or timeout state. 

105 For the connection pool to properly manage connections, connections 

106 should be returned to the connection pool (i.e. ``connection.close()``) 

107 whenever the connection is not in use. 

108 

109 .. index:: 

110 single: thread safety; Connection 

111 

112 """ 

113 

114 dialect: Dialect 

115 dispatch: dispatcher[ConnectionEventsTarget] 

116 

117 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

118 

119 # used by sqlalchemy.engine.util.TransactionalContext 

120 _trans_context_manager: Optional[TransactionalContext] = None 

121 

122 # legacy as of 2.0, should be eventually deprecated and 

123 # removed. was used in the "pre_ping" recipe that's been in the docs 

124 # a long time 

125 should_close_with_result = False 

126 

127 _dbapi_connection: Optional[PoolProxiedConnection] 

128 

129 _execution_options: _ExecuteOptions 

130 

131 _transaction: Optional[RootTransaction] 

132 _nested_transaction: Optional[NestedTransaction] 

133 

134 def __init__( 

135 self, 

136 engine: Engine, 

137 connection: Optional[PoolProxiedConnection] = None, 

138 _has_events: Optional[bool] = None, 

139 _allow_revalidate: bool = True, 

140 _allow_autobegin: bool = True, 

141 ): 

142 """Construct a new Connection.""" 

143 self.engine = engine 

144 self.dialect = dialect = engine.dialect 

145 

146 if connection is None: 

147 try: 

148 self._dbapi_connection = engine.raw_connection() 

149 except dialect.loaded_dbapi.Error as err: 

150 Connection._handle_dbapi_exception_noconnection( 

151 err, dialect, engine 

152 ) 

153 raise 

154 else: 

155 self._dbapi_connection = connection 

156 

157 self._transaction = self._nested_transaction = None 

158 self.__savepoint_seq = 0 

159 self.__in_begin = False 

160 

161 self.__can_reconnect = _allow_revalidate 

162 self._allow_autobegin = _allow_autobegin 

163 self._echo = self.engine._should_log_info() 

164 

165 if _has_events is None: 

166 # if _has_events is sent explicitly as False, 

167 # then don't join the dispatch of the engine; we don't 

168 # want to handle any of the engine's events in that case. 

169 self.dispatch = self.dispatch._join(engine.dispatch) 

170 self._has_events = _has_events or ( 

171 _has_events is None and engine._has_events 

172 ) 

173 

174 self._execution_options = engine._execution_options 

175 

176 if self._has_events or self.engine._has_events: 

177 self.dispatch.engine_connect(self) 

178 

179 # this can be assigned differently via 

180 # characteristics.LoggingTokenCharacteristic 

181 _message_formatter: Any = None 

182 

183 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

184 fmt = self._message_formatter 

185 

186 if fmt: 

187 message = fmt(message) 

188 

189 if log.STACKLEVEL: 

190 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

191 

192 self.engine.logger.info(message, *arg, **kw) 

193 

194 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

195 fmt = self._message_formatter 

196 

197 if fmt: 

198 message = fmt(message) 

199 

200 if log.STACKLEVEL: 

201 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

202 

203 self.engine.logger.debug(message, *arg, **kw) 

204 

205 @property 

206 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

207 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

208 self._execution_options.get("schema_translate_map", None) 

209 ) 

210 

211 return schema_translate_map 

212 

213 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

214 """Return the schema name for the given schema item taking into 

215 account current schema translate map. 

216 

217 """ 

218 

219 name = obj.schema 

220 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

221 self._execution_options.get("schema_translate_map", None) 

222 ) 

223 

224 if ( 

225 schema_translate_map 

226 and name in schema_translate_map 

227 and obj._use_schema_map 

228 ): 

229 return schema_translate_map[name] 

230 else: 

231 return name 

232 

233 def __enter__(self) -> Connection: 

234 return self 

235 

236 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

237 self.close() 

238 

239 @overload 

240 def execution_options( 

241 self, 

242 *, 

243 compiled_cache: Optional[CompiledCacheType] = ..., 

244 logging_token: str = ..., 

245 isolation_level: IsolationLevel = ..., 

246 no_parameters: bool = False, 

247 stream_results: bool = False, 

248 max_row_buffer: int = ..., 

249 yield_per: int = ..., 

250 insertmanyvalues_page_size: int = ..., 

251 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

252 preserve_rowcount: bool = False, 

253 driver_column_names: bool = False, 

254 **opt: Any, 

255 ) -> Connection: ... 

256 

257 @overload 

258 def execution_options(self, **opt: Any) -> Connection: ... 

259 

260 def execution_options(self, **opt: Any) -> Connection: 

261 r"""Set non-SQL options for the connection which take effect 

262 during execution. 

263 

264 This method modifies this :class:`_engine.Connection` **in-place**; 

265 the return value is the same :class:`_engine.Connection` object 

266 upon which the method is called. Note that this is in contrast 

267 to the behavior of the ``execution_options`` methods on other 

268 objects such as :meth:`_engine.Engine.execution_options` and 

269 :meth:`_sql.Executable.execution_options`. The rationale is that many 

270 such execution options necessarily modify the state of the base 

271 DBAPI connection in any case so there is no feasible means of 

272 keeping the effect of such an option localized to a "sub" connection. 

273 

274 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

275 method, in contrast to other objects with this method, modifies 

276 the connection in-place without creating copy of it. 

277 

278 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

279 method accepts any arbitrary parameters including user defined names. 

280 All parameters given are consumable in a number of ways including 

281 by using the :meth:`_engine.Connection.get_execution_options` method. 

282 See the examples at :meth:`_sql.Executable.execution_options` 

283 and :meth:`_engine.Engine.execution_options`. 

284 

285 The keywords that are currently recognized by SQLAlchemy itself 

286 include all those listed under :meth:`.Executable.execution_options`, 

287 as well as others that are specific to :class:`_engine.Connection`. 

288 

289 :param compiled_cache: Available on: :class:`_engine.Connection`, 

290 :class:`_engine.Engine`. 

291 

292 A dictionary where :class:`.Compiled` objects 

293 will be cached when the :class:`_engine.Connection` 

294 compiles a clause 

295 expression into a :class:`.Compiled` object. This dictionary will 

296 supersede the statement cache that may be configured on the 

297 :class:`_engine.Engine` itself. If set to None, caching 

298 is disabled, even if the engine has a configured cache size. 

299 

300 Note that the ORM makes use of its own "compiled" caches for 

301 some operations, including flush operations. The caching 

302 used by the ORM internally supersedes a cache dictionary 

303 specified here. 

304 

305 :param logging_token: Available on: :class:`_engine.Connection`, 

306 :class:`_engine.Engine`, :class:`_sql.Executable`. 

307 

308 Adds the specified string token surrounded by brackets in log 

309 messages logged by the connection, i.e. the logging that's enabled 

310 either via the :paramref:`_sa.create_engine.echo` flag or via the 

311 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

312 per-connection or per-sub-engine token to be available which is 

313 useful for debugging concurrent connection scenarios. 

314 

315 .. versionadded:: 1.4.0b2 

316 

317 .. seealso:: 

318 

319 :ref:`dbengine_logging_tokens` - usage example 

320 

321 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

322 name used by the Python logger object itself. 

323 

324 :param isolation_level: Available on: :class:`_engine.Connection`, 

325 :class:`_engine.Engine`. 

326 

327 Set the transaction isolation level for the lifespan of this 

328 :class:`_engine.Connection` object. 

329 Valid values include those string 

330 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

331 parameter passed to :func:`_sa.create_engine`. These levels are 

332 semi-database specific; see individual dialect documentation for 

333 valid levels. 

334 

335 The isolation level option applies the isolation level by emitting 

336 statements on the DBAPI connection, and **necessarily affects the 

337 original Connection object overall**. The isolation level will remain 

338 at the given setting until explicitly changed, or when the DBAPI 

339 connection itself is :term:`released` to the connection pool, i.e. the 

340 :meth:`_engine.Connection.close` method is called, at which time an 

341 event handler will emit additional statements on the DBAPI connection 

342 in order to revert the isolation level change. 

343 

344 .. note:: The ``isolation_level`` execution option may only be 

345 established before the :meth:`_engine.Connection.begin` method is 

346 called, as well as before any SQL statements are emitted which 

347 would otherwise trigger "autobegin", or directly after a call to 

348 :meth:`_engine.Connection.commit` or 

349 :meth:`_engine.Connection.rollback`. A database cannot change the 

350 isolation level on a transaction in progress. 

351 

352 .. note:: The ``isolation_level`` execution option is implicitly 

353 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

354 the :meth:`_engine.Connection.invalidate` method, or if a 

355 disconnection error occurs. The new connection produced after the 

356 invalidation will **not** have the selected isolation level 

357 re-applied to it automatically. 

358 

359 .. seealso:: 

360 

361 :ref:`dbapi_autocommit` 

362 

363 :meth:`_engine.Connection.get_isolation_level` 

364 - view current actual level 

365 

366 :param no_parameters: Available on: :class:`_engine.Connection`, 

367 :class:`_sql.Executable`. 

368 

369 When ``True``, if the final parameter 

370 list or dictionary is totally empty, will invoke the 

371 statement on the cursor as ``cursor.execute(statement)``, 

372 not passing the parameter collection at all. 

373 Some DBAPIs such as psycopg2 and mysql-python consider 

374 percent signs as significant only when parameters are 

375 present; this option allows code to generate SQL 

376 containing percent signs (and possibly other characters) 

377 that is neutral regarding whether it's executed by the DBAPI 

378 or piped into a script that's later invoked by 

379 command line tools. 

380 

381 :param stream_results: Available on: :class:`_engine.Connection`, 

382 :class:`_sql.Executable`. 

383 

384 Indicate to the dialect that results should be "streamed" and not 

385 pre-buffered, if possible. For backends such as PostgreSQL, MySQL 

386 and MariaDB, this indicates the use of a "server side cursor" as 

387 opposed to a client side cursor. Other backends such as that of 

388 Oracle Database may already use server side cursors by default. 

389 

390 The usage of 

391 :paramref:`_engine.Connection.execution_options.stream_results` is 

392 usually combined with setting a fixed number of rows to to be fetched 

393 in batches, to allow for efficient iteration of database rows while 

394 at the same time not loading all result rows into memory at once; 

395 this can be configured on a :class:`_engine.Result` object using the 

396 :meth:`_engine.Result.yield_per` method, after execution has 

397 returned a new :class:`_engine.Result`. If 

398 :meth:`_engine.Result.yield_per` is not used, 

399 the :paramref:`_engine.Connection.execution_options.stream_results` 

400 mode of operation will instead use a dynamically sized buffer 

401 which buffers sets of rows at a time, growing on each batch 

402 based on a fixed growth size up until a limit which may 

403 be configured using the 

404 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

405 parameter. 

406 

407 When using the ORM to fetch ORM mapped objects from a result, 

408 :meth:`_engine.Result.yield_per` should always be used with 

409 :paramref:`_engine.Connection.execution_options.stream_results`, 

410 so that the ORM does not fetch all rows into new ORM objects at once. 

411 

412 For typical use, the 

413 :paramref:`_engine.Connection.execution_options.yield_per` execution 

414 option should be preferred, which sets up both 

415 :paramref:`_engine.Connection.execution_options.stream_results` and 

416 :meth:`_engine.Result.yield_per` at once. This option is supported 

417 both at a core level by :class:`_engine.Connection` as well as by the 

418 ORM :class:`_engine.Session`; the latter is described at 

419 :ref:`orm_queryguide_yield_per`. 

420 

421 .. seealso:: 

422 

423 :ref:`engine_stream_results` - background on 

424 :paramref:`_engine.Connection.execution_options.stream_results` 

425 

426 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

427 

428 :paramref:`_engine.Connection.execution_options.yield_per` 

429 

430 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

431 describing the ORM version of ``yield_per`` 

432 

433 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

434 :class:`_sql.Executable`. Sets a maximum 

435 buffer size to use when the 

436 :paramref:`_engine.Connection.execution_options.stream_results` 

437 execution option is used on a backend that supports server side 

438 cursors. The default value if not specified is 1000. 

439 

440 .. seealso:: 

441 

442 :paramref:`_engine.Connection.execution_options.stream_results` 

443 

444 :ref:`engine_stream_results` 

445 

446 

447 :param yield_per: Available on: :class:`_engine.Connection`, 

448 :class:`_sql.Executable`. Integer value applied which will 

449 set the :paramref:`_engine.Connection.execution_options.stream_results` 

450 execution option and invoke :meth:`_engine.Result.yield_per` 

451 automatically at once. Allows equivalent functionality as 

452 is present when using this parameter with the ORM. 

453 

454 .. versionadded:: 1.4.40 

455 

456 .. seealso:: 

457 

458 :ref:`engine_stream_results` - background and examples 

459 on using server side cursors with Core. 

460 

461 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

462 describing the ORM version of ``yield_per`` 

463 

464 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

465 :class:`_engine.Engine`. Number of rows to format into an 

466 INSERT statement when the statement uses "insertmanyvalues" mode, 

467 which is a paged form of bulk insert that is used for many backends 

468 when using :term:`executemany` execution typically in conjunction 

469 with RETURNING. Defaults to 1000. May also be modified on a 

470 per-engine basis using the 

471 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

472 

473 .. versionadded:: 2.0 

474 

475 .. seealso:: 

476 

477 :ref:`engine_insertmanyvalues` 

478 

479 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

480 :class:`_engine.Engine`, :class:`_sql.Executable`. 

481 

482 A dictionary mapping schema names to schema names, that will be 

483 applied to the :paramref:`_schema.Table.schema` element of each 

484 :class:`_schema.Table` 

485 encountered when SQL or DDL expression elements 

486 are compiled into strings; the resulting schema name will be 

487 converted based on presence in the map of the original name. 

488 

489 .. seealso:: 

490 

491 :ref:`schema_translating` 

492 

493 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

494 attribute will be unconditionally memoized within the result and 

495 made available via the :attr:`.CursorResult.rowcount` attribute. 

496 Normally, this attribute is only preserved for UPDATE and DELETE 

497 statements. Using this option, the DBAPIs rowcount value can 

498 be accessed for other kinds of statements such as INSERT and SELECT, 

499 to the degree that the DBAPI supports these statements. See 

500 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

501 of this attribute. 

502 

503 .. versionadded:: 2.0.28 

504 

505 .. seealso:: 

506 

507 :meth:`_engine.Engine.execution_options` 

508 

509 :meth:`.Executable.execution_options` 

510 

511 :meth:`_engine.Connection.get_execution_options` 

512 

513 :ref:`orm_queryguide_execution_options` - documentation on all 

514 ORM-specific execution options 

515 

516 :param driver_column_names: When True, the returned 

517 :class:`_engine.CursorResult` will use the column names as written in 

518 ``cursor.description`` to set up the keys for the result set, 

519 including the names of columns for the :class:`_engine.Row` object as 

520 well as the dictionary keys when using :attr:`_engine.Row._mapping`. 

521 On backends that use "name normalization" such as Oracle Database to 

522 correct for lower case names being converted to all uppercase, this 

523 behavior is turned off and the raw UPPERCASE names in 

524 cursor.description will be present. 

525 

526 .. versionadded:: 2.1 

527 

528 """ # noqa 

529 if self._has_events or self.engine._has_events: 

530 self.dispatch.set_connection_execution_options(self, opt) 

531 self._execution_options = self._execution_options.union(opt) 

532 self.dialect.set_connection_execution_options(self, opt) 

533 return self 

534 

535 def get_execution_options(self) -> _ExecuteOptions: 

536 """Get the non-SQL options which will take effect during execution. 

537 

538 .. seealso:: 

539 

540 :meth:`_engine.Connection.execution_options` 

541 """ 

542 return self._execution_options 

543 

544 @property 

545 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

546 pool_proxied_connection = self._dbapi_connection 

547 return ( 

548 pool_proxied_connection is not None 

549 and pool_proxied_connection.is_valid 

550 ) 

551 

552 @property 

553 def closed(self) -> bool: 

554 """Return True if this connection is closed.""" 

555 

556 return self._dbapi_connection is None and not self.__can_reconnect 

557 

558 @property 

559 def invalidated(self) -> bool: 

560 """Return True if this connection was invalidated. 

561 

562 This does not indicate whether or not the connection was 

563 invalidated at the pool level, however 

564 

565 """ 

566 

567 # prior to 1.4, "invalid" was stored as a state independent of 

568 # "closed", meaning an invalidated connection could be "closed", 

569 # the _dbapi_connection would be None and closed=True, yet the 

570 # "invalid" flag would stay True. This meant that there were 

571 # three separate states (open/valid, closed/valid, closed/invalid) 

572 # when there is really no reason for that; a connection that's 

573 # "closed" does not need to be "invalid". So the state is now 

574 # represented by the two facts alone. 

575 

576 pool_proxied_connection = self._dbapi_connection 

577 return pool_proxied_connection is None and self.__can_reconnect 

578 

579 @property 

580 def connection(self) -> PoolProxiedConnection: 

581 """The underlying DB-API connection managed by this Connection. 

582 

583 This is a SQLAlchemy connection-pool proxied connection 

584 which then has the attribute 

585 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

586 actual driver connection. 

587 

588 .. seealso:: 

589 

590 

591 :ref:`dbapi_connections` 

592 

593 """ 

594 

595 if self._dbapi_connection is None: 

596 try: 

597 return self._revalidate_connection() 

598 except (exc.PendingRollbackError, exc.ResourceClosedError): 

599 raise 

600 except BaseException as e: 

601 self._handle_dbapi_exception(e, None, None, None, None) 

602 else: 

603 return self._dbapi_connection 

604 

605 def get_isolation_level(self) -> IsolationLevel: 

606 """Return the current **actual** isolation level that's present on 

607 the database within the scope of this connection. 

608 

609 This attribute will perform a live SQL operation against the database 

610 in order to procure the current isolation level, so the value returned 

611 is the actual level on the underlying DBAPI connection regardless of 

612 how this state was set. This will be one of the four actual isolation 

613 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

614 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

615 level setting. Third party dialects may also feature additional 

616 isolation level settings. 

617 

618 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

619 isolation level, which is a separate :term:`dbapi` setting that's 

620 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

621 in use, the database connection still has a "traditional" isolation 

622 mode in effect, that is typically one of the four values 

623 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

624 ``SERIALIZABLE``. 

625 

626 Compare to the :attr:`_engine.Connection.default_isolation_level` 

627 accessor which returns the isolation level that is present on the 

628 database at initial connection time. 

629 

630 .. seealso:: 

631 

632 :attr:`_engine.Connection.default_isolation_level` 

633 - view default level 

634 

635 :paramref:`_sa.create_engine.isolation_level` 

636 - set per :class:`_engine.Engine` isolation level 

637 

638 :paramref:`.Connection.execution_options.isolation_level` 

639 - set per :class:`_engine.Connection` isolation level 

640 

641 """ 

642 dbapi_connection = self.connection.dbapi_connection 

643 assert dbapi_connection is not None 

644 try: 

645 return self.dialect.get_isolation_level(dbapi_connection) 

646 except BaseException as e: 

647 self._handle_dbapi_exception(e, None, None, None, None) 

648 

649 @property 

650 def default_isolation_level(self) -> Optional[IsolationLevel]: 

651 """The initial-connection time isolation level associated with the 

652 :class:`_engine.Dialect` in use. 

653 

654 This value is independent of the 

655 :paramref:`.Connection.execution_options.isolation_level` and 

656 :paramref:`.Engine.execution_options.isolation_level` execution 

657 options, and is determined by the :class:`_engine.Dialect` when the 

658 first connection is created, by performing a SQL query against the 

659 database for the current isolation level before any additional commands 

660 have been emitted. 

661 

662 Calling this accessor does not invoke any new SQL queries. 

663 

664 .. seealso:: 

665 

666 :meth:`_engine.Connection.get_isolation_level` 

667 - view current actual isolation level 

668 

669 :paramref:`_sa.create_engine.isolation_level` 

670 - set per :class:`_engine.Engine` isolation level 

671 

672 :paramref:`.Connection.execution_options.isolation_level` 

673 - set per :class:`_engine.Connection` isolation level 

674 

675 """ 

676 return self.dialect.default_isolation_level 

677 

678 def _invalid_transaction(self) -> NoReturn: 

679 raise exc.PendingRollbackError( 

680 "Can't reconnect until invalid %stransaction is rolled " 

681 "back. Please rollback() fully before proceeding" 

682 % ("savepoint " if self._nested_transaction is not None else ""), 

683 code="8s2b", 

684 ) 

685 

686 def _revalidate_connection(self) -> PoolProxiedConnection: 

687 if self.__can_reconnect and self.invalidated: 

688 if self._transaction is not None: 

689 self._invalid_transaction() 

690 self._dbapi_connection = self.engine.raw_connection() 

691 return self._dbapi_connection 

692 raise exc.ResourceClosedError("This Connection is closed") 

693 

694 @property 

695 def info(self) -> _InfoType: 

696 """Info dictionary associated with the underlying DBAPI connection 

697 referred to by this :class:`_engine.Connection`, allowing user-defined 

698 data to be associated with the connection. 

699 

700 The data here will follow along with the DBAPI connection including 

701 after it is returned to the connection pool and used again 

702 in subsequent instances of :class:`_engine.Connection`. 

703 

704 """ 

705 

706 return self.connection.info 

707 

708 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

709 """Invalidate the underlying DBAPI connection associated with 

710 this :class:`_engine.Connection`. 

711 

712 An attempt will be made to close the underlying DBAPI connection 

713 immediately; however if this operation fails, the error is logged 

714 but not raised. The connection is then discarded whether or not 

715 close() succeeded. 

716 

717 Upon the next use (where "use" typically means using the 

718 :meth:`_engine.Connection.execute` method or similar), 

719 this :class:`_engine.Connection` will attempt to 

720 procure a new DBAPI connection using the services of the 

721 :class:`_pool.Pool` as a source of connectivity (e.g. 

722 a "reconnection"). 

723 

724 If a transaction was in progress (e.g. the 

725 :meth:`_engine.Connection.begin` method has been called) when 

726 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

727 level all state associated with this transaction is lost, as 

728 the DBAPI connection is closed. The :class:`_engine.Connection` 

729 will not allow a reconnection to proceed until the 

730 :class:`.Transaction` object is ended, by calling the 

731 :meth:`.Transaction.rollback` method; until that point, any attempt at 

732 continuing to use the :class:`_engine.Connection` will raise an 

733 :class:`~sqlalchemy.exc.InvalidRequestError`. 

734 This is to prevent applications from accidentally 

735 continuing an ongoing transactional operations despite the 

736 fact that the transaction has been lost due to an 

737 invalidation. 

738 

739 The :meth:`_engine.Connection.invalidate` method, 

740 just like auto-invalidation, 

741 will at the connection pool level invoke the 

742 :meth:`_events.PoolEvents.invalidate` event. 

743 

744 :param exception: an optional ``Exception`` instance that's the 

745 reason for the invalidation. is passed along to event handlers 

746 and logging functions. 

747 

748 .. seealso:: 

749 

750 :ref:`pool_connection_invalidation` 

751 

752 """ 

753 

754 if self.invalidated: 

755 return 

756 

757 if self.closed: 

758 raise exc.ResourceClosedError("This Connection is closed") 

759 

760 if self._still_open_and_dbapi_connection_is_valid: 

761 pool_proxied_connection = self._dbapi_connection 

762 assert pool_proxied_connection is not None 

763 pool_proxied_connection.invalidate(exception) 

764 

765 self._dbapi_connection = None 

766 

767 def detach(self) -> None: 

768 """Detach the underlying DB-API connection from its connection pool. 

769 

770 E.g.:: 

771 

772 with engine.connect() as conn: 

773 conn.detach() 

774 conn.execute(text("SET search_path TO schema1, schema2")) 

775 

776 # work with connection 

777 

778 # connection is fully closed (since we used "with:", can 

779 # also call .close()) 

780 

781 This :class:`_engine.Connection` instance will remain usable. 

782 When closed 

783 (or exited from a context manager context as above), 

784 the DB-API connection will be literally closed and not 

785 returned to its originating pool. 

786 

787 This method can be used to insulate the rest of an application 

788 from a modified state on a connection (such as a transaction 

789 isolation level or similar). 

790 

791 """ 

792 

793 if self.closed: 

794 raise exc.ResourceClosedError("This Connection is closed") 

795 

796 pool_proxied_connection = self._dbapi_connection 

797 if pool_proxied_connection is None: 

798 raise exc.InvalidRequestError( 

799 "Can't detach an invalidated Connection" 

800 ) 

801 pool_proxied_connection.detach() 

802 

803 def _autobegin(self) -> None: 

804 if self._allow_autobegin and not self.__in_begin: 

805 self.begin() 

806 

807 def begin(self) -> RootTransaction: 

808 """Begin a transaction prior to autobegin occurring. 

809 

810 E.g.:: 

811 

812 with engine.connect() as conn: 

813 with conn.begin() as trans: 

814 conn.execute(table.insert(), {"username": "sandy"}) 

815 

816 The returned object is an instance of :class:`_engine.RootTransaction`. 

817 This object represents the "scope" of the transaction, 

818 which completes when either the :meth:`_engine.Transaction.rollback` 

819 or :meth:`_engine.Transaction.commit` method is called; the object 

820 also works as a context manager as illustrated above. 

821 

822 The :meth:`_engine.Connection.begin` method begins a 

823 transaction that normally will be begun in any case when the connection 

824 is first used to execute a statement. The reason this method might be 

825 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

826 event at a specific time, or to organize code within the scope of a 

827 connection checkout in terms of context managed blocks, such as:: 

828 

829 with engine.connect() as conn: 

830 with conn.begin(): 

831 conn.execute(...) 

832 conn.execute(...) 

833 

834 with conn.begin(): 

835 conn.execute(...) 

836 conn.execute(...) 

837 

838 The above code is not fundamentally any different in its behavior than 

839 the following code which does not use 

840 :meth:`_engine.Connection.begin`; the below style is known 

841 as "commit as you go" style:: 

842 

843 with engine.connect() as conn: 

844 conn.execute(...) 

845 conn.execute(...) 

846 conn.commit() 

847 

848 conn.execute(...) 

849 conn.execute(...) 

850 conn.commit() 

851 

852 From a database point of view, the :meth:`_engine.Connection.begin` 

853 method does not emit any SQL or change the state of the underlying 

854 DBAPI connection in any way; the Python DBAPI does not have any 

855 concept of explicit transaction begin. 

856 

857 .. seealso:: 

858 

859 :ref:`tutorial_working_with_transactions` - in the 

860 :ref:`unified_tutorial` 

861 

862 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

863 

864 :meth:`_engine.Connection.begin_twophase` - 

865 use a two phase /XID transaction 

866 

867 :meth:`_engine.Engine.begin` - context manager available from 

868 :class:`_engine.Engine` 

869 

870 """ 

871 if self._transaction is None: 

872 self._transaction = RootTransaction(self) 

873 return self._transaction 

874 else: 

875 raise exc.InvalidRequestError( 

876 "This connection has already initialized a SQLAlchemy " 

877 "Transaction() object via begin() or autobegin; can't " 

878 "call begin() here unless rollback() or commit() " 

879 "is called first." 

880 ) 

881 

882 def begin_nested(self) -> NestedTransaction: 

883 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

884 handle that controls the scope of the SAVEPOINT. 

885 

886 E.g.:: 

887 

888 with engine.begin() as connection: 

889 with connection.begin_nested(): 

890 connection.execute(table.insert(), {"username": "sandy"}) 

891 

892 The returned object is an instance of 

893 :class:`_engine.NestedTransaction`, which includes transactional 

894 methods :meth:`_engine.NestedTransaction.commit` and 

895 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

896 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

897 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

898 to the :class:`_engine.NestedTransaction` object and is generated 

899 automatically. Like any other :class:`_engine.Transaction`, the 

900 :class:`_engine.NestedTransaction` may be used as a context manager as 

901 illustrated above which will "release" or "rollback" corresponding to 

902 if the operation within the block were successful or raised an 

903 exception. 

904 

905 Nested transactions require SAVEPOINT support in the underlying 

906 database, else the behavior is undefined. SAVEPOINT is commonly used to 

907 run operations within a transaction that may fail, while continuing the 

908 outer transaction. E.g.:: 

909 

910 from sqlalchemy import exc 

911 

912 with engine.begin() as connection: 

913 trans = connection.begin_nested() 

914 try: 

915 connection.execute(table.insert(), {"username": "sandy"}) 

916 trans.commit() 

917 except exc.IntegrityError: # catch for duplicate username 

918 trans.rollback() # rollback to savepoint 

919 

920 # outer transaction continues 

921 connection.execute(...) 

922 

923 If :meth:`_engine.Connection.begin_nested` is called without first 

924 calling :meth:`_engine.Connection.begin` or 

925 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

926 will "autobegin" the outer transaction first. This outer transaction 

927 may be committed using "commit-as-you-go" style, e.g.:: 

928 

929 with engine.connect() as connection: # begin() wasn't called 

930 

931 with connection.begin_nested(): # will auto-"begin()" first 

932 connection.execute(...) 

933 # savepoint is released 

934 

935 connection.execute(...) 

936 

937 # explicitly commit outer transaction 

938 connection.commit() 

939 

940 # can continue working with connection here 

941 

942 .. versionchanged:: 2.0 

943 

944 :meth:`_engine.Connection.begin_nested` will now participate 

945 in the connection "autobegin" behavior that is new as of 

946 2.0 / "future" style connections in 1.4. 

947 

948 .. seealso:: 

949 

950 :meth:`_engine.Connection.begin` 

951 

952 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

953 

954 """ 

955 if self._transaction is None: 

956 self._autobegin() 

957 

958 return NestedTransaction(self) 

959 

960 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

961 """Begin a two-phase or XA transaction and return a transaction 

962 handle. 

963 

964 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

965 which in addition to the methods provided by 

966 :class:`.Transaction`, also provides a 

967 :meth:`~.TwoPhaseTransaction.prepare` method. 

968 

969 :param xid: the two phase transaction id. If not supplied, a 

970 random id will be generated. 

971 

972 .. seealso:: 

973 

974 :meth:`_engine.Connection.begin` 

975 

976 :meth:`_engine.Connection.begin_twophase` 

977 

978 """ 

979 

980 if self._transaction is not None: 

981 raise exc.InvalidRequestError( 

982 "Cannot start a two phase transaction when a transaction " 

983 "is already in progress." 

984 ) 

985 if xid is None: 

986 xid = self.engine.dialect.create_xid() 

987 return TwoPhaseTransaction(self, xid) 

988 

989 def commit(self) -> None: 

990 """Commit the transaction that is currently in progress. 

991 

992 This method commits the current transaction if one has been started. 

993 If no transaction was started, the method has no effect, assuming 

994 the connection is in a non-invalidated state. 

995 

996 A transaction is begun on a :class:`_engine.Connection` automatically 

997 whenever a statement is first executed, or when the 

998 :meth:`_engine.Connection.begin` method is called. 

999 

1000 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

1001 the primary database transaction that is linked to the 

1002 :class:`_engine.Connection` object. It does not operate upon a 

1003 SAVEPOINT that would have been invoked from the 

1004 :meth:`_engine.Connection.begin_nested` method; for control of a 

1005 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

1006 :class:`_engine.NestedTransaction` that is returned by the 

1007 :meth:`_engine.Connection.begin_nested` method itself. 

1008 

1009 

1010 """ 

1011 if self._transaction: 

1012 self._transaction.commit() 

1013 

1014 def rollback(self) -> None: 

1015 """Roll back the transaction that is currently in progress. 

1016 

1017 This method rolls back the current transaction if one has been started. 

1018 If no transaction was started, the method has no effect. If a 

1019 transaction was started and the connection is in an invalidated state, 

1020 the transaction is cleared using this method. 

1021 

1022 A transaction is begun on a :class:`_engine.Connection` automatically 

1023 whenever a statement is first executed, or when the 

1024 :meth:`_engine.Connection.begin` method is called. 

1025 

1026 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1027 upon the primary database transaction that is linked to the 

1028 :class:`_engine.Connection` object. It does not operate upon a 

1029 SAVEPOINT that would have been invoked from the 

1030 :meth:`_engine.Connection.begin_nested` method; for control of a 

1031 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1032 :class:`_engine.NestedTransaction` that is returned by the 

1033 :meth:`_engine.Connection.begin_nested` method itself. 

1034 

1035 

1036 """ 

1037 if self._transaction: 

1038 self._transaction.rollback() 

1039 

1040 def recover_twophase(self) -> List[Any]: 

1041 return self.engine.dialect.do_recover_twophase(self) 

1042 

1043 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1044 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1045 

1046 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1047 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1048 

1049 def in_transaction(self) -> bool: 

1050 """Return True if a transaction is in progress.""" 

1051 return self._transaction is not None and self._transaction.is_active 

1052 

1053 def in_nested_transaction(self) -> bool: 

1054 """Return True if a transaction is in progress.""" 

1055 return ( 

1056 self._nested_transaction is not None 

1057 and self._nested_transaction.is_active 

1058 ) 

1059 

1060 def _is_autocommit_isolation(self) -> bool: 

1061 opt_iso = self._execution_options.get("isolation_level", None) 

1062 return bool( 

1063 opt_iso == "AUTOCOMMIT" 

1064 or ( 

1065 opt_iso is None 

1066 and self.engine.dialect._on_connect_isolation_level 

1067 == "AUTOCOMMIT" 

1068 ) 

1069 ) 

1070 

1071 def _get_required_transaction(self) -> RootTransaction: 

1072 trans = self._transaction 

1073 if trans is None: 

1074 raise exc.InvalidRequestError("connection is not in a transaction") 

1075 return trans 

1076 

1077 def _get_required_nested_transaction(self) -> NestedTransaction: 

1078 trans = self._nested_transaction 

1079 if trans is None: 

1080 raise exc.InvalidRequestError( 

1081 "connection is not in a nested transaction" 

1082 ) 

1083 return trans 

1084 

1085 def get_transaction(self) -> Optional[RootTransaction]: 

1086 """Return the current root transaction in progress, if any. 

1087 

1088 .. versionadded:: 1.4 

1089 

1090 """ 

1091 

1092 return self._transaction 

1093 

1094 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1095 """Return the current nested transaction in progress, if any. 

1096 

1097 .. versionadded:: 1.4 

1098 

1099 """ 

1100 return self._nested_transaction 

1101 

1102 def _begin_impl(self, transaction: RootTransaction) -> None: 

1103 if self._echo: 

1104 if self._is_autocommit_isolation(): 

1105 self._log_info( 

1106 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1107 "autocommit mode)" 

1108 ) 

1109 else: 

1110 self._log_info("BEGIN (implicit)") 

1111 

1112 self.__in_begin = True 

1113 

1114 if self._has_events or self.engine._has_events: 

1115 self.dispatch.begin(self) 

1116 

1117 try: 

1118 self.engine.dialect.do_begin(self.connection) 

1119 except BaseException as e: 

1120 self._handle_dbapi_exception(e, None, None, None, None) 

1121 finally: 

1122 self.__in_begin = False 

1123 

1124 def _rollback_impl(self) -> None: 

1125 if self._has_events or self.engine._has_events: 

1126 self.dispatch.rollback(self) 

1127 

1128 if self._still_open_and_dbapi_connection_is_valid: 

1129 if self._echo: 

1130 if self._is_autocommit_isolation(): 

1131 if self.dialect.skip_autocommit_rollback: 

1132 self._log_info( 

1133 "ROLLBACK will be skipped by " 

1134 "skip_autocommit_rollback" 

1135 ) 

1136 else: 

1137 self._log_info( 

1138 "ROLLBACK using DBAPI connection.rollback(); " 

1139 "set skip_autocommit_rollback to prevent fully" 

1140 ) 

1141 else: 

1142 self._log_info("ROLLBACK") 

1143 try: 

1144 self.engine.dialect.do_rollback(self.connection) 

1145 except BaseException as e: 

1146 self._handle_dbapi_exception(e, None, None, None, None) 

1147 

1148 def _commit_impl(self) -> None: 

1149 if self._has_events or self.engine._has_events: 

1150 self.dispatch.commit(self) 

1151 

1152 if self._echo: 

1153 if self._is_autocommit_isolation(): 

1154 self._log_info( 

1155 "COMMIT using DBAPI connection.commit(), " 

1156 "has no effect due to autocommit mode" 

1157 ) 

1158 else: 

1159 self._log_info("COMMIT") 

1160 try: 

1161 self.engine.dialect.do_commit(self.connection) 

1162 except BaseException as e: 

1163 self._handle_dbapi_exception(e, None, None, None, None) 

1164 

1165 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1166 if self._has_events or self.engine._has_events: 

1167 self.dispatch.savepoint(self, name) 

1168 

1169 if name is None: 

1170 self.__savepoint_seq += 1 

1171 name = "sa_savepoint_%s" % self.__savepoint_seq 

1172 self.engine.dialect.do_savepoint(self, name) 

1173 return name 

1174 

1175 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1176 if self._has_events or self.engine._has_events: 

1177 self.dispatch.rollback_savepoint(self, name, None) 

1178 

1179 if self._still_open_and_dbapi_connection_is_valid: 

1180 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1181 

1182 def _release_savepoint_impl(self, name: str) -> None: 

1183 if self._has_events or self.engine._has_events: 

1184 self.dispatch.release_savepoint(self, name, None) 

1185 

1186 self.engine.dialect.do_release_savepoint(self, name) 

1187 

1188 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1189 if self._echo: 

1190 self._log_info("BEGIN TWOPHASE (implicit)") 

1191 if self._has_events or self.engine._has_events: 

1192 self.dispatch.begin_twophase(self, transaction.xid) 

1193 

1194 self.__in_begin = True 

1195 try: 

1196 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1197 except BaseException as e: 

1198 self._handle_dbapi_exception(e, None, None, None, None) 

1199 finally: 

1200 self.__in_begin = False 

1201 

1202 def _prepare_twophase_impl(self, xid: Any) -> None: 

1203 if self._has_events or self.engine._has_events: 

1204 self.dispatch.prepare_twophase(self, xid) 

1205 

1206 assert isinstance(self._transaction, TwoPhaseTransaction) 

1207 try: 

1208 self.engine.dialect.do_prepare_twophase(self, xid) 

1209 except BaseException as e: 

1210 self._handle_dbapi_exception(e, None, None, None, None) 

1211 

1212 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1213 if self._has_events or self.engine._has_events: 

1214 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1215 

1216 if self._still_open_and_dbapi_connection_is_valid: 

1217 assert isinstance(self._transaction, TwoPhaseTransaction) 

1218 try: 

1219 self.engine.dialect.do_rollback_twophase( 

1220 self, xid, is_prepared 

1221 ) 

1222 except BaseException as e: 

1223 self._handle_dbapi_exception(e, None, None, None, None) 

1224 

1225 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1226 if self._has_events or self.engine._has_events: 

1227 self.dispatch.commit_twophase(self, xid, is_prepared) 

1228 

1229 assert isinstance(self._transaction, TwoPhaseTransaction) 

1230 try: 

1231 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1232 except BaseException as e: 

1233 self._handle_dbapi_exception(e, None, None, None, None) 

1234 

1235 def close(self) -> None: 

1236 """Close this :class:`_engine.Connection`. 

1237 

1238 This results in a release of the underlying database 

1239 resources, that is, the DBAPI connection referenced 

1240 internally. The DBAPI connection is typically restored 

1241 back to the connection-holding :class:`_pool.Pool` referenced 

1242 by the :class:`_engine.Engine` that produced this 

1243 :class:`_engine.Connection`. Any transactional state present on 

1244 the DBAPI connection is also unconditionally released via 

1245 the DBAPI connection's ``rollback()`` method, regardless 

1246 of any :class:`.Transaction` object that may be 

1247 outstanding with regards to this :class:`_engine.Connection`. 

1248 

1249 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1250 if any transaction is in place. 

1251 

1252 After :meth:`_engine.Connection.close` is called, the 

1253 :class:`_engine.Connection` is permanently in a closed state, 

1254 and will allow no further operations. 

1255 

1256 """ 

1257 

1258 if self._transaction: 

1259 self._transaction.close() 

1260 skip_reset = True 

1261 else: 

1262 skip_reset = False 

1263 

1264 if self._dbapi_connection is not None: 

1265 conn = self._dbapi_connection 

1266 

1267 # as we just closed the transaction, close the connection 

1268 # pool connection without doing an additional reset 

1269 if skip_reset: 

1270 cast("_ConnectionFairy", conn)._close_special( 

1271 transaction_reset=True 

1272 ) 

1273 else: 

1274 conn.close() 

1275 

1276 # There is a slight chance that conn.close() may have 

1277 # triggered an invalidation here in which case 

1278 # _dbapi_connection would already be None, however usually 

1279 # it will be non-None here and in a "closed" state. 

1280 self._dbapi_connection = None 

1281 self.__can_reconnect = False 

1282 

1283 # special case to handle mypy issue: 

1284 # https://github.com/python/mypy/issues/20651 

1285 @overload 

1286 def scalar( 

1287 self, 

1288 statement: TypedReturnsRows[Never], 

1289 parameters: Optional[_CoreSingleExecuteParams] = None, 

1290 *, 

1291 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1292 ) -> Optional[Any]: ... 

1293 

1294 @overload 

1295 def scalar( 

1296 self, 

1297 statement: TypedReturnsRows[_T], 

1298 parameters: Optional[_CoreSingleExecuteParams] = None, 

1299 *, 

1300 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1301 ) -> Optional[_T]: ... 

1302 

1303 @overload 

1304 def scalar( 

1305 self, 

1306 statement: Executable, 

1307 parameters: Optional[_CoreSingleExecuteParams] = None, 

1308 *, 

1309 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1310 ) -> Any: ... 

1311 

1312 def scalar( 

1313 self, 

1314 statement: Executable, 

1315 parameters: Optional[_CoreSingleExecuteParams] = None, 

1316 *, 

1317 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1318 ) -> Any: 

1319 r"""Executes a SQL statement construct and returns a scalar object. 

1320 

1321 This method is shorthand for invoking the 

1322 :meth:`_engine.Result.scalar` method after invoking the 

1323 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1324 

1325 :return: a scalar Python value representing the first column of the 

1326 first row returned. 

1327 

1328 """ 

1329 distilled_parameters = _distill_params_20(parameters) 

1330 try: 

1331 meth = statement._execute_on_scalar 

1332 except AttributeError as err: 

1333 raise exc.ObjectNotExecutableError(statement) from err 

1334 else: 

1335 return meth( 

1336 self, 

1337 distilled_parameters, 

1338 execution_options or NO_OPTIONS, 

1339 ) 

1340 

1341 @overload 

1342 def scalars( 

1343 self, 

1344 statement: TypedReturnsRows[_T], 

1345 parameters: Optional[_CoreAnyExecuteParams] = None, 

1346 *, 

1347 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1348 ) -> ScalarResult[_T]: ... 

1349 

1350 @overload 

1351 def scalars( 

1352 self, 

1353 statement: Executable, 

1354 parameters: Optional[_CoreAnyExecuteParams] = None, 

1355 *, 

1356 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1357 ) -> ScalarResult[Any]: ... 

1358 

1359 def scalars( 

1360 self, 

1361 statement: Executable, 

1362 parameters: Optional[_CoreAnyExecuteParams] = None, 

1363 *, 

1364 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1365 ) -> ScalarResult[Any]: 

1366 """Executes and returns a scalar result set, which yields scalar values 

1367 from the first column of each row. 

1368 

1369 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1370 to receive a :class:`_result.Result` object, then invoking the 

1371 :meth:`_result.Result.scalars` method to produce a 

1372 :class:`_result.ScalarResult` instance. 

1373 

1374 :return: a :class:`_result.ScalarResult` 

1375 

1376 .. versionadded:: 1.4.24 

1377 

1378 """ 

1379 

1380 return self.execute( 

1381 statement, parameters, execution_options=execution_options 

1382 ).scalars() 

1383 

1384 @overload 

1385 def execute( 

1386 self, 

1387 statement: TypedReturnsRows[Unpack[_Ts]], 

1388 parameters: Optional[_CoreAnyExecuteParams] = None, 

1389 *, 

1390 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1391 ) -> CursorResult[Unpack[_Ts]]: ... 

1392 

1393 @overload 

1394 def execute( 

1395 self, 

1396 statement: Executable, 

1397 parameters: Optional[_CoreAnyExecuteParams] = None, 

1398 *, 

1399 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1400 ) -> CursorResult[Unpack[TupleAny]]: ... 

1401 

1402 def execute( 

1403 self, 

1404 statement: Executable, 

1405 parameters: Optional[_CoreAnyExecuteParams] = None, 

1406 *, 

1407 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1408 ) -> CursorResult[Unpack[TupleAny]]: 

1409 r"""Executes a SQL statement construct and returns a 

1410 :class:`_engine.CursorResult`. 

1411 

1412 :param statement: The statement to be executed. This is always 

1413 an object that is in both the :class:`_expression.ClauseElement` and 

1414 :class:`_expression.Executable` hierarchies, including: 

1415 

1416 * :class:`_expression.Select` 

1417 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1418 :class:`_expression.Delete` 

1419 * :class:`_expression.TextClause` and 

1420 :class:`_expression.TextualSelect` 

1421 * :class:`_schema.DDL` and objects which inherit from 

1422 :class:`_schema.ExecutableDDLElement` 

1423 

1424 :param parameters: parameters which will be bound into the statement. 

1425 This may be either a dictionary of parameter names to values, 

1426 or a mutable sequence (e.g. a list) of dictionaries. When a 

1427 list of dictionaries is passed, the underlying statement execution 

1428 will make use of the DBAPI ``cursor.executemany()`` method. 

1429 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1430 method will be used. 

1431 

1432 :param execution_options: optional dictionary of execution options, 

1433 which will be associated with the statement execution. This 

1434 dictionary can provide a subset of the options that are accepted 

1435 by :meth:`_engine.Connection.execution_options`. 

1436 

1437 :return: a :class:`_engine.Result` object. 

1438 

1439 """ 

1440 distilled_parameters = _distill_params_20(parameters) 

1441 try: 

1442 meth = statement._execute_on_connection 

1443 except AttributeError as err: 

1444 raise exc.ObjectNotExecutableError(statement) from err 

1445 else: 

1446 return meth( 

1447 self, 

1448 distilled_parameters, 

1449 execution_options or NO_OPTIONS, 

1450 ) 

1451 

1452 def _execute_function( 

1453 self, 

1454 func: FunctionElement[Any], 

1455 distilled_parameters: _CoreMultiExecuteParams, 

1456 execution_options: CoreExecuteOptionsParameter, 

1457 ) -> CursorResult[Unpack[TupleAny]]: 

1458 """Execute a sql.FunctionElement object.""" 

1459 

1460 return self._execute_clauseelement( 

1461 func.select(), distilled_parameters, execution_options 

1462 ) 

1463 

1464 def _execute_default( 

1465 self, 

1466 default: DefaultGenerator, 

1467 distilled_parameters: _CoreMultiExecuteParams, 

1468 execution_options: CoreExecuteOptionsParameter, 

1469 ) -> Any: 

1470 """Execute a schema.ColumnDefault object.""" 

1471 

1472 exec_opts = self._execution_options.merge_with(execution_options) 

1473 

1474 event_multiparams: Optional[_CoreMultiExecuteParams] 

1475 event_params: Optional[_CoreAnyExecuteParams] 

1476 

1477 # note for event handlers, the "distilled parameters" which is always 

1478 # a list of dicts is broken out into separate "multiparams" and 

1479 # "params" collections, which allows the handler to distinguish 

1480 # between an executemany and execute style set of parameters. 

1481 if self._has_events or self.engine._has_events: 

1482 ( 

1483 default, 

1484 distilled_parameters, 

1485 event_multiparams, 

1486 event_params, 

1487 ) = self._invoke_before_exec_event( 

1488 default, distilled_parameters, exec_opts 

1489 ) 

1490 else: 

1491 event_multiparams = event_params = None 

1492 

1493 try: 

1494 conn = self._dbapi_connection 

1495 if conn is None: 

1496 conn = self._revalidate_connection() 

1497 

1498 dialect = self.dialect 

1499 ctx = dialect.execution_ctx_cls._init_default( 

1500 dialect, self, conn, exec_opts 

1501 ) 

1502 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1503 raise 

1504 except BaseException as e: 

1505 self._handle_dbapi_exception(e, None, None, None, None) 

1506 

1507 ret = ctx._exec_default(None, default, None) 

1508 

1509 if self._has_events or self.engine._has_events: 

1510 self.dispatch.after_execute( 

1511 self, 

1512 default, 

1513 event_multiparams, 

1514 event_params, 

1515 exec_opts, 

1516 ret, 

1517 ) 

1518 

1519 return ret 

1520 

1521 def _execute_ddl( 

1522 self, 

1523 ddl: ExecutableDDLElement, 

1524 distilled_parameters: _CoreMultiExecuteParams, 

1525 execution_options: CoreExecuteOptionsParameter, 

1526 ) -> CursorResult[Unpack[TupleAny]]: 

1527 """Execute a schema.DDL object.""" 

1528 

1529 exec_opts = ddl._execution_options.merge_with( 

1530 self._execution_options, execution_options 

1531 ) 

1532 

1533 event_multiparams: Optional[_CoreMultiExecuteParams] 

1534 event_params: Optional[_CoreSingleExecuteParams] 

1535 

1536 if self._has_events or self.engine._has_events: 

1537 ( 

1538 ddl, 

1539 distilled_parameters, 

1540 event_multiparams, 

1541 event_params, 

1542 ) = self._invoke_before_exec_event( 

1543 ddl, distilled_parameters, exec_opts 

1544 ) 

1545 else: 

1546 event_multiparams = event_params = None 

1547 

1548 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1549 

1550 dialect = self.dialect 

1551 

1552 compiled = ddl.compile( 

1553 dialect=dialect, schema_translate_map=schema_translate_map 

1554 ) 

1555 ret = self._execute_context( 

1556 dialect, 

1557 dialect.execution_ctx_cls._init_ddl, 

1558 compiled, 

1559 None, 

1560 exec_opts, 

1561 compiled, 

1562 ) 

1563 if self._has_events or self.engine._has_events: 

1564 self.dispatch.after_execute( 

1565 self, 

1566 ddl, 

1567 event_multiparams, 

1568 event_params, 

1569 exec_opts, 

1570 ret, 

1571 ) 

1572 return ret 

1573 

1574 def _invoke_before_exec_event( 

1575 self, 

1576 elem: Any, 

1577 distilled_params: _CoreMultiExecuteParams, 

1578 execution_options: _ExecuteOptions, 

1579 ) -> Tuple[ 

1580 Any, 

1581 _CoreMultiExecuteParams, 

1582 _CoreMultiExecuteParams, 

1583 _CoreSingleExecuteParams, 

1584 ]: 

1585 event_multiparams: _CoreMultiExecuteParams 

1586 event_params: _CoreSingleExecuteParams 

1587 

1588 if len(distilled_params) == 1: 

1589 event_multiparams, event_params = [], distilled_params[0] 

1590 else: 

1591 event_multiparams, event_params = distilled_params, {} 

1592 

1593 for fn in self.dispatch.before_execute: 

1594 elem, event_multiparams, event_params = fn( 

1595 self, 

1596 elem, 

1597 event_multiparams, 

1598 event_params, 

1599 execution_options, 

1600 ) 

1601 

1602 if event_multiparams: 

1603 distilled_params = list(event_multiparams) 

1604 if event_params: 

1605 raise exc.InvalidRequestError( 

1606 "Event handler can't return non-empty multiparams " 

1607 "and params at the same time" 

1608 ) 

1609 elif event_params: 

1610 distilled_params = [event_params] 

1611 else: 

1612 distilled_params = [] 

1613 

1614 return elem, distilled_params, event_multiparams, event_params 

1615 

1616 def _execute_clauseelement( 

1617 self, 

1618 elem: Executable, 

1619 distilled_parameters: _CoreMultiExecuteParams, 

1620 execution_options: CoreExecuteOptionsParameter, 

1621 ) -> CursorResult[Unpack[TupleAny]]: 

1622 """Execute a sql.ClauseElement object.""" 

1623 

1624 exec_opts = elem._execution_options.merge_with( 

1625 self._execution_options, execution_options 

1626 ) 

1627 

1628 has_events = self._has_events or self.engine._has_events 

1629 if has_events: 

1630 ( 

1631 elem, 

1632 distilled_parameters, 

1633 event_multiparams, 

1634 event_params, 

1635 ) = self._invoke_before_exec_event( 

1636 elem, distilled_parameters, exec_opts 

1637 ) 

1638 

1639 if distilled_parameters: 

1640 # ensure we don't retain a link to the view object for keys() 

1641 # which links to the values, which we don't want to cache 

1642 keys = sorted(distilled_parameters[0]) 

1643 for_executemany = len(distilled_parameters) > 1 

1644 else: 

1645 keys = [] 

1646 for_executemany = False 

1647 

1648 dialect = self.dialect 

1649 

1650 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1651 

1652 compiled_cache: Optional[CompiledCacheType] = exec_opts.get( 

1653 "compiled_cache", self.engine._compiled_cache 

1654 ) 

1655 

1656 compiled_sql, extracted_params, param_dict, cache_hit = ( 

1657 elem._compile_w_cache( 

1658 dialect=dialect, 

1659 compiled_cache=compiled_cache, 

1660 column_keys=keys, 

1661 for_executemany=for_executemany, 

1662 schema_translate_map=schema_translate_map, 

1663 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1664 ) 

1665 ) 

1666 ret = self._execute_context( 

1667 dialect, 

1668 dialect.execution_ctx_cls._init_compiled, 

1669 compiled_sql, 

1670 distilled_parameters, 

1671 exec_opts, 

1672 compiled_sql, 

1673 distilled_parameters, 

1674 elem, 

1675 extracted_params, 

1676 cache_hit=cache_hit, 

1677 param_dict=param_dict, 

1678 ) 

1679 if has_events: 

1680 self.dispatch.after_execute( 

1681 self, 

1682 elem, 

1683 event_multiparams, 

1684 event_params, 

1685 exec_opts, 

1686 ret, 

1687 ) 

1688 return ret 

1689 

1690 def exec_driver_sql( 

1691 self, 

1692 statement: str, 

1693 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1694 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1695 ) -> CursorResult[Unpack[TupleAny]]: 

1696 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1697 without any SQL compilation steps. 

1698 

1699 This can be used to pass any string directly to the 

1700 ``cursor.execute()`` method of the DBAPI in use. 

1701 

1702 :param statement: The statement str to be executed. Bound parameters 

1703 must use the underlying DBAPI's paramstyle, such as "qmark", 

1704 "pyformat", "format", etc. 

1705 

1706 :param parameters: represent bound parameter values to be used in the 

1707 execution. The format is one of: a dictionary of named parameters, 

1708 a tuple of positional parameters, or a list containing either 

1709 dictionaries or tuples for multiple-execute support. 

1710 

1711 :return: a :class:`_engine.CursorResult`. 

1712 

1713 E.g. multiple dictionaries:: 

1714 

1715 

1716 conn.exec_driver_sql( 

1717 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1718 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}], 

1719 ) 

1720 

1721 Single dictionary:: 

1722 

1723 conn.exec_driver_sql( 

1724 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1725 dict(id=1, value="v1"), 

1726 ) 

1727 

1728 Single tuple:: 

1729 

1730 conn.exec_driver_sql( 

1731 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1") 

1732 ) 

1733 

1734 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1735 not participate in the 

1736 :meth:`_events.ConnectionEvents.before_execute` and 

1737 :meth:`_events.ConnectionEvents.after_execute` events. To 

1738 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1739 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1740 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1741 

1742 .. seealso:: 

1743 

1744 :pep:`249` 

1745 

1746 """ 

1747 

1748 distilled_parameters = _distill_raw_params(parameters) 

1749 

1750 exec_opts = self._execution_options.merge_with(execution_options) 

1751 

1752 dialect = self.dialect 

1753 ret = self._execute_context( 

1754 dialect, 

1755 dialect.execution_ctx_cls._init_statement, 

1756 statement, 

1757 None, 

1758 exec_opts, 

1759 statement, 

1760 distilled_parameters, 

1761 ) 

1762 

1763 return ret 

1764 

1765 def _execute_context( 

1766 self, 

1767 dialect: Dialect, 

1768 constructor: Callable[..., ExecutionContext], 

1769 statement: Union[str, Compiled], 

1770 parameters: Optional[_AnyMultiExecuteParams], 

1771 execution_options: _ExecuteOptions, 

1772 *args: Any, 

1773 **kw: Any, 

1774 ) -> CursorResult[Unpack[TupleAny]]: 

1775 """Create an :class:`.ExecutionContext` and execute, returning 

1776 a :class:`_engine.CursorResult`.""" 

1777 

1778 if execution_options: 

1779 yp = execution_options.get("yield_per", None) 

1780 if yp: 

1781 execution_options = execution_options.union( 

1782 {"stream_results": True, "max_row_buffer": yp} 

1783 ) 

1784 try: 

1785 conn = self._dbapi_connection 

1786 if conn is None: 

1787 conn = self._revalidate_connection() 

1788 

1789 context = constructor( 

1790 dialect, self, conn, execution_options, *args, **kw 

1791 ) 

1792 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1793 raise 

1794 except BaseException as e: 

1795 self._handle_dbapi_exception( 

1796 e, str(statement), parameters, None, None 

1797 ) 

1798 

1799 if ( 

1800 self._transaction 

1801 and not self._transaction.is_active 

1802 or ( 

1803 self._nested_transaction 

1804 and not self._nested_transaction.is_active 

1805 ) 

1806 ): 

1807 self._invalid_transaction() 

1808 

1809 elif self._trans_context_manager: 

1810 TransactionalContext._trans_ctx_check(self) 

1811 

1812 if self._transaction is None: 

1813 self._autobegin() 

1814 

1815 context.pre_exec() 

1816 

1817 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1818 return self._exec_insertmany_context(dialect, context) 

1819 else: 

1820 return self._exec_single_context( 

1821 dialect, context, statement, parameters 

1822 ) 

1823 

1824 def _exec_single_context( 

1825 self, 

1826 dialect: Dialect, 

1827 context: ExecutionContext, 

1828 statement: Union[str, Compiled], 

1829 parameters: Optional[_AnyMultiExecuteParams], 

1830 ) -> CursorResult[Unpack[TupleAny]]: 

1831 """continue the _execute_context() method for a single DBAPI 

1832 cursor.execute() or cursor.executemany() call. 

1833 

1834 """ 

1835 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1836 generic_setinputsizes = context._prepare_set_input_sizes() 

1837 

1838 if generic_setinputsizes: 

1839 try: 

1840 dialect.do_set_input_sizes( 

1841 context.cursor, generic_setinputsizes, context 

1842 ) 

1843 except BaseException as e: 

1844 self._handle_dbapi_exception( 

1845 e, str(statement), parameters, None, context 

1846 ) 

1847 

1848 cursor, str_statement, parameters = ( 

1849 context.cursor, 

1850 context.statement, 

1851 context.parameters, 

1852 ) 

1853 

1854 effective_parameters: Optional[_AnyExecuteParams] 

1855 

1856 if not context.executemany: 

1857 effective_parameters = parameters[0] 

1858 else: 

1859 effective_parameters = parameters 

1860 

1861 if self._has_events or self.engine._has_events: 

1862 for fn in self.dispatch.before_cursor_execute: 

1863 str_statement, effective_parameters = fn( 

1864 self, 

1865 cursor, 

1866 str_statement, 

1867 effective_parameters, 

1868 context, 

1869 context.executemany, 

1870 ) 

1871 

1872 if self._echo: 

1873 self._log_info(str_statement) 

1874 

1875 stats = context._get_cache_stats() 

1876 

1877 if not self.engine.hide_parameters: 

1878 self._log_info( 

1879 "[%s] %r", 

1880 stats, 

1881 sql_util._repr_params( 

1882 effective_parameters, 

1883 batches=10, 

1884 ismulti=context.executemany, 

1885 ), 

1886 ) 

1887 else: 

1888 self._log_info( 

1889 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1890 stats, 

1891 ) 

1892 

1893 evt_handled: bool = False 

1894 try: 

1895 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1896 effective_parameters = cast( 

1897 "_CoreMultiExecuteParams", effective_parameters 

1898 ) 

1899 if self.dialect._has_events: 

1900 for fn in self.dialect.dispatch.do_executemany: 

1901 if fn( 

1902 cursor, 

1903 str_statement, 

1904 effective_parameters, 

1905 context, 

1906 ): 

1907 evt_handled = True 

1908 break 

1909 if not evt_handled: 

1910 self.dialect.do_executemany( 

1911 cursor, 

1912 str_statement, 

1913 effective_parameters, 

1914 context, 

1915 ) 

1916 elif not effective_parameters and context.no_parameters: 

1917 if self.dialect._has_events: 

1918 for fn in self.dialect.dispatch.do_execute_no_params: 

1919 if fn(cursor, str_statement, context): 

1920 evt_handled = True 

1921 break 

1922 if not evt_handled: 

1923 self.dialect.do_execute_no_params( 

1924 cursor, str_statement, context 

1925 ) 

1926 else: 

1927 effective_parameters = cast( 

1928 "_CoreSingleExecuteParams", effective_parameters 

1929 ) 

1930 if self.dialect._has_events: 

1931 for fn in self.dialect.dispatch.do_execute: 

1932 if fn( 

1933 cursor, 

1934 str_statement, 

1935 effective_parameters, 

1936 context, 

1937 ): 

1938 evt_handled = True 

1939 break 

1940 if not evt_handled: 

1941 self.dialect.do_execute( 

1942 cursor, str_statement, effective_parameters, context 

1943 ) 

1944 

1945 if self._has_events or self.engine._has_events: 

1946 self.dispatch.after_cursor_execute( 

1947 self, 

1948 cursor, 

1949 str_statement, 

1950 effective_parameters, 

1951 context, 

1952 context.executemany, 

1953 ) 

1954 

1955 context.post_exec() 

1956 

1957 result = context._setup_result_proxy() 

1958 

1959 except BaseException as e: 

1960 self._handle_dbapi_exception( 

1961 e, str_statement, effective_parameters, cursor, context 

1962 ) 

1963 

1964 return result 

1965 

1966 def _exec_insertmany_context( 

1967 self, 

1968 dialect: Dialect, 

1969 context: ExecutionContext, 

1970 ) -> CursorResult[Unpack[TupleAny]]: 

1971 """continue the _execute_context() method for an "insertmanyvalues" 

1972 operation, which will invoke DBAPI 

1973 cursor.execute() one or more times with individual log and 

1974 event hook calls. 

1975 

1976 """ 

1977 

1978 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1979 generic_setinputsizes = context._prepare_set_input_sizes() 

1980 else: 

1981 generic_setinputsizes = None 

1982 

1983 cursor, str_statement, parameters = ( 

1984 context.cursor, 

1985 context.statement, 

1986 context.parameters, 

1987 ) 

1988 

1989 effective_parameters = parameters 

1990 

1991 engine_events = self._has_events or self.engine._has_events 

1992 if self.dialect._has_events: 

1993 do_execute_dispatch: Iterable[Any] = ( 

1994 self.dialect.dispatch.do_execute 

1995 ) 

1996 else: 

1997 do_execute_dispatch = () 

1998 

1999 if self._echo: 

2000 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2001 

2002 preserve_rowcount = context.execution_options.get( 

2003 "preserve_rowcount", False 

2004 ) 

2005 rowcount = 0 

2006 

2007 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2008 self, 

2009 cursor, 

2010 str_statement, 

2011 effective_parameters, 

2012 generic_setinputsizes, 

2013 context, 

2014 ): 

2015 if imv_batch.processed_setinputsizes: 

2016 try: 

2017 dialect.do_set_input_sizes( 

2018 context.cursor, 

2019 imv_batch.processed_setinputsizes, 

2020 context, 

2021 ) 

2022 except BaseException as e: 

2023 self._handle_dbapi_exception( 

2024 e, 

2025 sql_util._long_statement(imv_batch.replaced_statement), 

2026 imv_batch.replaced_parameters, 

2027 None, 

2028 context, 

2029 is_sub_exec=True, 

2030 ) 

2031 

2032 sub_stmt = imv_batch.replaced_statement 

2033 sub_params = imv_batch.replaced_parameters 

2034 

2035 if engine_events: 

2036 for fn in self.dispatch.before_cursor_execute: 

2037 sub_stmt, sub_params = fn( 

2038 self, 

2039 cursor, 

2040 sub_stmt, 

2041 sub_params, 

2042 context, 

2043 True, 

2044 ) 

2045 

2046 if self._echo: 

2047 self._log_info(sql_util._long_statement(sub_stmt)) 

2048 

2049 imv_stats = f""" {imv_batch.batchnum}/{ 

2050 imv_batch.total_batches 

2051 } ({ 

2052 'ordered' 

2053 if imv_batch.rows_sorted else 'unordered' 

2054 }{ 

2055 '; batch not supported' 

2056 if imv_batch.is_downgraded 

2057 else '' 

2058 })""" 

2059 

2060 if imv_batch.batchnum == 1: 

2061 stats += imv_stats 

2062 else: 

2063 stats = f"insertmanyvalues{imv_stats}" 

2064 

2065 if not self.engine.hide_parameters: 

2066 self._log_info( 

2067 "[%s] %r", 

2068 stats, 

2069 sql_util._repr_params( 

2070 sub_params, 

2071 batches=10, 

2072 ismulti=False, 

2073 ), 

2074 ) 

2075 else: 

2076 self._log_info( 

2077 "[%s] [SQL parameters hidden due to " 

2078 "hide_parameters=True]", 

2079 stats, 

2080 ) 

2081 

2082 try: 

2083 for fn in do_execute_dispatch: 

2084 if fn( 

2085 cursor, 

2086 sub_stmt, 

2087 sub_params, 

2088 context, 

2089 ): 

2090 break 

2091 else: 

2092 dialect.do_execute( 

2093 cursor, 

2094 sub_stmt, 

2095 sub_params, 

2096 context, 

2097 ) 

2098 

2099 except BaseException as e: 

2100 self._handle_dbapi_exception( 

2101 e, 

2102 sql_util._long_statement(sub_stmt), 

2103 sub_params, 

2104 cursor, 

2105 context, 

2106 is_sub_exec=True, 

2107 ) 

2108 

2109 if engine_events: 

2110 self.dispatch.after_cursor_execute( 

2111 self, 

2112 cursor, 

2113 sub_stmt, 

2114 sub_params, 

2115 context, 

2116 context.executemany, 

2117 ) 

2118 

2119 if preserve_rowcount: 

2120 rowcount += imv_batch.current_batch_size 

2121 

2122 try: 

2123 context.post_exec() 

2124 

2125 if preserve_rowcount: 

2126 context._rowcount = rowcount # type: ignore[attr-defined] 

2127 

2128 result = context._setup_result_proxy() 

2129 

2130 except BaseException as e: 

2131 self._handle_dbapi_exception( 

2132 e, str_statement, effective_parameters, cursor, context 

2133 ) 

2134 

2135 return result 

2136 

2137 def _cursor_execute( 

2138 self, 

2139 cursor: DBAPICursor, 

2140 statement: str, 

2141 parameters: _DBAPISingleExecuteParams, 

2142 context: Optional[ExecutionContext] = None, 

2143 ) -> None: 

2144 """Execute a statement + params on the given cursor. 

2145 

2146 Adds appropriate logging and exception handling. 

2147 

2148 This method is used by DefaultDialect for special-case 

2149 executions, such as for sequences and column defaults. 

2150 The path of statement execution in the majority of cases 

2151 terminates at _execute_context(). 

2152 

2153 """ 

2154 if self._has_events or self.engine._has_events: 

2155 for fn in self.dispatch.before_cursor_execute: 

2156 statement, parameters = fn( 

2157 self, cursor, statement, parameters, context, False 

2158 ) 

2159 

2160 if self._echo: 

2161 self._log_info(statement) 

2162 self._log_info("[raw sql] %r", parameters) 

2163 try: 

2164 for fn in ( 

2165 () 

2166 if not self.dialect._has_events 

2167 else self.dialect.dispatch.do_execute 

2168 ): 

2169 if fn(cursor, statement, parameters, context): 

2170 break 

2171 else: 

2172 self.dialect.do_execute(cursor, statement, parameters, context) 

2173 except BaseException as e: 

2174 self._handle_dbapi_exception( 

2175 e, statement, parameters, cursor, context 

2176 ) 

2177 

2178 if self._has_events or self.engine._has_events: 

2179 self.dispatch.after_cursor_execute( 

2180 self, cursor, statement, parameters, context, False 

2181 ) 

2182 

2183 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2184 """Close the given cursor, catching exceptions 

2185 and turning into log warnings. 

2186 

2187 """ 

2188 try: 

2189 cursor.close() 

2190 except Exception: 

2191 # log the error through the connection pool's logger. 

2192 self.engine.pool.logger.error( 

2193 "Error closing cursor", exc_info=True 

2194 ) 

2195 

2196 _reentrant_error = False 

2197 _is_disconnect = False 

2198 

2199 def _handle_dbapi_exception( 

2200 self, 

2201 e: BaseException, 

2202 statement: Optional[str], 

2203 parameters: Optional[_AnyExecuteParams], 

2204 cursor: Optional[DBAPICursor], 

2205 context: Optional[ExecutionContext], 

2206 is_sub_exec: bool = False, 

2207 ) -> NoReturn: 

2208 exc_info = sys.exc_info() 

2209 

2210 is_exit_exception = util.is_exit_exception(e) 

2211 

2212 if not self._is_disconnect: 

2213 self._is_disconnect = ( 

2214 isinstance(e, self.dialect.loaded_dbapi.Error) 

2215 and not self.closed 

2216 and self.dialect.is_disconnect( 

2217 e, 

2218 self._dbapi_connection if not self.invalidated else None, 

2219 cursor, 

2220 ) 

2221 ) or (is_exit_exception and not self.closed) 

2222 

2223 invalidate_pool_on_disconnect = not is_exit_exception 

2224 

2225 ismulti: bool = ( 

2226 not is_sub_exec and context.executemany 

2227 if context is not None 

2228 else False 

2229 ) 

2230 if self._reentrant_error: 

2231 raise exc.DBAPIError.instance( 

2232 statement, 

2233 parameters, 

2234 e, 

2235 self.dialect.loaded_dbapi.Error, 

2236 hide_parameters=self.engine.hide_parameters, 

2237 dialect=self.dialect, 

2238 ismulti=ismulti, 

2239 ).with_traceback(exc_info[2]) from e 

2240 self._reentrant_error = True 

2241 try: 

2242 # non-DBAPI error - if we already got a context, 

2243 # or there's no string statement, don't wrap it 

2244 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2245 statement is not None 

2246 and context is None 

2247 and not is_exit_exception 

2248 ) 

2249 

2250 if should_wrap: 

2251 sqlalchemy_exception = exc.DBAPIError.instance( 

2252 statement, 

2253 parameters, 

2254 cast(Exception, e), 

2255 self.dialect.loaded_dbapi.Error, 

2256 hide_parameters=self.engine.hide_parameters, 

2257 connection_invalidated=self._is_disconnect, 

2258 dialect=self.dialect, 

2259 ismulti=ismulti, 

2260 ) 

2261 else: 

2262 sqlalchemy_exception = None 

2263 

2264 newraise = None 

2265 

2266 if (self.dialect._has_events) and not self._execution_options.get( 

2267 "skip_user_error_events", False 

2268 ): 

2269 ctx = ExceptionContextImpl( 

2270 e, 

2271 sqlalchemy_exception, 

2272 self.engine, 

2273 self.dialect, 

2274 self, 

2275 cursor, 

2276 statement, 

2277 parameters, 

2278 context, 

2279 self._is_disconnect, 

2280 invalidate_pool_on_disconnect, 

2281 False, 

2282 ) 

2283 

2284 for fn in self.dialect.dispatch.handle_error: 

2285 try: 

2286 # handler returns an exception; 

2287 # call next handler in a chain 

2288 per_fn = fn(ctx) 

2289 if per_fn is not None: 

2290 ctx.chained_exception = newraise = per_fn 

2291 except Exception as _raised: 

2292 # handler raises an exception - stop processing 

2293 newraise = _raised 

2294 break 

2295 

2296 if self._is_disconnect != ctx.is_disconnect: 

2297 self._is_disconnect = ctx.is_disconnect 

2298 if sqlalchemy_exception: 

2299 sqlalchemy_exception.connection_invalidated = ( 

2300 ctx.is_disconnect 

2301 ) 

2302 

2303 # set up potentially user-defined value for 

2304 # invalidate pool. 

2305 invalidate_pool_on_disconnect = ( 

2306 ctx.invalidate_pool_on_disconnect 

2307 ) 

2308 

2309 if should_wrap and context: 

2310 context.handle_dbapi_exception(e) 

2311 

2312 if not self._is_disconnect: 

2313 if cursor: 

2314 self._safe_close_cursor(cursor) 

2315 # "autorollback" was mostly relevant in 1.x series. 

2316 # It's very unlikely to reach here, as the connection 

2317 # does autobegin so when we are here, we are usually 

2318 # in an explicit / semi-explicit transaction. 

2319 # however we have a test which manufactures this 

2320 # scenario in any case using an event handler. 

2321 # test/engine/test_execute.py-> test_actual_autorollback 

2322 if not self.in_transaction(): 

2323 self._rollback_impl() 

2324 

2325 if newraise: 

2326 raise newraise.with_traceback(exc_info[2]) from e 

2327 elif should_wrap: 

2328 assert sqlalchemy_exception is not None 

2329 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2330 else: 

2331 assert exc_info[1] is not None 

2332 raise exc_info[1].with_traceback(exc_info[2]) 

2333 finally: 

2334 del self._reentrant_error 

2335 if self._is_disconnect: 

2336 del self._is_disconnect 

2337 if not self.invalidated: 

2338 dbapi_conn_wrapper = self._dbapi_connection 

2339 assert dbapi_conn_wrapper is not None 

2340 if invalidate_pool_on_disconnect: 

2341 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2342 self.invalidate(e) 

2343 

2344 @classmethod 

2345 def _handle_dbapi_exception_noconnection( 

2346 cls, 

2347 e: BaseException, 

2348 dialect: Dialect, 

2349 engine: Optional[Engine] = None, 

2350 is_disconnect: Optional[bool] = None, 

2351 invalidate_pool_on_disconnect: bool = True, 

2352 is_pre_ping: bool = False, 

2353 ) -> NoReturn: 

2354 exc_info = sys.exc_info() 

2355 

2356 if is_disconnect is None: 

2357 is_disconnect = isinstance( 

2358 e, dialect.loaded_dbapi.Error 

2359 ) and dialect.is_disconnect(e, None, None) 

2360 

2361 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2362 

2363 if should_wrap: 

2364 sqlalchemy_exception = exc.DBAPIError.instance( 

2365 None, 

2366 None, 

2367 cast(Exception, e), 

2368 dialect.loaded_dbapi.Error, 

2369 hide_parameters=( 

2370 engine.hide_parameters if engine is not None else False 

2371 ), 

2372 connection_invalidated=is_disconnect, 

2373 dialect=dialect, 

2374 ) 

2375 else: 

2376 sqlalchemy_exception = None 

2377 

2378 newraise = None 

2379 

2380 if dialect._has_events: 

2381 ctx = ExceptionContextImpl( 

2382 e, 

2383 sqlalchemy_exception, 

2384 engine, 

2385 dialect, 

2386 None, 

2387 None, 

2388 None, 

2389 None, 

2390 None, 

2391 is_disconnect, 

2392 invalidate_pool_on_disconnect, 

2393 is_pre_ping, 

2394 ) 

2395 for fn in dialect.dispatch.handle_error: 

2396 try: 

2397 # handler returns an exception; 

2398 # call next handler in a chain 

2399 per_fn = fn(ctx) 

2400 if per_fn is not None: 

2401 ctx.chained_exception = newraise = per_fn 

2402 except Exception as _raised: 

2403 # handler raises an exception - stop processing 

2404 newraise = _raised 

2405 break 

2406 

2407 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2408 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect 

2409 

2410 if newraise: 

2411 raise newraise.with_traceback(exc_info[2]) from e 

2412 elif should_wrap: 

2413 assert sqlalchemy_exception is not None 

2414 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2415 else: 

2416 assert exc_info[1] is not None 

2417 raise exc_info[1].with_traceback(exc_info[2]) 

2418 

2419 def _run_ddl_visitor( 

2420 self, 

2421 visitorcallable: Type[InvokeDDLBase], 

2422 element: SchemaVisitable, 

2423 **kwargs: Any, 

2424 ) -> None: 

2425 """run a DDL visitor. 

2426 

2427 This method is only here so that the MockConnection can change the 

2428 options given to the visitor so that "checkfirst" is skipped. 

2429 

2430 """ 

2431 visitorcallable( 

2432 dialect=self.dialect, connection=self, **kwargs 

2433 ).traverse_single(element) 

2434 

2435 

2436class ExceptionContextImpl(ExceptionContext): 

2437 """Implement the :class:`.ExceptionContext` interface.""" 

2438 

2439 __slots__ = ( 

2440 "connection", 

2441 "engine", 

2442 "dialect", 

2443 "cursor", 

2444 "statement", 

2445 "parameters", 

2446 "original_exception", 

2447 "sqlalchemy_exception", 

2448 "chained_exception", 

2449 "execution_context", 

2450 "is_disconnect", 

2451 "invalidate_pool_on_disconnect", 

2452 "is_pre_ping", 

2453 ) 

2454 

2455 def __init__( 

2456 self, 

2457 exception: BaseException, 

2458 sqlalchemy_exception: Optional[exc.StatementError], 

2459 engine: Optional[Engine], 

2460 dialect: Dialect, 

2461 connection: Optional[Connection], 

2462 cursor: Optional[DBAPICursor], 

2463 statement: Optional[str], 

2464 parameters: Optional[_DBAPIAnyExecuteParams], 

2465 context: Optional[ExecutionContext], 

2466 is_disconnect: bool, 

2467 invalidate_pool_on_disconnect: bool, 

2468 is_pre_ping: bool, 

2469 ): 

2470 self.engine = engine 

2471 self.dialect = dialect 

2472 self.connection = connection 

2473 self.sqlalchemy_exception = sqlalchemy_exception 

2474 self.original_exception = exception 

2475 self.execution_context = context 

2476 self.statement = statement 

2477 self.parameters = parameters 

2478 self.is_disconnect = is_disconnect 

2479 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2480 self.is_pre_ping = is_pre_ping 

2481 

2482 

2483class Transaction(TransactionalContext): 

2484 """Represent a database transaction in progress. 

2485 

2486 The :class:`.Transaction` object is procured by 

2487 calling the :meth:`_engine.Connection.begin` method of 

2488 :class:`_engine.Connection`:: 

2489 

2490 from sqlalchemy import create_engine 

2491 

2492 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2493 connection = engine.connect() 

2494 trans = connection.begin() 

2495 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2496 trans.commit() 

2497 

2498 The object provides :meth:`.rollback` and :meth:`.commit` 

2499 methods in order to control transaction boundaries. It 

2500 also implements a context manager interface so that 

2501 the Python ``with`` statement can be used with the 

2502 :meth:`_engine.Connection.begin` method:: 

2503 

2504 with connection.begin(): 

2505 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2506 

2507 The Transaction object is **not** threadsafe. 

2508 

2509 .. seealso:: 

2510 

2511 :meth:`_engine.Connection.begin` 

2512 

2513 :meth:`_engine.Connection.begin_twophase` 

2514 

2515 :meth:`_engine.Connection.begin_nested` 

2516 

2517 .. index:: 

2518 single: thread safety; Transaction 

2519 """ # noqa 

2520 

2521 __slots__ = () 

2522 

2523 _is_root: bool = False 

2524 is_active: bool 

2525 connection: Connection 

2526 

2527 def __init__(self, connection: Connection): 

2528 raise NotImplementedError() 

2529 

2530 @property 

2531 def _deactivated_from_connection(self) -> bool: 

2532 """True if this transaction is totally deactivated from the connection 

2533 and therefore can no longer affect its state. 

2534 

2535 """ 

2536 raise NotImplementedError() 

2537 

2538 def _do_close(self) -> None: 

2539 raise NotImplementedError() 

2540 

2541 def _do_rollback(self) -> None: 

2542 raise NotImplementedError() 

2543 

2544 def _do_commit(self) -> None: 

2545 raise NotImplementedError() 

2546 

2547 @property 

2548 def is_valid(self) -> bool: 

2549 return self.is_active and not self.connection.invalidated 

2550 

2551 def close(self) -> None: 

2552 """Close this :class:`.Transaction`. 

2553 

2554 If this transaction is the base transaction in a begin/commit 

2555 nesting, the transaction will rollback(). Otherwise, the 

2556 method returns. 

2557 

2558 This is used to cancel a Transaction without affecting the scope of 

2559 an enclosing transaction. 

2560 

2561 """ 

2562 try: 

2563 self._do_close() 

2564 finally: 

2565 assert not self.is_active 

2566 

2567 def rollback(self) -> None: 

2568 """Roll back this :class:`.Transaction`. 

2569 

2570 The implementation of this may vary based on the type of transaction in 

2571 use: 

2572 

2573 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2574 it corresponds to a ROLLBACK. 

2575 

2576 * For a :class:`.NestedTransaction`, it corresponds to a 

2577 "ROLLBACK TO SAVEPOINT" operation. 

2578 

2579 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2580 phase transactions may be used. 

2581 

2582 

2583 """ 

2584 try: 

2585 self._do_rollback() 

2586 finally: 

2587 assert not self.is_active 

2588 

2589 def commit(self) -> None: 

2590 """Commit this :class:`.Transaction`. 

2591 

2592 The implementation of this may vary based on the type of transaction in 

2593 use: 

2594 

2595 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2596 it corresponds to a COMMIT. 

2597 

2598 * For a :class:`.NestedTransaction`, it corresponds to a 

2599 "RELEASE SAVEPOINT" operation. 

2600 

2601 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2602 phase transactions may be used. 

2603 

2604 """ 

2605 try: 

2606 self._do_commit() 

2607 finally: 

2608 assert not self.is_active 

2609 

2610 def _get_subject(self) -> Connection: 

2611 return self.connection 

2612 

2613 def _transaction_is_active(self) -> bool: 

2614 return self.is_active 

2615 

2616 def _transaction_is_closed(self) -> bool: 

2617 return not self._deactivated_from_connection 

2618 

2619 def _rollback_can_be_called(self) -> bool: 

2620 # for RootTransaction / NestedTransaction, it's safe to call 

2621 # rollback() even if the transaction is deactive and no warnings 

2622 # will be emitted. tested in 

2623 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2624 return True 

2625 

2626 

2627class RootTransaction(Transaction): 

2628 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2629 

2630 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2631 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2632 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2633 remains associated with the :class:`_engine.Connection` throughout its 

2634 active span. The current :class:`_engine.RootTransaction` in use is 

2635 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2636 :class:`_engine.Connection`. 

2637 

2638 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2639 "autobegin" behavior that will create a new 

2640 :class:`_engine.RootTransaction` whenever a connection in a 

2641 non-transactional state is used to emit commands on the DBAPI connection. 

2642 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2643 use can be controlled using the :meth:`_engine.Connection.commit` and 

2644 :meth:`_engine.Connection.rollback` methods. 

2645 

2646 

2647 """ 

2648 

2649 _is_root = True 

2650 

2651 __slots__ = ("connection", "is_active") 

2652 

2653 def __init__(self, connection: Connection): 

2654 assert connection._transaction is None 

2655 if connection._trans_context_manager: 

2656 TransactionalContext._trans_ctx_check(connection) 

2657 self.connection = connection 

2658 self._connection_begin_impl() 

2659 connection._transaction = self 

2660 

2661 self.is_active = True 

2662 

2663 def _deactivate_from_connection(self) -> None: 

2664 if self.is_active: 

2665 assert self.connection._transaction is self 

2666 self.is_active = False 

2667 

2668 elif self.connection._transaction is not self: 

2669 util.warn("transaction already deassociated from connection") 

2670 

2671 @property 

2672 def _deactivated_from_connection(self) -> bool: 

2673 return self.connection._transaction is not self 

2674 

2675 def _connection_begin_impl(self) -> None: 

2676 self.connection._begin_impl(self) 

2677 

2678 def _connection_rollback_impl(self) -> None: 

2679 self.connection._rollback_impl() 

2680 

2681 def _connection_commit_impl(self) -> None: 

2682 self.connection._commit_impl() 

2683 

2684 def _close_impl(self, try_deactivate: bool = False) -> None: 

2685 try: 

2686 if self.is_active: 

2687 self._connection_rollback_impl() 

2688 

2689 if self.connection._nested_transaction: 

2690 self.connection._nested_transaction._cancel() 

2691 finally: 

2692 if self.is_active or try_deactivate: 

2693 self._deactivate_from_connection() 

2694 if self.connection._transaction is self: 

2695 self.connection._transaction = None 

2696 

2697 assert not self.is_active 

2698 assert self.connection._transaction is not self 

2699 

2700 def _do_close(self) -> None: 

2701 self._close_impl() 

2702 

2703 def _do_rollback(self) -> None: 

2704 self._close_impl(try_deactivate=True) 

2705 

2706 def _do_commit(self) -> None: 

2707 if self.is_active: 

2708 assert self.connection._transaction is self 

2709 

2710 try: 

2711 self._connection_commit_impl() 

2712 finally: 

2713 # whether or not commit succeeds, cancel any 

2714 # nested transactions, make this transaction "inactive" 

2715 # and remove it as a reset agent 

2716 if self.connection._nested_transaction: 

2717 self.connection._nested_transaction._cancel() 

2718 

2719 self._deactivate_from_connection() 

2720 

2721 # ...however only remove as the connection's current transaction 

2722 # if commit succeeded. otherwise it stays on so that a rollback 

2723 # needs to occur. 

2724 self.connection._transaction = None 

2725 else: 

2726 if self.connection._transaction is self: 

2727 self.connection._invalid_transaction() 

2728 else: 

2729 raise exc.InvalidRequestError("This transaction is inactive") 

2730 

2731 assert not self.is_active 

2732 assert self.connection._transaction is not self 

2733 

2734 

2735class NestedTransaction(Transaction): 

2736 """Represent a 'nested', or SAVEPOINT transaction. 

2737 

2738 The :class:`.NestedTransaction` object is created by calling the 

2739 :meth:`_engine.Connection.begin_nested` method of 

2740 :class:`_engine.Connection`. 

2741 

2742 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2743 "commit" / "rollback" are as follows: 

2744 

2745 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2746 the savepoint is given an explicit name that is part of the state 

2747 of this object. 

2748 

2749 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2750 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2751 with this :class:`.NestedTransaction`. 

2752 

2753 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2754 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2755 associated with this :class:`.NestedTransaction`. 

2756 

2757 The rationale for mimicking the semantics of an outer transaction in 

2758 terms of savepoints so that code may deal with a "savepoint" transaction 

2759 and an "outer" transaction in an agnostic way. 

2760 

2761 .. seealso:: 

2762 

2763 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2764 

2765 """ 

2766 

2767 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2768 

2769 _savepoint: str 

2770 

2771 def __init__(self, connection: Connection): 

2772 assert connection._transaction is not None 

2773 if connection._trans_context_manager: 

2774 TransactionalContext._trans_ctx_check(connection) 

2775 self.connection = connection 

2776 self._savepoint = self.connection._savepoint_impl() 

2777 self.is_active = True 

2778 self._previous_nested = connection._nested_transaction 

2779 connection._nested_transaction = self 

2780 

2781 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2782 if self.connection._nested_transaction is self: 

2783 self.connection._nested_transaction = self._previous_nested 

2784 elif warn: 

2785 util.warn( 

2786 "nested transaction already deassociated from connection" 

2787 ) 

2788 

2789 @property 

2790 def _deactivated_from_connection(self) -> bool: 

2791 return self.connection._nested_transaction is not self 

2792 

2793 def _cancel(self) -> None: 

2794 # called by RootTransaction when the outer transaction is 

2795 # committed, rolled back, or closed to cancel all savepoints 

2796 # without any action being taken 

2797 self.is_active = False 

2798 self._deactivate_from_connection() 

2799 if self._previous_nested: 

2800 self._previous_nested._cancel() 

2801 

2802 def _close_impl( 

2803 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2804 ) -> None: 

2805 try: 

2806 if ( 

2807 self.is_active 

2808 and self.connection._transaction 

2809 and self.connection._transaction.is_active 

2810 ): 

2811 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2812 finally: 

2813 self.is_active = False 

2814 

2815 if deactivate_from_connection: 

2816 self._deactivate_from_connection(warn=warn_already_deactive) 

2817 

2818 assert not self.is_active 

2819 if deactivate_from_connection: 

2820 assert self.connection._nested_transaction is not self 

2821 

2822 def _do_close(self) -> None: 

2823 self._close_impl(True, False) 

2824 

2825 def _do_rollback(self) -> None: 

2826 self._close_impl(True, True) 

2827 

2828 def _do_commit(self) -> None: 

2829 if self.is_active: 

2830 try: 

2831 self.connection._release_savepoint_impl(self._savepoint) 

2832 finally: 

2833 # nested trans becomes inactive on failed release 

2834 # unconditionally. this prevents it from trying to 

2835 # emit SQL when it rolls back. 

2836 self.is_active = False 

2837 

2838 # but only de-associate from connection if it succeeded 

2839 self._deactivate_from_connection() 

2840 else: 

2841 if self.connection._nested_transaction is self: 

2842 self.connection._invalid_transaction() 

2843 else: 

2844 raise exc.InvalidRequestError( 

2845 "This nested transaction is inactive" 

2846 ) 

2847 

2848 

2849class TwoPhaseTransaction(RootTransaction): 

2850 """Represent a two-phase transaction. 

2851 

2852 A new :class:`.TwoPhaseTransaction` object may be procured 

2853 using the :meth:`_engine.Connection.begin_twophase` method. 

2854 

2855 The interface is the same as that of :class:`.Transaction` 

2856 with the addition of the :meth:`prepare` method. 

2857 

2858 """ 

2859 

2860 __slots__ = ("xid", "_is_prepared") 

2861 

2862 xid: Any 

2863 

2864 def __init__(self, connection: Connection, xid: Any): 

2865 self._is_prepared = False 

2866 self.xid = xid 

2867 super().__init__(connection) 

2868 

2869 def prepare(self) -> None: 

2870 """Prepare this :class:`.TwoPhaseTransaction`. 

2871 

2872 After a PREPARE, the transaction can be committed. 

2873 

2874 """ 

2875 if not self.is_active: 

2876 raise exc.InvalidRequestError("This transaction is inactive") 

2877 self.connection._prepare_twophase_impl(self.xid) 

2878 self._is_prepared = True 

2879 

2880 def _connection_begin_impl(self) -> None: 

2881 self.connection._begin_twophase_impl(self) 

2882 

2883 def _connection_rollback_impl(self) -> None: 

2884 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2885 

2886 def _connection_commit_impl(self) -> None: 

2887 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2888 

2889 

2890class Engine( 

2891 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2892): 

2893 """ 

2894 Connects a :class:`~sqlalchemy.pool.Pool` and 

2895 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2896 source of database connectivity and behavior. 

2897 

2898 An :class:`_engine.Engine` object is instantiated publicly using the 

2899 :func:`~sqlalchemy.create_engine` function. 

2900 

2901 .. seealso:: 

2902 

2903 :doc:`/core/engines` 

2904 

2905 :ref:`connections_toplevel` 

2906 

2907 """ 

2908 

2909 dispatch: dispatcher[ConnectionEventsTarget] 

2910 

2911 _compiled_cache: Optional[CompiledCacheType] 

2912 

2913 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2914 _has_events: bool = False 

2915 _connection_cls: Type[Connection] = Connection 

2916 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2917 _is_future: bool = False 

2918 

2919 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2920 _option_cls: Type[OptionEngine] 

2921 

2922 dialect: Dialect 

2923 pool: Pool 

2924 url: URL 

2925 hide_parameters: bool 

2926 

2927 def __init__( 

2928 self, 

2929 pool: Pool, 

2930 dialect: Dialect, 

2931 url: URL, 

2932 logging_name: Optional[str] = None, 

2933 echo: Optional[_EchoFlagType] = None, 

2934 query_cache_size: int = 500, 

2935 execution_options: Optional[Mapping[str, Any]] = None, 

2936 hide_parameters: bool = False, 

2937 ): 

2938 self.pool = pool 

2939 self.url = url 

2940 self.dialect = dialect 

2941 if logging_name: 

2942 self.logging_name = logging_name 

2943 self.echo = echo 

2944 self.hide_parameters = hide_parameters 

2945 if query_cache_size != 0: 

2946 self._compiled_cache = util.LRUCache( 

2947 query_cache_size, size_alert=self._lru_size_alert 

2948 ) 

2949 else: 

2950 self._compiled_cache = None 

2951 log.instance_logger(self, echoflag=echo) 

2952 if execution_options: 

2953 self.update_execution_options(**execution_options) 

2954 

2955 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2956 if self._should_log_info(): 

2957 self.logger.info( 

2958 "Compiled cache size pruning from %d items to %d. " 

2959 "Increase cache size to reduce the frequency of pruning.", 

2960 len(cache), 

2961 cache.capacity, 

2962 ) 

2963 

2964 @property 

2965 def engine(self) -> Engine: 

2966 """Returns this :class:`.Engine`. 

2967 

2968 Used for legacy schemes that accept :class:`.Connection` / 

2969 :class:`.Engine` objects within the same variable. 

2970 

2971 """ 

2972 return self 

2973 

2974 def clear_compiled_cache(self) -> None: 

2975 """Clear the compiled cache associated with the dialect. 

2976 

2977 This applies **only** to the built-in cache that is established 

2978 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

2979 It will not impact any dictionary caches that were passed via the 

2980 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

2981 

2982 .. versionadded:: 1.4 

2983 

2984 """ 

2985 if self._compiled_cache: 

2986 self._compiled_cache.clear() 

2987 

2988 def update_execution_options(self, **opt: Any) -> None: 

2989 r"""Update the default execution_options dictionary 

2990 of this :class:`_engine.Engine`. 

2991 

2992 The given keys/values in \**opt are added to the 

2993 default execution options that will be used for 

2994 all connections. The initial contents of this dictionary 

2995 can be sent via the ``execution_options`` parameter 

2996 to :func:`_sa.create_engine`. 

2997 

2998 .. seealso:: 

2999 

3000 :meth:`_engine.Connection.execution_options` 

3001 

3002 :meth:`_engine.Engine.execution_options` 

3003 

3004 """ 

3005 self.dispatch.set_engine_execution_options(self, opt) 

3006 self._execution_options = self._execution_options.union(opt) 

3007 self.dialect.set_engine_execution_options(self, opt) 

3008 

3009 @overload 

3010 def execution_options( 

3011 self, 

3012 *, 

3013 compiled_cache: Optional[CompiledCacheType] = ..., 

3014 logging_token: str = ..., 

3015 isolation_level: IsolationLevel = ..., 

3016 insertmanyvalues_page_size: int = ..., 

3017 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3018 **opt: Any, 

3019 ) -> OptionEngine: ... 

3020 

3021 @overload 

3022 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3023 

3024 def execution_options(self, **opt: Any) -> OptionEngine: 

3025 """Return a new :class:`_engine.Engine` that will provide 

3026 :class:`_engine.Connection` objects with the given execution options. 

3027 

3028 The returned :class:`_engine.Engine` remains related to the original 

3029 :class:`_engine.Engine` in that it shares the same connection pool and 

3030 other state: 

3031 

3032 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3033 is the 

3034 same instance. The :meth:`_engine.Engine.dispose` 

3035 method will replace 

3036 the connection pool instance for the parent engine as well 

3037 as this one. 

3038 * Event listeners are "cascaded" - meaning, the new 

3039 :class:`_engine.Engine` 

3040 inherits the events of the parent, and new events can be associated 

3041 with the new :class:`_engine.Engine` individually. 

3042 * The logging configuration and logging_name is copied from the parent 

3043 :class:`_engine.Engine`. 

3044 

3045 The intent of the :meth:`_engine.Engine.execution_options` method is 

3046 to implement schemes where multiple :class:`_engine.Engine` 

3047 objects refer to the same connection pool, but are differentiated 

3048 by options that affect some execution-level behavior for each 

3049 engine. One such example is breaking into separate "reader" and 

3050 "writer" :class:`_engine.Engine` instances, where one 

3051 :class:`_engine.Engine` 

3052 has a lower :term:`isolation level` setting configured or is even 

3053 transaction-disabled using "autocommit". An example of this 

3054 configuration is at :ref:`dbapi_autocommit_multiple`. 

3055 

3056 Another example is one that 

3057 uses a custom option ``shard_id`` which is consumed by an event 

3058 to change the current schema on a database connection:: 

3059 

3060 from sqlalchemy import event 

3061 from sqlalchemy.engine import Engine 

3062 

3063 primary_engine = create_engine("mysql+mysqldb://") 

3064 shard1 = primary_engine.execution_options(shard_id="shard1") 

3065 shard2 = primary_engine.execution_options(shard_id="shard2") 

3066 

3067 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3068 

3069 

3070 @event.listens_for(Engine, "before_cursor_execute") 

3071 def _switch_shard(conn, cursor, stmt, params, context, executemany): 

3072 shard_id = conn.get_execution_options().get("shard_id", "default") 

3073 current_shard = conn.info.get("current_shard", None) 

3074 

3075 if current_shard != shard_id: 

3076 cursor.execute("use %s" % shards[shard_id]) 

3077 conn.info["current_shard"] = shard_id 

3078 

3079 The above recipe illustrates two :class:`_engine.Engine` objects that 

3080 will each serve as factories for :class:`_engine.Connection` objects 

3081 that have pre-established "shard_id" execution options present. A 

3082 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3083 then interprets this execution option to emit a MySQL ``use`` statement 

3084 to switch databases before a statement execution, while at the same 

3085 time keeping track of which database we've established using the 

3086 :attr:`_engine.Connection.info` dictionary. 

3087 

3088 .. seealso:: 

3089 

3090 :meth:`_engine.Connection.execution_options` 

3091 - update execution options 

3092 on a :class:`_engine.Connection` object. 

3093 

3094 :meth:`_engine.Engine.update_execution_options` 

3095 - update the execution 

3096 options for a given :class:`_engine.Engine` in place. 

3097 

3098 :meth:`_engine.Engine.get_execution_options` 

3099 

3100 

3101 """ # noqa: E501 

3102 return self._option_cls(self, opt) 

3103 

3104 def get_execution_options(self) -> _ExecuteOptions: 

3105 """Get the non-SQL options which will take effect during execution. 

3106 

3107 .. seealso:: 

3108 

3109 :meth:`_engine.Engine.execution_options` 

3110 """ 

3111 return self._execution_options 

3112 

3113 @property 

3114 def name(self) -> str: 

3115 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3116 in use by this :class:`Engine`. 

3117 

3118 """ 

3119 

3120 return self.dialect.name 

3121 

3122 @property 

3123 def driver(self) -> str: 

3124 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3125 in use by this :class:`Engine`. 

3126 

3127 """ 

3128 

3129 return self.dialect.driver 

3130 

3131 echo = log.echo_property() 

3132 

3133 def __repr__(self) -> str: 

3134 return "Engine(%r)" % (self.url,) 

3135 

3136 def dispose(self, close: bool = True) -> None: 

3137 """Dispose of the connection pool used by this 

3138 :class:`_engine.Engine`. 

3139 

3140 A new connection pool is created immediately after the old one has been 

3141 disposed. The previous connection pool is disposed either actively, by 

3142 closing out all currently checked-in connections in that pool, or 

3143 passively, by losing references to it but otherwise not closing any 

3144 connections. The latter strategy is more appropriate for an initializer 

3145 in a forked Python process. 

3146 

3147 Event listeners associated with the old pool via :class:`.PoolEvents` 

3148 are **transferred to the new pool**; this is to support the pattern 

3149 by which :class:`.PoolEvents` are set up in terms of the owning 

3150 :class:`.Engine` without the need to refer to the :class:`.Pool` 

3151 directly. 

3152 

3153 :param close: if left at its default of ``True``, has the 

3154 effect of fully closing all **currently checked in** 

3155 database connections. Connections that are still checked out 

3156 will **not** be closed, however they will no longer be associated 

3157 with this :class:`_engine.Engine`, 

3158 so when they are closed individually, eventually the 

3159 :class:`_pool.Pool` which they are associated with will 

3160 be garbage collected and they will be closed out fully, if 

3161 not already closed on checkin. 

3162 

3163 If set to ``False``, the previous connection pool is de-referenced, 

3164 and otherwise not touched in any way. 

3165 

3166 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3167 parameter to allow the replacement of a connection pool in a child 

3168 process without interfering with the connections used by the parent 

3169 process. 

3170 

3171 

3172 .. seealso:: 

3173 

3174 :ref:`engine_disposal` 

3175 

3176 :ref:`pooling_multiprocessing` 

3177 

3178 :meth:`.ConnectionEvents.engine_disposed` 

3179 

3180 """ 

3181 if close: 

3182 self.pool.dispose() 

3183 self.pool = self.pool.recreate() 

3184 self.dispatch.engine_disposed(self) 

3185 

3186 @contextlib.contextmanager 

3187 def _optional_conn_ctx_manager( 

3188 self, connection: Optional[Connection] = None 

3189 ) -> Iterator[Connection]: 

3190 if connection is None: 

3191 with self.connect() as conn: 

3192 yield conn 

3193 else: 

3194 yield connection 

3195 

3196 @contextlib.contextmanager 

3197 def begin(self) -> Iterator[Connection]: 

3198 """Return a context manager delivering a :class:`_engine.Connection` 

3199 with a :class:`.Transaction` established. 

3200 

3201 E.g.:: 

3202 

3203 with engine.begin() as conn: 

3204 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)")) 

3205 conn.execute(text("my_special_procedure(5)")) 

3206 

3207 Upon successful operation, the :class:`.Transaction` 

3208 is committed. If an error is raised, the :class:`.Transaction` 

3209 is rolled back. 

3210 

3211 .. seealso:: 

3212 

3213 :meth:`_engine.Engine.connect` - procure a 

3214 :class:`_engine.Connection` from 

3215 an :class:`_engine.Engine`. 

3216 

3217 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3218 for a particular :class:`_engine.Connection`. 

3219 

3220 """ # noqa: E501 

3221 with self.connect() as conn: 

3222 with conn.begin(): 

3223 yield conn 

3224 

3225 def _run_ddl_visitor( 

3226 self, 

3227 visitorcallable: Type[InvokeDDLBase], 

3228 element: SchemaVisitable, 

3229 **kwargs: Any, 

3230 ) -> None: 

3231 with self.begin() as conn: 

3232 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3233 

3234 def connect(self) -> Connection: 

3235 """Return a new :class:`_engine.Connection` object. 

3236 

3237 The :class:`_engine.Connection` acts as a Python context manager, so 

3238 the typical use of this method looks like:: 

3239 

3240 with engine.connect() as connection: 

3241 connection.execute(text("insert into table values ('foo')")) 

3242 connection.commit() 

3243 

3244 Where above, after the block is completed, the connection is "closed" 

3245 and its underlying DBAPI resources are returned to the connection pool. 

3246 This also has the effect of rolling back any transaction that 

3247 was explicitly begun or was begun via autobegin, and will 

3248 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3249 started and is still in progress. 

3250 

3251 .. seealso:: 

3252 

3253 :meth:`_engine.Engine.begin` 

3254 

3255 """ 

3256 

3257 return self._connection_cls(self) 

3258 

3259 def raw_connection(self) -> PoolProxiedConnection: 

3260 """Return a "raw" DBAPI connection from the connection pool. 

3261 

3262 The returned object is a proxied version of the DBAPI 

3263 connection object used by the underlying driver in use. 

3264 The object will have all the same behavior as the real DBAPI 

3265 connection, except that its ``close()`` method will result in the 

3266 connection being returned to the pool, rather than being closed 

3267 for real. 

3268 

3269 This method provides direct DBAPI connection access for 

3270 special situations when the API provided by 

3271 :class:`_engine.Connection` 

3272 is not needed. When a :class:`_engine.Connection` object is already 

3273 present, the DBAPI connection is available using 

3274 the :attr:`_engine.Connection.connection` accessor. 

3275 

3276 .. seealso:: 

3277 

3278 :ref:`dbapi_connections` 

3279 

3280 """ 

3281 return self.pool.connect() 

3282 

3283 

3284class OptionEngineMixin(log.Identified): 

3285 _sa_propagate_class_events = False 

3286 

3287 dispatch: dispatcher[ConnectionEventsTarget] 

3288 _compiled_cache: Optional[CompiledCacheType] 

3289 dialect: Dialect 

3290 pool: Pool 

3291 url: URL 

3292 hide_parameters: bool 

3293 echo: log.echo_property 

3294 

3295 def __init__( 

3296 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3297 ): 

3298 self._proxied = proxied 

3299 self.url = proxied.url 

3300 self.dialect = proxied.dialect 

3301 self.logging_name = proxied.logging_name 

3302 self.echo = proxied.echo 

3303 self._compiled_cache = proxied._compiled_cache 

3304 self.hide_parameters = proxied.hide_parameters 

3305 log.instance_logger(self, echoflag=self.echo) 

3306 

3307 # note: this will propagate events that are assigned to the parent 

3308 # engine after this OptionEngine is created. Since we share 

3309 # the events of the parent we also disallow class-level events 

3310 # to apply to the OptionEngine class directly. 

3311 # 

3312 # the other way this can work would be to transfer existing 

3313 # events only, using: 

3314 # self.dispatch._update(proxied.dispatch) 

3315 # 

3316 # that might be more appropriate however it would be a behavioral 

3317 # change for logic that assigns events to the parent engine and 

3318 # would like it to take effect for the already-created sub-engine. 

3319 self.dispatch = self.dispatch._join(proxied.dispatch) 

3320 

3321 self._execution_options = proxied._execution_options 

3322 self.update_execution_options(**execution_options) 

3323 

3324 def update_execution_options(self, **opt: Any) -> None: 

3325 raise NotImplementedError() 

3326 

3327 if not typing.TYPE_CHECKING: 

3328 # https://github.com/python/typing/discussions/1095 

3329 

3330 @property 

3331 def pool(self) -> Pool: 

3332 return self._proxied.pool 

3333 

3334 @pool.setter 

3335 def pool(self, pool: Pool) -> None: 

3336 self._proxied.pool = pool 

3337 

3338 @property 

3339 def _has_events(self) -> bool: 

3340 return self._proxied._has_events or self.__dict__.get( 

3341 "_has_events", False 

3342 ) 

3343 

3344 @_has_events.setter 

3345 def _has_events(self, value: bool) -> None: 

3346 self.__dict__["_has_events"] = value 

3347 

3348 

3349class OptionEngine(OptionEngineMixin, Engine): 

3350 def update_execution_options(self, **opt: Any) -> None: 

3351 Engine.update_execution_options(self, **opt) 

3352 

3353 

3354Engine._option_cls = OptionEngine