Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1004 statements  

1# engine/base.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`. 

8 

9""" 

10from __future__ import annotations 

11 

12import contextlib 

13import sys 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Iterable 

19from typing import Iterator 

20from typing import List 

21from typing import Mapping 

22from typing import NoReturn 

23from typing import Optional 

24from typing import overload 

25from typing import Tuple 

26from typing import Type 

27from typing import TypeVar 

28from typing import Union 

29 

30from .interfaces import BindTyping 

31from .interfaces import ConnectionEventsTarget 

32from .interfaces import DBAPICursor 

33from .interfaces import ExceptionContext 

34from .interfaces import ExecuteStyle 

35from .interfaces import ExecutionContext 

36from .interfaces import IsolationLevel 

37from .util import _distill_params_20 

38from .util import _distill_raw_params 

39from .util import TransactionalContext 

40from .. import exc 

41from .. import inspection 

42from .. import log 

43from .. import util 

44from ..sql import compiler 

45from ..sql import util as sql_util 

46from ..util.typing import TupleAny 

47from ..util.typing import TypeVarTuple 

48from ..util.typing import Unpack 

49 

50if typing.TYPE_CHECKING: 

51 from . import CursorResult 

52 from . import ScalarResult 

53 from .interfaces import _AnyExecuteParams 

54 from .interfaces import _AnyMultiExecuteParams 

55 from .interfaces import _CoreAnyExecuteParams 

56 from .interfaces import _CoreMultiExecuteParams 

57 from .interfaces import _CoreSingleExecuteParams 

58 from .interfaces import _DBAPIAnyExecuteParams 

59 from .interfaces import _DBAPISingleExecuteParams 

60 from .interfaces import _ExecuteOptions 

61 from .interfaces import CompiledCacheType 

62 from .interfaces import CoreExecuteOptionsParameter 

63 from .interfaces import Dialect 

64 from .interfaces import SchemaTranslateMapType 

65 from .reflection import Inspector # noqa 

66 from .url import URL 

67 from ..event import dispatcher 

68 from ..log import _EchoFlagType 

69 from ..pool import _ConnectionFairy 

70 from ..pool import Pool 

71 from ..pool import PoolProxiedConnection 

72 from ..sql import Executable 

73 from ..sql._typing import _InfoType 

74 from ..sql.compiler import Compiled 

75 from ..sql.ddl import ExecutableDDLElement 

76 from ..sql.ddl import SchemaDropper 

77 from ..sql.ddl import SchemaGenerator 

78 from ..sql.functions import FunctionElement 

79 from ..sql.schema import DefaultGenerator 

80 from ..sql.schema import HasSchemaAttr 

81 from ..sql.schema import SchemaItem 

82 from ..sql.selectable import TypedReturnsRows 

83 

84 

85_T = TypeVar("_T", bound=Any) 

86_Ts = TypeVarTuple("_Ts") 

87_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

88NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

89 

90 

91class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

92 """Provides high-level functionality for a wrapped DB-API connection. 

93 

94 The :class:`_engine.Connection` object is procured by calling the 

95 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

96 object, and provides services for execution of SQL statements as well 

97 as transaction control. 

98 

99 The Connection object is **not** thread-safe. While a Connection can be 

100 shared among threads using properly synchronized access, it is still 

101 possible that the underlying DBAPI connection may not support shared 

102 access between threads. Check the DBAPI documentation for details. 

103 

104 The Connection object represents a single DBAPI connection checked out 

105 from the connection pool. In this state, the connection pool has no 

106 affect upon the connection, including its expiration or timeout state. 

107 For the connection pool to properly manage connections, connections 

108 should be returned to the connection pool (i.e. ``connection.close()``) 

109 whenever the connection is not in use. 

110 

111 .. index:: 

112 single: thread safety; Connection 

113 

114 """ 

115 

116 dialect: Dialect 

117 dispatch: dispatcher[ConnectionEventsTarget] 

118 

119 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

120 

121 # used by sqlalchemy.engine.util.TransactionalContext 

122 _trans_context_manager: Optional[TransactionalContext] = None 

123 

124 # legacy as of 2.0, should be eventually deprecated and 

125 # removed. was used in the "pre_ping" recipe that's been in the docs 

126 # a long time 

127 should_close_with_result = False 

128 

129 _dbapi_connection: Optional[PoolProxiedConnection] 

130 

131 _execution_options: _ExecuteOptions 

132 

133 _transaction: Optional[RootTransaction] 

134 _nested_transaction: Optional[NestedTransaction] 

135 

136 def __init__( 

137 self, 

138 engine: Engine, 

139 connection: Optional[PoolProxiedConnection] = None, 

140 _has_events: Optional[bool] = None, 

141 _allow_revalidate: bool = True, 

142 _allow_autobegin: bool = True, 

143 ): 

144 """Construct a new Connection.""" 

145 self.engine = engine 

146 self.dialect = dialect = engine.dialect 

147 

148 if connection is None: 

149 try: 

150 self._dbapi_connection = engine.raw_connection() 

151 except dialect.loaded_dbapi.Error as err: 

152 Connection._handle_dbapi_exception_noconnection( 

153 err, dialect, engine 

154 ) 

155 raise 

156 else: 

157 self._dbapi_connection = connection 

158 

159 self._transaction = self._nested_transaction = None 

160 self.__savepoint_seq = 0 

161 self.__in_begin = False 

162 

163 self.__can_reconnect = _allow_revalidate 

164 self._allow_autobegin = _allow_autobegin 

165 self._echo = self.engine._should_log_info() 

166 

167 if _has_events is None: 

168 # if _has_events is sent explicitly as False, 

169 # then don't join the dispatch of the engine; we don't 

170 # want to handle any of the engine's events in that case. 

171 self.dispatch = self.dispatch._join(engine.dispatch) 

172 self._has_events = _has_events or ( 

173 _has_events is None and engine._has_events 

174 ) 

175 

176 self._execution_options = engine._execution_options 

177 

178 if self._has_events or self.engine._has_events: 

179 self.dispatch.engine_connect(self) 

180 

181 # this can be assigned differently via 

182 # characteristics.LoggingTokenCharacteristic 

183 _message_formatter: Any = None 

184 

185 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

186 fmt = self._message_formatter 

187 

188 if fmt: 

189 message = fmt(message) 

190 

191 if log.STACKLEVEL: 

192 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

193 

194 self.engine.logger.info(message, *arg, **kw) 

195 

196 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

197 fmt = self._message_formatter 

198 

199 if fmt: 

200 message = fmt(message) 

201 

202 if log.STACKLEVEL: 

203 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

204 

205 self.engine.logger.debug(message, *arg, **kw) 

206 

207 @property 

208 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

209 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

210 self._execution_options.get("schema_translate_map", None) 

211 ) 

212 

213 return schema_translate_map 

214 

215 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

216 """Return the schema name for the given schema item taking into 

217 account current schema translate map. 

218 

219 """ 

220 

221 name = obj.schema 

222 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

223 self._execution_options.get("schema_translate_map", None) 

224 ) 

225 

226 if ( 

227 schema_translate_map 

228 and name in schema_translate_map 

229 and obj._use_schema_map 

230 ): 

231 return schema_translate_map[name] 

232 else: 

233 return name 

234 

235 def __enter__(self) -> Connection: 

236 return self 

237 

238 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

239 self.close() 

240 

241 @overload 

242 def execution_options( 

243 self, 

244 *, 

245 compiled_cache: Optional[CompiledCacheType] = ..., 

246 logging_token: str = ..., 

247 isolation_level: IsolationLevel = ..., 

248 no_parameters: bool = False, 

249 stream_results: bool = False, 

250 max_row_buffer: int = ..., 

251 yield_per: int = ..., 

252 insertmanyvalues_page_size: int = ..., 

253 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

254 preserve_rowcount: bool = False, 

255 driver_column_names: bool = False, 

256 **opt: Any, 

257 ) -> Connection: ... 

258 

259 @overload 

260 def execution_options(self, **opt: Any) -> Connection: ... 

261 

262 def execution_options(self, **opt: Any) -> Connection: 

263 r"""Set non-SQL options for the connection which take effect 

264 during execution. 

265 

266 This method modifies this :class:`_engine.Connection` **in-place**; 

267 the return value is the same :class:`_engine.Connection` object 

268 upon which the method is called. Note that this is in contrast 

269 to the behavior of the ``execution_options`` methods on other 

270 objects such as :meth:`_engine.Engine.execution_options` and 

271 :meth:`_sql.Executable.execution_options`. The rationale is that many 

272 such execution options necessarily modify the state of the base 

273 DBAPI connection in any case so there is no feasible means of 

274 keeping the effect of such an option localized to a "sub" connection. 

275 

276 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

277 method, in contrast to other objects with this method, modifies 

278 the connection in-place without creating copy of it. 

279 

280 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

281 method accepts any arbitrary parameters including user defined names. 

282 All parameters given are consumable in a number of ways including 

283 by using the :meth:`_engine.Connection.get_execution_options` method. 

284 See the examples at :meth:`_sql.Executable.execution_options` 

285 and :meth:`_engine.Engine.execution_options`. 

286 

287 The keywords that are currently recognized by SQLAlchemy itself 

288 include all those listed under :meth:`.Executable.execution_options`, 

289 as well as others that are specific to :class:`_engine.Connection`. 

290 

291 :param compiled_cache: Available on: :class:`_engine.Connection`, 

292 :class:`_engine.Engine`. 

293 

294 A dictionary where :class:`.Compiled` objects 

295 will be cached when the :class:`_engine.Connection` 

296 compiles a clause 

297 expression into a :class:`.Compiled` object. This dictionary will 

298 supersede the statement cache that may be configured on the 

299 :class:`_engine.Engine` itself. If set to None, caching 

300 is disabled, even if the engine has a configured cache size. 

301 

302 Note that the ORM makes use of its own "compiled" caches for 

303 some operations, including flush operations. The caching 

304 used by the ORM internally supersedes a cache dictionary 

305 specified here. 

306 

307 :param logging_token: Available on: :class:`_engine.Connection`, 

308 :class:`_engine.Engine`, :class:`_sql.Executable`. 

309 

310 Adds the specified string token surrounded by brackets in log 

311 messages logged by the connection, i.e. the logging that's enabled 

312 either via the :paramref:`_sa.create_engine.echo` flag or via the 

313 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

314 per-connection or per-sub-engine token to be available which is 

315 useful for debugging concurrent connection scenarios. 

316 

317 .. versionadded:: 1.4.0b2 

318 

319 .. seealso:: 

320 

321 :ref:`dbengine_logging_tokens` - usage example 

322 

323 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

324 name used by the Python logger object itself. 

325 

326 :param isolation_level: Available on: :class:`_engine.Connection`, 

327 :class:`_engine.Engine`. 

328 

329 Set the transaction isolation level for the lifespan of this 

330 :class:`_engine.Connection` object. 

331 Valid values include those string 

332 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

333 parameter passed to :func:`_sa.create_engine`. These levels are 

334 semi-database specific; see individual dialect documentation for 

335 valid levels. 

336 

337 The isolation level option applies the isolation level by emitting 

338 statements on the DBAPI connection, and **necessarily affects the 

339 original Connection object overall**. The isolation level will remain 

340 at the given setting until explicitly changed, or when the DBAPI 

341 connection itself is :term:`released` to the connection pool, i.e. the 

342 :meth:`_engine.Connection.close` method is called, at which time an 

343 event handler will emit additional statements on the DBAPI connection 

344 in order to revert the isolation level change. 

345 

346 .. note:: The ``isolation_level`` execution option may only be 

347 established before the :meth:`_engine.Connection.begin` method is 

348 called, as well as before any SQL statements are emitted which 

349 would otherwise trigger "autobegin", or directly after a call to 

350 :meth:`_engine.Connection.commit` or 

351 :meth:`_engine.Connection.rollback`. A database cannot change the 

352 isolation level on a transaction in progress. 

353 

354 .. note:: The ``isolation_level`` execution option is implicitly 

355 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

356 the :meth:`_engine.Connection.invalidate` method, or if a 

357 disconnection error occurs. The new connection produced after the 

358 invalidation will **not** have the selected isolation level 

359 re-applied to it automatically. 

360 

361 .. seealso:: 

362 

363 :ref:`dbapi_autocommit` 

364 

365 :meth:`_engine.Connection.get_isolation_level` 

366 - view current actual level 

367 

368 :param no_parameters: Available on: :class:`_engine.Connection`, 

369 :class:`_sql.Executable`. 

370 

371 When ``True``, if the final parameter 

372 list or dictionary is totally empty, will invoke the 

373 statement on the cursor as ``cursor.execute(statement)``, 

374 not passing the parameter collection at all. 

375 Some DBAPIs such as psycopg2 and mysql-python consider 

376 percent signs as significant only when parameters are 

377 present; this option allows code to generate SQL 

378 containing percent signs (and possibly other characters) 

379 that is neutral regarding whether it's executed by the DBAPI 

380 or piped into a script that's later invoked by 

381 command line tools. 

382 

383 :param stream_results: Available on: :class:`_engine.Connection`, 

384 :class:`_sql.Executable`. 

385 

386 Indicate to the dialect that results should be 

387 "streamed" and not pre-buffered, if possible. For backends 

388 such as PostgreSQL, MySQL and MariaDB, this indicates the use of 

389 a "server side cursor" as opposed to a client side cursor. 

390 Other backends such as that of Oracle may already use server 

391 side cursors by default. 

392 

393 The usage of 

394 :paramref:`_engine.Connection.execution_options.stream_results` is 

395 usually combined with setting a fixed number of rows to to be fetched 

396 in batches, to allow for efficient iteration of database rows while 

397 at the same time not loading all result rows into memory at once; 

398 this can be configured on a :class:`_engine.Result` object using the 

399 :meth:`_engine.Result.yield_per` method, after execution has 

400 returned a new :class:`_engine.Result`. If 

401 :meth:`_engine.Result.yield_per` is not used, 

402 the :paramref:`_engine.Connection.execution_options.stream_results` 

403 mode of operation will instead use a dynamically sized buffer 

404 which buffers sets of rows at a time, growing on each batch 

405 based on a fixed growth size up until a limit which may 

406 be configured using the 

407 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

408 parameter. 

409 

410 When using the ORM to fetch ORM mapped objects from a result, 

411 :meth:`_engine.Result.yield_per` should always be used with 

412 :paramref:`_engine.Connection.execution_options.stream_results`, 

413 so that the ORM does not fetch all rows into new ORM objects at once. 

414 

415 For typical use, the 

416 :paramref:`_engine.Connection.execution_options.yield_per` execution 

417 option should be preferred, which sets up both 

418 :paramref:`_engine.Connection.execution_options.stream_results` and 

419 :meth:`_engine.Result.yield_per` at once. This option is supported 

420 both at a core level by :class:`_engine.Connection` as well as by the 

421 ORM :class:`_engine.Session`; the latter is described at 

422 :ref:`orm_queryguide_yield_per`. 

423 

424 .. seealso:: 

425 

426 :ref:`engine_stream_results` - background on 

427 :paramref:`_engine.Connection.execution_options.stream_results` 

428 

429 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

430 

431 :paramref:`_engine.Connection.execution_options.yield_per` 

432 

433 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

434 describing the ORM version of ``yield_per`` 

435 

436 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

437 :class:`_sql.Executable`. Sets a maximum 

438 buffer size to use when the 

439 :paramref:`_engine.Connection.execution_options.stream_results` 

440 execution option is used on a backend that supports server side 

441 cursors. The default value if not specified is 1000. 

442 

443 .. seealso:: 

444 

445 :paramref:`_engine.Connection.execution_options.stream_results` 

446 

447 :ref:`engine_stream_results` 

448 

449 

450 :param yield_per: Available on: :class:`_engine.Connection`, 

451 :class:`_sql.Executable`. Integer value applied which will 

452 set the :paramref:`_engine.Connection.execution_options.stream_results` 

453 execution option and invoke :meth:`_engine.Result.yield_per` 

454 automatically at once. Allows equivalent functionality as 

455 is present when using this parameter with the ORM. 

456 

457 .. versionadded:: 1.4.40 

458 

459 .. seealso:: 

460 

461 :ref:`engine_stream_results` - background and examples 

462 on using server side cursors with Core. 

463 

464 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

465 describing the ORM version of ``yield_per`` 

466 

467 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

468 :class:`_engine.Engine`. Number of rows to format into an 

469 INSERT statement when the statement uses "insertmanyvalues" mode, 

470 which is a paged form of bulk insert that is used for many backends 

471 when using :term:`executemany` execution typically in conjunction 

472 with RETURNING. Defaults to 1000. May also be modified on a 

473 per-engine basis using the 

474 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

475 

476 .. versionadded:: 2.0 

477 

478 .. seealso:: 

479 

480 :ref:`engine_insertmanyvalues` 

481 

482 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

483 :class:`_engine.Engine`, :class:`_sql.Executable`. 

484 

485 A dictionary mapping schema names to schema names, that will be 

486 applied to the :paramref:`_schema.Table.schema` element of each 

487 :class:`_schema.Table` 

488 encountered when SQL or DDL expression elements 

489 are compiled into strings; the resulting schema name will be 

490 converted based on presence in the map of the original name. 

491 

492 .. seealso:: 

493 

494 :ref:`schema_translating` 

495 

496 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

497 attribute will be unconditionally memoized within the result and 

498 made available via the :attr:`.CursorResult.rowcount` attribute. 

499 Normally, this attribute is only preserved for UPDATE and DELETE 

500 statements. Using this option, the DBAPIs rowcount value can 

501 be accessed for other kinds of statements such as INSERT and SELECT, 

502 to the degree that the DBAPI supports these statements. See 

503 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

504 of this attribute. 

505 

506 .. versionadded:: 2.0.28 

507 

508 .. seealso:: 

509 

510 :meth:`_engine.Engine.execution_options` 

511 

512 :meth:`.Executable.execution_options` 

513 

514 :meth:`_engine.Connection.get_execution_options` 

515 

516 :ref:`orm_queryguide_execution_options` - documentation on all 

517 ORM-specific execution options 

518 

519 :param driver_column_names: When True, the returned 

520 :class:`_engine.CursorResult` will use the column names as written in 

521 ``cursor.description`` to set up the keys for the result set, 

522 including the names of columns for the :class:`_engine.Row` object as 

523 well as the dictionary keys when using :attr:`_engine.Row._mapping`. 

524 On backends that use "name normalization" such as Oracle to correct 

525 for lower case names being converted to all uppercase, this behavior 

526 is turned off and the raw UPPERCASE names in cursor.description will 

527 be present. 

528 

529 .. versionadded:: 2.1 

530 

531 """ # noqa 

532 if self._has_events or self.engine._has_events: 

533 self.dispatch.set_connection_execution_options(self, opt) 

534 self._execution_options = self._execution_options.union(opt) 

535 self.dialect.set_connection_execution_options(self, opt) 

536 return self 

537 

538 def get_execution_options(self) -> _ExecuteOptions: 

539 """Get the non-SQL options which will take effect during execution. 

540 

541 .. versionadded:: 1.3 

542 

543 .. seealso:: 

544 

545 :meth:`_engine.Connection.execution_options` 

546 """ 

547 return self._execution_options 

548 

549 @property 

550 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

551 pool_proxied_connection = self._dbapi_connection 

552 return ( 

553 pool_proxied_connection is not None 

554 and pool_proxied_connection.is_valid 

555 ) 

556 

557 @property 

558 def closed(self) -> bool: 

559 """Return True if this connection is closed.""" 

560 

561 return self._dbapi_connection is None and not self.__can_reconnect 

562 

563 @property 

564 def invalidated(self) -> bool: 

565 """Return True if this connection was invalidated. 

566 

567 This does not indicate whether or not the connection was 

568 invalidated at the pool level, however 

569 

570 """ 

571 

572 # prior to 1.4, "invalid" was stored as a state independent of 

573 # "closed", meaning an invalidated connection could be "closed", 

574 # the _dbapi_connection would be None and closed=True, yet the 

575 # "invalid" flag would stay True. This meant that there were 

576 # three separate states (open/valid, closed/valid, closed/invalid) 

577 # when there is really no reason for that; a connection that's 

578 # "closed" does not need to be "invalid". So the state is now 

579 # represented by the two facts alone. 

580 

581 pool_proxied_connection = self._dbapi_connection 

582 return pool_proxied_connection is None and self.__can_reconnect 

583 

584 @property 

585 def connection(self) -> PoolProxiedConnection: 

586 """The underlying DB-API connection managed by this Connection. 

587 

588 This is a SQLAlchemy connection-pool proxied connection 

589 which then has the attribute 

590 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

591 actual driver connection. 

592 

593 .. seealso:: 

594 

595 

596 :ref:`dbapi_connections` 

597 

598 """ 

599 

600 if self._dbapi_connection is None: 

601 try: 

602 return self._revalidate_connection() 

603 except (exc.PendingRollbackError, exc.ResourceClosedError): 

604 raise 

605 except BaseException as e: 

606 self._handle_dbapi_exception(e, None, None, None, None) 

607 else: 

608 return self._dbapi_connection 

609 

610 def get_isolation_level(self) -> IsolationLevel: 

611 """Return the current **actual** isolation level that's present on 

612 the database within the scope of this connection. 

613 

614 This attribute will perform a live SQL operation against the database 

615 in order to procure the current isolation level, so the value returned 

616 is the actual level on the underlying DBAPI connection regardless of 

617 how this state was set. This will be one of the four actual isolation 

618 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

619 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

620 level setting. Third party dialects may also feature additional 

621 isolation level settings. 

622 

623 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

624 isolation level, which is a separate :term:`dbapi` setting that's 

625 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

626 in use, the database connection still has a "traditional" isolation 

627 mode in effect, that is typically one of the four values 

628 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

629 ``SERIALIZABLE``. 

630 

631 Compare to the :attr:`_engine.Connection.default_isolation_level` 

632 accessor which returns the isolation level that is present on the 

633 database at initial connection time. 

634 

635 .. seealso:: 

636 

637 :attr:`_engine.Connection.default_isolation_level` 

638 - view default level 

639 

640 :paramref:`_sa.create_engine.isolation_level` 

641 - set per :class:`_engine.Engine` isolation level 

642 

643 :paramref:`.Connection.execution_options.isolation_level` 

644 - set per :class:`_engine.Connection` isolation level 

645 

646 """ 

647 dbapi_connection = self.connection.dbapi_connection 

648 assert dbapi_connection is not None 

649 try: 

650 return self.dialect.get_isolation_level(dbapi_connection) 

651 except BaseException as e: 

652 self._handle_dbapi_exception(e, None, None, None, None) 

653 

654 @property 

655 def default_isolation_level(self) -> Optional[IsolationLevel]: 

656 """The initial-connection time isolation level associated with the 

657 :class:`_engine.Dialect` in use. 

658 

659 This value is independent of the 

660 :paramref:`.Connection.execution_options.isolation_level` and 

661 :paramref:`.Engine.execution_options.isolation_level` execution 

662 options, and is determined by the :class:`_engine.Dialect` when the 

663 first connection is created, by performing a SQL query against the 

664 database for the current isolation level before any additional commands 

665 have been emitted. 

666 

667 Calling this accessor does not invoke any new SQL queries. 

668 

669 .. seealso:: 

670 

671 :meth:`_engine.Connection.get_isolation_level` 

672 - view current actual isolation level 

673 

674 :paramref:`_sa.create_engine.isolation_level` 

675 - set per :class:`_engine.Engine` isolation level 

676 

677 :paramref:`.Connection.execution_options.isolation_level` 

678 - set per :class:`_engine.Connection` isolation level 

679 

680 """ 

681 return self.dialect.default_isolation_level 

682 

683 def _invalid_transaction(self) -> NoReturn: 

684 raise exc.PendingRollbackError( 

685 "Can't reconnect until invalid %stransaction is rolled " 

686 "back. Please rollback() fully before proceeding" 

687 % ("savepoint " if self._nested_transaction is not None else ""), 

688 code="8s2b", 

689 ) 

690 

691 def _revalidate_connection(self) -> PoolProxiedConnection: 

692 if self.__can_reconnect and self.invalidated: 

693 if self._transaction is not None: 

694 self._invalid_transaction() 

695 self._dbapi_connection = self.engine.raw_connection() 

696 return self._dbapi_connection 

697 raise exc.ResourceClosedError("This Connection is closed") 

698 

699 @property 

700 def info(self) -> _InfoType: 

701 """Info dictionary associated with the underlying DBAPI connection 

702 referred to by this :class:`_engine.Connection`, allowing user-defined 

703 data to be associated with the connection. 

704 

705 The data here will follow along with the DBAPI connection including 

706 after it is returned to the connection pool and used again 

707 in subsequent instances of :class:`_engine.Connection`. 

708 

709 """ 

710 

711 return self.connection.info 

712 

713 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

714 """Invalidate the underlying DBAPI connection associated with 

715 this :class:`_engine.Connection`. 

716 

717 An attempt will be made to close the underlying DBAPI connection 

718 immediately; however if this operation fails, the error is logged 

719 but not raised. The connection is then discarded whether or not 

720 close() succeeded. 

721 

722 Upon the next use (where "use" typically means using the 

723 :meth:`_engine.Connection.execute` method or similar), 

724 this :class:`_engine.Connection` will attempt to 

725 procure a new DBAPI connection using the services of the 

726 :class:`_pool.Pool` as a source of connectivity (e.g. 

727 a "reconnection"). 

728 

729 If a transaction was in progress (e.g. the 

730 :meth:`_engine.Connection.begin` method has been called) when 

731 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

732 level all state associated with this transaction is lost, as 

733 the DBAPI connection is closed. The :class:`_engine.Connection` 

734 will not allow a reconnection to proceed until the 

735 :class:`.Transaction` object is ended, by calling the 

736 :meth:`.Transaction.rollback` method; until that point, any attempt at 

737 continuing to use the :class:`_engine.Connection` will raise an 

738 :class:`~sqlalchemy.exc.InvalidRequestError`. 

739 This is to prevent applications from accidentally 

740 continuing an ongoing transactional operations despite the 

741 fact that the transaction has been lost due to an 

742 invalidation. 

743 

744 The :meth:`_engine.Connection.invalidate` method, 

745 just like auto-invalidation, 

746 will at the connection pool level invoke the 

747 :meth:`_events.PoolEvents.invalidate` event. 

748 

749 :param exception: an optional ``Exception`` instance that's the 

750 reason for the invalidation. is passed along to event handlers 

751 and logging functions. 

752 

753 .. seealso:: 

754 

755 :ref:`pool_connection_invalidation` 

756 

757 """ 

758 

759 if self.invalidated: 

760 return 

761 

762 if self.closed: 

763 raise exc.ResourceClosedError("This Connection is closed") 

764 

765 if self._still_open_and_dbapi_connection_is_valid: 

766 pool_proxied_connection = self._dbapi_connection 

767 assert pool_proxied_connection is not None 

768 pool_proxied_connection.invalidate(exception) 

769 

770 self._dbapi_connection = None 

771 

772 def detach(self) -> None: 

773 """Detach the underlying DB-API connection from its connection pool. 

774 

775 E.g.:: 

776 

777 with engine.connect() as conn: 

778 conn.detach() 

779 conn.execute(text("SET search_path TO schema1, schema2")) 

780 

781 # work with connection 

782 

783 # connection is fully closed (since we used "with:", can 

784 # also call .close()) 

785 

786 This :class:`_engine.Connection` instance will remain usable. 

787 When closed 

788 (or exited from a context manager context as above), 

789 the DB-API connection will be literally closed and not 

790 returned to its originating pool. 

791 

792 This method can be used to insulate the rest of an application 

793 from a modified state on a connection (such as a transaction 

794 isolation level or similar). 

795 

796 """ 

797 

798 if self.closed: 

799 raise exc.ResourceClosedError("This Connection is closed") 

800 

801 pool_proxied_connection = self._dbapi_connection 

802 if pool_proxied_connection is None: 

803 raise exc.InvalidRequestError( 

804 "Can't detach an invalidated Connection" 

805 ) 

806 pool_proxied_connection.detach() 

807 

808 def _autobegin(self) -> None: 

809 if self._allow_autobegin and not self.__in_begin: 

810 self.begin() 

811 

812 def begin(self) -> RootTransaction: 

813 """Begin a transaction prior to autobegin occurring. 

814 

815 E.g.:: 

816 

817 with engine.connect() as conn: 

818 with conn.begin() as trans: 

819 conn.execute(table.insert(), {"username": "sandy"}) 

820 

821 

822 The returned object is an instance of :class:`_engine.RootTransaction`. 

823 This object represents the "scope" of the transaction, 

824 which completes when either the :meth:`_engine.Transaction.rollback` 

825 or :meth:`_engine.Transaction.commit` method is called; the object 

826 also works as a context manager as illustrated above. 

827 

828 The :meth:`_engine.Connection.begin` method begins a 

829 transaction that normally will be begun in any case when the connection 

830 is first used to execute a statement. The reason this method might be 

831 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

832 event at a specific time, or to organize code within the scope of a 

833 connection checkout in terms of context managed blocks, such as:: 

834 

835 with engine.connect() as conn: 

836 with conn.begin(): 

837 conn.execute(...) 

838 conn.execute(...) 

839 

840 with conn.begin(): 

841 conn.execute(...) 

842 conn.execute(...) 

843 

844 The above code is not fundamentally any different in its behavior than 

845 the following code which does not use 

846 :meth:`_engine.Connection.begin`; the below style is known 

847 as "commit as you go" style:: 

848 

849 with engine.connect() as conn: 

850 conn.execute(...) 

851 conn.execute(...) 

852 conn.commit() 

853 

854 conn.execute(...) 

855 conn.execute(...) 

856 conn.commit() 

857 

858 From a database point of view, the :meth:`_engine.Connection.begin` 

859 method does not emit any SQL or change the state of the underlying 

860 DBAPI connection in any way; the Python DBAPI does not have any 

861 concept of explicit transaction begin. 

862 

863 .. seealso:: 

864 

865 :ref:`tutorial_working_with_transactions` - in the 

866 :ref:`unified_tutorial` 

867 

868 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

869 

870 :meth:`_engine.Connection.begin_twophase` - 

871 use a two phase /XID transaction 

872 

873 :meth:`_engine.Engine.begin` - context manager available from 

874 :class:`_engine.Engine` 

875 

876 """ 

877 if self._transaction is None: 

878 self._transaction = RootTransaction(self) 

879 return self._transaction 

880 else: 

881 raise exc.InvalidRequestError( 

882 "This connection has already initialized a SQLAlchemy " 

883 "Transaction() object via begin() or autobegin; can't " 

884 "call begin() here unless rollback() or commit() " 

885 "is called first." 

886 ) 

887 

888 def begin_nested(self) -> NestedTransaction: 

889 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

890 handle that controls the scope of the SAVEPOINT. 

891 

892 E.g.:: 

893 

894 with engine.begin() as connection: 

895 with connection.begin_nested(): 

896 connection.execute(table.insert(), {"username": "sandy"}) 

897 

898 The returned object is an instance of 

899 :class:`_engine.NestedTransaction`, which includes transactional 

900 methods :meth:`_engine.NestedTransaction.commit` and 

901 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

902 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

903 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

904 to the :class:`_engine.NestedTransaction` object and is generated 

905 automatically. Like any other :class:`_engine.Transaction`, the 

906 :class:`_engine.NestedTransaction` may be used as a context manager as 

907 illustrated above which will "release" or "rollback" corresponding to 

908 if the operation within the block were successful or raised an 

909 exception. 

910 

911 Nested transactions require SAVEPOINT support in the underlying 

912 database, else the behavior is undefined. SAVEPOINT is commonly used to 

913 run operations within a transaction that may fail, while continuing the 

914 outer transaction. E.g.:: 

915 

916 from sqlalchemy import exc 

917 

918 with engine.begin() as connection: 

919 trans = connection.begin_nested() 

920 try: 

921 connection.execute(table.insert(), {"username": "sandy"}) 

922 trans.commit() 

923 except exc.IntegrityError: # catch for duplicate username 

924 trans.rollback() # rollback to savepoint 

925 

926 # outer transaction continues 

927 connection.execute( ... ) 

928 

929 If :meth:`_engine.Connection.begin_nested` is called without first 

930 calling :meth:`_engine.Connection.begin` or 

931 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

932 will "autobegin" the outer transaction first. This outer transaction 

933 may be committed using "commit-as-you-go" style, e.g.:: 

934 

935 with engine.connect() as connection: # begin() wasn't called 

936 

937 with connection.begin_nested(): will auto-"begin()" first 

938 connection.execute( ... ) 

939 # savepoint is released 

940 

941 connection.execute( ... ) 

942 

943 # explicitly commit outer transaction 

944 connection.commit() 

945 

946 # can continue working with connection here 

947 

948 .. versionchanged:: 2.0 

949 

950 :meth:`_engine.Connection.begin_nested` will now participate 

951 in the connection "autobegin" behavior that is new as of 

952 2.0 / "future" style connections in 1.4. 

953 

954 .. seealso:: 

955 

956 :meth:`_engine.Connection.begin` 

957 

958 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

959 

960 """ 

961 if self._transaction is None: 

962 self._autobegin() 

963 

964 return NestedTransaction(self) 

965 

966 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

967 """Begin a two-phase or XA transaction and return a transaction 

968 handle. 

969 

970 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

971 which in addition to the methods provided by 

972 :class:`.Transaction`, also provides a 

973 :meth:`~.TwoPhaseTransaction.prepare` method. 

974 

975 :param xid: the two phase transaction id. If not supplied, a 

976 random id will be generated. 

977 

978 .. seealso:: 

979 

980 :meth:`_engine.Connection.begin` 

981 

982 :meth:`_engine.Connection.begin_twophase` 

983 

984 """ 

985 

986 if self._transaction is not None: 

987 raise exc.InvalidRequestError( 

988 "Cannot start a two phase transaction when a transaction " 

989 "is already in progress." 

990 ) 

991 if xid is None: 

992 xid = self.engine.dialect.create_xid() 

993 return TwoPhaseTransaction(self, xid) 

994 

995 def commit(self) -> None: 

996 """Commit the transaction that is currently in progress. 

997 

998 This method commits the current transaction if one has been started. 

999 If no transaction was started, the method has no effect, assuming 

1000 the connection is in a non-invalidated state. 

1001 

1002 A transaction is begun on a :class:`_engine.Connection` automatically 

1003 whenever a statement is first executed, or when the 

1004 :meth:`_engine.Connection.begin` method is called. 

1005 

1006 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

1007 the primary database transaction that is linked to the 

1008 :class:`_engine.Connection` object. It does not operate upon a 

1009 SAVEPOINT that would have been invoked from the 

1010 :meth:`_engine.Connection.begin_nested` method; for control of a 

1011 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

1012 :class:`_engine.NestedTransaction` that is returned by the 

1013 :meth:`_engine.Connection.begin_nested` method itself. 

1014 

1015 

1016 """ 

1017 if self._transaction: 

1018 self._transaction.commit() 

1019 

1020 def rollback(self) -> None: 

1021 """Roll back the transaction that is currently in progress. 

1022 

1023 This method rolls back the current transaction if one has been started. 

1024 If no transaction was started, the method has no effect. If a 

1025 transaction was started and the connection is in an invalidated state, 

1026 the transaction is cleared using this method. 

1027 

1028 A transaction is begun on a :class:`_engine.Connection` automatically 

1029 whenever a statement is first executed, or when the 

1030 :meth:`_engine.Connection.begin` method is called. 

1031 

1032 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1033 upon the primary database transaction that is linked to the 

1034 :class:`_engine.Connection` object. It does not operate upon a 

1035 SAVEPOINT that would have been invoked from the 

1036 :meth:`_engine.Connection.begin_nested` method; for control of a 

1037 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1038 :class:`_engine.NestedTransaction` that is returned by the 

1039 :meth:`_engine.Connection.begin_nested` method itself. 

1040 

1041 

1042 """ 

1043 if self._transaction: 

1044 self._transaction.rollback() 

1045 

1046 def recover_twophase(self) -> List[Any]: 

1047 return self.engine.dialect.do_recover_twophase(self) 

1048 

1049 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1050 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1051 

1052 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1053 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1054 

1055 def in_transaction(self) -> bool: 

1056 """Return True if a transaction is in progress.""" 

1057 return self._transaction is not None and self._transaction.is_active 

1058 

1059 def in_nested_transaction(self) -> bool: 

1060 """Return True if a transaction is in progress.""" 

1061 return ( 

1062 self._nested_transaction is not None 

1063 and self._nested_transaction.is_active 

1064 ) 

1065 

1066 def _is_autocommit_isolation(self) -> bool: 

1067 opt_iso = self._execution_options.get("isolation_level", None) 

1068 return bool( 

1069 opt_iso == "AUTOCOMMIT" 

1070 or ( 

1071 opt_iso is None 

1072 and self.engine.dialect._on_connect_isolation_level 

1073 == "AUTOCOMMIT" 

1074 ) 

1075 ) 

1076 

1077 def _get_required_transaction(self) -> RootTransaction: 

1078 trans = self._transaction 

1079 if trans is None: 

1080 raise exc.InvalidRequestError("connection is not in a transaction") 

1081 return trans 

1082 

1083 def _get_required_nested_transaction(self) -> NestedTransaction: 

1084 trans = self._nested_transaction 

1085 if trans is None: 

1086 raise exc.InvalidRequestError( 

1087 "connection is not in a nested transaction" 

1088 ) 

1089 return trans 

1090 

1091 def get_transaction(self) -> Optional[RootTransaction]: 

1092 """Return the current root transaction in progress, if any. 

1093 

1094 .. versionadded:: 1.4 

1095 

1096 """ 

1097 

1098 return self._transaction 

1099 

1100 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1101 """Return the current nested transaction in progress, if any. 

1102 

1103 .. versionadded:: 1.4 

1104 

1105 """ 

1106 return self._nested_transaction 

1107 

1108 def _begin_impl(self, transaction: RootTransaction) -> None: 

1109 if self._echo: 

1110 if self._is_autocommit_isolation(): 

1111 self._log_info( 

1112 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1113 "autocommit mode)" 

1114 ) 

1115 else: 

1116 self._log_info("BEGIN (implicit)") 

1117 

1118 self.__in_begin = True 

1119 

1120 if self._has_events or self.engine._has_events: 

1121 self.dispatch.begin(self) 

1122 

1123 try: 

1124 self.engine.dialect.do_begin(self.connection) 

1125 except BaseException as e: 

1126 self._handle_dbapi_exception(e, None, None, None, None) 

1127 finally: 

1128 self.__in_begin = False 

1129 

1130 def _rollback_impl(self) -> None: 

1131 if self._has_events or self.engine._has_events: 

1132 self.dispatch.rollback(self) 

1133 

1134 if self._still_open_and_dbapi_connection_is_valid: 

1135 if self._echo: 

1136 if self._is_autocommit_isolation(): 

1137 self._log_info( 

1138 "ROLLBACK using DBAPI connection.rollback(), " 

1139 "DBAPI should ignore due to autocommit mode" 

1140 ) 

1141 else: 

1142 self._log_info("ROLLBACK") 

1143 try: 

1144 self.engine.dialect.do_rollback(self.connection) 

1145 except BaseException as e: 

1146 self._handle_dbapi_exception(e, None, None, None, None) 

1147 

1148 def _commit_impl(self) -> None: 

1149 if self._has_events or self.engine._has_events: 

1150 self.dispatch.commit(self) 

1151 

1152 if self._echo: 

1153 if self._is_autocommit_isolation(): 

1154 self._log_info( 

1155 "COMMIT using DBAPI connection.commit(), " 

1156 "DBAPI should ignore due to autocommit mode" 

1157 ) 

1158 else: 

1159 self._log_info("COMMIT") 

1160 try: 

1161 self.engine.dialect.do_commit(self.connection) 

1162 except BaseException as e: 

1163 self._handle_dbapi_exception(e, None, None, None, None) 

1164 

1165 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1166 if self._has_events or self.engine._has_events: 

1167 self.dispatch.savepoint(self, name) 

1168 

1169 if name is None: 

1170 self.__savepoint_seq += 1 

1171 name = "sa_savepoint_%s" % self.__savepoint_seq 

1172 self.engine.dialect.do_savepoint(self, name) 

1173 return name 

1174 

1175 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1176 if self._has_events or self.engine._has_events: 

1177 self.dispatch.rollback_savepoint(self, name, None) 

1178 

1179 if self._still_open_and_dbapi_connection_is_valid: 

1180 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1181 

1182 def _release_savepoint_impl(self, name: str) -> None: 

1183 if self._has_events or self.engine._has_events: 

1184 self.dispatch.release_savepoint(self, name, None) 

1185 

1186 self.engine.dialect.do_release_savepoint(self, name) 

1187 

1188 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1189 if self._echo: 

1190 self._log_info("BEGIN TWOPHASE (implicit)") 

1191 if self._has_events or self.engine._has_events: 

1192 self.dispatch.begin_twophase(self, transaction.xid) 

1193 

1194 self.__in_begin = True 

1195 try: 

1196 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1197 except BaseException as e: 

1198 self._handle_dbapi_exception(e, None, None, None, None) 

1199 finally: 

1200 self.__in_begin = False 

1201 

1202 def _prepare_twophase_impl(self, xid: Any) -> None: 

1203 if self._has_events or self.engine._has_events: 

1204 self.dispatch.prepare_twophase(self, xid) 

1205 

1206 assert isinstance(self._transaction, TwoPhaseTransaction) 

1207 try: 

1208 self.engine.dialect.do_prepare_twophase(self, xid) 

1209 except BaseException as e: 

1210 self._handle_dbapi_exception(e, None, None, None, None) 

1211 

1212 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1213 if self._has_events or self.engine._has_events: 

1214 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1215 

1216 if self._still_open_and_dbapi_connection_is_valid: 

1217 assert isinstance(self._transaction, TwoPhaseTransaction) 

1218 try: 

1219 self.engine.dialect.do_rollback_twophase( 

1220 self, xid, is_prepared 

1221 ) 

1222 except BaseException as e: 

1223 self._handle_dbapi_exception(e, None, None, None, None) 

1224 

1225 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1226 if self._has_events or self.engine._has_events: 

1227 self.dispatch.commit_twophase(self, xid, is_prepared) 

1228 

1229 assert isinstance(self._transaction, TwoPhaseTransaction) 

1230 try: 

1231 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1232 except BaseException as e: 

1233 self._handle_dbapi_exception(e, None, None, None, None) 

1234 

1235 def close(self) -> None: 

1236 """Close this :class:`_engine.Connection`. 

1237 

1238 This results in a release of the underlying database 

1239 resources, that is, the DBAPI connection referenced 

1240 internally. The DBAPI connection is typically restored 

1241 back to the connection-holding :class:`_pool.Pool` referenced 

1242 by the :class:`_engine.Engine` that produced this 

1243 :class:`_engine.Connection`. Any transactional state present on 

1244 the DBAPI connection is also unconditionally released via 

1245 the DBAPI connection's ``rollback()`` method, regardless 

1246 of any :class:`.Transaction` object that may be 

1247 outstanding with regards to this :class:`_engine.Connection`. 

1248 

1249 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1250 if any transaction is in place. 

1251 

1252 After :meth:`_engine.Connection.close` is called, the 

1253 :class:`_engine.Connection` is permanently in a closed state, 

1254 and will allow no further operations. 

1255 

1256 """ 

1257 

1258 if self._transaction: 

1259 self._transaction.close() 

1260 skip_reset = True 

1261 else: 

1262 skip_reset = False 

1263 

1264 if self._dbapi_connection is not None: 

1265 conn = self._dbapi_connection 

1266 

1267 # as we just closed the transaction, close the connection 

1268 # pool connection without doing an additional reset 

1269 if skip_reset: 

1270 cast("_ConnectionFairy", conn)._close_special( 

1271 transaction_reset=True 

1272 ) 

1273 else: 

1274 conn.close() 

1275 

1276 # There is a slight chance that conn.close() may have 

1277 # triggered an invalidation here in which case 

1278 # _dbapi_connection would already be None, however usually 

1279 # it will be non-None here and in a "closed" state. 

1280 self._dbapi_connection = None 

1281 self.__can_reconnect = False 

1282 

1283 @overload 

1284 def scalar( 

1285 self, 

1286 statement: TypedReturnsRows[_T], 

1287 parameters: Optional[_CoreSingleExecuteParams] = None, 

1288 *, 

1289 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1290 ) -> Optional[_T]: ... 

1291 

1292 @overload 

1293 def scalar( 

1294 self, 

1295 statement: Executable, 

1296 parameters: Optional[_CoreSingleExecuteParams] = None, 

1297 *, 

1298 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1299 ) -> Any: ... 

1300 

1301 def scalar( 

1302 self, 

1303 statement: Executable, 

1304 parameters: Optional[_CoreSingleExecuteParams] = None, 

1305 *, 

1306 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1307 ) -> Any: 

1308 r"""Executes a SQL statement construct and returns a scalar object. 

1309 

1310 This method is shorthand for invoking the 

1311 :meth:`_engine.Result.scalar` method after invoking the 

1312 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1313 

1314 :return: a scalar Python value representing the first column of the 

1315 first row returned. 

1316 

1317 """ 

1318 distilled_parameters = _distill_params_20(parameters) 

1319 try: 

1320 meth = statement._execute_on_scalar 

1321 except AttributeError as err: 

1322 raise exc.ObjectNotExecutableError(statement) from err 

1323 else: 

1324 return meth( 

1325 self, 

1326 distilled_parameters, 

1327 execution_options or NO_OPTIONS, 

1328 ) 

1329 

1330 @overload 

1331 def scalars( 

1332 self, 

1333 statement: TypedReturnsRows[_T], 

1334 parameters: Optional[_CoreAnyExecuteParams] = None, 

1335 *, 

1336 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1337 ) -> ScalarResult[_T]: ... 

1338 

1339 @overload 

1340 def scalars( 

1341 self, 

1342 statement: Executable, 

1343 parameters: Optional[_CoreAnyExecuteParams] = None, 

1344 *, 

1345 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1346 ) -> ScalarResult[Any]: ... 

1347 

1348 def scalars( 

1349 self, 

1350 statement: Executable, 

1351 parameters: Optional[_CoreAnyExecuteParams] = None, 

1352 *, 

1353 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1354 ) -> ScalarResult[Any]: 

1355 """Executes and returns a scalar result set, which yields scalar values 

1356 from the first column of each row. 

1357 

1358 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1359 to receive a :class:`_result.Result` object, then invoking the 

1360 :meth:`_result.Result.scalars` method to produce a 

1361 :class:`_result.ScalarResult` instance. 

1362 

1363 :return: a :class:`_result.ScalarResult` 

1364 

1365 .. versionadded:: 1.4.24 

1366 

1367 """ 

1368 

1369 return self.execute( 

1370 statement, parameters, execution_options=execution_options 

1371 ).scalars() 

1372 

1373 @overload 

1374 def execute( 

1375 self, 

1376 statement: TypedReturnsRows[Unpack[_Ts]], 

1377 parameters: Optional[_CoreAnyExecuteParams] = None, 

1378 *, 

1379 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1380 ) -> CursorResult[Unpack[_Ts]]: ... 

1381 

1382 @overload 

1383 def execute( 

1384 self, 

1385 statement: Executable, 

1386 parameters: Optional[_CoreAnyExecuteParams] = None, 

1387 *, 

1388 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1389 ) -> CursorResult[Unpack[TupleAny]]: ... 

1390 

1391 def execute( 

1392 self, 

1393 statement: Executable, 

1394 parameters: Optional[_CoreAnyExecuteParams] = None, 

1395 *, 

1396 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1397 ) -> CursorResult[Unpack[TupleAny]]: 

1398 r"""Executes a SQL statement construct and returns a 

1399 :class:`_engine.CursorResult`. 

1400 

1401 :param statement: The statement to be executed. This is always 

1402 an object that is in both the :class:`_expression.ClauseElement` and 

1403 :class:`_expression.Executable` hierarchies, including: 

1404 

1405 * :class:`_expression.Select` 

1406 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1407 :class:`_expression.Delete` 

1408 * :class:`_expression.TextClause` and 

1409 :class:`_expression.TextualSelect` 

1410 * :class:`_schema.DDL` and objects which inherit from 

1411 :class:`_schema.ExecutableDDLElement` 

1412 

1413 :param parameters: parameters which will be bound into the statement. 

1414 This may be either a dictionary of parameter names to values, 

1415 or a mutable sequence (e.g. a list) of dictionaries. When a 

1416 list of dictionaries is passed, the underlying statement execution 

1417 will make use of the DBAPI ``cursor.executemany()`` method. 

1418 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1419 method will be used. 

1420 

1421 :param execution_options: optional dictionary of execution options, 

1422 which will be associated with the statement execution. This 

1423 dictionary can provide a subset of the options that are accepted 

1424 by :meth:`_engine.Connection.execution_options`. 

1425 

1426 :return: a :class:`_engine.Result` object. 

1427 

1428 """ 

1429 distilled_parameters = _distill_params_20(parameters) 

1430 try: 

1431 meth = statement._execute_on_connection 

1432 except AttributeError as err: 

1433 raise exc.ObjectNotExecutableError(statement) from err 

1434 else: 

1435 return meth( 

1436 self, 

1437 distilled_parameters, 

1438 execution_options or NO_OPTIONS, 

1439 ) 

1440 

1441 def _execute_function( 

1442 self, 

1443 func: FunctionElement[Any], 

1444 distilled_parameters: _CoreMultiExecuteParams, 

1445 execution_options: CoreExecuteOptionsParameter, 

1446 ) -> CursorResult[Unpack[TupleAny]]: 

1447 """Execute a sql.FunctionElement object.""" 

1448 

1449 return self._execute_clauseelement( 

1450 func.select(), distilled_parameters, execution_options 

1451 ) 

1452 

1453 def _execute_default( 

1454 self, 

1455 default: DefaultGenerator, 

1456 distilled_parameters: _CoreMultiExecuteParams, 

1457 execution_options: CoreExecuteOptionsParameter, 

1458 ) -> Any: 

1459 """Execute a schema.ColumnDefault object.""" 

1460 

1461 exec_opts = self._execution_options.merge_with(execution_options) 

1462 

1463 event_multiparams: Optional[_CoreMultiExecuteParams] 

1464 event_params: Optional[_CoreAnyExecuteParams] 

1465 

1466 # note for event handlers, the "distilled parameters" which is always 

1467 # a list of dicts is broken out into separate "multiparams" and 

1468 # "params" collections, which allows the handler to distinguish 

1469 # between an executemany and execute style set of parameters. 

1470 if self._has_events or self.engine._has_events: 

1471 ( 

1472 default, 

1473 distilled_parameters, 

1474 event_multiparams, 

1475 event_params, 

1476 ) = self._invoke_before_exec_event( 

1477 default, distilled_parameters, exec_opts 

1478 ) 

1479 else: 

1480 event_multiparams = event_params = None 

1481 

1482 try: 

1483 conn = self._dbapi_connection 

1484 if conn is None: 

1485 conn = self._revalidate_connection() 

1486 

1487 dialect = self.dialect 

1488 ctx = dialect.execution_ctx_cls._init_default( 

1489 dialect, self, conn, exec_opts 

1490 ) 

1491 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1492 raise 

1493 except BaseException as e: 

1494 self._handle_dbapi_exception(e, None, None, None, None) 

1495 

1496 ret = ctx._exec_default(None, default, None) 

1497 

1498 if self._has_events or self.engine._has_events: 

1499 self.dispatch.after_execute( 

1500 self, 

1501 default, 

1502 event_multiparams, 

1503 event_params, 

1504 exec_opts, 

1505 ret, 

1506 ) 

1507 

1508 return ret 

1509 

1510 def _execute_ddl( 

1511 self, 

1512 ddl: ExecutableDDLElement, 

1513 distilled_parameters: _CoreMultiExecuteParams, 

1514 execution_options: CoreExecuteOptionsParameter, 

1515 ) -> CursorResult[Unpack[TupleAny]]: 

1516 """Execute a schema.DDL object.""" 

1517 

1518 exec_opts = ddl._execution_options.merge_with( 

1519 self._execution_options, execution_options 

1520 ) 

1521 

1522 event_multiparams: Optional[_CoreMultiExecuteParams] 

1523 event_params: Optional[_CoreSingleExecuteParams] 

1524 

1525 if self._has_events or self.engine._has_events: 

1526 ( 

1527 ddl, 

1528 distilled_parameters, 

1529 event_multiparams, 

1530 event_params, 

1531 ) = self._invoke_before_exec_event( 

1532 ddl, distilled_parameters, exec_opts 

1533 ) 

1534 else: 

1535 event_multiparams = event_params = None 

1536 

1537 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1538 

1539 dialect = self.dialect 

1540 

1541 compiled = ddl.compile( 

1542 dialect=dialect, schema_translate_map=schema_translate_map 

1543 ) 

1544 ret = self._execute_context( 

1545 dialect, 

1546 dialect.execution_ctx_cls._init_ddl, 

1547 compiled, 

1548 None, 

1549 exec_opts, 

1550 compiled, 

1551 ) 

1552 if self._has_events or self.engine._has_events: 

1553 self.dispatch.after_execute( 

1554 self, 

1555 ddl, 

1556 event_multiparams, 

1557 event_params, 

1558 exec_opts, 

1559 ret, 

1560 ) 

1561 return ret 

1562 

1563 def _invoke_before_exec_event( 

1564 self, 

1565 elem: Any, 

1566 distilled_params: _CoreMultiExecuteParams, 

1567 execution_options: _ExecuteOptions, 

1568 ) -> Tuple[ 

1569 Any, 

1570 _CoreMultiExecuteParams, 

1571 _CoreMultiExecuteParams, 

1572 _CoreSingleExecuteParams, 

1573 ]: 

1574 event_multiparams: _CoreMultiExecuteParams 

1575 event_params: _CoreSingleExecuteParams 

1576 

1577 if len(distilled_params) == 1: 

1578 event_multiparams, event_params = [], distilled_params[0] 

1579 else: 

1580 event_multiparams, event_params = distilled_params, {} 

1581 

1582 for fn in self.dispatch.before_execute: 

1583 elem, event_multiparams, event_params = fn( 

1584 self, 

1585 elem, 

1586 event_multiparams, 

1587 event_params, 

1588 execution_options, 

1589 ) 

1590 

1591 if event_multiparams: 

1592 distilled_params = list(event_multiparams) 

1593 if event_params: 

1594 raise exc.InvalidRequestError( 

1595 "Event handler can't return non-empty multiparams " 

1596 "and params at the same time" 

1597 ) 

1598 elif event_params: 

1599 distilled_params = [event_params] 

1600 else: 

1601 distilled_params = [] 

1602 

1603 return elem, distilled_params, event_multiparams, event_params 

1604 

1605 def _execute_clauseelement( 

1606 self, 

1607 elem: Executable, 

1608 distilled_parameters: _CoreMultiExecuteParams, 

1609 execution_options: CoreExecuteOptionsParameter, 

1610 ) -> CursorResult[Unpack[TupleAny]]: 

1611 """Execute a sql.ClauseElement object.""" 

1612 

1613 exec_opts = elem._execution_options.merge_with( 

1614 self._execution_options, execution_options 

1615 ) 

1616 

1617 has_events = self._has_events or self.engine._has_events 

1618 if has_events: 

1619 ( 

1620 elem, 

1621 distilled_parameters, 

1622 event_multiparams, 

1623 event_params, 

1624 ) = self._invoke_before_exec_event( 

1625 elem, distilled_parameters, exec_opts 

1626 ) 

1627 

1628 if distilled_parameters: 

1629 # ensure we don't retain a link to the view object for keys() 

1630 # which links to the values, which we don't want to cache 

1631 keys = sorted(distilled_parameters[0]) 

1632 for_executemany = len(distilled_parameters) > 1 

1633 else: 

1634 keys = [] 

1635 for_executemany = False 

1636 

1637 dialect = self.dialect 

1638 

1639 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1640 

1641 compiled_cache: Optional[CompiledCacheType] = exec_opts.get( 

1642 "compiled_cache", self.engine._compiled_cache 

1643 ) 

1644 

1645 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 

1646 dialect=dialect, 

1647 compiled_cache=compiled_cache, 

1648 column_keys=keys, 

1649 for_executemany=for_executemany, 

1650 schema_translate_map=schema_translate_map, 

1651 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1652 ) 

1653 ret = self._execute_context( 

1654 dialect, 

1655 dialect.execution_ctx_cls._init_compiled, 

1656 compiled_sql, 

1657 distilled_parameters, 

1658 exec_opts, 

1659 compiled_sql, 

1660 distilled_parameters, 

1661 elem, 

1662 extracted_params, 

1663 cache_hit=cache_hit, 

1664 ) 

1665 if has_events: 

1666 self.dispatch.after_execute( 

1667 self, 

1668 elem, 

1669 event_multiparams, 

1670 event_params, 

1671 exec_opts, 

1672 ret, 

1673 ) 

1674 return ret 

1675 

1676 def _execute_compiled( 

1677 self, 

1678 compiled: Compiled, 

1679 distilled_parameters: _CoreMultiExecuteParams, 

1680 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS, 

1681 ) -> CursorResult[Unpack[TupleAny]]: 

1682 """Execute a sql.Compiled object. 

1683 

1684 TODO: why do we have this? likely deprecate or remove 

1685 

1686 """ 

1687 

1688 exec_opts = compiled.execution_options.merge_with( 

1689 self._execution_options, execution_options 

1690 ) 

1691 

1692 if self._has_events or self.engine._has_events: 

1693 ( 

1694 compiled, 

1695 distilled_parameters, 

1696 event_multiparams, 

1697 event_params, 

1698 ) = self._invoke_before_exec_event( 

1699 compiled, distilled_parameters, exec_opts 

1700 ) 

1701 

1702 dialect = self.dialect 

1703 

1704 ret = self._execute_context( 

1705 dialect, 

1706 dialect.execution_ctx_cls._init_compiled, 

1707 compiled, 

1708 distilled_parameters, 

1709 exec_opts, 

1710 compiled, 

1711 distilled_parameters, 

1712 None, 

1713 None, 

1714 ) 

1715 if self._has_events or self.engine._has_events: 

1716 self.dispatch.after_execute( 

1717 self, 

1718 compiled, 

1719 event_multiparams, 

1720 event_params, 

1721 exec_opts, 

1722 ret, 

1723 ) 

1724 return ret 

1725 

1726 def exec_driver_sql( 

1727 self, 

1728 statement: str, 

1729 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1730 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1731 ) -> CursorResult[Unpack[TupleAny]]: 

1732 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1733 without any SQL compilation steps. 

1734 

1735 This can be used to pass any string directly to the 

1736 ``cursor.execute()`` method of the DBAPI in use. 

1737 

1738 :param statement: The statement str to be executed. Bound parameters 

1739 must use the underlying DBAPI's paramstyle, such as "qmark", 

1740 "pyformat", "format", etc. 

1741 

1742 :param parameters: represent bound parameter values to be used in the 

1743 execution. The format is one of: a dictionary of named parameters, 

1744 a tuple of positional parameters, or a list containing either 

1745 dictionaries or tuples for multiple-execute support. 

1746 

1747 :return: a :class:`_engine.CursorResult`. 

1748 

1749 E.g. multiple dictionaries:: 

1750 

1751 

1752 conn.exec_driver_sql( 

1753 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1754 [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}] 

1755 ) 

1756 

1757 Single dictionary:: 

1758 

1759 conn.exec_driver_sql( 

1760 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1761 dict(id=1, value="v1") 

1762 ) 

1763 

1764 Single tuple:: 

1765 

1766 conn.exec_driver_sql( 

1767 "INSERT INTO table (id, value) VALUES (?, ?)", 

1768 (1, 'v1') 

1769 ) 

1770 

1771 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1772 not participate in the 

1773 :meth:`_events.ConnectionEvents.before_execute` and 

1774 :meth:`_events.ConnectionEvents.after_execute` events. To 

1775 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1776 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1777 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1778 

1779 .. seealso:: 

1780 

1781 :pep:`249` 

1782 

1783 """ 

1784 

1785 distilled_parameters = _distill_raw_params(parameters) 

1786 

1787 exec_opts = self._execution_options.merge_with(execution_options) 

1788 

1789 dialect = self.dialect 

1790 ret = self._execute_context( 

1791 dialect, 

1792 dialect.execution_ctx_cls._init_statement, 

1793 statement, 

1794 None, 

1795 exec_opts, 

1796 statement, 

1797 distilled_parameters, 

1798 ) 

1799 

1800 return ret 

1801 

1802 def _execute_context( 

1803 self, 

1804 dialect: Dialect, 

1805 constructor: Callable[..., ExecutionContext], 

1806 statement: Union[str, Compiled], 

1807 parameters: Optional[_AnyMultiExecuteParams], 

1808 execution_options: _ExecuteOptions, 

1809 *args: Any, 

1810 **kw: Any, 

1811 ) -> CursorResult[Unpack[TupleAny]]: 

1812 """Create an :class:`.ExecutionContext` and execute, returning 

1813 a :class:`_engine.CursorResult`.""" 

1814 

1815 if execution_options: 

1816 yp = execution_options.get("yield_per", None) 

1817 if yp: 

1818 execution_options = execution_options.union( 

1819 {"stream_results": True, "max_row_buffer": yp} 

1820 ) 

1821 try: 

1822 conn = self._dbapi_connection 

1823 if conn is None: 

1824 conn = self._revalidate_connection() 

1825 

1826 context = constructor( 

1827 dialect, self, conn, execution_options, *args, **kw 

1828 ) 

1829 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1830 raise 

1831 except BaseException as e: 

1832 self._handle_dbapi_exception( 

1833 e, str(statement), parameters, None, None 

1834 ) 

1835 

1836 if ( 

1837 self._transaction 

1838 and not self._transaction.is_active 

1839 or ( 

1840 self._nested_transaction 

1841 and not self._nested_transaction.is_active 

1842 ) 

1843 ): 

1844 self._invalid_transaction() 

1845 

1846 elif self._trans_context_manager: 

1847 TransactionalContext._trans_ctx_check(self) 

1848 

1849 if self._transaction is None: 

1850 self._autobegin() 

1851 

1852 context.pre_exec() 

1853 

1854 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1855 return self._exec_insertmany_context(dialect, context) 

1856 else: 

1857 return self._exec_single_context( 

1858 dialect, context, statement, parameters 

1859 ) 

1860 

1861 def _exec_single_context( 

1862 self, 

1863 dialect: Dialect, 

1864 context: ExecutionContext, 

1865 statement: Union[str, Compiled], 

1866 parameters: Optional[_AnyMultiExecuteParams], 

1867 ) -> CursorResult[Unpack[TupleAny]]: 

1868 """continue the _execute_context() method for a single DBAPI 

1869 cursor.execute() or cursor.executemany() call. 

1870 

1871 """ 

1872 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1873 generic_setinputsizes = context._prepare_set_input_sizes() 

1874 

1875 if generic_setinputsizes: 

1876 try: 

1877 dialect.do_set_input_sizes( 

1878 context.cursor, generic_setinputsizes, context 

1879 ) 

1880 except BaseException as e: 

1881 self._handle_dbapi_exception( 

1882 e, str(statement), parameters, None, context 

1883 ) 

1884 

1885 cursor, str_statement, parameters = ( 

1886 context.cursor, 

1887 context.statement, 

1888 context.parameters, 

1889 ) 

1890 

1891 effective_parameters: Optional[_AnyExecuteParams] 

1892 

1893 if not context.executemany: 

1894 effective_parameters = parameters[0] 

1895 else: 

1896 effective_parameters = parameters 

1897 

1898 if self._has_events or self.engine._has_events: 

1899 for fn in self.dispatch.before_cursor_execute: 

1900 str_statement, effective_parameters = fn( 

1901 self, 

1902 cursor, 

1903 str_statement, 

1904 effective_parameters, 

1905 context, 

1906 context.executemany, 

1907 ) 

1908 

1909 if self._echo: 

1910 self._log_info(str_statement) 

1911 

1912 stats = context._get_cache_stats() 

1913 

1914 if not self.engine.hide_parameters: 

1915 self._log_info( 

1916 "[%s] %r", 

1917 stats, 

1918 sql_util._repr_params( 

1919 effective_parameters, 

1920 batches=10, 

1921 ismulti=context.executemany, 

1922 ), 

1923 ) 

1924 else: 

1925 self._log_info( 

1926 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1927 stats, 

1928 ) 

1929 

1930 evt_handled: bool = False 

1931 try: 

1932 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1933 effective_parameters = cast( 

1934 "_CoreMultiExecuteParams", effective_parameters 

1935 ) 

1936 if self.dialect._has_events: 

1937 for fn in self.dialect.dispatch.do_executemany: 

1938 if fn( 

1939 cursor, 

1940 str_statement, 

1941 effective_parameters, 

1942 context, 

1943 ): 

1944 evt_handled = True 

1945 break 

1946 if not evt_handled: 

1947 self.dialect.do_executemany( 

1948 cursor, 

1949 str_statement, 

1950 effective_parameters, 

1951 context, 

1952 ) 

1953 elif not effective_parameters and context.no_parameters: 

1954 if self.dialect._has_events: 

1955 for fn in self.dialect.dispatch.do_execute_no_params: 

1956 if fn(cursor, str_statement, context): 

1957 evt_handled = True 

1958 break 

1959 if not evt_handled: 

1960 self.dialect.do_execute_no_params( 

1961 cursor, str_statement, context 

1962 ) 

1963 else: 

1964 effective_parameters = cast( 

1965 "_CoreSingleExecuteParams", effective_parameters 

1966 ) 

1967 if self.dialect._has_events: 

1968 for fn in self.dialect.dispatch.do_execute: 

1969 if fn( 

1970 cursor, 

1971 str_statement, 

1972 effective_parameters, 

1973 context, 

1974 ): 

1975 evt_handled = True 

1976 break 

1977 if not evt_handled: 

1978 self.dialect.do_execute( 

1979 cursor, str_statement, effective_parameters, context 

1980 ) 

1981 

1982 if self._has_events or self.engine._has_events: 

1983 self.dispatch.after_cursor_execute( 

1984 self, 

1985 cursor, 

1986 str_statement, 

1987 effective_parameters, 

1988 context, 

1989 context.executemany, 

1990 ) 

1991 

1992 context.post_exec() 

1993 

1994 result = context._setup_result_proxy() 

1995 

1996 except BaseException as e: 

1997 self._handle_dbapi_exception( 

1998 e, str_statement, effective_parameters, cursor, context 

1999 ) 

2000 

2001 return result 

2002 

2003 def _exec_insertmany_context( 

2004 self, 

2005 dialect: Dialect, 

2006 context: ExecutionContext, 

2007 ) -> CursorResult[Unpack[TupleAny]]: 

2008 """continue the _execute_context() method for an "insertmanyvalues" 

2009 operation, which will invoke DBAPI 

2010 cursor.execute() one or more times with individual log and 

2011 event hook calls. 

2012 

2013 """ 

2014 

2015 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

2016 generic_setinputsizes = context._prepare_set_input_sizes() 

2017 else: 

2018 generic_setinputsizes = None 

2019 

2020 cursor, str_statement, parameters = ( 

2021 context.cursor, 

2022 context.statement, 

2023 context.parameters, 

2024 ) 

2025 

2026 effective_parameters = parameters 

2027 

2028 engine_events = self._has_events or self.engine._has_events 

2029 if self.dialect._has_events: 

2030 do_execute_dispatch: Iterable[Any] = ( 

2031 self.dialect.dispatch.do_execute 

2032 ) 

2033 else: 

2034 do_execute_dispatch = () 

2035 

2036 if self._echo: 

2037 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2038 

2039 preserve_rowcount = context.execution_options.get( 

2040 "preserve_rowcount", False 

2041 ) 

2042 rowcount = 0 

2043 

2044 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2045 self, 

2046 cursor, 

2047 str_statement, 

2048 effective_parameters, 

2049 generic_setinputsizes, 

2050 context, 

2051 ): 

2052 if imv_batch.processed_setinputsizes: 

2053 try: 

2054 dialect.do_set_input_sizes( 

2055 context.cursor, 

2056 imv_batch.processed_setinputsizes, 

2057 context, 

2058 ) 

2059 except BaseException as e: 

2060 self._handle_dbapi_exception( 

2061 e, 

2062 sql_util._long_statement(imv_batch.replaced_statement), 

2063 imv_batch.replaced_parameters, 

2064 None, 

2065 context, 

2066 is_sub_exec=True, 

2067 ) 

2068 

2069 sub_stmt = imv_batch.replaced_statement 

2070 sub_params = imv_batch.replaced_parameters 

2071 

2072 if engine_events: 

2073 for fn in self.dispatch.before_cursor_execute: 

2074 sub_stmt, sub_params = fn( 

2075 self, 

2076 cursor, 

2077 sub_stmt, 

2078 sub_params, 

2079 context, 

2080 True, 

2081 ) 

2082 

2083 if self._echo: 

2084 self._log_info(sql_util._long_statement(sub_stmt)) 

2085 

2086 imv_stats = f""" {imv_batch.batchnum}/{ 

2087 imv_batch.total_batches 

2088 } ({ 

2089 'ordered' 

2090 if imv_batch.rows_sorted else 'unordered' 

2091 }{ 

2092 '; batch not supported' 

2093 if imv_batch.is_downgraded 

2094 else '' 

2095 })""" 

2096 

2097 if imv_batch.batchnum == 1: 

2098 stats += imv_stats 

2099 else: 

2100 stats = f"insertmanyvalues{imv_stats}" 

2101 

2102 if not self.engine.hide_parameters: 

2103 self._log_info( 

2104 "[%s] %r", 

2105 stats, 

2106 sql_util._repr_params( 

2107 sub_params, 

2108 batches=10, 

2109 ismulti=False, 

2110 ), 

2111 ) 

2112 else: 

2113 self._log_info( 

2114 "[%s] [SQL parameters hidden due to " 

2115 "hide_parameters=True]", 

2116 stats, 

2117 ) 

2118 

2119 try: 

2120 for fn in do_execute_dispatch: 

2121 if fn( 

2122 cursor, 

2123 sub_stmt, 

2124 sub_params, 

2125 context, 

2126 ): 

2127 break 

2128 else: 

2129 dialect.do_execute( 

2130 cursor, 

2131 sub_stmt, 

2132 sub_params, 

2133 context, 

2134 ) 

2135 

2136 except BaseException as e: 

2137 self._handle_dbapi_exception( 

2138 e, 

2139 sql_util._long_statement(sub_stmt), 

2140 sub_params, 

2141 cursor, 

2142 context, 

2143 is_sub_exec=True, 

2144 ) 

2145 

2146 if engine_events: 

2147 self.dispatch.after_cursor_execute( 

2148 self, 

2149 cursor, 

2150 str_statement, 

2151 effective_parameters, 

2152 context, 

2153 context.executemany, 

2154 ) 

2155 

2156 if preserve_rowcount: 

2157 rowcount += imv_batch.current_batch_size 

2158 

2159 try: 

2160 context.post_exec() 

2161 

2162 if preserve_rowcount: 

2163 context._rowcount = rowcount # type: ignore[attr-defined] 

2164 

2165 result = context._setup_result_proxy() 

2166 

2167 except BaseException as e: 

2168 self._handle_dbapi_exception( 

2169 e, str_statement, effective_parameters, cursor, context 

2170 ) 

2171 

2172 return result 

2173 

2174 def _cursor_execute( 

2175 self, 

2176 cursor: DBAPICursor, 

2177 statement: str, 

2178 parameters: _DBAPISingleExecuteParams, 

2179 context: Optional[ExecutionContext] = None, 

2180 ) -> None: 

2181 """Execute a statement + params on the given cursor. 

2182 

2183 Adds appropriate logging and exception handling. 

2184 

2185 This method is used by DefaultDialect for special-case 

2186 executions, such as for sequences and column defaults. 

2187 The path of statement execution in the majority of cases 

2188 terminates at _execute_context(). 

2189 

2190 """ 

2191 if self._has_events or self.engine._has_events: 

2192 for fn in self.dispatch.before_cursor_execute: 

2193 statement, parameters = fn( 

2194 self, cursor, statement, parameters, context, False 

2195 ) 

2196 

2197 if self._echo: 

2198 self._log_info(statement) 

2199 self._log_info("[raw sql] %r", parameters) 

2200 try: 

2201 for fn in ( 

2202 () 

2203 if not self.dialect._has_events 

2204 else self.dialect.dispatch.do_execute 

2205 ): 

2206 if fn(cursor, statement, parameters, context): 

2207 break 

2208 else: 

2209 self.dialect.do_execute(cursor, statement, parameters, context) 

2210 except BaseException as e: 

2211 self._handle_dbapi_exception( 

2212 e, statement, parameters, cursor, context 

2213 ) 

2214 

2215 if self._has_events or self.engine._has_events: 

2216 self.dispatch.after_cursor_execute( 

2217 self, cursor, statement, parameters, context, False 

2218 ) 

2219 

2220 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2221 """Close the given cursor, catching exceptions 

2222 and turning into log warnings. 

2223 

2224 """ 

2225 try: 

2226 cursor.close() 

2227 except Exception: 

2228 # log the error through the connection pool's logger. 

2229 self.engine.pool.logger.error( 

2230 "Error closing cursor", exc_info=True 

2231 ) 

2232 

2233 _reentrant_error = False 

2234 _is_disconnect = False 

2235 

2236 def _handle_dbapi_exception( 

2237 self, 

2238 e: BaseException, 

2239 statement: Optional[str], 

2240 parameters: Optional[_AnyExecuteParams], 

2241 cursor: Optional[DBAPICursor], 

2242 context: Optional[ExecutionContext], 

2243 is_sub_exec: bool = False, 

2244 ) -> NoReturn: 

2245 exc_info = sys.exc_info() 

2246 

2247 is_exit_exception = util.is_exit_exception(e) 

2248 

2249 if not self._is_disconnect: 

2250 self._is_disconnect = ( 

2251 isinstance(e, self.dialect.loaded_dbapi.Error) 

2252 and not self.closed 

2253 and self.dialect.is_disconnect( 

2254 e, 

2255 self._dbapi_connection if not self.invalidated else None, 

2256 cursor, 

2257 ) 

2258 ) or (is_exit_exception and not self.closed) 

2259 

2260 invalidate_pool_on_disconnect = not is_exit_exception 

2261 

2262 ismulti: bool = ( 

2263 not is_sub_exec and context.executemany 

2264 if context is not None 

2265 else False 

2266 ) 

2267 if self._reentrant_error: 

2268 raise exc.DBAPIError.instance( 

2269 statement, 

2270 parameters, 

2271 e, 

2272 self.dialect.loaded_dbapi.Error, 

2273 hide_parameters=self.engine.hide_parameters, 

2274 dialect=self.dialect, 

2275 ismulti=ismulti, 

2276 ).with_traceback(exc_info[2]) from e 

2277 self._reentrant_error = True 

2278 try: 

2279 # non-DBAPI error - if we already got a context, 

2280 # or there's no string statement, don't wrap it 

2281 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2282 statement is not None 

2283 and context is None 

2284 and not is_exit_exception 

2285 ) 

2286 

2287 if should_wrap: 

2288 sqlalchemy_exception = exc.DBAPIError.instance( 

2289 statement, 

2290 parameters, 

2291 cast(Exception, e), 

2292 self.dialect.loaded_dbapi.Error, 

2293 hide_parameters=self.engine.hide_parameters, 

2294 connection_invalidated=self._is_disconnect, 

2295 dialect=self.dialect, 

2296 ismulti=ismulti, 

2297 ) 

2298 else: 

2299 sqlalchemy_exception = None 

2300 

2301 newraise = None 

2302 

2303 if (self.dialect._has_events) and not self._execution_options.get( 

2304 "skip_user_error_events", False 

2305 ): 

2306 ctx = ExceptionContextImpl( 

2307 e, 

2308 sqlalchemy_exception, 

2309 self.engine, 

2310 self.dialect, 

2311 self, 

2312 cursor, 

2313 statement, 

2314 parameters, 

2315 context, 

2316 self._is_disconnect, 

2317 invalidate_pool_on_disconnect, 

2318 False, 

2319 ) 

2320 

2321 for fn in self.dialect.dispatch.handle_error: 

2322 try: 

2323 # handler returns an exception; 

2324 # call next handler in a chain 

2325 per_fn = fn(ctx) 

2326 if per_fn is not None: 

2327 ctx.chained_exception = newraise = per_fn 

2328 except Exception as _raised: 

2329 # handler raises an exception - stop processing 

2330 newraise = _raised 

2331 break 

2332 

2333 if self._is_disconnect != ctx.is_disconnect: 

2334 self._is_disconnect = ctx.is_disconnect 

2335 if sqlalchemy_exception: 

2336 sqlalchemy_exception.connection_invalidated = ( 

2337 ctx.is_disconnect 

2338 ) 

2339 

2340 # set up potentially user-defined value for 

2341 # invalidate pool. 

2342 invalidate_pool_on_disconnect = ( 

2343 ctx.invalidate_pool_on_disconnect 

2344 ) 

2345 

2346 if should_wrap and context: 

2347 context.handle_dbapi_exception(e) 

2348 

2349 if not self._is_disconnect: 

2350 if cursor: 

2351 self._safe_close_cursor(cursor) 

2352 # "autorollback" was mostly relevant in 1.x series. 

2353 # It's very unlikely to reach here, as the connection 

2354 # does autobegin so when we are here, we are usually 

2355 # in an explicit / semi-explicit transaction. 

2356 # however we have a test which manufactures this 

2357 # scenario in any case using an event handler. 

2358 # test/engine/test_execute.py-> test_actual_autorollback 

2359 if not self.in_transaction(): 

2360 self._rollback_impl() 

2361 

2362 if newraise: 

2363 raise newraise.with_traceback(exc_info[2]) from e 

2364 elif should_wrap: 

2365 assert sqlalchemy_exception is not None 

2366 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2367 else: 

2368 assert exc_info[1] is not None 

2369 raise exc_info[1].with_traceback(exc_info[2]) 

2370 finally: 

2371 del self._reentrant_error 

2372 if self._is_disconnect: 

2373 del self._is_disconnect 

2374 if not self.invalidated: 

2375 dbapi_conn_wrapper = self._dbapi_connection 

2376 assert dbapi_conn_wrapper is not None 

2377 if invalidate_pool_on_disconnect: 

2378 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2379 self.invalidate(e) 

2380 

2381 @classmethod 

2382 def _handle_dbapi_exception_noconnection( 

2383 cls, 

2384 e: BaseException, 

2385 dialect: Dialect, 

2386 engine: Optional[Engine] = None, 

2387 is_disconnect: Optional[bool] = None, 

2388 invalidate_pool_on_disconnect: bool = True, 

2389 is_pre_ping: bool = False, 

2390 ) -> NoReturn: 

2391 exc_info = sys.exc_info() 

2392 

2393 if is_disconnect is None: 

2394 is_disconnect = isinstance( 

2395 e, dialect.loaded_dbapi.Error 

2396 ) and dialect.is_disconnect(e, None, None) 

2397 

2398 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2399 

2400 if should_wrap: 

2401 sqlalchemy_exception = exc.DBAPIError.instance( 

2402 None, 

2403 None, 

2404 cast(Exception, e), 

2405 dialect.loaded_dbapi.Error, 

2406 hide_parameters=( 

2407 engine.hide_parameters if engine is not None else False 

2408 ), 

2409 connection_invalidated=is_disconnect, 

2410 dialect=dialect, 

2411 ) 

2412 else: 

2413 sqlalchemy_exception = None 

2414 

2415 newraise = None 

2416 

2417 if dialect._has_events: 

2418 ctx = ExceptionContextImpl( 

2419 e, 

2420 sqlalchemy_exception, 

2421 engine, 

2422 dialect, 

2423 None, 

2424 None, 

2425 None, 

2426 None, 

2427 None, 

2428 is_disconnect, 

2429 invalidate_pool_on_disconnect, 

2430 is_pre_ping, 

2431 ) 

2432 for fn in dialect.dispatch.handle_error: 

2433 try: 

2434 # handler returns an exception; 

2435 # call next handler in a chain 

2436 per_fn = fn(ctx) 

2437 if per_fn is not None: 

2438 ctx.chained_exception = newraise = per_fn 

2439 except Exception as _raised: 

2440 # handler raises an exception - stop processing 

2441 newraise = _raised 

2442 break 

2443 

2444 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2445 sqlalchemy_exception.connection_invalidated = is_disconnect = ( 

2446 ctx.is_disconnect 

2447 ) 

2448 

2449 if newraise: 

2450 raise newraise.with_traceback(exc_info[2]) from e 

2451 elif should_wrap: 

2452 assert sqlalchemy_exception is not None 

2453 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2454 else: 

2455 assert exc_info[1] is not None 

2456 raise exc_info[1].with_traceback(exc_info[2]) 

2457 

2458 def _run_ddl_visitor( 

2459 self, 

2460 visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]], 

2461 element: SchemaItem, 

2462 **kwargs: Any, 

2463 ) -> None: 

2464 """run a DDL visitor. 

2465 

2466 This method is only here so that the MockConnection can change the 

2467 options given to the visitor so that "checkfirst" is skipped. 

2468 

2469 """ 

2470 visitorcallable(self.dialect, self, **kwargs).traverse_single(element) 

2471 

2472 

2473class ExceptionContextImpl(ExceptionContext): 

2474 """Implement the :class:`.ExceptionContext` interface.""" 

2475 

2476 __slots__ = ( 

2477 "connection", 

2478 "engine", 

2479 "dialect", 

2480 "cursor", 

2481 "statement", 

2482 "parameters", 

2483 "original_exception", 

2484 "sqlalchemy_exception", 

2485 "chained_exception", 

2486 "execution_context", 

2487 "is_disconnect", 

2488 "invalidate_pool_on_disconnect", 

2489 "is_pre_ping", 

2490 ) 

2491 

2492 def __init__( 

2493 self, 

2494 exception: BaseException, 

2495 sqlalchemy_exception: Optional[exc.StatementError], 

2496 engine: Optional[Engine], 

2497 dialect: Dialect, 

2498 connection: Optional[Connection], 

2499 cursor: Optional[DBAPICursor], 

2500 statement: Optional[str], 

2501 parameters: Optional[_DBAPIAnyExecuteParams], 

2502 context: Optional[ExecutionContext], 

2503 is_disconnect: bool, 

2504 invalidate_pool_on_disconnect: bool, 

2505 is_pre_ping: bool, 

2506 ): 

2507 self.engine = engine 

2508 self.dialect = dialect 

2509 self.connection = connection 

2510 self.sqlalchemy_exception = sqlalchemy_exception 

2511 self.original_exception = exception 

2512 self.execution_context = context 

2513 self.statement = statement 

2514 self.parameters = parameters 

2515 self.is_disconnect = is_disconnect 

2516 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2517 self.is_pre_ping = is_pre_ping 

2518 

2519 

2520class Transaction(TransactionalContext): 

2521 """Represent a database transaction in progress. 

2522 

2523 The :class:`.Transaction` object is procured by 

2524 calling the :meth:`_engine.Connection.begin` method of 

2525 :class:`_engine.Connection`:: 

2526 

2527 from sqlalchemy import create_engine 

2528 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2529 connection = engine.connect() 

2530 trans = connection.begin() 

2531 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2532 trans.commit() 

2533 

2534 The object provides :meth:`.rollback` and :meth:`.commit` 

2535 methods in order to control transaction boundaries. It 

2536 also implements a context manager interface so that 

2537 the Python ``with`` statement can be used with the 

2538 :meth:`_engine.Connection.begin` method:: 

2539 

2540 with connection.begin(): 

2541 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2542 

2543 The Transaction object is **not** threadsafe. 

2544 

2545 .. seealso:: 

2546 

2547 :meth:`_engine.Connection.begin` 

2548 

2549 :meth:`_engine.Connection.begin_twophase` 

2550 

2551 :meth:`_engine.Connection.begin_nested` 

2552 

2553 .. index:: 

2554 single: thread safety; Transaction 

2555 """ # noqa 

2556 

2557 __slots__ = () 

2558 

2559 _is_root: bool = False 

2560 is_active: bool 

2561 connection: Connection 

2562 

2563 def __init__(self, connection: Connection): 

2564 raise NotImplementedError() 

2565 

2566 @property 

2567 def _deactivated_from_connection(self) -> bool: 

2568 """True if this transaction is totally deactivated from the connection 

2569 and therefore can no longer affect its state. 

2570 

2571 """ 

2572 raise NotImplementedError() 

2573 

2574 def _do_close(self) -> None: 

2575 raise NotImplementedError() 

2576 

2577 def _do_rollback(self) -> None: 

2578 raise NotImplementedError() 

2579 

2580 def _do_commit(self) -> None: 

2581 raise NotImplementedError() 

2582 

2583 @property 

2584 def is_valid(self) -> bool: 

2585 return self.is_active and not self.connection.invalidated 

2586 

2587 def close(self) -> None: 

2588 """Close this :class:`.Transaction`. 

2589 

2590 If this transaction is the base transaction in a begin/commit 

2591 nesting, the transaction will rollback(). Otherwise, the 

2592 method returns. 

2593 

2594 This is used to cancel a Transaction without affecting the scope of 

2595 an enclosing transaction. 

2596 

2597 """ 

2598 try: 

2599 self._do_close() 

2600 finally: 

2601 assert not self.is_active 

2602 

2603 def rollback(self) -> None: 

2604 """Roll back this :class:`.Transaction`. 

2605 

2606 The implementation of this may vary based on the type of transaction in 

2607 use: 

2608 

2609 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2610 it corresponds to a ROLLBACK. 

2611 

2612 * For a :class:`.NestedTransaction`, it corresponds to a 

2613 "ROLLBACK TO SAVEPOINT" operation. 

2614 

2615 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2616 phase transactions may be used. 

2617 

2618 

2619 """ 

2620 try: 

2621 self._do_rollback() 

2622 finally: 

2623 assert not self.is_active 

2624 

2625 def commit(self) -> None: 

2626 """Commit this :class:`.Transaction`. 

2627 

2628 The implementation of this may vary based on the type of transaction in 

2629 use: 

2630 

2631 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2632 it corresponds to a COMMIT. 

2633 

2634 * For a :class:`.NestedTransaction`, it corresponds to a 

2635 "RELEASE SAVEPOINT" operation. 

2636 

2637 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2638 phase transactions may be used. 

2639 

2640 """ 

2641 try: 

2642 self._do_commit() 

2643 finally: 

2644 assert not self.is_active 

2645 

2646 def _get_subject(self) -> Connection: 

2647 return self.connection 

2648 

2649 def _transaction_is_active(self) -> bool: 

2650 return self.is_active 

2651 

2652 def _transaction_is_closed(self) -> bool: 

2653 return not self._deactivated_from_connection 

2654 

2655 def _rollback_can_be_called(self) -> bool: 

2656 # for RootTransaction / NestedTransaction, it's safe to call 

2657 # rollback() even if the transaction is deactive and no warnings 

2658 # will be emitted. tested in 

2659 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2660 return True 

2661 

2662 

2663class RootTransaction(Transaction): 

2664 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2665 

2666 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2667 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2668 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2669 remains associated with the :class:`_engine.Connection` throughout its 

2670 active span. The current :class:`_engine.RootTransaction` in use is 

2671 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2672 :class:`_engine.Connection`. 

2673 

2674 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2675 "autobegin" behavior that will create a new 

2676 :class:`_engine.RootTransaction` whenever a connection in a 

2677 non-transactional state is used to emit commands on the DBAPI connection. 

2678 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2679 use can be controlled using the :meth:`_engine.Connection.commit` and 

2680 :meth:`_engine.Connection.rollback` methods. 

2681 

2682 

2683 """ 

2684 

2685 _is_root = True 

2686 

2687 __slots__ = ("connection", "is_active") 

2688 

2689 def __init__(self, connection: Connection): 

2690 assert connection._transaction is None 

2691 if connection._trans_context_manager: 

2692 TransactionalContext._trans_ctx_check(connection) 

2693 self.connection = connection 

2694 self._connection_begin_impl() 

2695 connection._transaction = self 

2696 

2697 self.is_active = True 

2698 

2699 def _deactivate_from_connection(self) -> None: 

2700 if self.is_active: 

2701 assert self.connection._transaction is self 

2702 self.is_active = False 

2703 

2704 elif self.connection._transaction is not self: 

2705 util.warn("transaction already deassociated from connection") 

2706 

2707 @property 

2708 def _deactivated_from_connection(self) -> bool: 

2709 return self.connection._transaction is not self 

2710 

2711 def _connection_begin_impl(self) -> None: 

2712 self.connection._begin_impl(self) 

2713 

2714 def _connection_rollback_impl(self) -> None: 

2715 self.connection._rollback_impl() 

2716 

2717 def _connection_commit_impl(self) -> None: 

2718 self.connection._commit_impl() 

2719 

2720 def _close_impl(self, try_deactivate: bool = False) -> None: 

2721 try: 

2722 if self.is_active: 

2723 self._connection_rollback_impl() 

2724 

2725 if self.connection._nested_transaction: 

2726 self.connection._nested_transaction._cancel() 

2727 finally: 

2728 if self.is_active or try_deactivate: 

2729 self._deactivate_from_connection() 

2730 if self.connection._transaction is self: 

2731 self.connection._transaction = None 

2732 

2733 assert not self.is_active 

2734 assert self.connection._transaction is not self 

2735 

2736 def _do_close(self) -> None: 

2737 self._close_impl() 

2738 

2739 def _do_rollback(self) -> None: 

2740 self._close_impl(try_deactivate=True) 

2741 

2742 def _do_commit(self) -> None: 

2743 if self.is_active: 

2744 assert self.connection._transaction is self 

2745 

2746 try: 

2747 self._connection_commit_impl() 

2748 finally: 

2749 # whether or not commit succeeds, cancel any 

2750 # nested transactions, make this transaction "inactive" 

2751 # and remove it as a reset agent 

2752 if self.connection._nested_transaction: 

2753 self.connection._nested_transaction._cancel() 

2754 

2755 self._deactivate_from_connection() 

2756 

2757 # ...however only remove as the connection's current transaction 

2758 # if commit succeeded. otherwise it stays on so that a rollback 

2759 # needs to occur. 

2760 self.connection._transaction = None 

2761 else: 

2762 if self.connection._transaction is self: 

2763 self.connection._invalid_transaction() 

2764 else: 

2765 raise exc.InvalidRequestError("This transaction is inactive") 

2766 

2767 assert not self.is_active 

2768 assert self.connection._transaction is not self 

2769 

2770 

2771class NestedTransaction(Transaction): 

2772 """Represent a 'nested', or SAVEPOINT transaction. 

2773 

2774 The :class:`.NestedTransaction` object is created by calling the 

2775 :meth:`_engine.Connection.begin_nested` method of 

2776 :class:`_engine.Connection`. 

2777 

2778 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2779 "commit" / "rollback" are as follows: 

2780 

2781 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2782 the savepoint is given an explicit name that is part of the state 

2783 of this object. 

2784 

2785 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2786 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2787 with this :class:`.NestedTransaction`. 

2788 

2789 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2790 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2791 associated with this :class:`.NestedTransaction`. 

2792 

2793 The rationale for mimicking the semantics of an outer transaction in 

2794 terms of savepoints so that code may deal with a "savepoint" transaction 

2795 and an "outer" transaction in an agnostic way. 

2796 

2797 .. seealso:: 

2798 

2799 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2800 

2801 """ 

2802 

2803 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2804 

2805 _savepoint: str 

2806 

2807 def __init__(self, connection: Connection): 

2808 assert connection._transaction is not None 

2809 if connection._trans_context_manager: 

2810 TransactionalContext._trans_ctx_check(connection) 

2811 self.connection = connection 

2812 self._savepoint = self.connection._savepoint_impl() 

2813 self.is_active = True 

2814 self._previous_nested = connection._nested_transaction 

2815 connection._nested_transaction = self 

2816 

2817 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2818 if self.connection._nested_transaction is self: 

2819 self.connection._nested_transaction = self._previous_nested 

2820 elif warn: 

2821 util.warn( 

2822 "nested transaction already deassociated from connection" 

2823 ) 

2824 

2825 @property 

2826 def _deactivated_from_connection(self) -> bool: 

2827 return self.connection._nested_transaction is not self 

2828 

2829 def _cancel(self) -> None: 

2830 # called by RootTransaction when the outer transaction is 

2831 # committed, rolled back, or closed to cancel all savepoints 

2832 # without any action being taken 

2833 self.is_active = False 

2834 self._deactivate_from_connection() 

2835 if self._previous_nested: 

2836 self._previous_nested._cancel() 

2837 

2838 def _close_impl( 

2839 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2840 ) -> None: 

2841 try: 

2842 if ( 

2843 self.is_active 

2844 and self.connection._transaction 

2845 and self.connection._transaction.is_active 

2846 ): 

2847 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2848 finally: 

2849 self.is_active = False 

2850 

2851 if deactivate_from_connection: 

2852 self._deactivate_from_connection(warn=warn_already_deactive) 

2853 

2854 assert not self.is_active 

2855 if deactivate_from_connection: 

2856 assert self.connection._nested_transaction is not self 

2857 

2858 def _do_close(self) -> None: 

2859 self._close_impl(True, False) 

2860 

2861 def _do_rollback(self) -> None: 

2862 self._close_impl(True, True) 

2863 

2864 def _do_commit(self) -> None: 

2865 if self.is_active: 

2866 try: 

2867 self.connection._release_savepoint_impl(self._savepoint) 

2868 finally: 

2869 # nested trans becomes inactive on failed release 

2870 # unconditionally. this prevents it from trying to 

2871 # emit SQL when it rolls back. 

2872 self.is_active = False 

2873 

2874 # but only de-associate from connection if it succeeded 

2875 self._deactivate_from_connection() 

2876 else: 

2877 if self.connection._nested_transaction is self: 

2878 self.connection._invalid_transaction() 

2879 else: 

2880 raise exc.InvalidRequestError( 

2881 "This nested transaction is inactive" 

2882 ) 

2883 

2884 

2885class TwoPhaseTransaction(RootTransaction): 

2886 """Represent a two-phase transaction. 

2887 

2888 A new :class:`.TwoPhaseTransaction` object may be procured 

2889 using the :meth:`_engine.Connection.begin_twophase` method. 

2890 

2891 The interface is the same as that of :class:`.Transaction` 

2892 with the addition of the :meth:`prepare` method. 

2893 

2894 """ 

2895 

2896 __slots__ = ("xid", "_is_prepared") 

2897 

2898 xid: Any 

2899 

2900 def __init__(self, connection: Connection, xid: Any): 

2901 self._is_prepared = False 

2902 self.xid = xid 

2903 super().__init__(connection) 

2904 

2905 def prepare(self) -> None: 

2906 """Prepare this :class:`.TwoPhaseTransaction`. 

2907 

2908 After a PREPARE, the transaction can be committed. 

2909 

2910 """ 

2911 if not self.is_active: 

2912 raise exc.InvalidRequestError("This transaction is inactive") 

2913 self.connection._prepare_twophase_impl(self.xid) 

2914 self._is_prepared = True 

2915 

2916 def _connection_begin_impl(self) -> None: 

2917 self.connection._begin_twophase_impl(self) 

2918 

2919 def _connection_rollback_impl(self) -> None: 

2920 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2921 

2922 def _connection_commit_impl(self) -> None: 

2923 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2924 

2925 

2926class Engine( 

2927 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2928): 

2929 """ 

2930 Connects a :class:`~sqlalchemy.pool.Pool` and 

2931 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2932 source of database connectivity and behavior. 

2933 

2934 An :class:`_engine.Engine` object is instantiated publicly using the 

2935 :func:`~sqlalchemy.create_engine` function. 

2936 

2937 .. seealso:: 

2938 

2939 :doc:`/core/engines` 

2940 

2941 :ref:`connections_toplevel` 

2942 

2943 """ 

2944 

2945 dispatch: dispatcher[ConnectionEventsTarget] 

2946 

2947 _compiled_cache: Optional[CompiledCacheType] 

2948 

2949 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2950 _has_events: bool = False 

2951 _connection_cls: Type[Connection] = Connection 

2952 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2953 _is_future: bool = False 

2954 

2955 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2956 _option_cls: Type[OptionEngine] 

2957 

2958 dialect: Dialect 

2959 pool: Pool 

2960 url: URL 

2961 hide_parameters: bool 

2962 

2963 def __init__( 

2964 self, 

2965 pool: Pool, 

2966 dialect: Dialect, 

2967 url: URL, 

2968 logging_name: Optional[str] = None, 

2969 echo: Optional[_EchoFlagType] = None, 

2970 query_cache_size: int = 500, 

2971 execution_options: Optional[Mapping[str, Any]] = None, 

2972 hide_parameters: bool = False, 

2973 ): 

2974 self.pool = pool 

2975 self.url = url 

2976 self.dialect = dialect 

2977 if logging_name: 

2978 self.logging_name = logging_name 

2979 self.echo = echo 

2980 self.hide_parameters = hide_parameters 

2981 if query_cache_size != 0: 

2982 self._compiled_cache = util.LRUCache( 

2983 query_cache_size, size_alert=self._lru_size_alert 

2984 ) 

2985 else: 

2986 self._compiled_cache = None 

2987 log.instance_logger(self, echoflag=echo) 

2988 if execution_options: 

2989 self.update_execution_options(**execution_options) 

2990 

2991 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2992 if self._should_log_info(): 

2993 self.logger.info( 

2994 "Compiled cache size pruning from %d items to %d. " 

2995 "Increase cache size to reduce the frequency of pruning.", 

2996 len(cache), 

2997 cache.capacity, 

2998 ) 

2999 

3000 @property 

3001 def engine(self) -> Engine: 

3002 """Returns this :class:`.Engine`. 

3003 

3004 Used for legacy schemes that accept :class:`.Connection` / 

3005 :class:`.Engine` objects within the same variable. 

3006 

3007 """ 

3008 return self 

3009 

3010 def clear_compiled_cache(self) -> None: 

3011 """Clear the compiled cache associated with the dialect. 

3012 

3013 This applies **only** to the built-in cache that is established 

3014 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

3015 It will not impact any dictionary caches that were passed via the 

3016 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

3017 

3018 .. versionadded:: 1.4 

3019 

3020 """ 

3021 if self._compiled_cache: 

3022 self._compiled_cache.clear() 

3023 

3024 def update_execution_options(self, **opt: Any) -> None: 

3025 r"""Update the default execution_options dictionary 

3026 of this :class:`_engine.Engine`. 

3027 

3028 The given keys/values in \**opt are added to the 

3029 default execution options that will be used for 

3030 all connections. The initial contents of this dictionary 

3031 can be sent via the ``execution_options`` parameter 

3032 to :func:`_sa.create_engine`. 

3033 

3034 .. seealso:: 

3035 

3036 :meth:`_engine.Connection.execution_options` 

3037 

3038 :meth:`_engine.Engine.execution_options` 

3039 

3040 """ 

3041 self.dispatch.set_engine_execution_options(self, opt) 

3042 self._execution_options = self._execution_options.union(opt) 

3043 self.dialect.set_engine_execution_options(self, opt) 

3044 

3045 @overload 

3046 def execution_options( 

3047 self, 

3048 *, 

3049 compiled_cache: Optional[CompiledCacheType] = ..., 

3050 logging_token: str = ..., 

3051 isolation_level: IsolationLevel = ..., 

3052 insertmanyvalues_page_size: int = ..., 

3053 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3054 **opt: Any, 

3055 ) -> OptionEngine: ... 

3056 

3057 @overload 

3058 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3059 

3060 def execution_options(self, **opt: Any) -> OptionEngine: 

3061 """Return a new :class:`_engine.Engine` that will provide 

3062 :class:`_engine.Connection` objects with the given execution options. 

3063 

3064 The returned :class:`_engine.Engine` remains related to the original 

3065 :class:`_engine.Engine` in that it shares the same connection pool and 

3066 other state: 

3067 

3068 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3069 is the 

3070 same instance. The :meth:`_engine.Engine.dispose` 

3071 method will replace 

3072 the connection pool instance for the parent engine as well 

3073 as this one. 

3074 * Event listeners are "cascaded" - meaning, the new 

3075 :class:`_engine.Engine` 

3076 inherits the events of the parent, and new events can be associated 

3077 with the new :class:`_engine.Engine` individually. 

3078 * The logging configuration and logging_name is copied from the parent 

3079 :class:`_engine.Engine`. 

3080 

3081 The intent of the :meth:`_engine.Engine.execution_options` method is 

3082 to implement schemes where multiple :class:`_engine.Engine` 

3083 objects refer to the same connection pool, but are differentiated 

3084 by options that affect some execution-level behavior for each 

3085 engine. One such example is breaking into separate "reader" and 

3086 "writer" :class:`_engine.Engine` instances, where one 

3087 :class:`_engine.Engine` 

3088 has a lower :term:`isolation level` setting configured or is even 

3089 transaction-disabled using "autocommit". An example of this 

3090 configuration is at :ref:`dbapi_autocommit_multiple`. 

3091 

3092 Another example is one that 

3093 uses a custom option ``shard_id`` which is consumed by an event 

3094 to change the current schema on a database connection:: 

3095 

3096 from sqlalchemy import event 

3097 from sqlalchemy.engine import Engine 

3098 

3099 primary_engine = create_engine("mysql+mysqldb://") 

3100 shard1 = primary_engine.execution_options(shard_id="shard1") 

3101 shard2 = primary_engine.execution_options(shard_id="shard2") 

3102 

3103 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3104 

3105 @event.listens_for(Engine, "before_cursor_execute") 

3106 def _switch_shard(conn, cursor, stmt, 

3107 params, context, executemany): 

3108 shard_id = conn.get_execution_options().get('shard_id', "default") 

3109 current_shard = conn.info.get("current_shard", None) 

3110 

3111 if current_shard != shard_id: 

3112 cursor.execute("use %s" % shards[shard_id]) 

3113 conn.info["current_shard"] = shard_id 

3114 

3115 The above recipe illustrates two :class:`_engine.Engine` objects that 

3116 will each serve as factories for :class:`_engine.Connection` objects 

3117 that have pre-established "shard_id" execution options present. A 

3118 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3119 then interprets this execution option to emit a MySQL ``use`` statement 

3120 to switch databases before a statement execution, while at the same 

3121 time keeping track of which database we've established using the 

3122 :attr:`_engine.Connection.info` dictionary. 

3123 

3124 .. seealso:: 

3125 

3126 :meth:`_engine.Connection.execution_options` 

3127 - update execution options 

3128 on a :class:`_engine.Connection` object. 

3129 

3130 :meth:`_engine.Engine.update_execution_options` 

3131 - update the execution 

3132 options for a given :class:`_engine.Engine` in place. 

3133 

3134 :meth:`_engine.Engine.get_execution_options` 

3135 

3136 

3137 """ # noqa: E501 

3138 return self._option_cls(self, opt) 

3139 

3140 def get_execution_options(self) -> _ExecuteOptions: 

3141 """Get the non-SQL options which will take effect during execution. 

3142 

3143 .. versionadded: 1.3 

3144 

3145 .. seealso:: 

3146 

3147 :meth:`_engine.Engine.execution_options` 

3148 """ 

3149 return self._execution_options 

3150 

3151 @property 

3152 def name(self) -> str: 

3153 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3154 in use by this :class:`Engine`. 

3155 

3156 """ 

3157 

3158 return self.dialect.name 

3159 

3160 @property 

3161 def driver(self) -> str: 

3162 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3163 in use by this :class:`Engine`. 

3164 

3165 """ 

3166 

3167 return self.dialect.driver 

3168 

3169 echo = log.echo_property() 

3170 

3171 def __repr__(self) -> str: 

3172 return "Engine(%r)" % (self.url,) 

3173 

3174 def dispose(self, close: bool = True) -> None: 

3175 """Dispose of the connection pool used by this 

3176 :class:`_engine.Engine`. 

3177 

3178 A new connection pool is created immediately after the old one has been 

3179 disposed. The previous connection pool is disposed either actively, by 

3180 closing out all currently checked-in connections in that pool, or 

3181 passively, by losing references to it but otherwise not closing any 

3182 connections. The latter strategy is more appropriate for an initializer 

3183 in a forked Python process. 

3184 

3185 :param close: if left at its default of ``True``, has the 

3186 effect of fully closing all **currently checked in** 

3187 database connections. Connections that are still checked out 

3188 will **not** be closed, however they will no longer be associated 

3189 with this :class:`_engine.Engine`, 

3190 so when they are closed individually, eventually the 

3191 :class:`_pool.Pool` which they are associated with will 

3192 be garbage collected and they will be closed out fully, if 

3193 not already closed on checkin. 

3194 

3195 If set to ``False``, the previous connection pool is de-referenced, 

3196 and otherwise not touched in any way. 

3197 

3198 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3199 parameter to allow the replacement of a connection pool in a child 

3200 process without interfering with the connections used by the parent 

3201 process. 

3202 

3203 

3204 .. seealso:: 

3205 

3206 :ref:`engine_disposal` 

3207 

3208 :ref:`pooling_multiprocessing` 

3209 

3210 """ 

3211 if close: 

3212 self.pool.dispose() 

3213 self.pool = self.pool.recreate() 

3214 self.dispatch.engine_disposed(self) 

3215 

3216 @contextlib.contextmanager 

3217 def _optional_conn_ctx_manager( 

3218 self, connection: Optional[Connection] = None 

3219 ) -> Iterator[Connection]: 

3220 if connection is None: 

3221 with self.connect() as conn: 

3222 yield conn 

3223 else: 

3224 yield connection 

3225 

3226 @contextlib.contextmanager 

3227 def begin(self) -> Iterator[Connection]: 

3228 """Return a context manager delivering a :class:`_engine.Connection` 

3229 with a :class:`.Transaction` established. 

3230 

3231 E.g.:: 

3232 

3233 with engine.begin() as conn: 

3234 conn.execute( 

3235 text("insert into table (x, y, z) values (1, 2, 3)") 

3236 ) 

3237 conn.execute(text("my_special_procedure(5)")) 

3238 

3239 Upon successful operation, the :class:`.Transaction` 

3240 is committed. If an error is raised, the :class:`.Transaction` 

3241 is rolled back. 

3242 

3243 .. seealso:: 

3244 

3245 :meth:`_engine.Engine.connect` - procure a 

3246 :class:`_engine.Connection` from 

3247 an :class:`_engine.Engine`. 

3248 

3249 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3250 for a particular :class:`_engine.Connection`. 

3251 

3252 """ 

3253 with self.connect() as conn: 

3254 with conn.begin(): 

3255 yield conn 

3256 

3257 def _run_ddl_visitor( 

3258 self, 

3259 visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]], 

3260 element: SchemaItem, 

3261 **kwargs: Any, 

3262 ) -> None: 

3263 with self.begin() as conn: 

3264 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3265 

3266 def connect(self) -> Connection: 

3267 """Return a new :class:`_engine.Connection` object. 

3268 

3269 The :class:`_engine.Connection` acts as a Python context manager, so 

3270 the typical use of this method looks like:: 

3271 

3272 with engine.connect() as connection: 

3273 connection.execute(text("insert into table values ('foo')")) 

3274 connection.commit() 

3275 

3276 Where above, after the block is completed, the connection is "closed" 

3277 and its underlying DBAPI resources are returned to the connection pool. 

3278 This also has the effect of rolling back any transaction that 

3279 was explicitly begun or was begun via autobegin, and will 

3280 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3281 started and is still in progress. 

3282 

3283 .. seealso:: 

3284 

3285 :meth:`_engine.Engine.begin` 

3286 

3287 """ 

3288 

3289 return self._connection_cls(self) 

3290 

3291 def raw_connection(self) -> PoolProxiedConnection: 

3292 """Return a "raw" DBAPI connection from the connection pool. 

3293 

3294 The returned object is a proxied version of the DBAPI 

3295 connection object used by the underlying driver in use. 

3296 The object will have all the same behavior as the real DBAPI 

3297 connection, except that its ``close()`` method will result in the 

3298 connection being returned to the pool, rather than being closed 

3299 for real. 

3300 

3301 This method provides direct DBAPI connection access for 

3302 special situations when the API provided by 

3303 :class:`_engine.Connection` 

3304 is not needed. When a :class:`_engine.Connection` object is already 

3305 present, the DBAPI connection is available using 

3306 the :attr:`_engine.Connection.connection` accessor. 

3307 

3308 .. seealso:: 

3309 

3310 :ref:`dbapi_connections` 

3311 

3312 """ 

3313 return self.pool.connect() 

3314 

3315 

3316class OptionEngineMixin(log.Identified): 

3317 _sa_propagate_class_events = False 

3318 

3319 dispatch: dispatcher[ConnectionEventsTarget] 

3320 _compiled_cache: Optional[CompiledCacheType] 

3321 dialect: Dialect 

3322 pool: Pool 

3323 url: URL 

3324 hide_parameters: bool 

3325 echo: log.echo_property 

3326 

3327 def __init__( 

3328 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3329 ): 

3330 self._proxied = proxied 

3331 self.url = proxied.url 

3332 self.dialect = proxied.dialect 

3333 self.logging_name = proxied.logging_name 

3334 self.echo = proxied.echo 

3335 self._compiled_cache = proxied._compiled_cache 

3336 self.hide_parameters = proxied.hide_parameters 

3337 log.instance_logger(self, echoflag=self.echo) 

3338 

3339 # note: this will propagate events that are assigned to the parent 

3340 # engine after this OptionEngine is created. Since we share 

3341 # the events of the parent we also disallow class-level events 

3342 # to apply to the OptionEngine class directly. 

3343 # 

3344 # the other way this can work would be to transfer existing 

3345 # events only, using: 

3346 # self.dispatch._update(proxied.dispatch) 

3347 # 

3348 # that might be more appropriate however it would be a behavioral 

3349 # change for logic that assigns events to the parent engine and 

3350 # would like it to take effect for the already-created sub-engine. 

3351 self.dispatch = self.dispatch._join(proxied.dispatch) 

3352 

3353 self._execution_options = proxied._execution_options 

3354 self.update_execution_options(**execution_options) 

3355 

3356 def update_execution_options(self, **opt: Any) -> None: 

3357 raise NotImplementedError() 

3358 

3359 if not typing.TYPE_CHECKING: 

3360 # https://github.com/python/typing/discussions/1095 

3361 

3362 @property 

3363 def pool(self) -> Pool: 

3364 return self._proxied.pool 

3365 

3366 @pool.setter 

3367 def pool(self, pool: Pool) -> None: 

3368 self._proxied.pool = pool 

3369 

3370 @property 

3371 def _has_events(self) -> bool: 

3372 return self._proxied._has_events or self.__dict__.get( 

3373 "_has_events", False 

3374 ) 

3375 

3376 @_has_events.setter 

3377 def _has_events(self, value: bool) -> None: 

3378 self.__dict__["_has_events"] = value 

3379 

3380 

3381class OptionEngine(OptionEngineMixin, Engine): 

3382 def update_execution_options(self, **opt: Any) -> None: 

3383 Engine.update_execution_options(self, **opt) 

3384 

3385 

3386Engine._option_cls = OptionEngine