Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1000 statements  

1# engine/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.""" 

8 

9from __future__ import annotations 

10 

11import contextlib 

12import sys 

13import typing 

14from typing import Any 

15from typing import Callable 

16from typing import cast 

17from typing import Iterable 

18from typing import Iterator 

19from typing import List 

20from typing import Mapping 

21from typing import NoReturn 

22from typing import Optional 

23from typing import overload 

24from typing import Tuple 

25from typing import Type 

26from typing import TypeVar 

27from typing import Union 

28 

29from .interfaces import BindTyping 

30from .interfaces import ConnectionEventsTarget 

31from .interfaces import DBAPICursor 

32from .interfaces import ExceptionContext 

33from .interfaces import ExecuteStyle 

34from .interfaces import ExecutionContext 

35from .interfaces import IsolationLevel 

36from .util import _distill_params_20 

37from .util import _distill_raw_params 

38from .util import TransactionalContext 

39from .. import exc 

40from .. import inspection 

41from .. import log 

42from .. import util 

43from ..sql import compiler 

44from ..sql import util as sql_util 

45from ..util.typing import Never 

46from ..util.typing import TupleAny 

47from ..util.typing import TypeVarTuple 

48from ..util.typing import Unpack 

49 

50if typing.TYPE_CHECKING: 

51 from . import CursorResult 

52 from . import ScalarResult 

53 from .interfaces import _AnyExecuteParams 

54 from .interfaces import _AnyMultiExecuteParams 

55 from .interfaces import _CoreAnyExecuteParams 

56 from .interfaces import _CoreMultiExecuteParams 

57 from .interfaces import _CoreSingleExecuteParams 

58 from .interfaces import _DBAPIAnyExecuteParams 

59 from .interfaces import _DBAPISingleExecuteParams 

60 from .interfaces import _ExecuteOptions 

61 from .interfaces import CompiledCacheType 

62 from .interfaces import CoreExecuteOptionsParameter 

63 from .interfaces import Dialect 

64 from .interfaces import SchemaTranslateMapType 

65 from .reflection import Inspector # noqa 

66 from .url import URL 

67 from ..event import dispatcher 

68 from ..log import _EchoFlagType 

69 from ..pool import _ConnectionFairy 

70 from ..pool import Pool 

71 from ..pool import PoolProxiedConnection 

72 from ..sql import Executable 

73 from ..sql._typing import _InfoType 

74 from ..sql.compiler import Compiled 

75 from ..sql.ddl import ExecutableDDLElement 

76 from ..sql.ddl import InvokeDDLBase 

77 from ..sql.functions import FunctionElement 

78 from ..sql.schema import DefaultGenerator 

79 from ..sql.schema import HasSchemaAttr 

80 from ..sql.schema import SchemaVisitable 

81 from ..sql.selectable import TypedReturnsRows 

82 

83 

84_T = TypeVar("_T", bound=Any) 

85_Ts = TypeVarTuple("_Ts") 

86_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

87NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

88 

89 

90class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

91 """Provides high-level functionality for a wrapped DB-API connection. 

92 

93 The :class:`_engine.Connection` object is procured by calling the 

94 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

95 object, and provides services for execution of SQL statements as well 

96 as transaction control. 

97 

98 The Connection object is **not** thread-safe. While a Connection can be 

99 shared among threads using properly synchronized access, it is still 

100 possible that the underlying DBAPI connection may not support shared 

101 access between threads. Check the DBAPI documentation for details. 

102 

103 The Connection object represents a single DBAPI connection checked out 

104 from the connection pool. In this state, the connection pool has no 

105 affect upon the connection, including its expiration or timeout state. 

106 For the connection pool to properly manage connections, connections 

107 should be returned to the connection pool (i.e. ``connection.close()``) 

108 whenever the connection is not in use. 

109 

110 .. index:: 

111 single: thread safety; Connection 

112 

113 """ 

114 

115 dialect: Dialect 

116 dispatch: dispatcher[ConnectionEventsTarget] 

117 

118 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

119 

120 # used by sqlalchemy.engine.util.TransactionalContext 

121 _trans_context_manager: Optional[TransactionalContext] = None 

122 

123 # legacy as of 2.0, should be eventually deprecated and 

124 # removed. was used in the "pre_ping" recipe that's been in the docs 

125 # a long time 

126 should_close_with_result = False 

127 

128 _dbapi_connection: Optional[PoolProxiedConnection] 

129 

130 _execution_options: _ExecuteOptions 

131 

132 _transaction: Optional[RootTransaction] 

133 _nested_transaction: Optional[NestedTransaction] 

134 

135 def __init__( 

136 self, 

137 engine: Engine, 

138 connection: Optional[PoolProxiedConnection] = None, 

139 _has_events: Optional[bool] = None, 

140 _allow_revalidate: bool = True, 

141 _allow_autobegin: bool = True, 

142 ): 

143 """Construct a new Connection.""" 

144 self.engine = engine 

145 self.dialect = dialect = engine.dialect 

146 

147 if connection is None: 

148 try: 

149 self._dbapi_connection = engine.raw_connection() 

150 except dialect.loaded_dbapi.Error as err: 

151 Connection._handle_dbapi_exception_noconnection( 

152 err, dialect, engine 

153 ) 

154 raise 

155 else: 

156 self._dbapi_connection = connection 

157 

158 self._transaction = self._nested_transaction = None 

159 self.__savepoint_seq = 0 

160 self.__in_begin = False 

161 

162 self.__can_reconnect = _allow_revalidate 

163 self._allow_autobegin = _allow_autobegin 

164 self._echo = self.engine._should_log_info() 

165 

166 if _has_events is None: 

167 # if _has_events is sent explicitly as False, 

168 # then don't join the dispatch of the engine; we don't 

169 # want to handle any of the engine's events in that case. 

170 self.dispatch = self.dispatch._join(engine.dispatch) 

171 self._has_events = _has_events or ( 

172 _has_events is None and engine._has_events 

173 ) 

174 

175 self._execution_options = engine._execution_options 

176 

177 if self._has_events or self.engine._has_events: 

178 self.dispatch.engine_connect(self) 

179 

180 # this can be assigned differently via 

181 # characteristics.LoggingTokenCharacteristic 

182 _message_formatter: Any = None 

183 

184 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

185 fmt = self._message_formatter 

186 

187 if fmt: 

188 message = fmt(message) 

189 

190 if log.STACKLEVEL: 

191 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

192 

193 self.engine.logger.info(message, *arg, **kw) 

194 

195 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

196 fmt = self._message_formatter 

197 

198 if fmt: 

199 message = fmt(message) 

200 

201 if log.STACKLEVEL: 

202 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

203 

204 self.engine.logger.debug(message, *arg, **kw) 

205 

206 @property 

207 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

208 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

209 self._execution_options.get("schema_translate_map", None) 

210 ) 

211 

212 return schema_translate_map 

213 

214 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

215 """Return the schema name for the given schema item taking into 

216 account current schema translate map. 

217 

218 """ 

219 

220 name = obj.schema 

221 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

222 self._execution_options.get("schema_translate_map", None) 

223 ) 

224 

225 if ( 

226 schema_translate_map 

227 and name in schema_translate_map 

228 and obj._use_schema_map 

229 ): 

230 return schema_translate_map[name] 

231 else: 

232 return name 

233 

234 def __enter__(self) -> Connection: 

235 return self 

236 

237 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

238 self.close() 

239 

240 @overload 

241 def execution_options( 

242 self, 

243 *, 

244 compiled_cache: Optional[CompiledCacheType] = ..., 

245 logging_token: str = ..., 

246 isolation_level: IsolationLevel = ..., 

247 no_parameters: bool = False, 

248 stream_results: bool = False, 

249 max_row_buffer: int = ..., 

250 yield_per: int = ..., 

251 insertmanyvalues_page_size: int = ..., 

252 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

253 preserve_rowcount: bool = False, 

254 driver_column_names: bool = False, 

255 **opt: Any, 

256 ) -> Connection: ... 

257 

258 @overload 

259 def execution_options(self, **opt: Any) -> Connection: ... 

260 

261 def execution_options(self, **opt: Any) -> Connection: 

262 r"""Set non-SQL options for the connection which take effect 

263 during execution. 

264 

265 This method modifies this :class:`_engine.Connection` **in-place**; 

266 the return value is the same :class:`_engine.Connection` object 

267 upon which the method is called. Note that this is in contrast 

268 to the behavior of the ``execution_options`` methods on other 

269 objects such as :meth:`_engine.Engine.execution_options` and 

270 :meth:`_sql.Executable.execution_options`. The rationale is that many 

271 such execution options necessarily modify the state of the base 

272 DBAPI connection in any case so there is no feasible means of 

273 keeping the effect of such an option localized to a "sub" connection. 

274 

275 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

276 method, in contrast to other objects with this method, modifies 

277 the connection in-place without creating copy of it. 

278 

279 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

280 method accepts any arbitrary parameters including user defined names. 

281 All parameters given are consumable in a number of ways including 

282 by using the :meth:`_engine.Connection.get_execution_options` method. 

283 See the examples at :meth:`_sql.Executable.execution_options` 

284 and :meth:`_engine.Engine.execution_options`. 

285 

286 The keywords that are currently recognized by SQLAlchemy itself 

287 include all those listed under :meth:`.Executable.execution_options`, 

288 as well as others that are specific to :class:`_engine.Connection`. 

289 

290 :param compiled_cache: Available on: :class:`_engine.Connection`, 

291 :class:`_engine.Engine`. 

292 

293 A dictionary where :class:`.Compiled` objects 

294 will be cached when the :class:`_engine.Connection` 

295 compiles a clause 

296 expression into a :class:`.Compiled` object. This dictionary will 

297 supersede the statement cache that may be configured on the 

298 :class:`_engine.Engine` itself. If set to None, caching 

299 is disabled, even if the engine has a configured cache size. 

300 

301 Note that the ORM makes use of its own "compiled" caches for 

302 some operations, including flush operations. The caching 

303 used by the ORM internally supersedes a cache dictionary 

304 specified here. 

305 

306 :param logging_token: Available on: :class:`_engine.Connection`, 

307 :class:`_engine.Engine`, :class:`_sql.Executable`. 

308 

309 Adds the specified string token surrounded by brackets in log 

310 messages logged by the connection, i.e. the logging that's enabled 

311 either via the :paramref:`_sa.create_engine.echo` flag or via the 

312 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

313 per-connection or per-sub-engine token to be available which is 

314 useful for debugging concurrent connection scenarios. 

315 

316 .. versionadded:: 1.4.0b2 

317 

318 .. seealso:: 

319 

320 :ref:`dbengine_logging_tokens` - usage example 

321 

322 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

323 name used by the Python logger object itself. 

324 

325 :param isolation_level: Available on: :class:`_engine.Connection`, 

326 :class:`_engine.Engine`. 

327 

328 Set the transaction isolation level for the lifespan of this 

329 :class:`_engine.Connection` object. 

330 Valid values include those string 

331 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

332 parameter passed to :func:`_sa.create_engine`. These levels are 

333 semi-database specific; see individual dialect documentation for 

334 valid levels. 

335 

336 The isolation level option applies the isolation level by emitting 

337 statements on the DBAPI connection, and **necessarily affects the 

338 original Connection object overall**. The isolation level will remain 

339 at the given setting until explicitly changed, or when the DBAPI 

340 connection itself is :term:`released` to the connection pool, i.e. the 

341 :meth:`_engine.Connection.close` method is called, at which time an 

342 event handler will emit additional statements on the DBAPI connection 

343 in order to revert the isolation level change. 

344 

345 .. note:: The ``isolation_level`` execution option may only be 

346 established before the :meth:`_engine.Connection.begin` method is 

347 called, as well as before any SQL statements are emitted which 

348 would otherwise trigger "autobegin", or directly after a call to 

349 :meth:`_engine.Connection.commit` or 

350 :meth:`_engine.Connection.rollback`. A database cannot change the 

351 isolation level on a transaction in progress. 

352 

353 .. note:: The ``isolation_level`` execution option is implicitly 

354 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

355 the :meth:`_engine.Connection.invalidate` method, or if a 

356 disconnection error occurs. The new connection produced after the 

357 invalidation will **not** have the selected isolation level 

358 re-applied to it automatically. 

359 

360 .. seealso:: 

361 

362 :ref:`dbapi_autocommit` 

363 

364 :meth:`_engine.Connection.get_isolation_level` 

365 - view current actual level 

366 

367 :param no_parameters: Available on: :class:`_engine.Connection`, 

368 :class:`_sql.Executable`. 

369 

370 When ``True``, if the final parameter 

371 list or dictionary is totally empty, will invoke the 

372 statement on the cursor as ``cursor.execute(statement)``, 

373 not passing the parameter collection at all. 

374 Some DBAPIs such as psycopg2 and mysql-python consider 

375 percent signs as significant only when parameters are 

376 present; this option allows code to generate SQL 

377 containing percent signs (and possibly other characters) 

378 that is neutral regarding whether it's executed by the DBAPI 

379 or piped into a script that's later invoked by 

380 command line tools. 

381 

382 :param stream_results: Available on: :class:`_engine.Connection`, 

383 :class:`_sql.Executable`. 

384 

385 Indicate to the dialect that results should be "streamed" and not 

386 pre-buffered, if possible. For backends such as PostgreSQL, MySQL 

387 and MariaDB, this indicates the use of a "server side cursor" as 

388 opposed to a client side cursor. Other backends such as that of 

389 Oracle Database may already use server side cursors by default. 

390 

391 The usage of 

392 :paramref:`_engine.Connection.execution_options.stream_results` is 

393 usually combined with setting a fixed number of rows to to be fetched 

394 in batches, to allow for efficient iteration of database rows while 

395 at the same time not loading all result rows into memory at once; 

396 this can be configured on a :class:`_engine.Result` object using the 

397 :meth:`_engine.Result.yield_per` method, after execution has 

398 returned a new :class:`_engine.Result`. If 

399 :meth:`_engine.Result.yield_per` is not used, 

400 the :paramref:`_engine.Connection.execution_options.stream_results` 

401 mode of operation will instead use a dynamically sized buffer 

402 which buffers sets of rows at a time, growing on each batch 

403 based on a fixed growth size up until a limit which may 

404 be configured using the 

405 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

406 parameter. 

407 

408 When using the ORM to fetch ORM mapped objects from a result, 

409 :meth:`_engine.Result.yield_per` should always be used with 

410 :paramref:`_engine.Connection.execution_options.stream_results`, 

411 so that the ORM does not fetch all rows into new ORM objects at once. 

412 

413 For typical use, the 

414 :paramref:`_engine.Connection.execution_options.yield_per` execution 

415 option should be preferred, which sets up both 

416 :paramref:`_engine.Connection.execution_options.stream_results` and 

417 :meth:`_engine.Result.yield_per` at once. This option is supported 

418 both at a core level by :class:`_engine.Connection` as well as by the 

419 ORM :class:`_engine.Session`; the latter is described at 

420 :ref:`orm_queryguide_yield_per`. 

421 

422 .. seealso:: 

423 

424 :ref:`engine_stream_results` - background on 

425 :paramref:`_engine.Connection.execution_options.stream_results` 

426 

427 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

428 

429 :paramref:`_engine.Connection.execution_options.yield_per` 

430 

431 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

432 describing the ORM version of ``yield_per`` 

433 

434 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

435 :class:`_sql.Executable`. Sets a maximum 

436 buffer size to use when the 

437 :paramref:`_engine.Connection.execution_options.stream_results` 

438 execution option is used on a backend that supports server side 

439 cursors. The default value if not specified is 1000. 

440 

441 .. seealso:: 

442 

443 :paramref:`_engine.Connection.execution_options.stream_results` 

444 

445 :ref:`engine_stream_results` 

446 

447 

448 :param yield_per: Available on: :class:`_engine.Connection`, 

449 :class:`_sql.Executable`. Integer value applied which will 

450 set the :paramref:`_engine.Connection.execution_options.stream_results` 

451 execution option and invoke :meth:`_engine.Result.yield_per` 

452 automatically at once. Allows equivalent functionality as 

453 is present when using this parameter with the ORM. 

454 

455 .. versionadded:: 1.4.40 

456 

457 .. seealso:: 

458 

459 :ref:`engine_stream_results` - background and examples 

460 on using server side cursors with Core. 

461 

462 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

463 describing the ORM version of ``yield_per`` 

464 

465 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

466 :class:`_engine.Engine`. Number of rows to format into an 

467 INSERT statement when the statement uses "insertmanyvalues" mode, 

468 which is a paged form of bulk insert that is used for many backends 

469 when using :term:`executemany` execution typically in conjunction 

470 with RETURNING. Defaults to 1000. May also be modified on a 

471 per-engine basis using the 

472 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

473 

474 .. versionadded:: 2.0 

475 

476 .. seealso:: 

477 

478 :ref:`engine_insertmanyvalues` 

479 

480 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

481 :class:`_engine.Engine`, :class:`_sql.Executable`. 

482 

483 A dictionary mapping schema names to schema names, that will be 

484 applied to the :paramref:`_schema.Table.schema` element of each 

485 :class:`_schema.Table` 

486 encountered when SQL or DDL expression elements 

487 are compiled into strings; the resulting schema name will be 

488 converted based on presence in the map of the original name. 

489 

490 .. seealso:: 

491 

492 :ref:`schema_translating` 

493 

494 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

495 attribute will be unconditionally memoized within the result and 

496 made available via the :attr:`.CursorResult.rowcount` attribute. 

497 Normally, this attribute is only preserved for UPDATE and DELETE 

498 statements. Using this option, the DBAPIs rowcount value can 

499 be accessed for other kinds of statements such as INSERT and SELECT, 

500 to the degree that the DBAPI supports these statements. See 

501 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

502 of this attribute. 

503 

504 .. versionadded:: 2.0.28 

505 

506 .. seealso:: 

507 

508 :meth:`_engine.Engine.execution_options` 

509 

510 :meth:`.Executable.execution_options` 

511 

512 :meth:`_engine.Connection.get_execution_options` 

513 

514 :ref:`orm_queryguide_execution_options` - documentation on all 

515 ORM-specific execution options 

516 

517 :param driver_column_names: When True, the returned 

518 :class:`_engine.CursorResult` will use the column names as written in 

519 ``cursor.description`` to set up the keys for the result set, 

520 including the names of columns for the :class:`_engine.Row` object as 

521 well as the dictionary keys when using :attr:`_engine.Row._mapping`. 

522 On backends that use "name normalization" such as Oracle Database to 

523 correct for lower case names being converted to all uppercase, this 

524 behavior is turned off and the raw UPPERCASE names in 

525 cursor.description will be present. 

526 

527 .. versionadded:: 2.1 

528 

529 """ # noqa 

530 if self._has_events or self.engine._has_events: 

531 self.dispatch.set_connection_execution_options(self, opt) 

532 self._execution_options = self._execution_options.union(opt) 

533 self.dialect.set_connection_execution_options(self, opt) 

534 return self 

535 

536 def get_execution_options(self) -> _ExecuteOptions: 

537 """Get the non-SQL options which will take effect during execution. 

538 

539 .. seealso:: 

540 

541 :meth:`_engine.Connection.execution_options` 

542 """ 

543 return self._execution_options 

544 

545 @property 

546 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

547 pool_proxied_connection = self._dbapi_connection 

548 return ( 

549 pool_proxied_connection is not None 

550 and pool_proxied_connection.is_valid 

551 ) 

552 

553 @property 

554 def closed(self) -> bool: 

555 """Return True if this connection is closed.""" 

556 

557 return self._dbapi_connection is None and not self.__can_reconnect 

558 

559 @property 

560 def invalidated(self) -> bool: 

561 """Return True if this connection was invalidated. 

562 

563 This does not indicate whether or not the connection was 

564 invalidated at the pool level, however 

565 

566 """ 

567 

568 # prior to 1.4, "invalid" was stored as a state independent of 

569 # "closed", meaning an invalidated connection could be "closed", 

570 # the _dbapi_connection would be None and closed=True, yet the 

571 # "invalid" flag would stay True. This meant that there were 

572 # three separate states (open/valid, closed/valid, closed/invalid) 

573 # when there is really no reason for that; a connection that's 

574 # "closed" does not need to be "invalid". So the state is now 

575 # represented by the two facts alone. 

576 

577 pool_proxied_connection = self._dbapi_connection 

578 return pool_proxied_connection is None and self.__can_reconnect 

579 

580 @property 

581 def connection(self) -> PoolProxiedConnection: 

582 """The underlying DB-API connection managed by this Connection. 

583 

584 This is a SQLAlchemy connection-pool proxied connection 

585 which then has the attribute 

586 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

587 actual driver connection. 

588 

589 .. seealso:: 

590 

591 

592 :ref:`dbapi_connections` 

593 

594 """ 

595 

596 if self._dbapi_connection is None: 

597 try: 

598 return self._revalidate_connection() 

599 except (exc.PendingRollbackError, exc.ResourceClosedError): 

600 raise 

601 except BaseException as e: 

602 self._handle_dbapi_exception(e, None, None, None, None) 

603 else: 

604 return self._dbapi_connection 

605 

606 def get_isolation_level(self) -> IsolationLevel: 

607 """Return the current **actual** isolation level that's present on 

608 the database within the scope of this connection. 

609 

610 This attribute will perform a live SQL operation against the database 

611 in order to procure the current isolation level, so the value returned 

612 is the actual level on the underlying DBAPI connection regardless of 

613 how this state was set. This will be one of the four actual isolation 

614 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

615 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

616 level setting. Third party dialects may also feature additional 

617 isolation level settings. 

618 

619 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

620 isolation level, which is a separate :term:`dbapi` setting that's 

621 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

622 in use, the database connection still has a "traditional" isolation 

623 mode in effect, that is typically one of the four values 

624 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

625 ``SERIALIZABLE``. 

626 

627 Compare to the :attr:`_engine.Connection.default_isolation_level` 

628 accessor which returns the isolation level that is present on the 

629 database at initial connection time. 

630 

631 .. seealso:: 

632 

633 :attr:`_engine.Connection.default_isolation_level` 

634 - view default level 

635 

636 :paramref:`_sa.create_engine.isolation_level` 

637 - set per :class:`_engine.Engine` isolation level 

638 

639 :paramref:`.Connection.execution_options.isolation_level` 

640 - set per :class:`_engine.Connection` isolation level 

641 

642 """ 

643 dbapi_connection = self.connection.dbapi_connection 

644 assert dbapi_connection is not None 

645 try: 

646 return self.dialect.get_isolation_level(dbapi_connection) 

647 except BaseException as e: 

648 self._handle_dbapi_exception(e, None, None, None, None) 

649 

650 @property 

651 def default_isolation_level(self) -> Optional[IsolationLevel]: 

652 """The initial-connection time isolation level associated with the 

653 :class:`_engine.Dialect` in use. 

654 

655 This value is independent of the 

656 :paramref:`.Connection.execution_options.isolation_level` and 

657 :paramref:`.Engine.execution_options.isolation_level` execution 

658 options, and is determined by the :class:`_engine.Dialect` when the 

659 first connection is created, by performing a SQL query against the 

660 database for the current isolation level before any additional commands 

661 have been emitted. 

662 

663 Calling this accessor does not invoke any new SQL queries. 

664 

665 .. seealso:: 

666 

667 :meth:`_engine.Connection.get_isolation_level` 

668 - view current actual isolation level 

669 

670 :paramref:`_sa.create_engine.isolation_level` 

671 - set per :class:`_engine.Engine` isolation level 

672 

673 :paramref:`.Connection.execution_options.isolation_level` 

674 - set per :class:`_engine.Connection` isolation level 

675 

676 """ 

677 return self.dialect.default_isolation_level 

678 

679 def _invalid_transaction(self) -> NoReturn: 

680 raise exc.PendingRollbackError( 

681 "Can't reconnect until invalid %stransaction is rolled " 

682 "back. Please rollback() fully before proceeding" 

683 % ("savepoint " if self._nested_transaction is not None else ""), 

684 code="8s2b", 

685 ) 

686 

687 def _revalidate_connection(self) -> PoolProxiedConnection: 

688 if self.__can_reconnect and self.invalidated: 

689 if self._transaction is not None: 

690 self._invalid_transaction() 

691 self._dbapi_connection = self.engine.raw_connection() 

692 return self._dbapi_connection 

693 raise exc.ResourceClosedError("This Connection is closed") 

694 

695 @property 

696 def info(self) -> _InfoType: 

697 """Info dictionary associated with the underlying DBAPI connection 

698 referred to by this :class:`_engine.Connection`, allowing user-defined 

699 data to be associated with the connection. 

700 

701 The data here will follow along with the DBAPI connection including 

702 after it is returned to the connection pool and used again 

703 in subsequent instances of :class:`_engine.Connection`. 

704 

705 """ 

706 

707 return self.connection.info 

708 

709 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

710 """Invalidate the underlying DBAPI connection associated with 

711 this :class:`_engine.Connection`. 

712 

713 An attempt will be made to close the underlying DBAPI connection 

714 immediately; however if this operation fails, the error is logged 

715 but not raised. The connection is then discarded whether or not 

716 close() succeeded. 

717 

718 Upon the next use (where "use" typically means using the 

719 :meth:`_engine.Connection.execute` method or similar), 

720 this :class:`_engine.Connection` will attempt to 

721 procure a new DBAPI connection using the services of the 

722 :class:`_pool.Pool` as a source of connectivity (e.g. 

723 a "reconnection"). 

724 

725 If a transaction was in progress (e.g. the 

726 :meth:`_engine.Connection.begin` method has been called) when 

727 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

728 level all state associated with this transaction is lost, as 

729 the DBAPI connection is closed. The :class:`_engine.Connection` 

730 will not allow a reconnection to proceed until the 

731 :class:`.Transaction` object is ended, by calling the 

732 :meth:`.Transaction.rollback` method; until that point, any attempt at 

733 continuing to use the :class:`_engine.Connection` will raise an 

734 :class:`~sqlalchemy.exc.InvalidRequestError`. 

735 This is to prevent applications from accidentally 

736 continuing an ongoing transactional operations despite the 

737 fact that the transaction has been lost due to an 

738 invalidation. 

739 

740 The :meth:`_engine.Connection.invalidate` method, 

741 just like auto-invalidation, 

742 will at the connection pool level invoke the 

743 :meth:`_events.PoolEvents.invalidate` event. 

744 

745 :param exception: an optional ``Exception`` instance that's the 

746 reason for the invalidation. is passed along to event handlers 

747 and logging functions. 

748 

749 .. seealso:: 

750 

751 :ref:`pool_connection_invalidation` 

752 

753 """ 

754 

755 if self.invalidated: 

756 return 

757 

758 if self.closed: 

759 raise exc.ResourceClosedError("This Connection is closed") 

760 

761 if self._still_open_and_dbapi_connection_is_valid: 

762 pool_proxied_connection = self._dbapi_connection 

763 assert pool_proxied_connection is not None 

764 pool_proxied_connection.invalidate(exception) 

765 

766 self._dbapi_connection = None 

767 

768 def detach(self) -> None: 

769 """Detach the underlying DB-API connection from its connection pool. 

770 

771 E.g.:: 

772 

773 with engine.connect() as conn: 

774 conn.detach() 

775 conn.execute(text("SET search_path TO schema1, schema2")) 

776 

777 # work with connection 

778 

779 # connection is fully closed (since we used "with:", can 

780 # also call .close()) 

781 

782 This :class:`_engine.Connection` instance will remain usable. 

783 When closed 

784 (or exited from a context manager context as above), 

785 the DB-API connection will be literally closed and not 

786 returned to its originating pool. 

787 

788 This method can be used to insulate the rest of an application 

789 from a modified state on a connection (such as a transaction 

790 isolation level or similar). 

791 

792 """ 

793 

794 if self.closed: 

795 raise exc.ResourceClosedError("This Connection is closed") 

796 

797 pool_proxied_connection = self._dbapi_connection 

798 if pool_proxied_connection is None: 

799 raise exc.InvalidRequestError( 

800 "Can't detach an invalidated Connection" 

801 ) 

802 pool_proxied_connection.detach() 

803 

804 def _autobegin(self) -> None: 

805 if self._allow_autobegin and not self.__in_begin: 

806 self.begin() 

807 

808 def begin(self) -> RootTransaction: 

809 """Begin a transaction prior to autobegin occurring. 

810 

811 E.g.:: 

812 

813 with engine.connect() as conn: 

814 with conn.begin() as trans: 

815 conn.execute(table.insert(), {"username": "sandy"}) 

816 

817 The returned object is an instance of :class:`_engine.RootTransaction`. 

818 This object represents the "scope" of the transaction, 

819 which completes when either the :meth:`_engine.Transaction.rollback` 

820 or :meth:`_engine.Transaction.commit` method is called; the object 

821 also works as a context manager as illustrated above. 

822 

823 The :meth:`_engine.Connection.begin` method begins a 

824 transaction that normally will be begun in any case when the connection 

825 is first used to execute a statement. The reason this method might be 

826 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

827 event at a specific time, or to organize code within the scope of a 

828 connection checkout in terms of context managed blocks, such as:: 

829 

830 with engine.connect() as conn: 

831 with conn.begin(): 

832 conn.execute(...) 

833 conn.execute(...) 

834 

835 with conn.begin(): 

836 conn.execute(...) 

837 conn.execute(...) 

838 

839 The above code is not fundamentally any different in its behavior than 

840 the following code which does not use 

841 :meth:`_engine.Connection.begin`; the below style is known 

842 as "commit as you go" style:: 

843 

844 with engine.connect() as conn: 

845 conn.execute(...) 

846 conn.execute(...) 

847 conn.commit() 

848 

849 conn.execute(...) 

850 conn.execute(...) 

851 conn.commit() 

852 

853 From a database point of view, the :meth:`_engine.Connection.begin` 

854 method does not emit any SQL or change the state of the underlying 

855 DBAPI connection in any way; the Python DBAPI does not have any 

856 concept of explicit transaction begin. 

857 

858 .. seealso:: 

859 

860 :ref:`tutorial_working_with_transactions` - in the 

861 :ref:`unified_tutorial` 

862 

863 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

864 

865 :meth:`_engine.Connection.begin_twophase` - 

866 use a two phase /XID transaction 

867 

868 :meth:`_engine.Engine.begin` - context manager available from 

869 :class:`_engine.Engine` 

870 

871 """ 

872 if self._transaction is None: 

873 self._transaction = RootTransaction(self) 

874 return self._transaction 

875 else: 

876 raise exc.InvalidRequestError( 

877 "This connection has already initialized a SQLAlchemy " 

878 "Transaction() object via begin() or autobegin; can't " 

879 "call begin() here unless rollback() or commit() " 

880 "is called first." 

881 ) 

882 

883 def begin_nested(self) -> NestedTransaction: 

884 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

885 handle that controls the scope of the SAVEPOINT. 

886 

887 E.g.:: 

888 

889 with engine.begin() as connection: 

890 with connection.begin_nested(): 

891 connection.execute(table.insert(), {"username": "sandy"}) 

892 

893 The returned object is an instance of 

894 :class:`_engine.NestedTransaction`, which includes transactional 

895 methods :meth:`_engine.NestedTransaction.commit` and 

896 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

897 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

898 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

899 to the :class:`_engine.NestedTransaction` object and is generated 

900 automatically. Like any other :class:`_engine.Transaction`, the 

901 :class:`_engine.NestedTransaction` may be used as a context manager as 

902 illustrated above which will "release" or "rollback" corresponding to 

903 if the operation within the block were successful or raised an 

904 exception. 

905 

906 Nested transactions require SAVEPOINT support in the underlying 

907 database, else the behavior is undefined. SAVEPOINT is commonly used to 

908 run operations within a transaction that may fail, while continuing the 

909 outer transaction. E.g.:: 

910 

911 from sqlalchemy import exc 

912 

913 with engine.begin() as connection: 

914 trans = connection.begin_nested() 

915 try: 

916 connection.execute(table.insert(), {"username": "sandy"}) 

917 trans.commit() 

918 except exc.IntegrityError: # catch for duplicate username 

919 trans.rollback() # rollback to savepoint 

920 

921 # outer transaction continues 

922 connection.execute(...) 

923 

924 If :meth:`_engine.Connection.begin_nested` is called without first 

925 calling :meth:`_engine.Connection.begin` or 

926 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

927 will "autobegin" the outer transaction first. This outer transaction 

928 may be committed using "commit-as-you-go" style, e.g.:: 

929 

930 with engine.connect() as connection: # begin() wasn't called 

931 

932 with connection.begin_nested(): # will auto-"begin()" first 

933 connection.execute(...) 

934 # savepoint is released 

935 

936 connection.execute(...) 

937 

938 # explicitly commit outer transaction 

939 connection.commit() 

940 

941 # can continue working with connection here 

942 

943 .. versionchanged:: 2.0 

944 

945 :meth:`_engine.Connection.begin_nested` will now participate 

946 in the connection "autobegin" behavior that is new as of 

947 2.0 / "future" style connections in 1.4. 

948 

949 .. seealso:: 

950 

951 :meth:`_engine.Connection.begin` 

952 

953 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

954 

955 """ 

956 if self._transaction is None: 

957 self._autobegin() 

958 

959 return NestedTransaction(self) 

960 

961 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

962 """Begin a two-phase or XA transaction and return a transaction 

963 handle. 

964 

965 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

966 which in addition to the methods provided by 

967 :class:`.Transaction`, also provides a 

968 :meth:`~.TwoPhaseTransaction.prepare` method. 

969 

970 :param xid: the two phase transaction id. If not supplied, a 

971 random id will be generated. The accepted type and value depends on 

972 the driver in use. 

973 

974 .. seealso:: 

975 

976 :meth:`_engine.Connection.begin` 

977 

978 :meth:`_engine.Connection.begin_twophase` 

979 

980 """ 

981 

982 if self._transaction is not None: 

983 raise exc.InvalidRequestError( 

984 "Cannot start a two phase transaction when a transaction " 

985 "is already in progress." 

986 ) 

987 if xid is None: 

988 xid = self.engine.dialect.create_xid() 

989 return TwoPhaseTransaction(self, xid) 

990 

991 def commit(self) -> None: 

992 """Commit the transaction that is currently in progress. 

993 

994 This method commits the current transaction if one has been started. 

995 If no transaction was started, the method has no effect, assuming 

996 the connection is in a non-invalidated state. 

997 

998 A transaction is begun on a :class:`_engine.Connection` automatically 

999 whenever a statement is first executed, or when the 

1000 :meth:`_engine.Connection.begin` method is called. 

1001 

1002 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

1003 the primary database transaction that is linked to the 

1004 :class:`_engine.Connection` object. It does not operate upon a 

1005 SAVEPOINT that would have been invoked from the 

1006 :meth:`_engine.Connection.begin_nested` method; for control of a 

1007 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

1008 :class:`_engine.NestedTransaction` that is returned by the 

1009 :meth:`_engine.Connection.begin_nested` method itself. 

1010 

1011 

1012 """ 

1013 if self._transaction: 

1014 self._transaction.commit() 

1015 

1016 def rollback(self) -> None: 

1017 """Roll back the transaction that is currently in progress. 

1018 

1019 This method rolls back the current transaction if one has been started. 

1020 If no transaction was started, the method has no effect. If a 

1021 transaction was started and the connection is in an invalidated state, 

1022 the transaction is cleared using this method. 

1023 

1024 A transaction is begun on a :class:`_engine.Connection` automatically 

1025 whenever a statement is first executed, or when the 

1026 :meth:`_engine.Connection.begin` method is called. 

1027 

1028 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1029 upon the primary database transaction that is linked to the 

1030 :class:`_engine.Connection` object. It does not operate upon a 

1031 SAVEPOINT that would have been invoked from the 

1032 :meth:`_engine.Connection.begin_nested` method; for control of a 

1033 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1034 :class:`_engine.NestedTransaction` that is returned by the 

1035 :meth:`_engine.Connection.begin_nested` method itself. 

1036 

1037 

1038 """ 

1039 if self._transaction: 

1040 self._transaction.rollback() 

1041 

1042 def recover_twophase(self) -> List[Any]: 

1043 return self.engine.dialect.do_recover_twophase(self) 

1044 

1045 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1046 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1047 

1048 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1049 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1050 

1051 def in_transaction(self) -> bool: 

1052 """Return True if a transaction is in progress.""" 

1053 return self._transaction is not None and self._transaction.is_active 

1054 

1055 def in_nested_transaction(self) -> bool: 

1056 """Return True if a transaction is in progress.""" 

1057 return ( 

1058 self._nested_transaction is not None 

1059 and self._nested_transaction.is_active 

1060 ) 

1061 

1062 def _is_autocommit_isolation(self) -> bool: 

1063 opt_iso = self._execution_options.get("isolation_level", None) 

1064 return bool( 

1065 opt_iso == "AUTOCOMMIT" 

1066 or ( 

1067 opt_iso is None 

1068 and self.engine.dialect._on_connect_isolation_level 

1069 == "AUTOCOMMIT" 

1070 ) 

1071 ) 

1072 

1073 def _get_required_transaction(self) -> RootTransaction: 

1074 trans = self._transaction 

1075 if trans is None: 

1076 raise exc.InvalidRequestError("connection is not in a transaction") 

1077 return trans 

1078 

1079 def _get_required_nested_transaction(self) -> NestedTransaction: 

1080 trans = self._nested_transaction 

1081 if trans is None: 

1082 raise exc.InvalidRequestError( 

1083 "connection is not in a nested transaction" 

1084 ) 

1085 return trans 

1086 

1087 def get_transaction(self) -> Optional[RootTransaction]: 

1088 """Return the current root transaction in progress, if any. 

1089 

1090 .. versionadded:: 1.4 

1091 

1092 """ 

1093 

1094 return self._transaction 

1095 

1096 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1097 """Return the current nested transaction in progress, if any. 

1098 

1099 .. versionadded:: 1.4 

1100 

1101 """ 

1102 return self._nested_transaction 

1103 

1104 def _begin_impl(self, transaction: RootTransaction) -> None: 

1105 if self._echo: 

1106 if self._is_autocommit_isolation(): 

1107 self._log_info( 

1108 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1109 "autocommit mode)" 

1110 ) 

1111 else: 

1112 self._log_info("BEGIN (implicit)") 

1113 

1114 self.__in_begin = True 

1115 

1116 if self._has_events or self.engine._has_events: 

1117 self.dispatch.begin(self) 

1118 

1119 try: 

1120 self.engine.dialect.do_begin(self.connection) 

1121 except BaseException as e: 

1122 self._handle_dbapi_exception(e, None, None, None, None) 

1123 finally: 

1124 self.__in_begin = False 

1125 

1126 def _rollback_impl(self) -> None: 

1127 if self._has_events or self.engine._has_events: 

1128 self.dispatch.rollback(self) 

1129 

1130 if self._still_open_and_dbapi_connection_is_valid: 

1131 if self._echo: 

1132 if self._is_autocommit_isolation(): 

1133 if self.dialect.skip_autocommit_rollback: 

1134 self._log_info( 

1135 "ROLLBACK will be skipped by " 

1136 "skip_autocommit_rollback" 

1137 ) 

1138 else: 

1139 self._log_info( 

1140 "ROLLBACK using DBAPI connection.rollback(); " 

1141 "set skip_autocommit_rollback to prevent fully" 

1142 ) 

1143 else: 

1144 self._log_info("ROLLBACK") 

1145 try: 

1146 self.engine.dialect.do_rollback(self.connection) 

1147 except BaseException as e: 

1148 self._handle_dbapi_exception(e, None, None, None, None) 

1149 

1150 def _commit_impl(self) -> None: 

1151 if self._has_events or self.engine._has_events: 

1152 self.dispatch.commit(self) 

1153 

1154 if self._echo: 

1155 if self._is_autocommit_isolation(): 

1156 self._log_info( 

1157 "COMMIT using DBAPI connection.commit(), " 

1158 "has no effect due to autocommit mode" 

1159 ) 

1160 else: 

1161 self._log_info("COMMIT") 

1162 try: 

1163 self.engine.dialect.do_commit(self.connection) 

1164 except BaseException as e: 

1165 self._handle_dbapi_exception(e, None, None, None, None) 

1166 

1167 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1168 if self._has_events or self.engine._has_events: 

1169 self.dispatch.savepoint(self, name) 

1170 

1171 if name is None: 

1172 self.__savepoint_seq += 1 

1173 name = "sa_savepoint_%s" % self.__savepoint_seq 

1174 self.engine.dialect.do_savepoint(self, name) 

1175 return name 

1176 

1177 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1178 if self._has_events or self.engine._has_events: 

1179 self.dispatch.rollback_savepoint(self, name, None) 

1180 

1181 if self._still_open_and_dbapi_connection_is_valid: 

1182 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1183 

1184 def _release_savepoint_impl(self, name: str) -> None: 

1185 if self._has_events or self.engine._has_events: 

1186 self.dispatch.release_savepoint(self, name, None) 

1187 

1188 self.engine.dialect.do_release_savepoint(self, name) 

1189 

1190 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1191 if self._echo: 

1192 self._log_info("BEGIN TWOPHASE (implicit)") 

1193 if self._has_events or self.engine._has_events: 

1194 self.dispatch.begin_twophase(self, transaction.xid) 

1195 

1196 self.__in_begin = True 

1197 try: 

1198 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1199 except BaseException as e: 

1200 self._handle_dbapi_exception(e, None, None, None, None) 

1201 finally: 

1202 self.__in_begin = False 

1203 

1204 def _prepare_twophase_impl(self, xid: Any) -> None: 

1205 if self._has_events or self.engine._has_events: 

1206 self.dispatch.prepare_twophase(self, xid) 

1207 

1208 assert isinstance(self._transaction, TwoPhaseTransaction) 

1209 try: 

1210 self.engine.dialect.do_prepare_twophase(self, xid) 

1211 except BaseException as e: 

1212 self._handle_dbapi_exception(e, None, None, None, None) 

1213 

1214 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1215 if self._has_events or self.engine._has_events: 

1216 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1217 

1218 if self._still_open_and_dbapi_connection_is_valid: 

1219 assert isinstance(self._transaction, TwoPhaseTransaction) 

1220 try: 

1221 self.engine.dialect.do_rollback_twophase( 

1222 self, xid, is_prepared 

1223 ) 

1224 except BaseException as e: 

1225 self._handle_dbapi_exception(e, None, None, None, None) 

1226 

1227 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1228 if self._has_events or self.engine._has_events: 

1229 self.dispatch.commit_twophase(self, xid, is_prepared) 

1230 

1231 assert isinstance(self._transaction, TwoPhaseTransaction) 

1232 try: 

1233 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1234 except BaseException as e: 

1235 self._handle_dbapi_exception(e, None, None, None, None) 

1236 

1237 def close(self) -> None: 

1238 """Close this :class:`_engine.Connection`. 

1239 

1240 This results in a release of the underlying database 

1241 resources, that is, the DBAPI connection referenced 

1242 internally. The DBAPI connection is typically restored 

1243 back to the connection-holding :class:`_pool.Pool` referenced 

1244 by the :class:`_engine.Engine` that produced this 

1245 :class:`_engine.Connection`. Any transactional state present on 

1246 the DBAPI connection is also unconditionally released via 

1247 the DBAPI connection's ``rollback()`` method, regardless 

1248 of any :class:`.Transaction` object that may be 

1249 outstanding with regards to this :class:`_engine.Connection`. 

1250 

1251 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1252 if any transaction is in place. 

1253 

1254 After :meth:`_engine.Connection.close` is called, the 

1255 :class:`_engine.Connection` is permanently in a closed state, 

1256 and will allow no further operations. 

1257 

1258 """ 

1259 

1260 if self._transaction: 

1261 self._transaction.close() 

1262 skip_reset = True 

1263 else: 

1264 skip_reset = False 

1265 

1266 if self._dbapi_connection is not None: 

1267 conn = self._dbapi_connection 

1268 

1269 # as we just closed the transaction, close the connection 

1270 # pool connection without doing an additional reset 

1271 if skip_reset: 

1272 cast("_ConnectionFairy", conn)._close_special( 

1273 transaction_reset=True 

1274 ) 

1275 else: 

1276 conn.close() 

1277 

1278 # There is a slight chance that conn.close() may have 

1279 # triggered an invalidation here in which case 

1280 # _dbapi_connection would already be None, however usually 

1281 # it will be non-None here and in a "closed" state. 

1282 self._dbapi_connection = None 

1283 self.__can_reconnect = False 

1284 

1285 # special case to handle mypy issue: 

1286 # https://github.com/python/mypy/issues/20651 

1287 @overload 

1288 def scalar( 

1289 self, 

1290 statement: TypedReturnsRows[Never], 

1291 parameters: Optional[_CoreSingleExecuteParams] = None, 

1292 *, 

1293 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1294 ) -> Optional[Any]: ... 

1295 

1296 @overload 

1297 def scalar( 

1298 self, 

1299 statement: TypedReturnsRows[_T], 

1300 parameters: Optional[_CoreSingleExecuteParams] = None, 

1301 *, 

1302 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1303 ) -> Optional[_T]: ... 

1304 

1305 @overload 

1306 def scalar( 

1307 self, 

1308 statement: Executable, 

1309 parameters: Optional[_CoreSingleExecuteParams] = None, 

1310 *, 

1311 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1312 ) -> Any: ... 

1313 

1314 def scalar( 

1315 self, 

1316 statement: Executable, 

1317 parameters: Optional[_CoreSingleExecuteParams] = None, 

1318 *, 

1319 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1320 ) -> Any: 

1321 r"""Executes a SQL statement construct and returns a scalar object. 

1322 

1323 This method is shorthand for invoking the 

1324 :meth:`_engine.Result.scalar` method after invoking the 

1325 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1326 

1327 :return: a scalar Python value representing the first column of the 

1328 first row returned. 

1329 

1330 """ 

1331 distilled_parameters = _distill_params_20(parameters) 

1332 try: 

1333 meth = statement._execute_on_scalar 

1334 except AttributeError as err: 

1335 raise exc.ObjectNotExecutableError(statement) from err 

1336 else: 

1337 return meth( 

1338 self, 

1339 distilled_parameters, 

1340 execution_options or NO_OPTIONS, 

1341 ) 

1342 

1343 @overload 

1344 def scalars( 

1345 self, 

1346 statement: TypedReturnsRows[_T], 

1347 parameters: Optional[_CoreAnyExecuteParams] = None, 

1348 *, 

1349 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1350 ) -> ScalarResult[_T]: ... 

1351 

1352 @overload 

1353 def scalars( 

1354 self, 

1355 statement: Executable, 

1356 parameters: Optional[_CoreAnyExecuteParams] = None, 

1357 *, 

1358 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1359 ) -> ScalarResult[Any]: ... 

1360 

1361 def scalars( 

1362 self, 

1363 statement: Executable, 

1364 parameters: Optional[_CoreAnyExecuteParams] = None, 

1365 *, 

1366 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1367 ) -> ScalarResult[Any]: 

1368 """Executes and returns a scalar result set, which yields scalar values 

1369 from the first column of each row. 

1370 

1371 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1372 to receive a :class:`_result.Result` object, then invoking the 

1373 :meth:`_result.Result.scalars` method to produce a 

1374 :class:`_result.ScalarResult` instance. 

1375 

1376 :return: a :class:`_result.ScalarResult` 

1377 

1378 .. versionadded:: 1.4.24 

1379 

1380 """ 

1381 

1382 return self.execute( 

1383 statement, parameters, execution_options=execution_options 

1384 ).scalars() 

1385 

1386 @overload 

1387 def execute( 

1388 self, 

1389 statement: TypedReturnsRows[Unpack[_Ts]], 

1390 parameters: Optional[_CoreAnyExecuteParams] = None, 

1391 *, 

1392 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1393 ) -> CursorResult[Unpack[_Ts]]: ... 

1394 

1395 @overload 

1396 def execute( 

1397 self, 

1398 statement: Executable, 

1399 parameters: Optional[_CoreAnyExecuteParams] = None, 

1400 *, 

1401 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1402 ) -> CursorResult[Unpack[TupleAny]]: ... 

1403 

1404 def execute( 

1405 self, 

1406 statement: Executable, 

1407 parameters: Optional[_CoreAnyExecuteParams] = None, 

1408 *, 

1409 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1410 ) -> CursorResult[Unpack[TupleAny]]: 

1411 r"""Executes a SQL statement construct and returns a 

1412 :class:`_engine.CursorResult`. 

1413 

1414 :param statement: The statement to be executed. This is always 

1415 an object that is in both the :class:`_expression.ClauseElement` and 

1416 :class:`_expression.Executable` hierarchies, including: 

1417 

1418 * :class:`_expression.Select` 

1419 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1420 :class:`_expression.Delete` 

1421 * :class:`_expression.TextClause` and 

1422 :class:`_expression.TextualSelect` 

1423 * :class:`_schema.DDL` and objects which inherit from 

1424 :class:`_schema.ExecutableDDLElement` 

1425 

1426 :param parameters: parameters which will be bound into the statement. 

1427 This may be either a dictionary of parameter names to values, 

1428 or a mutable sequence (e.g. a list) of dictionaries. When a 

1429 list of dictionaries is passed, the underlying statement execution 

1430 will make use of the DBAPI ``cursor.executemany()`` method. 

1431 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1432 method will be used. 

1433 

1434 :param execution_options: optional dictionary of execution options, 

1435 which will be associated with the statement execution. This 

1436 dictionary can provide a subset of the options that are accepted 

1437 by :meth:`_engine.Connection.execution_options`. 

1438 

1439 :return: a :class:`_engine.Result` object. 

1440 

1441 """ 

1442 distilled_parameters = _distill_params_20(parameters) 

1443 try: 

1444 meth = statement._execute_on_connection 

1445 except AttributeError as err: 

1446 raise exc.ObjectNotExecutableError(statement) from err 

1447 else: 

1448 return meth( 

1449 self, 

1450 distilled_parameters, 

1451 execution_options or NO_OPTIONS, 

1452 ) 

1453 

1454 def _execute_function( 

1455 self, 

1456 func: FunctionElement[Any], 

1457 distilled_parameters: _CoreMultiExecuteParams, 

1458 execution_options: CoreExecuteOptionsParameter, 

1459 ) -> CursorResult[Unpack[TupleAny]]: 

1460 """Execute a sql.FunctionElement object.""" 

1461 

1462 return self._execute_clauseelement( 

1463 func.select(), distilled_parameters, execution_options 

1464 ) 

1465 

1466 def _execute_default( 

1467 self, 

1468 default: DefaultGenerator, 

1469 distilled_parameters: _CoreMultiExecuteParams, 

1470 execution_options: CoreExecuteOptionsParameter, 

1471 ) -> Any: 

1472 """Execute a schema.ColumnDefault object.""" 

1473 

1474 exec_opts = self._execution_options.merge_with(execution_options) 

1475 

1476 event_multiparams: Optional[_CoreMultiExecuteParams] 

1477 event_params: Optional[_CoreAnyExecuteParams] 

1478 

1479 # note for event handlers, the "distilled parameters" which is always 

1480 # a list of dicts is broken out into separate "multiparams" and 

1481 # "params" collections, which allows the handler to distinguish 

1482 # between an executemany and execute style set of parameters. 

1483 if self._has_events or self.engine._has_events: 

1484 ( 

1485 default, 

1486 distilled_parameters, 

1487 event_multiparams, 

1488 event_params, 

1489 ) = self._invoke_before_exec_event( 

1490 default, distilled_parameters, exec_opts 

1491 ) 

1492 else: 

1493 event_multiparams = event_params = None 

1494 

1495 try: 

1496 conn = self._dbapi_connection 

1497 if conn is None: 

1498 conn = self._revalidate_connection() 

1499 

1500 dialect = self.dialect 

1501 ctx = dialect.execution_ctx_cls._init_default( 

1502 dialect, self, conn, exec_opts 

1503 ) 

1504 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1505 raise 

1506 except BaseException as e: 

1507 self._handle_dbapi_exception(e, None, None, None, None) 

1508 

1509 ret = ctx._exec_default(None, default, None) 

1510 

1511 if self._has_events or self.engine._has_events: 

1512 self.dispatch.after_execute( 

1513 self, 

1514 default, 

1515 event_multiparams, 

1516 event_params, 

1517 exec_opts, 

1518 ret, 

1519 ) 

1520 

1521 return ret 

1522 

1523 def _execute_ddl( 

1524 self, 

1525 ddl: ExecutableDDLElement, 

1526 distilled_parameters: _CoreMultiExecuteParams, 

1527 execution_options: CoreExecuteOptionsParameter, 

1528 ) -> CursorResult[Unpack[TupleAny]]: 

1529 """Execute a schema.DDL object.""" 

1530 

1531 exec_opts = ddl._execution_options.merge_with( 

1532 self._execution_options, execution_options 

1533 ) 

1534 

1535 event_multiparams: Optional[_CoreMultiExecuteParams] 

1536 event_params: Optional[_CoreSingleExecuteParams] 

1537 

1538 if self._has_events or self.engine._has_events: 

1539 ( 

1540 ddl, 

1541 distilled_parameters, 

1542 event_multiparams, 

1543 event_params, 

1544 ) = self._invoke_before_exec_event( 

1545 ddl, distilled_parameters, exec_opts 

1546 ) 

1547 else: 

1548 event_multiparams = event_params = None 

1549 

1550 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1551 

1552 dialect = self.dialect 

1553 

1554 compiled = ddl.compile( 

1555 dialect=dialect, schema_translate_map=schema_translate_map 

1556 ) 

1557 ret = self._execute_context( 

1558 dialect, 

1559 dialect.execution_ctx_cls._init_ddl, 

1560 compiled, 

1561 None, 

1562 exec_opts, 

1563 compiled, 

1564 ) 

1565 if self._has_events or self.engine._has_events: 

1566 self.dispatch.after_execute( 

1567 self, 

1568 ddl, 

1569 event_multiparams, 

1570 event_params, 

1571 exec_opts, 

1572 ret, 

1573 ) 

1574 return ret 

1575 

1576 def _invoke_before_exec_event( 

1577 self, 

1578 elem: Any, 

1579 distilled_params: _CoreMultiExecuteParams, 

1580 execution_options: _ExecuteOptions, 

1581 ) -> Tuple[ 

1582 Any, 

1583 _CoreMultiExecuteParams, 

1584 _CoreMultiExecuteParams, 

1585 _CoreSingleExecuteParams, 

1586 ]: 

1587 event_multiparams: _CoreMultiExecuteParams 

1588 event_params: _CoreSingleExecuteParams 

1589 

1590 if len(distilled_params) == 1: 

1591 event_multiparams, event_params = [], distilled_params[0] 

1592 else: 

1593 event_multiparams, event_params = distilled_params, {} 

1594 

1595 for fn in self.dispatch.before_execute: 

1596 elem, event_multiparams, event_params = fn( 

1597 self, 

1598 elem, 

1599 event_multiparams, 

1600 event_params, 

1601 execution_options, 

1602 ) 

1603 

1604 if event_multiparams: 

1605 distilled_params = list(event_multiparams) 

1606 if event_params: 

1607 raise exc.InvalidRequestError( 

1608 "Event handler can't return non-empty multiparams " 

1609 "and params at the same time" 

1610 ) 

1611 elif event_params: 

1612 distilled_params = [event_params] 

1613 else: 

1614 distilled_params = [] 

1615 

1616 return elem, distilled_params, event_multiparams, event_params 

1617 

1618 def _execute_clauseelement( 

1619 self, 

1620 elem: Executable, 

1621 distilled_parameters: _CoreMultiExecuteParams, 

1622 execution_options: CoreExecuteOptionsParameter, 

1623 ) -> CursorResult[Unpack[TupleAny]]: 

1624 """Execute a sql.ClauseElement object.""" 

1625 

1626 exec_opts = elem._execution_options.merge_with( 

1627 self._execution_options, execution_options 

1628 ) 

1629 

1630 has_events = self._has_events or self.engine._has_events 

1631 if has_events: 

1632 ( 

1633 elem, 

1634 distilled_parameters, 

1635 event_multiparams, 

1636 event_params, 

1637 ) = self._invoke_before_exec_event( 

1638 elem, distilled_parameters, exec_opts 

1639 ) 

1640 

1641 if distilled_parameters: 

1642 # ensure we don't retain a link to the view object for keys() 

1643 # which links to the values, which we don't want to cache 

1644 keys = sorted(distilled_parameters[0]) 

1645 for_executemany = len(distilled_parameters) > 1 

1646 else: 

1647 keys = [] 

1648 for_executemany = False 

1649 

1650 dialect = self.dialect 

1651 

1652 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1653 

1654 compiled_cache: Optional[CompiledCacheType] = exec_opts.get( 

1655 "compiled_cache", self.engine._compiled_cache 

1656 ) 

1657 

1658 compiled_sql, extracted_params, param_dict, cache_hit = ( 

1659 elem._compile_w_cache( 

1660 dialect=dialect, 

1661 compiled_cache=compiled_cache, 

1662 column_keys=keys, 

1663 for_executemany=for_executemany, 

1664 schema_translate_map=schema_translate_map, 

1665 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1666 ) 

1667 ) 

1668 ret = self._execute_context( 

1669 dialect, 

1670 dialect.execution_ctx_cls._init_compiled, 

1671 compiled_sql, 

1672 distilled_parameters, 

1673 exec_opts, 

1674 compiled_sql, 

1675 distilled_parameters, 

1676 elem, 

1677 extracted_params, 

1678 cache_hit=cache_hit, 

1679 param_dict=param_dict, 

1680 ) 

1681 if has_events: 

1682 self.dispatch.after_execute( 

1683 self, 

1684 elem, 

1685 event_multiparams, 

1686 event_params, 

1687 exec_opts, 

1688 ret, 

1689 ) 

1690 return ret 

1691 

1692 def exec_driver_sql( 

1693 self, 

1694 statement: str, 

1695 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1696 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1697 ) -> CursorResult[Unpack[TupleAny]]: 

1698 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1699 without any SQL compilation steps. 

1700 

1701 This can be used to pass any string directly to the 

1702 ``cursor.execute()`` method of the DBAPI in use. 

1703 

1704 :param statement: The statement str to be executed. Bound parameters 

1705 must use the underlying DBAPI's paramstyle, such as "qmark", 

1706 "pyformat", "format", etc. 

1707 

1708 :param parameters: represent bound parameter values to be used in the 

1709 execution. The format is one of: a dictionary of named parameters, 

1710 a tuple of positional parameters, or a list containing either 

1711 dictionaries or tuples for multiple-execute support. 

1712 

1713 :return: a :class:`_engine.CursorResult`. 

1714 

1715 E.g. multiple dictionaries:: 

1716 

1717 

1718 conn.exec_driver_sql( 

1719 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1720 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}], 

1721 ) 

1722 

1723 Single dictionary:: 

1724 

1725 conn.exec_driver_sql( 

1726 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1727 dict(id=1, value="v1"), 

1728 ) 

1729 

1730 Single tuple:: 

1731 

1732 conn.exec_driver_sql( 

1733 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1") 

1734 ) 

1735 

1736 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1737 not participate in the 

1738 :meth:`_events.ConnectionEvents.before_execute` and 

1739 :meth:`_events.ConnectionEvents.after_execute` events. To 

1740 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1741 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1742 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1743 

1744 .. seealso:: 

1745 

1746 :pep:`249` 

1747 

1748 """ 

1749 

1750 distilled_parameters = _distill_raw_params(parameters) 

1751 

1752 exec_opts = self._execution_options.merge_with(execution_options) 

1753 

1754 dialect = self.dialect 

1755 ret = self._execute_context( 

1756 dialect, 

1757 dialect.execution_ctx_cls._init_statement, 

1758 statement, 

1759 None, 

1760 exec_opts, 

1761 statement, 

1762 distilled_parameters, 

1763 ) 

1764 

1765 return ret 

1766 

1767 def _execute_context( 

1768 self, 

1769 dialect: Dialect, 

1770 constructor: Callable[..., ExecutionContext], 

1771 statement: Union[str, Compiled], 

1772 parameters: Optional[_AnyMultiExecuteParams], 

1773 execution_options: _ExecuteOptions, 

1774 *args: Any, 

1775 **kw: Any, 

1776 ) -> CursorResult[Unpack[TupleAny]]: 

1777 """Create an :class:`.ExecutionContext` and execute, returning 

1778 a :class:`_engine.CursorResult`.""" 

1779 

1780 if execution_options: 

1781 yp = execution_options.get("yield_per", None) 

1782 if yp: 

1783 execution_options = execution_options.union( 

1784 {"stream_results": True, "max_row_buffer": yp} 

1785 ) 

1786 try: 

1787 conn = self._dbapi_connection 

1788 if conn is None: 

1789 conn = self._revalidate_connection() 

1790 

1791 context = constructor( 

1792 dialect, self, conn, execution_options, *args, **kw 

1793 ) 

1794 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1795 raise 

1796 except BaseException as e: 

1797 self._handle_dbapi_exception( 

1798 e, str(statement), parameters, None, None 

1799 ) 

1800 

1801 if ( 

1802 self._transaction 

1803 and not self._transaction.is_active 

1804 or ( 

1805 self._nested_transaction 

1806 and not self._nested_transaction.is_active 

1807 ) 

1808 ): 

1809 self._invalid_transaction() 

1810 

1811 elif self._trans_context_manager: 

1812 TransactionalContext._trans_ctx_check(self) 

1813 

1814 if self._transaction is None: 

1815 self._autobegin() 

1816 

1817 context.pre_exec() 

1818 

1819 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1820 return self._exec_insertmany_context(dialect, context) 

1821 else: 

1822 return self._exec_single_context( 

1823 dialect, context, statement, parameters 

1824 ) 

1825 

1826 def _exec_single_context( 

1827 self, 

1828 dialect: Dialect, 

1829 context: ExecutionContext, 

1830 statement: Union[str, Compiled], 

1831 parameters: Optional[_AnyMultiExecuteParams], 

1832 ) -> CursorResult[Unpack[TupleAny]]: 

1833 """continue the _execute_context() method for a single DBAPI 

1834 cursor.execute() or cursor.executemany() call. 

1835 

1836 """ 

1837 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1838 generic_setinputsizes = context._prepare_set_input_sizes() 

1839 

1840 if generic_setinputsizes: 

1841 try: 

1842 dialect.do_set_input_sizes( 

1843 context.cursor, generic_setinputsizes, context 

1844 ) 

1845 except BaseException as e: 

1846 self._handle_dbapi_exception( 

1847 e, str(statement), parameters, None, context 

1848 ) 

1849 

1850 cursor, str_statement, parameters = ( 

1851 context.cursor, 

1852 context.statement, 

1853 context.parameters, 

1854 ) 

1855 

1856 effective_parameters: Optional[_AnyExecuteParams] 

1857 

1858 if not context.executemany: 

1859 effective_parameters = parameters[0] 

1860 else: 

1861 effective_parameters = parameters 

1862 

1863 if self._has_events or self.engine._has_events: 

1864 for fn in self.dispatch.before_cursor_execute: 

1865 str_statement, effective_parameters = fn( 

1866 self, 

1867 cursor, 

1868 str_statement, 

1869 effective_parameters, 

1870 context, 

1871 context.executemany, 

1872 ) 

1873 

1874 if self._echo: 

1875 self._log_info(str_statement) 

1876 

1877 stats = context._get_cache_stats() 

1878 

1879 if not self.engine.hide_parameters: 

1880 self._log_info( 

1881 "[%s] %r", 

1882 stats, 

1883 sql_util._repr_params( 

1884 effective_parameters, 

1885 batches=10, 

1886 ismulti=context.executemany, 

1887 ), 

1888 ) 

1889 else: 

1890 self._log_info( 

1891 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1892 stats, 

1893 ) 

1894 

1895 evt_handled: bool = False 

1896 try: 

1897 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1898 effective_parameters = cast( 

1899 "_CoreMultiExecuteParams", effective_parameters 

1900 ) 

1901 if self.dialect._has_events: 

1902 for fn in self.dialect.dispatch.do_executemany: 

1903 if fn( 

1904 cursor, 

1905 str_statement, 

1906 effective_parameters, 

1907 context, 

1908 ): 

1909 evt_handled = True 

1910 break 

1911 if not evt_handled: 

1912 self.dialect.do_executemany( 

1913 cursor, 

1914 str_statement, 

1915 effective_parameters, 

1916 context, 

1917 ) 

1918 elif not effective_parameters and context.no_parameters: 

1919 if self.dialect._has_events: 

1920 for fn in self.dialect.dispatch.do_execute_no_params: 

1921 if fn(cursor, str_statement, context): 

1922 evt_handled = True 

1923 break 

1924 if not evt_handled: 

1925 self.dialect.do_execute_no_params( 

1926 cursor, str_statement, context 

1927 ) 

1928 else: 

1929 effective_parameters = cast( 

1930 "_CoreSingleExecuteParams", effective_parameters 

1931 ) 

1932 if self.dialect._has_events: 

1933 for fn in self.dialect.dispatch.do_execute: 

1934 if fn( 

1935 cursor, 

1936 str_statement, 

1937 effective_parameters, 

1938 context, 

1939 ): 

1940 evt_handled = True 

1941 break 

1942 if not evt_handled: 

1943 self.dialect.do_execute( 

1944 cursor, str_statement, effective_parameters, context 

1945 ) 

1946 

1947 if self._has_events or self.engine._has_events: 

1948 self.dispatch.after_cursor_execute( 

1949 self, 

1950 cursor, 

1951 str_statement, 

1952 effective_parameters, 

1953 context, 

1954 context.executemany, 

1955 ) 

1956 

1957 context.post_exec() 

1958 

1959 result = context._setup_result_proxy() 

1960 

1961 except BaseException as e: 

1962 self._handle_dbapi_exception( 

1963 e, str_statement, effective_parameters, cursor, context 

1964 ) 

1965 

1966 return result 

1967 

1968 def _exec_insertmany_context( 

1969 self, 

1970 dialect: Dialect, 

1971 context: ExecutionContext, 

1972 ) -> CursorResult[Unpack[TupleAny]]: 

1973 """continue the _execute_context() method for an "insertmanyvalues" 

1974 operation, which will invoke DBAPI 

1975 cursor.execute() one or more times with individual log and 

1976 event hook calls. 

1977 

1978 """ 

1979 

1980 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1981 generic_setinputsizes = context._prepare_set_input_sizes() 

1982 else: 

1983 generic_setinputsizes = None 

1984 

1985 cursor, str_statement, parameters = ( 

1986 context.cursor, 

1987 context.statement, 

1988 context.parameters, 

1989 ) 

1990 

1991 effective_parameters = parameters 

1992 

1993 engine_events = self._has_events or self.engine._has_events 

1994 if self.dialect._has_events: 

1995 do_execute_dispatch: Iterable[Any] = ( 

1996 self.dialect.dispatch.do_execute 

1997 ) 

1998 else: 

1999 do_execute_dispatch = () 

2000 

2001 if self._echo: 

2002 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2003 

2004 preserve_rowcount = context.execution_options.get( 

2005 "preserve_rowcount", False 

2006 ) 

2007 rowcount = 0 

2008 

2009 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2010 self, 

2011 cursor, 

2012 str_statement, 

2013 effective_parameters, 

2014 generic_setinputsizes, 

2015 context, 

2016 ): 

2017 if imv_batch.processed_setinputsizes: 

2018 try: 

2019 dialect.do_set_input_sizes( 

2020 context.cursor, 

2021 imv_batch.processed_setinputsizes, 

2022 context, 

2023 ) 

2024 except BaseException as e: 

2025 self._handle_dbapi_exception( 

2026 e, 

2027 sql_util._long_statement(imv_batch.replaced_statement), 

2028 imv_batch.replaced_parameters, 

2029 None, 

2030 context, 

2031 is_sub_exec=True, 

2032 ) 

2033 

2034 sub_stmt = imv_batch.replaced_statement 

2035 sub_params = imv_batch.replaced_parameters 

2036 

2037 if engine_events: 

2038 for fn in self.dispatch.before_cursor_execute: 

2039 sub_stmt, sub_params = fn( 

2040 self, 

2041 cursor, 

2042 sub_stmt, 

2043 sub_params, 

2044 context, 

2045 True, 

2046 ) 

2047 

2048 if self._echo: 

2049 self._log_info(sql_util._long_statement(sub_stmt)) 

2050 

2051 imv_stats = f""" {imv_batch.batchnum}/{ 

2052 imv_batch.total_batches 

2053 } ({ 

2054 'ordered' 

2055 if imv_batch.rows_sorted else 'unordered' 

2056 }{ 

2057 '; batch not supported' 

2058 if imv_batch.is_downgraded 

2059 else '' 

2060 })""" 

2061 

2062 if imv_batch.batchnum == 1: 

2063 stats += imv_stats 

2064 else: 

2065 stats = f"insertmanyvalues{imv_stats}" 

2066 

2067 if not self.engine.hide_parameters: 

2068 self._log_info( 

2069 "[%s] %r", 

2070 stats, 

2071 sql_util._repr_params( 

2072 sub_params, 

2073 batches=10, 

2074 ismulti=False, 

2075 ), 

2076 ) 

2077 else: 

2078 self._log_info( 

2079 "[%s] [SQL parameters hidden due to " 

2080 "hide_parameters=True]", 

2081 stats, 

2082 ) 

2083 

2084 try: 

2085 for fn in do_execute_dispatch: 

2086 if fn( 

2087 cursor, 

2088 sub_stmt, 

2089 sub_params, 

2090 context, 

2091 ): 

2092 break 

2093 else: 

2094 dialect.do_execute( 

2095 cursor, 

2096 sub_stmt, 

2097 sub_params, 

2098 context, 

2099 ) 

2100 except BaseException as e: 

2101 self._handle_dbapi_exception( 

2102 e, 

2103 sql_util._long_statement(sub_stmt), 

2104 sub_params, 

2105 cursor, 

2106 context, 

2107 is_sub_exec=True, 

2108 ) 

2109 

2110 if engine_events: 

2111 self.dispatch.after_cursor_execute( 

2112 self, 

2113 cursor, 

2114 sub_stmt, 

2115 sub_params, 

2116 context, 

2117 context.executemany, 

2118 ) 

2119 

2120 if preserve_rowcount: 

2121 rowcount += imv_batch.current_batch_size 

2122 

2123 try: 

2124 context.post_exec() 

2125 

2126 if preserve_rowcount: 

2127 context._rowcount = rowcount # type: ignore[attr-defined] 

2128 

2129 result = context._setup_result_proxy() 

2130 

2131 except BaseException as e: 

2132 self._handle_dbapi_exception( 

2133 e, str_statement, effective_parameters, cursor, context 

2134 ) 

2135 

2136 return result 

2137 

2138 def _cursor_execute( 

2139 self, 

2140 cursor: DBAPICursor, 

2141 statement: str, 

2142 parameters: _DBAPISingleExecuteParams, 

2143 context: Optional[ExecutionContext] = None, 

2144 ) -> None: 

2145 """Execute a statement + params on the given cursor. 

2146 

2147 Adds appropriate logging and exception handling. 

2148 

2149 This method is used by DefaultDialect for special-case 

2150 executions, such as for sequences and column defaults. 

2151 The path of statement execution in the majority of cases 

2152 terminates at _execute_context(). 

2153 

2154 """ 

2155 if self._has_events or self.engine._has_events: 

2156 for fn in self.dispatch.before_cursor_execute: 

2157 statement, parameters = fn( 

2158 self, cursor, statement, parameters, context, False 

2159 ) 

2160 

2161 if self._echo: 

2162 self._log_info(statement) 

2163 self._log_info("[raw sql] %r", parameters) 

2164 try: 

2165 for fn in ( 

2166 () 

2167 if not self.dialect._has_events 

2168 else self.dialect.dispatch.do_execute 

2169 ): 

2170 if fn(cursor, statement, parameters, context): 

2171 break 

2172 else: 

2173 self.dialect.do_execute(cursor, statement, parameters, context) 

2174 except BaseException as e: 

2175 self._handle_dbapi_exception( 

2176 e, statement, parameters, cursor, context 

2177 ) 

2178 

2179 if self._has_events or self.engine._has_events: 

2180 self.dispatch.after_cursor_execute( 

2181 self, cursor, statement, parameters, context, False 

2182 ) 

2183 

2184 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2185 """Close the given cursor, catching exceptions 

2186 and turning into log warnings. 

2187 

2188 """ 

2189 try: 

2190 cursor.close() 

2191 except Exception: 

2192 # log the error through the connection pool's logger. 

2193 self.engine.pool.logger.error( 

2194 "Error closing cursor", exc_info=True 

2195 ) 

2196 

2197 _reentrant_error = False 

2198 _is_disconnect = False 

2199 

2200 def _handle_dbapi_exception( 

2201 self, 

2202 e: BaseException, 

2203 statement: Optional[str], 

2204 parameters: Optional[_AnyExecuteParams], 

2205 cursor: Optional[DBAPICursor], 

2206 context: Optional[ExecutionContext], 

2207 is_sub_exec: bool = False, 

2208 ) -> NoReturn: 

2209 exc_info = sys.exc_info() 

2210 

2211 is_exit_exception = util.is_exit_exception(e) 

2212 

2213 if not self._is_disconnect: 

2214 self._is_disconnect = ( 

2215 isinstance(e, self.dialect.loaded_dbapi.Error) 

2216 and not self.closed 

2217 and self.dialect.is_disconnect( 

2218 e, 

2219 self._dbapi_connection if not self.invalidated else None, 

2220 cursor, 

2221 ) 

2222 ) or (is_exit_exception and not self.closed) 

2223 

2224 invalidate_pool_on_disconnect = not is_exit_exception 

2225 

2226 ismulti: bool = ( 

2227 not is_sub_exec and context.executemany 

2228 if context is not None 

2229 else False 

2230 ) 

2231 if self._reentrant_error: 

2232 raise exc.DBAPIError.instance( 

2233 statement, 

2234 parameters, 

2235 e, 

2236 self.dialect.loaded_dbapi.Error, 

2237 hide_parameters=self.engine.hide_parameters, 

2238 dialect=self.dialect, 

2239 ismulti=ismulti, 

2240 ).with_traceback(exc_info[2]) from e 

2241 self._reentrant_error = True 

2242 try: 

2243 # non-DBAPI error - if we already got a context, 

2244 # or there's no string statement, don't wrap it 

2245 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2246 statement is not None 

2247 and context is None 

2248 and not is_exit_exception 

2249 ) 

2250 

2251 if should_wrap: 

2252 sqlalchemy_exception = exc.DBAPIError.instance( 

2253 statement, 

2254 parameters, 

2255 cast(Exception, e), 

2256 self.dialect.loaded_dbapi.Error, 

2257 hide_parameters=self.engine.hide_parameters, 

2258 connection_invalidated=self._is_disconnect, 

2259 dialect=self.dialect, 

2260 ismulti=ismulti, 

2261 ) 

2262 else: 

2263 sqlalchemy_exception = None 

2264 

2265 newraise = None 

2266 

2267 if (self.dialect._has_events) and not self._execution_options.get( 

2268 "skip_user_error_events", False 

2269 ): 

2270 ctx = ExceptionContextImpl( 

2271 e, 

2272 sqlalchemy_exception, 

2273 self.engine, 

2274 self.dialect, 

2275 self, 

2276 cursor, 

2277 statement, 

2278 parameters, 

2279 context, 

2280 self._is_disconnect, 

2281 invalidate_pool_on_disconnect, 

2282 False, 

2283 ) 

2284 

2285 for fn in self.dialect.dispatch.handle_error: 

2286 try: 

2287 # handler returns an exception; 

2288 # call next handler in a chain 

2289 per_fn = fn(ctx) 

2290 if per_fn is not None: 

2291 ctx.chained_exception = newraise = per_fn 

2292 except Exception as _raised: 

2293 # handler raises an exception - stop processing 

2294 newraise = _raised 

2295 break 

2296 

2297 if self._is_disconnect != ctx.is_disconnect: 

2298 self._is_disconnect = ctx.is_disconnect 

2299 if sqlalchemy_exception: 

2300 sqlalchemy_exception.connection_invalidated = ( 

2301 ctx.is_disconnect 

2302 ) 

2303 

2304 # set up potentially user-defined value for 

2305 # invalidate pool. 

2306 invalidate_pool_on_disconnect = ( 

2307 ctx.invalidate_pool_on_disconnect 

2308 ) 

2309 

2310 if should_wrap and context: 

2311 context.handle_dbapi_exception(e) 

2312 

2313 if not self._is_disconnect: 

2314 if cursor: 

2315 self._safe_close_cursor(cursor) 

2316 # "autorollback" was mostly relevant in 1.x series. 

2317 # It's very unlikely to reach here, as the connection 

2318 # does autobegin so when we are here, we are usually 

2319 # in an explicit / semi-explicit transaction. 

2320 # however we have a test which manufactures this 

2321 # scenario in any case using an event handler. 

2322 # test/engine/test_execute.py-> test_actual_autorollback 

2323 if not self.in_transaction(): 

2324 self._rollback_impl() 

2325 

2326 if newraise: 

2327 raise newraise.with_traceback(exc_info[2]) from e 

2328 elif should_wrap: 

2329 assert sqlalchemy_exception is not None 

2330 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2331 else: 

2332 assert exc_info[1] is not None 

2333 raise exc_info[1].with_traceback(exc_info[2]) 

2334 finally: 

2335 del self._reentrant_error 

2336 if self._is_disconnect: 

2337 del self._is_disconnect 

2338 if not self.invalidated: 

2339 dbapi_conn_wrapper = self._dbapi_connection 

2340 assert dbapi_conn_wrapper is not None 

2341 if invalidate_pool_on_disconnect: 

2342 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2343 self.invalidate(e) 

2344 

2345 @classmethod 

2346 def _handle_dbapi_exception_noconnection( 

2347 cls, 

2348 e: BaseException, 

2349 dialect: Dialect, 

2350 engine: Optional[Engine] = None, 

2351 is_disconnect: Optional[bool] = None, 

2352 invalidate_pool_on_disconnect: bool = True, 

2353 is_pre_ping: bool = False, 

2354 ) -> NoReturn: 

2355 exc_info = sys.exc_info() 

2356 

2357 if is_disconnect is None: 

2358 is_disconnect = isinstance( 

2359 e, dialect.loaded_dbapi.Error 

2360 ) and dialect.is_disconnect(e, None, None) 

2361 

2362 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2363 

2364 if should_wrap: 

2365 sqlalchemy_exception = exc.DBAPIError.instance( 

2366 None, 

2367 None, 

2368 cast(Exception, e), 

2369 dialect.loaded_dbapi.Error, 

2370 hide_parameters=( 

2371 engine.hide_parameters if engine is not None else False 

2372 ), 

2373 connection_invalidated=is_disconnect, 

2374 dialect=dialect, 

2375 ) 

2376 else: 

2377 sqlalchemy_exception = None 

2378 

2379 newraise = None 

2380 

2381 if dialect._has_events: 

2382 ctx = ExceptionContextImpl( 

2383 e, 

2384 sqlalchemy_exception, 

2385 engine, 

2386 dialect, 

2387 None, 

2388 None, 

2389 None, 

2390 None, 

2391 None, 

2392 is_disconnect, 

2393 invalidate_pool_on_disconnect, 

2394 is_pre_ping, 

2395 ) 

2396 for fn in dialect.dispatch.handle_error: 

2397 try: 

2398 # handler returns an exception; 

2399 # call next handler in a chain 

2400 per_fn = fn(ctx) 

2401 if per_fn is not None: 

2402 ctx.chained_exception = newraise = per_fn 

2403 except Exception as _raised: 

2404 # handler raises an exception - stop processing 

2405 newraise = _raised 

2406 break 

2407 

2408 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2409 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect 

2410 

2411 if newraise: 

2412 raise newraise.with_traceback(exc_info[2]) from e 

2413 elif should_wrap: 

2414 assert sqlalchemy_exception is not None 

2415 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2416 else: 

2417 assert exc_info[1] is not None 

2418 raise exc_info[1].with_traceback(exc_info[2]) 

2419 

2420 def _run_ddl_visitor( 

2421 self, 

2422 visitorcallable: Type[InvokeDDLBase], 

2423 element: SchemaVisitable, 

2424 **kwargs: Any, 

2425 ) -> None: 

2426 """run a DDL visitor. 

2427 

2428 This method is only here so that the MockConnection can change the 

2429 options given to the visitor so that "checkfirst" is skipped. 

2430 

2431 """ 

2432 visitorcallable( 

2433 dialect=self.dialect, connection=self, **kwargs 

2434 ).traverse_single(element) 

2435 

2436 

2437class ExceptionContextImpl(ExceptionContext): 

2438 """Implement the :class:`.ExceptionContext` interface.""" 

2439 

2440 __slots__ = ( 

2441 "connection", 

2442 "engine", 

2443 "dialect", 

2444 "cursor", 

2445 "statement", 

2446 "parameters", 

2447 "original_exception", 

2448 "sqlalchemy_exception", 

2449 "chained_exception", 

2450 "execution_context", 

2451 "is_disconnect", 

2452 "invalidate_pool_on_disconnect", 

2453 "is_pre_ping", 

2454 ) 

2455 

2456 def __init__( 

2457 self, 

2458 exception: BaseException, 

2459 sqlalchemy_exception: Optional[exc.StatementError], 

2460 engine: Optional[Engine], 

2461 dialect: Dialect, 

2462 connection: Optional[Connection], 

2463 cursor: Optional[DBAPICursor], 

2464 statement: Optional[str], 

2465 parameters: Optional[_DBAPIAnyExecuteParams], 

2466 context: Optional[ExecutionContext], 

2467 is_disconnect: bool, 

2468 invalidate_pool_on_disconnect: bool, 

2469 is_pre_ping: bool, 

2470 ): 

2471 self.engine = engine 

2472 self.dialect = dialect 

2473 self.connection = connection 

2474 self.sqlalchemy_exception = sqlalchemy_exception 

2475 self.original_exception = exception 

2476 self.execution_context = context 

2477 self.statement = statement 

2478 self.parameters = parameters 

2479 self.is_disconnect = is_disconnect 

2480 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2481 self.is_pre_ping = is_pre_ping 

2482 

2483 

2484class Transaction(TransactionalContext): 

2485 """Represent a database transaction in progress. 

2486 

2487 The :class:`.Transaction` object is procured by 

2488 calling the :meth:`_engine.Connection.begin` method of 

2489 :class:`_engine.Connection`:: 

2490 

2491 from sqlalchemy import create_engine 

2492 

2493 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2494 connection = engine.connect() 

2495 trans = connection.begin() 

2496 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2497 trans.commit() 

2498 

2499 The object provides :meth:`.rollback` and :meth:`.commit` 

2500 methods in order to control transaction boundaries. It 

2501 also implements a context manager interface so that 

2502 the Python ``with`` statement can be used with the 

2503 :meth:`_engine.Connection.begin` method:: 

2504 

2505 with connection.begin(): 

2506 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2507 

2508 The Transaction object is **not** threadsafe. 

2509 

2510 .. seealso:: 

2511 

2512 :meth:`_engine.Connection.begin` 

2513 

2514 :meth:`_engine.Connection.begin_twophase` 

2515 

2516 :meth:`_engine.Connection.begin_nested` 

2517 

2518 .. index:: 

2519 single: thread safety; Transaction 

2520 """ # noqa 

2521 

2522 __slots__ = () 

2523 

2524 _is_root: bool = False 

2525 is_active: bool 

2526 connection: Connection 

2527 

2528 def __init__(self, connection: Connection): 

2529 raise NotImplementedError() 

2530 

2531 @property 

2532 def _deactivated_from_connection(self) -> bool: 

2533 """True if this transaction is totally deactivated from the connection 

2534 and therefore can no longer affect its state. 

2535 

2536 """ 

2537 raise NotImplementedError() 

2538 

2539 def _do_close(self) -> None: 

2540 raise NotImplementedError() 

2541 

2542 def _do_rollback(self) -> None: 

2543 raise NotImplementedError() 

2544 

2545 def _do_commit(self) -> None: 

2546 raise NotImplementedError() 

2547 

2548 @property 

2549 def is_valid(self) -> bool: 

2550 return self.is_active and not self.connection.invalidated 

2551 

2552 def close(self) -> None: 

2553 """Close this :class:`.Transaction`. 

2554 

2555 If this transaction is the base transaction in a begin/commit 

2556 nesting, the transaction will rollback(). Otherwise, the 

2557 method returns. 

2558 

2559 This is used to cancel a Transaction without affecting the scope of 

2560 an enclosing transaction. 

2561 

2562 """ 

2563 try: 

2564 self._do_close() 

2565 finally: 

2566 assert not self.is_active 

2567 

2568 def rollback(self) -> None: 

2569 """Roll back this :class:`.Transaction`. 

2570 

2571 The implementation of this may vary based on the type of transaction in 

2572 use: 

2573 

2574 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2575 it corresponds to a ROLLBACK. 

2576 

2577 * For a :class:`.NestedTransaction`, it corresponds to a 

2578 "ROLLBACK TO SAVEPOINT" operation. 

2579 

2580 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2581 phase transactions may be used. 

2582 

2583 

2584 """ 

2585 try: 

2586 self._do_rollback() 

2587 finally: 

2588 assert not self.is_active 

2589 

2590 def commit(self) -> None: 

2591 """Commit this :class:`.Transaction`. 

2592 

2593 The implementation of this may vary based on the type of transaction in 

2594 use: 

2595 

2596 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2597 it corresponds to a COMMIT. 

2598 

2599 * For a :class:`.NestedTransaction`, it corresponds to a 

2600 "RELEASE SAVEPOINT" operation. 

2601 

2602 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2603 phase transactions may be used. 

2604 

2605 """ 

2606 try: 

2607 self._do_commit() 

2608 finally: 

2609 assert not self.is_active 

2610 

2611 def _get_subject(self) -> Connection: 

2612 return self.connection 

2613 

2614 def _transaction_is_active(self) -> bool: 

2615 return self.is_active 

2616 

2617 def _transaction_is_closed(self) -> bool: 

2618 return not self._deactivated_from_connection 

2619 

2620 def _rollback_can_be_called(self) -> bool: 

2621 # for RootTransaction / NestedTransaction, it's safe to call 

2622 # rollback() even if the transaction is deactive and no warnings 

2623 # will be emitted. tested in 

2624 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2625 return True 

2626 

2627 

2628class RootTransaction(Transaction): 

2629 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2630 

2631 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2632 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2633 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2634 remains associated with the :class:`_engine.Connection` throughout its 

2635 active span. The current :class:`_engine.RootTransaction` in use is 

2636 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2637 :class:`_engine.Connection`. 

2638 

2639 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2640 "autobegin" behavior that will create a new 

2641 :class:`_engine.RootTransaction` whenever a connection in a 

2642 non-transactional state is used to emit commands on the DBAPI connection. 

2643 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2644 use can be controlled using the :meth:`_engine.Connection.commit` and 

2645 :meth:`_engine.Connection.rollback` methods. 

2646 

2647 

2648 """ 

2649 

2650 _is_root = True 

2651 

2652 __slots__ = ("connection", "is_active") 

2653 

2654 def __init__(self, connection: Connection): 

2655 assert connection._transaction is None 

2656 if connection._trans_context_manager: 

2657 TransactionalContext._trans_ctx_check(connection) 

2658 self.connection = connection 

2659 self._connection_begin_impl() 

2660 connection._transaction = self 

2661 

2662 self.is_active = True 

2663 

2664 def _deactivate_from_connection(self) -> None: 

2665 if self.is_active: 

2666 assert self.connection._transaction is self 

2667 self.is_active = False 

2668 

2669 elif self.connection._transaction is not self: 

2670 util.warn("transaction already deassociated from connection") 

2671 

2672 @property 

2673 def _deactivated_from_connection(self) -> bool: 

2674 return self.connection._transaction is not self 

2675 

2676 def _connection_begin_impl(self) -> None: 

2677 self.connection._begin_impl(self) 

2678 

2679 def _connection_rollback_impl(self) -> None: 

2680 self.connection._rollback_impl() 

2681 

2682 def _connection_commit_impl(self) -> None: 

2683 self.connection._commit_impl() 

2684 

2685 def _close_impl(self, try_deactivate: bool = False) -> None: 

2686 try: 

2687 if self.is_active: 

2688 self._connection_rollback_impl() 

2689 

2690 if self.connection._nested_transaction: 

2691 self.connection._nested_transaction._cancel() 

2692 finally: 

2693 if self.is_active or try_deactivate: 

2694 self._deactivate_from_connection() 

2695 if self.connection._transaction is self: 

2696 self.connection._transaction = None 

2697 

2698 assert not self.is_active 

2699 assert self.connection._transaction is not self 

2700 

2701 def _do_close(self) -> None: 

2702 self._close_impl() 

2703 

2704 def _do_rollback(self) -> None: 

2705 self._close_impl(try_deactivate=True) 

2706 

2707 def _do_commit(self) -> None: 

2708 if self.is_active: 

2709 assert self.connection._transaction is self 

2710 

2711 try: 

2712 self._connection_commit_impl() 

2713 finally: 

2714 # whether or not commit succeeds, cancel any 

2715 # nested transactions, make this transaction "inactive" 

2716 # and remove it as a reset agent 

2717 if self.connection._nested_transaction: 

2718 self.connection._nested_transaction._cancel() 

2719 

2720 self._deactivate_from_connection() 

2721 

2722 # ...however only remove as the connection's current transaction 

2723 # if commit succeeded. otherwise it stays on so that a rollback 

2724 # needs to occur. 

2725 self.connection._transaction = None 

2726 else: 

2727 if self.connection._transaction is self: 

2728 self.connection._invalid_transaction() 

2729 else: 

2730 raise exc.InvalidRequestError("This transaction is inactive") 

2731 

2732 assert not self.is_active 

2733 assert self.connection._transaction is not self 

2734 

2735 

2736class NestedTransaction(Transaction): 

2737 """Represent a 'nested', or SAVEPOINT transaction. 

2738 

2739 The :class:`.NestedTransaction` object is created by calling the 

2740 :meth:`_engine.Connection.begin_nested` method of 

2741 :class:`_engine.Connection`. 

2742 

2743 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2744 "commit" / "rollback" are as follows: 

2745 

2746 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2747 the savepoint is given an explicit name that is part of the state 

2748 of this object. 

2749 

2750 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2751 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2752 with this :class:`.NestedTransaction`. 

2753 

2754 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2755 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2756 associated with this :class:`.NestedTransaction`. 

2757 

2758 The rationale for mimicking the semantics of an outer transaction in 

2759 terms of savepoints so that code may deal with a "savepoint" transaction 

2760 and an "outer" transaction in an agnostic way. 

2761 

2762 .. seealso:: 

2763 

2764 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2765 

2766 """ 

2767 

2768 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2769 

2770 _savepoint: str 

2771 

2772 def __init__(self, connection: Connection): 

2773 assert connection._transaction is not None 

2774 if connection._trans_context_manager: 

2775 TransactionalContext._trans_ctx_check(connection) 

2776 self.connection = connection 

2777 self._savepoint = self.connection._savepoint_impl() 

2778 self.is_active = True 

2779 self._previous_nested = connection._nested_transaction 

2780 connection._nested_transaction = self 

2781 

2782 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2783 if self.connection._nested_transaction is self: 

2784 self.connection._nested_transaction = self._previous_nested 

2785 elif warn: 

2786 util.warn( 

2787 "nested transaction already deassociated from connection" 

2788 ) 

2789 

2790 @property 

2791 def _deactivated_from_connection(self) -> bool: 

2792 return self.connection._nested_transaction is not self 

2793 

2794 def _cancel(self) -> None: 

2795 # called by RootTransaction when the outer transaction is 

2796 # committed, rolled back, or closed to cancel all savepoints 

2797 # without any action being taken 

2798 self.is_active = False 

2799 self._deactivate_from_connection() 

2800 if self._previous_nested: 

2801 self._previous_nested._cancel() 

2802 

2803 def _close_impl( 

2804 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2805 ) -> None: 

2806 try: 

2807 if ( 

2808 self.is_active 

2809 and self.connection._transaction 

2810 and self.connection._transaction.is_active 

2811 ): 

2812 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2813 finally: 

2814 self.is_active = False 

2815 

2816 if deactivate_from_connection: 

2817 self._deactivate_from_connection(warn=warn_already_deactive) 

2818 

2819 assert not self.is_active 

2820 if deactivate_from_connection: 

2821 assert self.connection._nested_transaction is not self 

2822 

2823 def _do_close(self) -> None: 

2824 self._close_impl(True, False) 

2825 

2826 def _do_rollback(self) -> None: 

2827 self._close_impl(True, True) 

2828 

2829 def _do_commit(self) -> None: 

2830 if self.is_active: 

2831 try: 

2832 self.connection._release_savepoint_impl(self._savepoint) 

2833 finally: 

2834 # nested trans becomes inactive on failed release 

2835 # unconditionally. this prevents it from trying to 

2836 # emit SQL when it rolls back. 

2837 self.is_active = False 

2838 

2839 # but only de-associate from connection if it succeeded 

2840 self._deactivate_from_connection() 

2841 else: 

2842 if self.connection._nested_transaction is self: 

2843 self.connection._invalid_transaction() 

2844 else: 

2845 raise exc.InvalidRequestError( 

2846 "This nested transaction is inactive" 

2847 ) 

2848 

2849 

2850class TwoPhaseTransaction(RootTransaction): 

2851 """Represent a two-phase transaction. 

2852 

2853 A new :class:`.TwoPhaseTransaction` object may be procured 

2854 using the :meth:`_engine.Connection.begin_twophase` method. 

2855 

2856 The interface is the same as that of :class:`.Transaction` 

2857 with the addition of the :meth:`prepare` method. 

2858 

2859 """ 

2860 

2861 __slots__ = ("xid", "_is_prepared") 

2862 

2863 xid: Any 

2864 

2865 def __init__(self, connection: Connection, xid: Any): 

2866 self._is_prepared = False 

2867 self.xid = xid 

2868 super().__init__(connection) 

2869 

2870 def prepare(self) -> None: 

2871 """Prepare this :class:`.TwoPhaseTransaction`. 

2872 

2873 After a PREPARE, the transaction can be committed. 

2874 

2875 """ 

2876 if not self.is_active: 

2877 raise exc.InvalidRequestError("This transaction is inactive") 

2878 self.connection._prepare_twophase_impl(self.xid) 

2879 self._is_prepared = True 

2880 

2881 def _connection_begin_impl(self) -> None: 

2882 self.connection._begin_twophase_impl(self) 

2883 

2884 def _connection_rollback_impl(self) -> None: 

2885 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2886 

2887 def _connection_commit_impl(self) -> None: 

2888 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2889 

2890 

2891class Engine( 

2892 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2893): 

2894 """ 

2895 Connects a :class:`~sqlalchemy.pool.Pool` and 

2896 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2897 source of database connectivity and behavior. 

2898 

2899 An :class:`_engine.Engine` object is instantiated publicly using the 

2900 :func:`~sqlalchemy.create_engine` function. 

2901 

2902 .. seealso:: 

2903 

2904 :doc:`/core/engines` 

2905 

2906 :ref:`connections_toplevel` 

2907 

2908 """ 

2909 

2910 dispatch: dispatcher[ConnectionEventsTarget] 

2911 

2912 _compiled_cache: Optional[CompiledCacheType] 

2913 

2914 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2915 _has_events: bool = False 

2916 _connection_cls: Type[Connection] = Connection 

2917 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2918 _is_future: bool = False 

2919 

2920 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2921 _option_cls: Type[OptionEngine] 

2922 

2923 dialect: Dialect 

2924 pool: Pool 

2925 url: URL 

2926 hide_parameters: bool 

2927 

2928 def __init__( 

2929 self, 

2930 pool: Pool, 

2931 dialect: Dialect, 

2932 url: URL, 

2933 logging_name: Optional[str] = None, 

2934 echo: Optional[_EchoFlagType] = None, 

2935 query_cache_size: int = 500, 

2936 execution_options: Optional[Mapping[str, Any]] = None, 

2937 hide_parameters: bool = False, 

2938 ): 

2939 self.pool = pool 

2940 self.url = url 

2941 self.dialect = dialect 

2942 if logging_name: 

2943 self.logging_name = logging_name 

2944 self.echo = echo 

2945 self.hide_parameters = hide_parameters 

2946 if query_cache_size != 0: 

2947 self._compiled_cache = util.LRUCache( 

2948 query_cache_size, size_alert=self._lru_size_alert 

2949 ) 

2950 else: 

2951 self._compiled_cache = None 

2952 log.instance_logger(self, echoflag=echo) 

2953 if execution_options: 

2954 self.update_execution_options(**execution_options) 

2955 

2956 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2957 if self._should_log_info(): 

2958 self.logger.info( 

2959 "Compiled cache size pruning from %d items to %d. " 

2960 "Increase cache size to reduce the frequency of pruning.", 

2961 len(cache), 

2962 cache.capacity, 

2963 ) 

2964 

2965 @property 

2966 def engine(self) -> Engine: 

2967 """Returns this :class:`.Engine`. 

2968 

2969 Used for legacy schemes that accept :class:`.Connection` / 

2970 :class:`.Engine` objects within the same variable. 

2971 

2972 """ 

2973 return self 

2974 

2975 def clear_compiled_cache(self) -> None: 

2976 """Clear the compiled cache associated with the dialect. 

2977 

2978 This applies **only** to the built-in cache that is established 

2979 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

2980 It will not impact any dictionary caches that were passed via the 

2981 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

2982 

2983 .. versionadded:: 1.4 

2984 

2985 """ 

2986 if self._compiled_cache: 

2987 self._compiled_cache.clear() 

2988 

2989 def update_execution_options(self, **opt: Any) -> None: 

2990 r"""Update the default execution_options dictionary 

2991 of this :class:`_engine.Engine`. 

2992 

2993 The given keys/values in \**opt are added to the 

2994 default execution options that will be used for 

2995 all connections. The initial contents of this dictionary 

2996 can be sent via the ``execution_options`` parameter 

2997 to :func:`_sa.create_engine`. 

2998 

2999 .. seealso:: 

3000 

3001 :meth:`_engine.Connection.execution_options` 

3002 

3003 :meth:`_engine.Engine.execution_options` 

3004 

3005 """ 

3006 self.dispatch.set_engine_execution_options(self, opt) 

3007 self._execution_options = self._execution_options.union(opt) 

3008 self.dialect.set_engine_execution_options(self, opt) 

3009 

3010 @overload 

3011 def execution_options( 

3012 self, 

3013 *, 

3014 compiled_cache: Optional[CompiledCacheType] = ..., 

3015 logging_token: str = ..., 

3016 isolation_level: IsolationLevel = ..., 

3017 insertmanyvalues_page_size: int = ..., 

3018 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3019 **opt: Any, 

3020 ) -> OptionEngine: ... 

3021 

3022 @overload 

3023 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3024 

3025 def execution_options(self, **opt: Any) -> OptionEngine: 

3026 """Return a new :class:`_engine.Engine` that will provide 

3027 :class:`_engine.Connection` objects with the given execution options. 

3028 

3029 The returned :class:`_engine.Engine` remains related to the original 

3030 :class:`_engine.Engine` in that it shares the same connection pool and 

3031 other state: 

3032 

3033 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3034 is the 

3035 same instance. The :meth:`_engine.Engine.dispose` 

3036 method will replace 

3037 the connection pool instance for the parent engine as well 

3038 as this one. 

3039 * Event listeners are "cascaded" - meaning, the new 

3040 :class:`_engine.Engine` 

3041 inherits the events of the parent, and new events can be associated 

3042 with the new :class:`_engine.Engine` individually. 

3043 * The logging configuration and logging_name is copied from the parent 

3044 :class:`_engine.Engine`. 

3045 

3046 The intent of the :meth:`_engine.Engine.execution_options` method is 

3047 to implement schemes where multiple :class:`_engine.Engine` 

3048 objects refer to the same connection pool, but are differentiated 

3049 by options that affect some execution-level behavior for each 

3050 engine. One such example is breaking into separate "reader" and 

3051 "writer" :class:`_engine.Engine` instances, where one 

3052 :class:`_engine.Engine` 

3053 has a lower :term:`isolation level` setting configured or is even 

3054 transaction-disabled using "autocommit". An example of this 

3055 configuration is at :ref:`dbapi_autocommit_multiple`. 

3056 

3057 Another example is one that 

3058 uses a custom option ``shard_id`` which is consumed by an event 

3059 to change the current schema on a database connection:: 

3060 

3061 from sqlalchemy import event 

3062 from sqlalchemy.engine import Engine 

3063 

3064 primary_engine = create_engine("mysql+mysqldb://") 

3065 shard1 = primary_engine.execution_options(shard_id="shard1") 

3066 shard2 = primary_engine.execution_options(shard_id="shard2") 

3067 

3068 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3069 

3070 

3071 @event.listens_for(Engine, "before_cursor_execute") 

3072 def _switch_shard(conn, cursor, stmt, params, context, executemany): 

3073 shard_id = conn.get_execution_options().get("shard_id", "default") 

3074 current_shard = conn.info.get("current_shard", None) 

3075 

3076 if current_shard != shard_id: 

3077 cursor.execute("use %s" % shards[shard_id]) 

3078 conn.info["current_shard"] = shard_id 

3079 

3080 The above recipe illustrates two :class:`_engine.Engine` objects that 

3081 will each serve as factories for :class:`_engine.Connection` objects 

3082 that have pre-established "shard_id" execution options present. A 

3083 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3084 then interprets this execution option to emit a MySQL ``use`` statement 

3085 to switch databases before a statement execution, while at the same 

3086 time keeping track of which database we've established using the 

3087 :attr:`_engine.Connection.info` dictionary. 

3088 

3089 .. seealso:: 

3090 

3091 :meth:`_engine.Connection.execution_options` 

3092 - update execution options 

3093 on a :class:`_engine.Connection` object. 

3094 

3095 :meth:`_engine.Engine.update_execution_options` 

3096 - update the execution 

3097 options for a given :class:`_engine.Engine` in place. 

3098 

3099 :meth:`_engine.Engine.get_execution_options` 

3100 

3101 

3102 """ # noqa: E501 

3103 return self._option_cls(self, opt) 

3104 

3105 def get_execution_options(self) -> _ExecuteOptions: 

3106 """Get the non-SQL options which will take effect during execution. 

3107 

3108 .. seealso:: 

3109 

3110 :meth:`_engine.Engine.execution_options` 

3111 """ 

3112 return self._execution_options 

3113 

3114 @property 

3115 def name(self) -> str: 

3116 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3117 in use by this :class:`Engine`. 

3118 

3119 """ 

3120 

3121 return self.dialect.name 

3122 

3123 @property 

3124 def driver(self) -> str: 

3125 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3126 in use by this :class:`Engine`. 

3127 

3128 """ 

3129 

3130 return self.dialect.driver 

3131 

3132 echo = log.echo_property() 

3133 

3134 def __repr__(self) -> str: 

3135 return "Engine(%r)" % (self.url,) 

3136 

3137 def dispose(self, close: bool = True) -> None: 

3138 """Dispose of the connection pool used by this 

3139 :class:`_engine.Engine`. 

3140 

3141 A new connection pool is created immediately after the old one has been 

3142 disposed. The previous connection pool is disposed either actively, by 

3143 closing out all currently checked-in connections in that pool, or 

3144 passively, by losing references to it but otherwise not closing any 

3145 connections. The latter strategy is more appropriate for an initializer 

3146 in a forked Python process. 

3147 

3148 Event listeners associated with the old pool via :class:`.PoolEvents` 

3149 are **transferred to the new pool**; this is to support the pattern 

3150 by which :class:`.PoolEvents` are set up in terms of the owning 

3151 :class:`.Engine` without the need to refer to the :class:`.Pool` 

3152 directly. 

3153 

3154 :param close: if left at its default of ``True``, has the 

3155 effect of fully closing all **currently checked in** 

3156 database connections. Connections that are still checked out 

3157 will **not** be closed, however they will no longer be associated 

3158 with this :class:`_engine.Engine`, 

3159 so when they are closed individually, eventually the 

3160 :class:`_pool.Pool` which they are associated with will 

3161 be garbage collected and they will be closed out fully, if 

3162 not already closed on checkin. 

3163 

3164 If set to ``False``, the previous connection pool is de-referenced, 

3165 and otherwise not touched in any way. 

3166 

3167 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3168 parameter to allow the replacement of a connection pool in a child 

3169 process without interfering with the connections used by the parent 

3170 process. 

3171 

3172 

3173 .. seealso:: 

3174 

3175 :ref:`engine_disposal` 

3176 

3177 :ref:`pooling_multiprocessing` 

3178 

3179 :meth:`.ConnectionEvents.engine_disposed` 

3180 

3181 """ 

3182 if close: 

3183 self.pool.dispose() 

3184 self.pool = self.pool.recreate() 

3185 self.dispatch.engine_disposed(self) 

3186 

3187 @contextlib.contextmanager 

3188 def _optional_conn_ctx_manager( 

3189 self, connection: Optional[Connection] = None 

3190 ) -> Iterator[Connection]: 

3191 if connection is None: 

3192 with self.connect() as conn: 

3193 yield conn 

3194 else: 

3195 yield connection 

3196 

3197 @contextlib.contextmanager 

3198 def begin(self) -> Iterator[Connection]: 

3199 """Return a context manager delivering a :class:`_engine.Connection` 

3200 with a :class:`.Transaction` established. 

3201 

3202 E.g.:: 

3203 

3204 with engine.begin() as conn: 

3205 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)")) 

3206 conn.execute(text("my_special_procedure(5)")) 

3207 

3208 Upon successful operation, the :class:`.Transaction` 

3209 is committed. If an error is raised, the :class:`.Transaction` 

3210 is rolled back. 

3211 

3212 .. seealso:: 

3213 

3214 :meth:`_engine.Engine.connect` - procure a 

3215 :class:`_engine.Connection` from 

3216 an :class:`_engine.Engine`. 

3217 

3218 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3219 for a particular :class:`_engine.Connection`. 

3220 

3221 """ # noqa: E501 

3222 with self.connect() as conn: 

3223 with conn.begin(): 

3224 yield conn 

3225 

3226 def _run_ddl_visitor( 

3227 self, 

3228 visitorcallable: Type[InvokeDDLBase], 

3229 element: SchemaVisitable, 

3230 **kwargs: Any, 

3231 ) -> None: 

3232 with self.begin() as conn: 

3233 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3234 

3235 def connect(self) -> Connection: 

3236 """Return a new :class:`_engine.Connection` object. 

3237 

3238 The :class:`_engine.Connection` acts as a Python context manager, so 

3239 the typical use of this method looks like:: 

3240 

3241 with engine.connect() as connection: 

3242 connection.execute(text("insert into table values ('foo')")) 

3243 connection.commit() 

3244 

3245 Where above, after the block is completed, the connection is "closed" 

3246 and its underlying DBAPI resources are returned to the connection pool. 

3247 This also has the effect of rolling back any transaction that 

3248 was explicitly begun or was begun via autobegin, and will 

3249 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3250 started and is still in progress. 

3251 

3252 .. seealso:: 

3253 

3254 :meth:`_engine.Engine.begin` 

3255 

3256 """ 

3257 

3258 return self._connection_cls(self) 

3259 

3260 def raw_connection(self) -> PoolProxiedConnection: 

3261 """Return a "raw" DBAPI connection from the connection pool. 

3262 

3263 The returned object is a proxied version of the DBAPI 

3264 connection object used by the underlying driver in use. 

3265 The object will have all the same behavior as the real DBAPI 

3266 connection, except that its ``close()`` method will result in the 

3267 connection being returned to the pool, rather than being closed 

3268 for real. 

3269 

3270 This method provides direct DBAPI connection access for 

3271 special situations when the API provided by 

3272 :class:`_engine.Connection` 

3273 is not needed. When a :class:`_engine.Connection` object is already 

3274 present, the DBAPI connection is available using 

3275 the :attr:`_engine.Connection.connection` accessor. 

3276 

3277 .. seealso:: 

3278 

3279 :ref:`dbapi_connections` 

3280 

3281 """ 

3282 return self.pool.connect() 

3283 

3284 

3285class OptionEngineMixin(log.Identified): 

3286 _sa_propagate_class_events = False 

3287 

3288 dispatch: dispatcher[ConnectionEventsTarget] 

3289 _compiled_cache: Optional[CompiledCacheType] 

3290 dialect: Dialect 

3291 pool: Pool 

3292 url: URL 

3293 hide_parameters: bool 

3294 echo: log.echo_property 

3295 

3296 def __init__( 

3297 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3298 ): 

3299 self._proxied = proxied 

3300 self.url = proxied.url 

3301 self.dialect = proxied.dialect 

3302 self.logging_name = proxied.logging_name 

3303 self.echo = proxied.echo 

3304 self._compiled_cache = proxied._compiled_cache 

3305 self.hide_parameters = proxied.hide_parameters 

3306 log.instance_logger(self, echoflag=self.echo) 

3307 

3308 # note: this will propagate events that are assigned to the parent 

3309 # engine after this OptionEngine is created. Since we share 

3310 # the events of the parent we also disallow class-level events 

3311 # to apply to the OptionEngine class directly. 

3312 # 

3313 # the other way this can work would be to transfer existing 

3314 # events only, using: 

3315 # self.dispatch._update(proxied.dispatch) 

3316 # 

3317 # that might be more appropriate however it would be a behavioral 

3318 # change for logic that assigns events to the parent engine and 

3319 # would like it to take effect for the already-created sub-engine. 

3320 self.dispatch = self.dispatch._join(proxied.dispatch) 

3321 

3322 self._execution_options = proxied._execution_options 

3323 self.update_execution_options(**execution_options) 

3324 

3325 def update_execution_options(self, **opt: Any) -> None: 

3326 raise NotImplementedError() 

3327 

3328 if not typing.TYPE_CHECKING: 

3329 # https://github.com/python/typing/discussions/1095 

3330 

3331 @property 

3332 def pool(self) -> Pool: 

3333 return self._proxied.pool 

3334 

3335 @pool.setter 

3336 def pool(self, pool: Pool) -> None: 

3337 self._proxied.pool = pool 

3338 

3339 @property 

3340 def _has_events(self) -> bool: 

3341 return self._proxied._has_events or self.__dict__.get( 

3342 "_has_events", False 

3343 ) 

3344 

3345 @_has_events.setter 

3346 def _has_events(self, value: bool) -> None: 

3347 self.__dict__["_has_events"] = value 

3348 

3349 

3350class OptionEngine(OptionEngineMixin, Engine): 

3351 def update_execution_options(self, **opt: Any) -> None: 

3352 Engine.update_execution_options(self, **opt) 

3353 

3354 

3355Engine._option_cls = OptionEngine