Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1000 statements  

1# engine/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`. 

8 

9""" 

10from __future__ import annotations 

11 

12import contextlib 

13import sys 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Iterable 

19from typing import Iterator 

20from typing import List 

21from typing import Mapping 

22from typing import NoReturn 

23from typing import Optional 

24from typing import overload 

25from typing import Tuple 

26from typing import Type 

27from typing import TypeVar 

28from typing import Union 

29 

30from .interfaces import BindTyping 

31from .interfaces import ConnectionEventsTarget 

32from .interfaces import DBAPICursor 

33from .interfaces import ExceptionContext 

34from .interfaces import ExecuteStyle 

35from .interfaces import ExecutionContext 

36from .interfaces import IsolationLevel 

37from .util import _distill_params_20 

38from .util import _distill_raw_params 

39from .util import TransactionalContext 

40from .. import exc 

41from .. import inspection 

42from .. import log 

43from .. import util 

44from ..sql import compiler 

45from ..sql import util as sql_util 

46 

47if typing.TYPE_CHECKING: 

48 from . import CursorResult 

49 from . import ScalarResult 

50 from .interfaces import _AnyExecuteParams 

51 from .interfaces import _AnyMultiExecuteParams 

52 from .interfaces import _CoreAnyExecuteParams 

53 from .interfaces import _CoreMultiExecuteParams 

54 from .interfaces import _CoreSingleExecuteParams 

55 from .interfaces import _DBAPIAnyExecuteParams 

56 from .interfaces import _DBAPISingleExecuteParams 

57 from .interfaces import _ExecuteOptions 

58 from .interfaces import CompiledCacheType 

59 from .interfaces import CoreExecuteOptionsParameter 

60 from .interfaces import Dialect 

61 from .interfaces import SchemaTranslateMapType 

62 from .reflection import Inspector # noqa 

63 from .url import URL 

64 from ..event import dispatcher 

65 from ..log import _EchoFlagType 

66 from ..pool import _ConnectionFairy 

67 from ..pool import Pool 

68 from ..pool import PoolProxiedConnection 

69 from ..sql import Executable 

70 from ..sql._typing import _InfoType 

71 from ..sql.compiler import Compiled 

72 from ..sql.ddl import ExecutableDDLElement 

73 from ..sql.ddl import InvokeDDLBase 

74 from ..sql.functions import FunctionElement 

75 from ..sql.schema import DefaultGenerator 

76 from ..sql.schema import HasSchemaAttr 

77 from ..sql.schema import SchemaVisitable 

78 from ..sql.selectable import TypedReturnsRows 

79 

80 

81_T = TypeVar("_T", bound=Any) 

82_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

83NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

84 

85 

86class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

87 """Provides high-level functionality for a wrapped DB-API connection. 

88 

89 The :class:`_engine.Connection` object is procured by calling the 

90 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

91 object, and provides services for execution of SQL statements as well 

92 as transaction control. 

93 

94 The Connection object is **not** thread-safe. While a Connection can be 

95 shared among threads using properly synchronized access, it is still 

96 possible that the underlying DBAPI connection may not support shared 

97 access between threads. Check the DBAPI documentation for details. 

98 

99 The Connection object represents a single DBAPI connection checked out 

100 from the connection pool. In this state, the connection pool has no 

101 affect upon the connection, including its expiration or timeout state. 

102 For the connection pool to properly manage connections, connections 

103 should be returned to the connection pool (i.e. ``connection.close()``) 

104 whenever the connection is not in use. 

105 

106 .. index:: 

107 single: thread safety; Connection 

108 

109 """ 

110 

111 dialect: Dialect 

112 dispatch: dispatcher[ConnectionEventsTarget] 

113 

114 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

115 

116 # used by sqlalchemy.engine.util.TransactionalContext 

117 _trans_context_manager: Optional[TransactionalContext] = None 

118 

119 # legacy as of 2.0, should be eventually deprecated and 

120 # removed. was used in the "pre_ping" recipe that's been in the docs 

121 # a long time 

122 should_close_with_result = False 

123 

124 _dbapi_connection: Optional[PoolProxiedConnection] 

125 

126 _execution_options: _ExecuteOptions 

127 

128 _transaction: Optional[RootTransaction] 

129 _nested_transaction: Optional[NestedTransaction] 

130 

131 def __init__( 

132 self, 

133 engine: Engine, 

134 connection: Optional[PoolProxiedConnection] = None, 

135 _has_events: Optional[bool] = None, 

136 _allow_revalidate: bool = True, 

137 _allow_autobegin: bool = True, 

138 ): 

139 """Construct a new Connection.""" 

140 self.engine = engine 

141 self.dialect = dialect = engine.dialect 

142 

143 if connection is None: 

144 try: 

145 self._dbapi_connection = engine.raw_connection() 

146 except dialect.loaded_dbapi.Error as err: 

147 Connection._handle_dbapi_exception_noconnection( 

148 err, dialect, engine 

149 ) 

150 raise 

151 else: 

152 self._dbapi_connection = connection 

153 

154 self._transaction = self._nested_transaction = None 

155 self.__savepoint_seq = 0 

156 self.__in_begin = False 

157 

158 self.__can_reconnect = _allow_revalidate 

159 self._allow_autobegin = _allow_autobegin 

160 self._echo = self.engine._should_log_info() 

161 

162 if _has_events is None: 

163 # if _has_events is sent explicitly as False, 

164 # then don't join the dispatch of the engine; we don't 

165 # want to handle any of the engine's events in that case. 

166 self.dispatch = self.dispatch._join(engine.dispatch) 

167 self._has_events = _has_events or ( 

168 _has_events is None and engine._has_events 

169 ) 

170 

171 self._execution_options = engine._execution_options 

172 

173 if self._has_events or self.engine._has_events: 

174 self.dispatch.engine_connect(self) 

175 

176 # this can be assigned differently via 

177 # characteristics.LoggingTokenCharacteristic 

178 _message_formatter: Any = None 

179 

180 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

181 fmt = self._message_formatter 

182 

183 if fmt: 

184 message = fmt(message) 

185 

186 if log.STACKLEVEL: 

187 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

188 

189 self.engine.logger.info(message, *arg, **kw) 

190 

191 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

192 fmt = self._message_formatter 

193 

194 if fmt: 

195 message = fmt(message) 

196 

197 if log.STACKLEVEL: 

198 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

199 

200 self.engine.logger.debug(message, *arg, **kw) 

201 

202 @property 

203 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

204 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

205 self._execution_options.get("schema_translate_map", None) 

206 ) 

207 

208 return schema_translate_map 

209 

210 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

211 """Return the schema name for the given schema item taking into 

212 account current schema translate map. 

213 

214 """ 

215 

216 name = obj.schema 

217 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

218 self._execution_options.get("schema_translate_map", None) 

219 ) 

220 

221 if ( 

222 schema_translate_map 

223 and name in schema_translate_map 

224 and obj._use_schema_map 

225 ): 

226 return schema_translate_map[name] 

227 else: 

228 return name 

229 

230 def __enter__(self) -> Connection: 

231 return self 

232 

233 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

234 self.close() 

235 

236 @overload 

237 def execution_options( 

238 self, 

239 *, 

240 compiled_cache: Optional[CompiledCacheType] = ..., 

241 logging_token: str = ..., 

242 isolation_level: IsolationLevel = ..., 

243 no_parameters: bool = False, 

244 stream_results: bool = False, 

245 max_row_buffer: int = ..., 

246 yield_per: int = ..., 

247 insertmanyvalues_page_size: int = ..., 

248 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

249 preserve_rowcount: bool = False, 

250 **opt: Any, 

251 ) -> Connection: ... 

252 

253 @overload 

254 def execution_options(self, **opt: Any) -> Connection: ... 

255 

256 def execution_options(self, **opt: Any) -> Connection: 

257 r"""Set non-SQL options for the connection which take effect 

258 during execution. 

259 

260 This method modifies this :class:`_engine.Connection` **in-place**; 

261 the return value is the same :class:`_engine.Connection` object 

262 upon which the method is called. Note that this is in contrast 

263 to the behavior of the ``execution_options`` methods on other 

264 objects such as :meth:`_engine.Engine.execution_options` and 

265 :meth:`_sql.Executable.execution_options`. The rationale is that many 

266 such execution options necessarily modify the state of the base 

267 DBAPI connection in any case so there is no feasible means of 

268 keeping the effect of such an option localized to a "sub" connection. 

269 

270 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

271 method, in contrast to other objects with this method, modifies 

272 the connection in-place without creating copy of it. 

273 

274 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

275 method accepts any arbitrary parameters including user defined names. 

276 All parameters given are consumable in a number of ways including 

277 by using the :meth:`_engine.Connection.get_execution_options` method. 

278 See the examples at :meth:`_sql.Executable.execution_options` 

279 and :meth:`_engine.Engine.execution_options`. 

280 

281 The keywords that are currently recognized by SQLAlchemy itself 

282 include all those listed under :meth:`.Executable.execution_options`, 

283 as well as others that are specific to :class:`_engine.Connection`. 

284 

285 :param compiled_cache: Available on: :class:`_engine.Connection`, 

286 :class:`_engine.Engine`. 

287 

288 A dictionary where :class:`.Compiled` objects 

289 will be cached when the :class:`_engine.Connection` 

290 compiles a clause 

291 expression into a :class:`.Compiled` object. This dictionary will 

292 supersede the statement cache that may be configured on the 

293 :class:`_engine.Engine` itself. If set to None, caching 

294 is disabled, even if the engine has a configured cache size. 

295 

296 Note that the ORM makes use of its own "compiled" caches for 

297 some operations, including flush operations. The caching 

298 used by the ORM internally supersedes a cache dictionary 

299 specified here. 

300 

301 :param logging_token: Available on: :class:`_engine.Connection`, 

302 :class:`_engine.Engine`, :class:`_sql.Executable`. 

303 

304 Adds the specified string token surrounded by brackets in log 

305 messages logged by the connection, i.e. the logging that's enabled 

306 either via the :paramref:`_sa.create_engine.echo` flag or via the 

307 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

308 per-connection or per-sub-engine token to be available which is 

309 useful for debugging concurrent connection scenarios. 

310 

311 .. versionadded:: 1.4.0b2 

312 

313 .. seealso:: 

314 

315 :ref:`dbengine_logging_tokens` - usage example 

316 

317 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

318 name used by the Python logger object itself. 

319 

320 :param isolation_level: Available on: :class:`_engine.Connection`, 

321 :class:`_engine.Engine`. 

322 

323 Set the transaction isolation level for the lifespan of this 

324 :class:`_engine.Connection` object. 

325 Valid values include those string 

326 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

327 parameter passed to :func:`_sa.create_engine`. These levels are 

328 semi-database specific; see individual dialect documentation for 

329 valid levels. 

330 

331 The isolation level option applies the isolation level by emitting 

332 statements on the DBAPI connection, and **necessarily affects the 

333 original Connection object overall**. The isolation level will remain 

334 at the given setting until explicitly changed, or when the DBAPI 

335 connection itself is :term:`released` to the connection pool, i.e. the 

336 :meth:`_engine.Connection.close` method is called, at which time an 

337 event handler will emit additional statements on the DBAPI connection 

338 in order to revert the isolation level change. 

339 

340 .. note:: The ``isolation_level`` execution option may only be 

341 established before the :meth:`_engine.Connection.begin` method is 

342 called, as well as before any SQL statements are emitted which 

343 would otherwise trigger "autobegin", or directly after a call to 

344 :meth:`_engine.Connection.commit` or 

345 :meth:`_engine.Connection.rollback`. A database cannot change the 

346 isolation level on a transaction in progress. 

347 

348 .. note:: The ``isolation_level`` execution option is implicitly 

349 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

350 the :meth:`_engine.Connection.invalidate` method, or if a 

351 disconnection error occurs. The new connection produced after the 

352 invalidation will **not** have the selected isolation level 

353 re-applied to it automatically. 

354 

355 .. seealso:: 

356 

357 :ref:`dbapi_autocommit` 

358 

359 :meth:`_engine.Connection.get_isolation_level` 

360 - view current actual level 

361 

362 :param no_parameters: Available on: :class:`_engine.Connection`, 

363 :class:`_sql.Executable`. 

364 

365 When ``True``, if the final parameter 

366 list or dictionary is totally empty, will invoke the 

367 statement on the cursor as ``cursor.execute(statement)``, 

368 not passing the parameter collection at all. 

369 Some DBAPIs such as psycopg2 and mysql-python consider 

370 percent signs as significant only when parameters are 

371 present; this option allows code to generate SQL 

372 containing percent signs (and possibly other characters) 

373 that is neutral regarding whether it's executed by the DBAPI 

374 or piped into a script that's later invoked by 

375 command line tools. 

376 

377 :param stream_results: Available on: :class:`_engine.Connection`, 

378 :class:`_sql.Executable`. 

379 

380 Indicate to the dialect that results should be "streamed" and not 

381 pre-buffered, if possible. For backends such as PostgreSQL, MySQL 

382 and MariaDB, this indicates the use of a "server side cursor" as 

383 opposed to a client side cursor. Other backends such as that of 

384 Oracle Database may already use server side cursors by default. 

385 

386 The usage of 

387 :paramref:`_engine.Connection.execution_options.stream_results` is 

388 usually combined with setting a fixed number of rows to to be fetched 

389 in batches, to allow for efficient iteration of database rows while 

390 at the same time not loading all result rows into memory at once; 

391 this can be configured on a :class:`_engine.Result` object using the 

392 :meth:`_engine.Result.yield_per` method, after execution has 

393 returned a new :class:`_engine.Result`. If 

394 :meth:`_engine.Result.yield_per` is not used, 

395 the :paramref:`_engine.Connection.execution_options.stream_results` 

396 mode of operation will instead use a dynamically sized buffer 

397 which buffers sets of rows at a time, growing on each batch 

398 based on a fixed growth size up until a limit which may 

399 be configured using the 

400 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

401 parameter. 

402 

403 When using the ORM to fetch ORM mapped objects from a result, 

404 :meth:`_engine.Result.yield_per` should always be used with 

405 :paramref:`_engine.Connection.execution_options.stream_results`, 

406 so that the ORM does not fetch all rows into new ORM objects at once. 

407 

408 For typical use, the 

409 :paramref:`_engine.Connection.execution_options.yield_per` execution 

410 option should be preferred, which sets up both 

411 :paramref:`_engine.Connection.execution_options.stream_results` and 

412 :meth:`_engine.Result.yield_per` at once. This option is supported 

413 both at a core level by :class:`_engine.Connection` as well as by the 

414 ORM :class:`_engine.Session`; the latter is described at 

415 :ref:`orm_queryguide_yield_per`. 

416 

417 .. seealso:: 

418 

419 :ref:`engine_stream_results` - background on 

420 :paramref:`_engine.Connection.execution_options.stream_results` 

421 

422 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

423 

424 :paramref:`_engine.Connection.execution_options.yield_per` 

425 

426 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

427 describing the ORM version of ``yield_per`` 

428 

429 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

430 :class:`_sql.Executable`. Sets a maximum 

431 buffer size to use when the 

432 :paramref:`_engine.Connection.execution_options.stream_results` 

433 execution option is used on a backend that supports server side 

434 cursors. The default value if not specified is 1000. 

435 

436 .. seealso:: 

437 

438 :paramref:`_engine.Connection.execution_options.stream_results` 

439 

440 :ref:`engine_stream_results` 

441 

442 

443 :param yield_per: Available on: :class:`_engine.Connection`, 

444 :class:`_sql.Executable`. Integer value applied which will 

445 set the :paramref:`_engine.Connection.execution_options.stream_results` 

446 execution option and invoke :meth:`_engine.Result.yield_per` 

447 automatically at once. Allows equivalent functionality as 

448 is present when using this parameter with the ORM. 

449 

450 .. versionadded:: 1.4.40 

451 

452 .. seealso:: 

453 

454 :ref:`engine_stream_results` - background and examples 

455 on using server side cursors with Core. 

456 

457 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

458 describing the ORM version of ``yield_per`` 

459 

460 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

461 :class:`_engine.Engine`. Number of rows to format into an 

462 INSERT statement when the statement uses "insertmanyvalues" mode, 

463 which is a paged form of bulk insert that is used for many backends 

464 when using :term:`executemany` execution typically in conjunction 

465 with RETURNING. Defaults to 1000. May also be modified on a 

466 per-engine basis using the 

467 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

468 

469 .. versionadded:: 2.0 

470 

471 .. seealso:: 

472 

473 :ref:`engine_insertmanyvalues` 

474 

475 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

476 :class:`_engine.Engine`, :class:`_sql.Executable`. 

477 

478 A dictionary mapping schema names to schema names, that will be 

479 applied to the :paramref:`_schema.Table.schema` element of each 

480 :class:`_schema.Table` 

481 encountered when SQL or DDL expression elements 

482 are compiled into strings; the resulting schema name will be 

483 converted based on presence in the map of the original name. 

484 

485 .. seealso:: 

486 

487 :ref:`schema_translating` 

488 

489 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

490 attribute will be unconditionally memoized within the result and 

491 made available via the :attr:`.CursorResult.rowcount` attribute. 

492 Normally, this attribute is only preserved for UPDATE and DELETE 

493 statements. Using this option, the DBAPIs rowcount value can 

494 be accessed for other kinds of statements such as INSERT and SELECT, 

495 to the degree that the DBAPI supports these statements. See 

496 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

497 of this attribute. 

498 

499 .. versionadded:: 2.0.28 

500 

501 .. seealso:: 

502 

503 :meth:`_engine.Engine.execution_options` 

504 

505 :meth:`.Executable.execution_options` 

506 

507 :meth:`_engine.Connection.get_execution_options` 

508 

509 :ref:`orm_queryguide_execution_options` - documentation on all 

510 ORM-specific execution options 

511 

512 """ # noqa 

513 if self._has_events or self.engine._has_events: 

514 self.dispatch.set_connection_execution_options(self, opt) 

515 self._execution_options = self._execution_options.union(opt) 

516 self.dialect.set_connection_execution_options(self, opt) 

517 return self 

518 

519 def get_execution_options(self) -> _ExecuteOptions: 

520 """Get the non-SQL options which will take effect during execution. 

521 

522 .. versionadded:: 1.3 

523 

524 .. seealso:: 

525 

526 :meth:`_engine.Connection.execution_options` 

527 """ 

528 return self._execution_options 

529 

530 @property 

531 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

532 pool_proxied_connection = self._dbapi_connection 

533 return ( 

534 pool_proxied_connection is not None 

535 and pool_proxied_connection.is_valid 

536 ) 

537 

538 @property 

539 def closed(self) -> bool: 

540 """Return True if this connection is closed.""" 

541 

542 return self._dbapi_connection is None and not self.__can_reconnect 

543 

544 @property 

545 def invalidated(self) -> bool: 

546 """Return True if this connection was invalidated. 

547 

548 This does not indicate whether or not the connection was 

549 invalidated at the pool level, however 

550 

551 """ 

552 

553 # prior to 1.4, "invalid" was stored as a state independent of 

554 # "closed", meaning an invalidated connection could be "closed", 

555 # the _dbapi_connection would be None and closed=True, yet the 

556 # "invalid" flag would stay True. This meant that there were 

557 # three separate states (open/valid, closed/valid, closed/invalid) 

558 # when there is really no reason for that; a connection that's 

559 # "closed" does not need to be "invalid". So the state is now 

560 # represented by the two facts alone. 

561 

562 pool_proxied_connection = self._dbapi_connection 

563 return pool_proxied_connection is None and self.__can_reconnect 

564 

565 @property 

566 def connection(self) -> PoolProxiedConnection: 

567 """The underlying DB-API connection managed by this Connection. 

568 

569 This is a SQLAlchemy connection-pool proxied connection 

570 which then has the attribute 

571 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

572 actual driver connection. 

573 

574 .. seealso:: 

575 

576 

577 :ref:`dbapi_connections` 

578 

579 """ 

580 

581 if self._dbapi_connection is None: 

582 try: 

583 return self._revalidate_connection() 

584 except (exc.PendingRollbackError, exc.ResourceClosedError): 

585 raise 

586 except BaseException as e: 

587 self._handle_dbapi_exception(e, None, None, None, None) 

588 else: 

589 return self._dbapi_connection 

590 

591 def get_isolation_level(self) -> IsolationLevel: 

592 """Return the current **actual** isolation level that's present on 

593 the database within the scope of this connection. 

594 

595 This attribute will perform a live SQL operation against the database 

596 in order to procure the current isolation level, so the value returned 

597 is the actual level on the underlying DBAPI connection regardless of 

598 how this state was set. This will be one of the four actual isolation 

599 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

600 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

601 level setting. Third party dialects may also feature additional 

602 isolation level settings. 

603 

604 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

605 isolation level, which is a separate :term:`dbapi` setting that's 

606 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

607 in use, the database connection still has a "traditional" isolation 

608 mode in effect, that is typically one of the four values 

609 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

610 ``SERIALIZABLE``. 

611 

612 Compare to the :attr:`_engine.Connection.default_isolation_level` 

613 accessor which returns the isolation level that is present on the 

614 database at initial connection time. 

615 

616 .. seealso:: 

617 

618 :attr:`_engine.Connection.default_isolation_level` 

619 - view default level 

620 

621 :paramref:`_sa.create_engine.isolation_level` 

622 - set per :class:`_engine.Engine` isolation level 

623 

624 :paramref:`.Connection.execution_options.isolation_level` 

625 - set per :class:`_engine.Connection` isolation level 

626 

627 """ 

628 dbapi_connection = self.connection.dbapi_connection 

629 assert dbapi_connection is not None 

630 try: 

631 return self.dialect.get_isolation_level(dbapi_connection) 

632 except BaseException as e: 

633 self._handle_dbapi_exception(e, None, None, None, None) 

634 

635 @property 

636 def default_isolation_level(self) -> Optional[IsolationLevel]: 

637 """The initial-connection time isolation level associated with the 

638 :class:`_engine.Dialect` in use. 

639 

640 This value is independent of the 

641 :paramref:`.Connection.execution_options.isolation_level` and 

642 :paramref:`.Engine.execution_options.isolation_level` execution 

643 options, and is determined by the :class:`_engine.Dialect` when the 

644 first connection is created, by performing a SQL query against the 

645 database for the current isolation level before any additional commands 

646 have been emitted. 

647 

648 Calling this accessor does not invoke any new SQL queries. 

649 

650 .. seealso:: 

651 

652 :meth:`_engine.Connection.get_isolation_level` 

653 - view current actual isolation level 

654 

655 :paramref:`_sa.create_engine.isolation_level` 

656 - set per :class:`_engine.Engine` isolation level 

657 

658 :paramref:`.Connection.execution_options.isolation_level` 

659 - set per :class:`_engine.Connection` isolation level 

660 

661 """ 

662 return self.dialect.default_isolation_level 

663 

664 def _invalid_transaction(self) -> NoReturn: 

665 raise exc.PendingRollbackError( 

666 "Can't reconnect until invalid %stransaction is rolled " 

667 "back. Please rollback() fully before proceeding" 

668 % ("savepoint " if self._nested_transaction is not None else ""), 

669 code="8s2b", 

670 ) 

671 

672 def _revalidate_connection(self) -> PoolProxiedConnection: 

673 if self.__can_reconnect and self.invalidated: 

674 if self._transaction is not None: 

675 self._invalid_transaction() 

676 self._dbapi_connection = self.engine.raw_connection() 

677 return self._dbapi_connection 

678 raise exc.ResourceClosedError("This Connection is closed") 

679 

680 @property 

681 def info(self) -> _InfoType: 

682 """Info dictionary associated with the underlying DBAPI connection 

683 referred to by this :class:`_engine.Connection`, allowing user-defined 

684 data to be associated with the connection. 

685 

686 The data here will follow along with the DBAPI connection including 

687 after it is returned to the connection pool and used again 

688 in subsequent instances of :class:`_engine.Connection`. 

689 

690 """ 

691 

692 return self.connection.info 

693 

694 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

695 """Invalidate the underlying DBAPI connection associated with 

696 this :class:`_engine.Connection`. 

697 

698 An attempt will be made to close the underlying DBAPI connection 

699 immediately; however if this operation fails, the error is logged 

700 but not raised. The connection is then discarded whether or not 

701 close() succeeded. 

702 

703 Upon the next use (where "use" typically means using the 

704 :meth:`_engine.Connection.execute` method or similar), 

705 this :class:`_engine.Connection` will attempt to 

706 procure a new DBAPI connection using the services of the 

707 :class:`_pool.Pool` as a source of connectivity (e.g. 

708 a "reconnection"). 

709 

710 If a transaction was in progress (e.g. the 

711 :meth:`_engine.Connection.begin` method has been called) when 

712 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

713 level all state associated with this transaction is lost, as 

714 the DBAPI connection is closed. The :class:`_engine.Connection` 

715 will not allow a reconnection to proceed until the 

716 :class:`.Transaction` object is ended, by calling the 

717 :meth:`.Transaction.rollback` method; until that point, any attempt at 

718 continuing to use the :class:`_engine.Connection` will raise an 

719 :class:`~sqlalchemy.exc.InvalidRequestError`. 

720 This is to prevent applications from accidentally 

721 continuing an ongoing transactional operations despite the 

722 fact that the transaction has been lost due to an 

723 invalidation. 

724 

725 The :meth:`_engine.Connection.invalidate` method, 

726 just like auto-invalidation, 

727 will at the connection pool level invoke the 

728 :meth:`_events.PoolEvents.invalidate` event. 

729 

730 :param exception: an optional ``Exception`` instance that's the 

731 reason for the invalidation. is passed along to event handlers 

732 and logging functions. 

733 

734 .. seealso:: 

735 

736 :ref:`pool_connection_invalidation` 

737 

738 """ 

739 

740 if self.invalidated: 

741 return 

742 

743 if self.closed: 

744 raise exc.ResourceClosedError("This Connection is closed") 

745 

746 if self._still_open_and_dbapi_connection_is_valid: 

747 pool_proxied_connection = self._dbapi_connection 

748 assert pool_proxied_connection is not None 

749 pool_proxied_connection.invalidate(exception) 

750 

751 self._dbapi_connection = None 

752 

753 def detach(self) -> None: 

754 """Detach the underlying DB-API connection from its connection pool. 

755 

756 E.g.:: 

757 

758 with engine.connect() as conn: 

759 conn.detach() 

760 conn.execute(text("SET search_path TO schema1, schema2")) 

761 

762 # work with connection 

763 

764 # connection is fully closed (since we used "with:", can 

765 # also call .close()) 

766 

767 This :class:`_engine.Connection` instance will remain usable. 

768 When closed 

769 (or exited from a context manager context as above), 

770 the DB-API connection will be literally closed and not 

771 returned to its originating pool. 

772 

773 This method can be used to insulate the rest of an application 

774 from a modified state on a connection (such as a transaction 

775 isolation level or similar). 

776 

777 """ 

778 

779 if self.closed: 

780 raise exc.ResourceClosedError("This Connection is closed") 

781 

782 pool_proxied_connection = self._dbapi_connection 

783 if pool_proxied_connection is None: 

784 raise exc.InvalidRequestError( 

785 "Can't detach an invalidated Connection" 

786 ) 

787 pool_proxied_connection.detach() 

788 

789 def _autobegin(self) -> None: 

790 if self._allow_autobegin and not self.__in_begin: 

791 self.begin() 

792 

793 def begin(self) -> RootTransaction: 

794 """Begin a transaction prior to autobegin occurring. 

795 

796 E.g.:: 

797 

798 with engine.connect() as conn: 

799 with conn.begin() as trans: 

800 conn.execute(table.insert(), {"username": "sandy"}) 

801 

802 The returned object is an instance of :class:`_engine.RootTransaction`. 

803 This object represents the "scope" of the transaction, 

804 which completes when either the :meth:`_engine.Transaction.rollback` 

805 or :meth:`_engine.Transaction.commit` method is called; the object 

806 also works as a context manager as illustrated above. 

807 

808 The :meth:`_engine.Connection.begin` method begins a 

809 transaction that normally will be begun in any case when the connection 

810 is first used to execute a statement. The reason this method might be 

811 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

812 event at a specific time, or to organize code within the scope of a 

813 connection checkout in terms of context managed blocks, such as:: 

814 

815 with engine.connect() as conn: 

816 with conn.begin(): 

817 conn.execute(...) 

818 conn.execute(...) 

819 

820 with conn.begin(): 

821 conn.execute(...) 

822 conn.execute(...) 

823 

824 The above code is not fundamentally any different in its behavior than 

825 the following code which does not use 

826 :meth:`_engine.Connection.begin`; the below style is known 

827 as "commit as you go" style:: 

828 

829 with engine.connect() as conn: 

830 conn.execute(...) 

831 conn.execute(...) 

832 conn.commit() 

833 

834 conn.execute(...) 

835 conn.execute(...) 

836 conn.commit() 

837 

838 From a database point of view, the :meth:`_engine.Connection.begin` 

839 method does not emit any SQL or change the state of the underlying 

840 DBAPI connection in any way; the Python DBAPI does not have any 

841 concept of explicit transaction begin. 

842 

843 .. seealso:: 

844 

845 :ref:`tutorial_working_with_transactions` - in the 

846 :ref:`unified_tutorial` 

847 

848 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

849 

850 :meth:`_engine.Connection.begin_twophase` - 

851 use a two phase /XID transaction 

852 

853 :meth:`_engine.Engine.begin` - context manager available from 

854 :class:`_engine.Engine` 

855 

856 """ 

857 if self._transaction is None: 

858 self._transaction = RootTransaction(self) 

859 return self._transaction 

860 else: 

861 raise exc.InvalidRequestError( 

862 "This connection has already initialized a SQLAlchemy " 

863 "Transaction() object via begin() or autobegin; can't " 

864 "call begin() here unless rollback() or commit() " 

865 "is called first." 

866 ) 

867 

868 def begin_nested(self) -> NestedTransaction: 

869 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

870 handle that controls the scope of the SAVEPOINT. 

871 

872 E.g.:: 

873 

874 with engine.begin() as connection: 

875 with connection.begin_nested(): 

876 connection.execute(table.insert(), {"username": "sandy"}) 

877 

878 The returned object is an instance of 

879 :class:`_engine.NestedTransaction`, which includes transactional 

880 methods :meth:`_engine.NestedTransaction.commit` and 

881 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

882 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

883 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

884 to the :class:`_engine.NestedTransaction` object and is generated 

885 automatically. Like any other :class:`_engine.Transaction`, the 

886 :class:`_engine.NestedTransaction` may be used as a context manager as 

887 illustrated above which will "release" or "rollback" corresponding to 

888 if the operation within the block were successful or raised an 

889 exception. 

890 

891 Nested transactions require SAVEPOINT support in the underlying 

892 database, else the behavior is undefined. SAVEPOINT is commonly used to 

893 run operations within a transaction that may fail, while continuing the 

894 outer transaction. E.g.:: 

895 

896 from sqlalchemy import exc 

897 

898 with engine.begin() as connection: 

899 trans = connection.begin_nested() 

900 try: 

901 connection.execute(table.insert(), {"username": "sandy"}) 

902 trans.commit() 

903 except exc.IntegrityError: # catch for duplicate username 

904 trans.rollback() # rollback to savepoint 

905 

906 # outer transaction continues 

907 connection.execute(...) 

908 

909 If :meth:`_engine.Connection.begin_nested` is called without first 

910 calling :meth:`_engine.Connection.begin` or 

911 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

912 will "autobegin" the outer transaction first. This outer transaction 

913 may be committed using "commit-as-you-go" style, e.g.:: 

914 

915 with engine.connect() as connection: # begin() wasn't called 

916 

917 with connection.begin_nested(): # will auto-"begin()" first 

918 connection.execute(...) 

919 # savepoint is released 

920 

921 connection.execute(...) 

922 

923 # explicitly commit outer transaction 

924 connection.commit() 

925 

926 # can continue working with connection here 

927 

928 .. versionchanged:: 2.0 

929 

930 :meth:`_engine.Connection.begin_nested` will now participate 

931 in the connection "autobegin" behavior that is new as of 

932 2.0 / "future" style connections in 1.4. 

933 

934 .. seealso:: 

935 

936 :meth:`_engine.Connection.begin` 

937 

938 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

939 

940 """ 

941 if self._transaction is None: 

942 self._autobegin() 

943 

944 return NestedTransaction(self) 

945 

946 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

947 """Begin a two-phase or XA transaction and return a transaction 

948 handle. 

949 

950 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

951 which in addition to the methods provided by 

952 :class:`.Transaction`, also provides a 

953 :meth:`~.TwoPhaseTransaction.prepare` method. 

954 

955 :param xid: the two phase transaction id. If not supplied, a 

956 random id will be generated. 

957 

958 .. seealso:: 

959 

960 :meth:`_engine.Connection.begin` 

961 

962 :meth:`_engine.Connection.begin_twophase` 

963 

964 """ 

965 

966 if self._transaction is not None: 

967 raise exc.InvalidRequestError( 

968 "Cannot start a two phase transaction when a transaction " 

969 "is already in progress." 

970 ) 

971 if xid is None: 

972 xid = self.engine.dialect.create_xid() 

973 return TwoPhaseTransaction(self, xid) 

974 

975 def commit(self) -> None: 

976 """Commit the transaction that is currently in progress. 

977 

978 This method commits the current transaction if one has been started. 

979 If no transaction was started, the method has no effect, assuming 

980 the connection is in a non-invalidated state. 

981 

982 A transaction is begun on a :class:`_engine.Connection` automatically 

983 whenever a statement is first executed, or when the 

984 :meth:`_engine.Connection.begin` method is called. 

985 

986 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

987 the primary database transaction that is linked to the 

988 :class:`_engine.Connection` object. It does not operate upon a 

989 SAVEPOINT that would have been invoked from the 

990 :meth:`_engine.Connection.begin_nested` method; for control of a 

991 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

992 :class:`_engine.NestedTransaction` that is returned by the 

993 :meth:`_engine.Connection.begin_nested` method itself. 

994 

995 

996 """ 

997 if self._transaction: 

998 self._transaction.commit() 

999 

1000 def rollback(self) -> None: 

1001 """Roll back the transaction that is currently in progress. 

1002 

1003 This method rolls back the current transaction if one has been started. 

1004 If no transaction was started, the method has no effect. If a 

1005 transaction was started and the connection is in an invalidated state, 

1006 the transaction is cleared using this method. 

1007 

1008 A transaction is begun on a :class:`_engine.Connection` automatically 

1009 whenever a statement is first executed, or when the 

1010 :meth:`_engine.Connection.begin` method is called. 

1011 

1012 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1013 upon the primary database transaction that is linked to the 

1014 :class:`_engine.Connection` object. It does not operate upon a 

1015 SAVEPOINT that would have been invoked from the 

1016 :meth:`_engine.Connection.begin_nested` method; for control of a 

1017 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1018 :class:`_engine.NestedTransaction` that is returned by the 

1019 :meth:`_engine.Connection.begin_nested` method itself. 

1020 

1021 

1022 """ 

1023 if self._transaction: 

1024 self._transaction.rollback() 

1025 

1026 def recover_twophase(self) -> List[Any]: 

1027 return self.engine.dialect.do_recover_twophase(self) 

1028 

1029 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1030 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1031 

1032 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1033 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1034 

1035 def in_transaction(self) -> bool: 

1036 """Return True if a transaction is in progress.""" 

1037 return self._transaction is not None and self._transaction.is_active 

1038 

1039 def in_nested_transaction(self) -> bool: 

1040 """Return True if a transaction is in progress.""" 

1041 return ( 

1042 self._nested_transaction is not None 

1043 and self._nested_transaction.is_active 

1044 ) 

1045 

1046 def _is_autocommit_isolation(self) -> bool: 

1047 opt_iso = self._execution_options.get("isolation_level", None) 

1048 return bool( 

1049 opt_iso == "AUTOCOMMIT" 

1050 or ( 

1051 opt_iso is None 

1052 and self.engine.dialect._on_connect_isolation_level 

1053 == "AUTOCOMMIT" 

1054 ) 

1055 ) 

1056 

1057 def _get_required_transaction(self) -> RootTransaction: 

1058 trans = self._transaction 

1059 if trans is None: 

1060 raise exc.InvalidRequestError("connection is not in a transaction") 

1061 return trans 

1062 

1063 def _get_required_nested_transaction(self) -> NestedTransaction: 

1064 trans = self._nested_transaction 

1065 if trans is None: 

1066 raise exc.InvalidRequestError( 

1067 "connection is not in a nested transaction" 

1068 ) 

1069 return trans 

1070 

1071 def get_transaction(self) -> Optional[RootTransaction]: 

1072 """Return the current root transaction in progress, if any. 

1073 

1074 .. versionadded:: 1.4 

1075 

1076 """ 

1077 

1078 return self._transaction 

1079 

1080 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1081 """Return the current nested transaction in progress, if any. 

1082 

1083 .. versionadded:: 1.4 

1084 

1085 """ 

1086 return self._nested_transaction 

1087 

1088 def _begin_impl(self, transaction: RootTransaction) -> None: 

1089 if self._echo: 

1090 if self._is_autocommit_isolation(): 

1091 self._log_info( 

1092 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1093 "autocommit mode)" 

1094 ) 

1095 else: 

1096 self._log_info("BEGIN (implicit)") 

1097 

1098 self.__in_begin = True 

1099 

1100 if self._has_events or self.engine._has_events: 

1101 self.dispatch.begin(self) 

1102 

1103 try: 

1104 self.engine.dialect.do_begin(self.connection) 

1105 except BaseException as e: 

1106 self._handle_dbapi_exception(e, None, None, None, None) 

1107 finally: 

1108 self.__in_begin = False 

1109 

1110 def _rollback_impl(self) -> None: 

1111 if self._has_events or self.engine._has_events: 

1112 self.dispatch.rollback(self) 

1113 

1114 if self._still_open_and_dbapi_connection_is_valid: 

1115 if self._echo: 

1116 if self._is_autocommit_isolation(): 

1117 self._log_info( 

1118 "ROLLBACK using DBAPI connection.rollback(), " 

1119 "DBAPI should ignore due to autocommit mode" 

1120 ) 

1121 else: 

1122 self._log_info("ROLLBACK") 

1123 try: 

1124 self.engine.dialect.do_rollback(self.connection) 

1125 except BaseException as e: 

1126 self._handle_dbapi_exception(e, None, None, None, None) 

1127 

1128 def _commit_impl(self) -> None: 

1129 if self._has_events or self.engine._has_events: 

1130 self.dispatch.commit(self) 

1131 

1132 if self._echo: 

1133 if self._is_autocommit_isolation(): 

1134 self._log_info( 

1135 "COMMIT using DBAPI connection.commit(), " 

1136 "DBAPI should ignore due to autocommit mode" 

1137 ) 

1138 else: 

1139 self._log_info("COMMIT") 

1140 try: 

1141 self.engine.dialect.do_commit(self.connection) 

1142 except BaseException as e: 

1143 self._handle_dbapi_exception(e, None, None, None, None) 

1144 

1145 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1146 if self._has_events or self.engine._has_events: 

1147 self.dispatch.savepoint(self, name) 

1148 

1149 if name is None: 

1150 self.__savepoint_seq += 1 

1151 name = "sa_savepoint_%s" % self.__savepoint_seq 

1152 self.engine.dialect.do_savepoint(self, name) 

1153 return name 

1154 

1155 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1156 if self._has_events or self.engine._has_events: 

1157 self.dispatch.rollback_savepoint(self, name, None) 

1158 

1159 if self._still_open_and_dbapi_connection_is_valid: 

1160 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1161 

1162 def _release_savepoint_impl(self, name: str) -> None: 

1163 if self._has_events or self.engine._has_events: 

1164 self.dispatch.release_savepoint(self, name, None) 

1165 

1166 self.engine.dialect.do_release_savepoint(self, name) 

1167 

1168 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1169 if self._echo: 

1170 self._log_info("BEGIN TWOPHASE (implicit)") 

1171 if self._has_events or self.engine._has_events: 

1172 self.dispatch.begin_twophase(self, transaction.xid) 

1173 

1174 self.__in_begin = True 

1175 try: 

1176 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1177 except BaseException as e: 

1178 self._handle_dbapi_exception(e, None, None, None, None) 

1179 finally: 

1180 self.__in_begin = False 

1181 

1182 def _prepare_twophase_impl(self, xid: Any) -> None: 

1183 if self._has_events or self.engine._has_events: 

1184 self.dispatch.prepare_twophase(self, xid) 

1185 

1186 assert isinstance(self._transaction, TwoPhaseTransaction) 

1187 try: 

1188 self.engine.dialect.do_prepare_twophase(self, xid) 

1189 except BaseException as e: 

1190 self._handle_dbapi_exception(e, None, None, None, None) 

1191 

1192 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1193 if self._has_events or self.engine._has_events: 

1194 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1195 

1196 if self._still_open_and_dbapi_connection_is_valid: 

1197 assert isinstance(self._transaction, TwoPhaseTransaction) 

1198 try: 

1199 self.engine.dialect.do_rollback_twophase( 

1200 self, xid, is_prepared 

1201 ) 

1202 except BaseException as e: 

1203 self._handle_dbapi_exception(e, None, None, None, None) 

1204 

1205 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1206 if self._has_events or self.engine._has_events: 

1207 self.dispatch.commit_twophase(self, xid, is_prepared) 

1208 

1209 assert isinstance(self._transaction, TwoPhaseTransaction) 

1210 try: 

1211 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1212 except BaseException as e: 

1213 self._handle_dbapi_exception(e, None, None, None, None) 

1214 

1215 def close(self) -> None: 

1216 """Close this :class:`_engine.Connection`. 

1217 

1218 This results in a release of the underlying database 

1219 resources, that is, the DBAPI connection referenced 

1220 internally. The DBAPI connection is typically restored 

1221 back to the connection-holding :class:`_pool.Pool` referenced 

1222 by the :class:`_engine.Engine` that produced this 

1223 :class:`_engine.Connection`. Any transactional state present on 

1224 the DBAPI connection is also unconditionally released via 

1225 the DBAPI connection's ``rollback()`` method, regardless 

1226 of any :class:`.Transaction` object that may be 

1227 outstanding with regards to this :class:`_engine.Connection`. 

1228 

1229 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1230 if any transaction is in place. 

1231 

1232 After :meth:`_engine.Connection.close` is called, the 

1233 :class:`_engine.Connection` is permanently in a closed state, 

1234 and will allow no further operations. 

1235 

1236 """ 

1237 

1238 if self._transaction: 

1239 self._transaction.close() 

1240 skip_reset = True 

1241 else: 

1242 skip_reset = False 

1243 

1244 if self._dbapi_connection is not None: 

1245 conn = self._dbapi_connection 

1246 

1247 # as we just closed the transaction, close the connection 

1248 # pool connection without doing an additional reset 

1249 if skip_reset: 

1250 cast("_ConnectionFairy", conn)._close_special( 

1251 transaction_reset=True 

1252 ) 

1253 else: 

1254 conn.close() 

1255 

1256 # There is a slight chance that conn.close() may have 

1257 # triggered an invalidation here in which case 

1258 # _dbapi_connection would already be None, however usually 

1259 # it will be non-None here and in a "closed" state. 

1260 self._dbapi_connection = None 

1261 self.__can_reconnect = False 

1262 

1263 @overload 

1264 def scalar( 

1265 self, 

1266 statement: TypedReturnsRows[Tuple[_T]], 

1267 parameters: Optional[_CoreSingleExecuteParams] = None, 

1268 *, 

1269 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1270 ) -> Optional[_T]: ... 

1271 

1272 @overload 

1273 def scalar( 

1274 self, 

1275 statement: Executable, 

1276 parameters: Optional[_CoreSingleExecuteParams] = None, 

1277 *, 

1278 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1279 ) -> Any: ... 

1280 

1281 def scalar( 

1282 self, 

1283 statement: Executable, 

1284 parameters: Optional[_CoreSingleExecuteParams] = None, 

1285 *, 

1286 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1287 ) -> Any: 

1288 r"""Executes a SQL statement construct and returns a scalar object. 

1289 

1290 This method is shorthand for invoking the 

1291 :meth:`_engine.Result.scalar` method after invoking the 

1292 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1293 

1294 :return: a scalar Python value representing the first column of the 

1295 first row returned. 

1296 

1297 """ 

1298 distilled_parameters = _distill_params_20(parameters) 

1299 try: 

1300 meth = statement._execute_on_scalar 

1301 except AttributeError as err: 

1302 raise exc.ObjectNotExecutableError(statement) from err 

1303 else: 

1304 return meth( 

1305 self, 

1306 distilled_parameters, 

1307 execution_options or NO_OPTIONS, 

1308 ) 

1309 

1310 @overload 

1311 def scalars( 

1312 self, 

1313 statement: TypedReturnsRows[Tuple[_T]], 

1314 parameters: Optional[_CoreAnyExecuteParams] = None, 

1315 *, 

1316 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1317 ) -> ScalarResult[_T]: ... 

1318 

1319 @overload 

1320 def scalars( 

1321 self, 

1322 statement: Executable, 

1323 parameters: Optional[_CoreAnyExecuteParams] = None, 

1324 *, 

1325 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1326 ) -> ScalarResult[Any]: ... 

1327 

1328 def scalars( 

1329 self, 

1330 statement: Executable, 

1331 parameters: Optional[_CoreAnyExecuteParams] = None, 

1332 *, 

1333 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1334 ) -> ScalarResult[Any]: 

1335 """Executes and returns a scalar result set, which yields scalar values 

1336 from the first column of each row. 

1337 

1338 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1339 to receive a :class:`_result.Result` object, then invoking the 

1340 :meth:`_result.Result.scalars` method to produce a 

1341 :class:`_result.ScalarResult` instance. 

1342 

1343 :return: a :class:`_result.ScalarResult` 

1344 

1345 .. versionadded:: 1.4.24 

1346 

1347 """ 

1348 

1349 return self.execute( 

1350 statement, parameters, execution_options=execution_options 

1351 ).scalars() 

1352 

1353 @overload 

1354 def execute( 

1355 self, 

1356 statement: TypedReturnsRows[_T], 

1357 parameters: Optional[_CoreAnyExecuteParams] = None, 

1358 *, 

1359 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1360 ) -> CursorResult[_T]: ... 

1361 

1362 @overload 

1363 def execute( 

1364 self, 

1365 statement: Executable, 

1366 parameters: Optional[_CoreAnyExecuteParams] = None, 

1367 *, 

1368 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1369 ) -> CursorResult[Any]: ... 

1370 

1371 def execute( 

1372 self, 

1373 statement: Executable, 

1374 parameters: Optional[_CoreAnyExecuteParams] = None, 

1375 *, 

1376 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1377 ) -> CursorResult[Any]: 

1378 r"""Executes a SQL statement construct and returns a 

1379 :class:`_engine.CursorResult`. 

1380 

1381 :param statement: The statement to be executed. This is always 

1382 an object that is in both the :class:`_expression.ClauseElement` and 

1383 :class:`_expression.Executable` hierarchies, including: 

1384 

1385 * :class:`_expression.Select` 

1386 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1387 :class:`_expression.Delete` 

1388 * :class:`_expression.TextClause` and 

1389 :class:`_expression.TextualSelect` 

1390 * :class:`_schema.DDL` and objects which inherit from 

1391 :class:`_schema.ExecutableDDLElement` 

1392 

1393 :param parameters: parameters which will be bound into the statement. 

1394 This may be either a dictionary of parameter names to values, 

1395 or a mutable sequence (e.g. a list) of dictionaries. When a 

1396 list of dictionaries is passed, the underlying statement execution 

1397 will make use of the DBAPI ``cursor.executemany()`` method. 

1398 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1399 method will be used. 

1400 

1401 :param execution_options: optional dictionary of execution options, 

1402 which will be associated with the statement execution. This 

1403 dictionary can provide a subset of the options that are accepted 

1404 by :meth:`_engine.Connection.execution_options`. 

1405 

1406 :return: a :class:`_engine.Result` object. 

1407 

1408 """ 

1409 distilled_parameters = _distill_params_20(parameters) 

1410 try: 

1411 meth = statement._execute_on_connection 

1412 except AttributeError as err: 

1413 raise exc.ObjectNotExecutableError(statement) from err 

1414 else: 

1415 return meth( 

1416 self, 

1417 distilled_parameters, 

1418 execution_options or NO_OPTIONS, 

1419 ) 

1420 

1421 def _execute_function( 

1422 self, 

1423 func: FunctionElement[Any], 

1424 distilled_parameters: _CoreMultiExecuteParams, 

1425 execution_options: CoreExecuteOptionsParameter, 

1426 ) -> CursorResult[Any]: 

1427 """Execute a sql.FunctionElement object.""" 

1428 

1429 return self._execute_clauseelement( 

1430 func.select(), distilled_parameters, execution_options 

1431 ) 

1432 

1433 def _execute_default( 

1434 self, 

1435 default: DefaultGenerator, 

1436 distilled_parameters: _CoreMultiExecuteParams, 

1437 execution_options: CoreExecuteOptionsParameter, 

1438 ) -> Any: 

1439 """Execute a schema.ColumnDefault object.""" 

1440 

1441 execution_options = self._execution_options.merge_with( 

1442 execution_options 

1443 ) 

1444 

1445 event_multiparams: Optional[_CoreMultiExecuteParams] 

1446 event_params: Optional[_CoreAnyExecuteParams] 

1447 

1448 # note for event handlers, the "distilled parameters" which is always 

1449 # a list of dicts is broken out into separate "multiparams" and 

1450 # "params" collections, which allows the handler to distinguish 

1451 # between an executemany and execute style set of parameters. 

1452 if self._has_events or self.engine._has_events: 

1453 ( 

1454 default, 

1455 distilled_parameters, 

1456 event_multiparams, 

1457 event_params, 

1458 ) = self._invoke_before_exec_event( 

1459 default, distilled_parameters, execution_options 

1460 ) 

1461 else: 

1462 event_multiparams = event_params = None 

1463 

1464 try: 

1465 conn = self._dbapi_connection 

1466 if conn is None: 

1467 conn = self._revalidate_connection() 

1468 

1469 dialect = self.dialect 

1470 ctx = dialect.execution_ctx_cls._init_default( 

1471 dialect, self, conn, execution_options 

1472 ) 

1473 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1474 raise 

1475 except BaseException as e: 

1476 self._handle_dbapi_exception(e, None, None, None, None) 

1477 

1478 ret = ctx._exec_default(None, default, None) 

1479 

1480 if self._has_events or self.engine._has_events: 

1481 self.dispatch.after_execute( 

1482 self, 

1483 default, 

1484 event_multiparams, 

1485 event_params, 

1486 execution_options, 

1487 ret, 

1488 ) 

1489 

1490 return ret 

1491 

1492 def _execute_ddl( 

1493 self, 

1494 ddl: ExecutableDDLElement, 

1495 distilled_parameters: _CoreMultiExecuteParams, 

1496 execution_options: CoreExecuteOptionsParameter, 

1497 ) -> CursorResult[Any]: 

1498 """Execute a schema.DDL object.""" 

1499 

1500 exec_opts = ddl._execution_options.merge_with( 

1501 self._execution_options, execution_options 

1502 ) 

1503 

1504 event_multiparams: Optional[_CoreMultiExecuteParams] 

1505 event_params: Optional[_CoreSingleExecuteParams] 

1506 

1507 if self._has_events or self.engine._has_events: 

1508 ( 

1509 ddl, 

1510 distilled_parameters, 

1511 event_multiparams, 

1512 event_params, 

1513 ) = self._invoke_before_exec_event( 

1514 ddl, distilled_parameters, exec_opts 

1515 ) 

1516 else: 

1517 event_multiparams = event_params = None 

1518 

1519 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1520 

1521 dialect = self.dialect 

1522 

1523 compiled = ddl.compile( 

1524 dialect=dialect, schema_translate_map=schema_translate_map 

1525 ) 

1526 ret = self._execute_context( 

1527 dialect, 

1528 dialect.execution_ctx_cls._init_ddl, 

1529 compiled, 

1530 None, 

1531 exec_opts, 

1532 compiled, 

1533 ) 

1534 if self._has_events or self.engine._has_events: 

1535 self.dispatch.after_execute( 

1536 self, 

1537 ddl, 

1538 event_multiparams, 

1539 event_params, 

1540 exec_opts, 

1541 ret, 

1542 ) 

1543 return ret 

1544 

1545 def _invoke_before_exec_event( 

1546 self, 

1547 elem: Any, 

1548 distilled_params: _CoreMultiExecuteParams, 

1549 execution_options: _ExecuteOptions, 

1550 ) -> Tuple[ 

1551 Any, 

1552 _CoreMultiExecuteParams, 

1553 _CoreMultiExecuteParams, 

1554 _CoreSingleExecuteParams, 

1555 ]: 

1556 event_multiparams: _CoreMultiExecuteParams 

1557 event_params: _CoreSingleExecuteParams 

1558 

1559 if len(distilled_params) == 1: 

1560 event_multiparams, event_params = [], distilled_params[0] 

1561 else: 

1562 event_multiparams, event_params = distilled_params, {} 

1563 

1564 for fn in self.dispatch.before_execute: 

1565 elem, event_multiparams, event_params = fn( 

1566 self, 

1567 elem, 

1568 event_multiparams, 

1569 event_params, 

1570 execution_options, 

1571 ) 

1572 

1573 if event_multiparams: 

1574 distilled_params = list(event_multiparams) 

1575 if event_params: 

1576 raise exc.InvalidRequestError( 

1577 "Event handler can't return non-empty multiparams " 

1578 "and params at the same time" 

1579 ) 

1580 elif event_params: 

1581 distilled_params = [event_params] 

1582 else: 

1583 distilled_params = [] 

1584 

1585 return elem, distilled_params, event_multiparams, event_params 

1586 

1587 def _execute_clauseelement( 

1588 self, 

1589 elem: Executable, 

1590 distilled_parameters: _CoreMultiExecuteParams, 

1591 execution_options: CoreExecuteOptionsParameter, 

1592 ) -> CursorResult[Any]: 

1593 """Execute a sql.ClauseElement object.""" 

1594 

1595 execution_options = elem._execution_options.merge_with( 

1596 self._execution_options, execution_options 

1597 ) 

1598 

1599 has_events = self._has_events or self.engine._has_events 

1600 if has_events: 

1601 ( 

1602 elem, 

1603 distilled_parameters, 

1604 event_multiparams, 

1605 event_params, 

1606 ) = self._invoke_before_exec_event( 

1607 elem, distilled_parameters, execution_options 

1608 ) 

1609 

1610 if distilled_parameters: 

1611 # ensure we don't retain a link to the view object for keys() 

1612 # which links to the values, which we don't want to cache 

1613 keys = sorted(distilled_parameters[0]) 

1614 for_executemany = len(distilled_parameters) > 1 

1615 else: 

1616 keys = [] 

1617 for_executemany = False 

1618 

1619 dialect = self.dialect 

1620 

1621 schema_translate_map = execution_options.get( 

1622 "schema_translate_map", None 

1623 ) 

1624 

1625 compiled_cache: Optional[CompiledCacheType] = execution_options.get( 

1626 "compiled_cache", self.engine._compiled_cache 

1627 ) 

1628 

1629 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 

1630 dialect=dialect, 

1631 compiled_cache=compiled_cache, 

1632 column_keys=keys, 

1633 for_executemany=for_executemany, 

1634 schema_translate_map=schema_translate_map, 

1635 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1636 ) 

1637 ret = self._execute_context( 

1638 dialect, 

1639 dialect.execution_ctx_cls._init_compiled, 

1640 compiled_sql, 

1641 distilled_parameters, 

1642 execution_options, 

1643 compiled_sql, 

1644 distilled_parameters, 

1645 elem, 

1646 extracted_params, 

1647 cache_hit=cache_hit, 

1648 ) 

1649 if has_events: 

1650 self.dispatch.after_execute( 

1651 self, 

1652 elem, 

1653 event_multiparams, 

1654 event_params, 

1655 execution_options, 

1656 ret, 

1657 ) 

1658 return ret 

1659 

1660 def _execute_compiled( 

1661 self, 

1662 compiled: Compiled, 

1663 distilled_parameters: _CoreMultiExecuteParams, 

1664 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS, 

1665 ) -> CursorResult[Any]: 

1666 """Execute a sql.Compiled object. 

1667 

1668 TODO: why do we have this? likely deprecate or remove 

1669 

1670 """ 

1671 

1672 execution_options = compiled.execution_options.merge_with( 

1673 self._execution_options, execution_options 

1674 ) 

1675 

1676 if self._has_events or self.engine._has_events: 

1677 ( 

1678 compiled, 

1679 distilled_parameters, 

1680 event_multiparams, 

1681 event_params, 

1682 ) = self._invoke_before_exec_event( 

1683 compiled, distilled_parameters, execution_options 

1684 ) 

1685 

1686 dialect = self.dialect 

1687 

1688 ret = self._execute_context( 

1689 dialect, 

1690 dialect.execution_ctx_cls._init_compiled, 

1691 compiled, 

1692 distilled_parameters, 

1693 execution_options, 

1694 compiled, 

1695 distilled_parameters, 

1696 None, 

1697 None, 

1698 ) 

1699 if self._has_events or self.engine._has_events: 

1700 self.dispatch.after_execute( 

1701 self, 

1702 compiled, 

1703 event_multiparams, 

1704 event_params, 

1705 execution_options, 

1706 ret, 

1707 ) 

1708 return ret 

1709 

1710 def exec_driver_sql( 

1711 self, 

1712 statement: str, 

1713 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1714 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1715 ) -> CursorResult[Any]: 

1716 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1717 without any SQL compilation steps. 

1718 

1719 This can be used to pass any string directly to the 

1720 ``cursor.execute()`` method of the DBAPI in use. 

1721 

1722 :param statement: The statement str to be executed. Bound parameters 

1723 must use the underlying DBAPI's paramstyle, such as "qmark", 

1724 "pyformat", "format", etc. 

1725 

1726 :param parameters: represent bound parameter values to be used in the 

1727 execution. The format is one of: a dictionary of named parameters, 

1728 a tuple of positional parameters, or a list containing either 

1729 dictionaries or tuples for multiple-execute support. 

1730 

1731 :return: a :class:`_engine.CursorResult`. 

1732 

1733 E.g. multiple dictionaries:: 

1734 

1735 

1736 conn.exec_driver_sql( 

1737 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1738 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}], 

1739 ) 

1740 

1741 Single dictionary:: 

1742 

1743 conn.exec_driver_sql( 

1744 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1745 dict(id=1, value="v1"), 

1746 ) 

1747 

1748 Single tuple:: 

1749 

1750 conn.exec_driver_sql( 

1751 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1") 

1752 ) 

1753 

1754 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1755 not participate in the 

1756 :meth:`_events.ConnectionEvents.before_execute` and 

1757 :meth:`_events.ConnectionEvents.after_execute` events. To 

1758 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1759 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1760 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1761 

1762 .. seealso:: 

1763 

1764 :pep:`249` 

1765 

1766 """ 

1767 

1768 distilled_parameters = _distill_raw_params(parameters) 

1769 

1770 execution_options = self._execution_options.merge_with( 

1771 execution_options 

1772 ) 

1773 

1774 dialect = self.dialect 

1775 ret = self._execute_context( 

1776 dialect, 

1777 dialect.execution_ctx_cls._init_statement, 

1778 statement, 

1779 None, 

1780 execution_options, 

1781 statement, 

1782 distilled_parameters, 

1783 ) 

1784 

1785 return ret 

1786 

1787 def _execute_context( 

1788 self, 

1789 dialect: Dialect, 

1790 constructor: Callable[..., ExecutionContext], 

1791 statement: Union[str, Compiled], 

1792 parameters: Optional[_AnyMultiExecuteParams], 

1793 execution_options: _ExecuteOptions, 

1794 *args: Any, 

1795 **kw: Any, 

1796 ) -> CursorResult[Any]: 

1797 """Create an :class:`.ExecutionContext` and execute, returning 

1798 a :class:`_engine.CursorResult`.""" 

1799 

1800 if execution_options: 

1801 yp = execution_options.get("yield_per", None) 

1802 if yp: 

1803 execution_options = execution_options.union( 

1804 {"stream_results": True, "max_row_buffer": yp} 

1805 ) 

1806 try: 

1807 conn = self._dbapi_connection 

1808 if conn is None: 

1809 conn = self._revalidate_connection() 

1810 

1811 context = constructor( 

1812 dialect, self, conn, execution_options, *args, **kw 

1813 ) 

1814 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1815 raise 

1816 except BaseException as e: 

1817 self._handle_dbapi_exception( 

1818 e, str(statement), parameters, None, None 

1819 ) 

1820 

1821 if ( 

1822 self._transaction 

1823 and not self._transaction.is_active 

1824 or ( 

1825 self._nested_transaction 

1826 and not self._nested_transaction.is_active 

1827 ) 

1828 ): 

1829 self._invalid_transaction() 

1830 

1831 elif self._trans_context_manager: 

1832 TransactionalContext._trans_ctx_check(self) 

1833 

1834 if self._transaction is None: 

1835 self._autobegin() 

1836 

1837 context.pre_exec() 

1838 

1839 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1840 return self._exec_insertmany_context(dialect, context) 

1841 else: 

1842 return self._exec_single_context( 

1843 dialect, context, statement, parameters 

1844 ) 

1845 

1846 def _exec_single_context( 

1847 self, 

1848 dialect: Dialect, 

1849 context: ExecutionContext, 

1850 statement: Union[str, Compiled], 

1851 parameters: Optional[_AnyMultiExecuteParams], 

1852 ) -> CursorResult[Any]: 

1853 """continue the _execute_context() method for a single DBAPI 

1854 cursor.execute() or cursor.executemany() call. 

1855 

1856 """ 

1857 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1858 generic_setinputsizes = context._prepare_set_input_sizes() 

1859 

1860 if generic_setinputsizes: 

1861 try: 

1862 dialect.do_set_input_sizes( 

1863 context.cursor, generic_setinputsizes, context 

1864 ) 

1865 except BaseException as e: 

1866 self._handle_dbapi_exception( 

1867 e, str(statement), parameters, None, context 

1868 ) 

1869 

1870 cursor, str_statement, parameters = ( 

1871 context.cursor, 

1872 context.statement, 

1873 context.parameters, 

1874 ) 

1875 

1876 effective_parameters: Optional[_AnyExecuteParams] 

1877 

1878 if not context.executemany: 

1879 effective_parameters = parameters[0] 

1880 else: 

1881 effective_parameters = parameters 

1882 

1883 if self._has_events or self.engine._has_events: 

1884 for fn in self.dispatch.before_cursor_execute: 

1885 str_statement, effective_parameters = fn( 

1886 self, 

1887 cursor, 

1888 str_statement, 

1889 effective_parameters, 

1890 context, 

1891 context.executemany, 

1892 ) 

1893 

1894 if self._echo: 

1895 self._log_info(str_statement) 

1896 

1897 stats = context._get_cache_stats() 

1898 

1899 if not self.engine.hide_parameters: 

1900 self._log_info( 

1901 "[%s] %r", 

1902 stats, 

1903 sql_util._repr_params( 

1904 effective_parameters, 

1905 batches=10, 

1906 ismulti=context.executemany, 

1907 ), 

1908 ) 

1909 else: 

1910 self._log_info( 

1911 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1912 stats, 

1913 ) 

1914 

1915 evt_handled: bool = False 

1916 try: 

1917 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1918 effective_parameters = cast( 

1919 "_CoreMultiExecuteParams", effective_parameters 

1920 ) 

1921 if self.dialect._has_events: 

1922 for fn in self.dialect.dispatch.do_executemany: 

1923 if fn( 

1924 cursor, 

1925 str_statement, 

1926 effective_parameters, 

1927 context, 

1928 ): 

1929 evt_handled = True 

1930 break 

1931 if not evt_handled: 

1932 self.dialect.do_executemany( 

1933 cursor, 

1934 str_statement, 

1935 effective_parameters, 

1936 context, 

1937 ) 

1938 elif not effective_parameters and context.no_parameters: 

1939 if self.dialect._has_events: 

1940 for fn in self.dialect.dispatch.do_execute_no_params: 

1941 if fn(cursor, str_statement, context): 

1942 evt_handled = True 

1943 break 

1944 if not evt_handled: 

1945 self.dialect.do_execute_no_params( 

1946 cursor, str_statement, context 

1947 ) 

1948 else: 

1949 effective_parameters = cast( 

1950 "_CoreSingleExecuteParams", effective_parameters 

1951 ) 

1952 if self.dialect._has_events: 

1953 for fn in self.dialect.dispatch.do_execute: 

1954 if fn( 

1955 cursor, 

1956 str_statement, 

1957 effective_parameters, 

1958 context, 

1959 ): 

1960 evt_handled = True 

1961 break 

1962 if not evt_handled: 

1963 self.dialect.do_execute( 

1964 cursor, str_statement, effective_parameters, context 

1965 ) 

1966 

1967 if self._has_events or self.engine._has_events: 

1968 self.dispatch.after_cursor_execute( 

1969 self, 

1970 cursor, 

1971 str_statement, 

1972 effective_parameters, 

1973 context, 

1974 context.executemany, 

1975 ) 

1976 

1977 context.post_exec() 

1978 

1979 result = context._setup_result_proxy() 

1980 

1981 except BaseException as e: 

1982 self._handle_dbapi_exception( 

1983 e, str_statement, effective_parameters, cursor, context 

1984 ) 

1985 

1986 return result 

1987 

1988 def _exec_insertmany_context( 

1989 self, 

1990 dialect: Dialect, 

1991 context: ExecutionContext, 

1992 ) -> CursorResult[Any]: 

1993 """continue the _execute_context() method for an "insertmanyvalues" 

1994 operation, which will invoke DBAPI 

1995 cursor.execute() one or more times with individual log and 

1996 event hook calls. 

1997 

1998 """ 

1999 

2000 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

2001 generic_setinputsizes = context._prepare_set_input_sizes() 

2002 else: 

2003 generic_setinputsizes = None 

2004 

2005 cursor, str_statement, parameters = ( 

2006 context.cursor, 

2007 context.statement, 

2008 context.parameters, 

2009 ) 

2010 

2011 effective_parameters = parameters 

2012 

2013 engine_events = self._has_events or self.engine._has_events 

2014 if self.dialect._has_events: 

2015 do_execute_dispatch: Iterable[Any] = ( 

2016 self.dialect.dispatch.do_execute 

2017 ) 

2018 else: 

2019 do_execute_dispatch = () 

2020 

2021 if self._echo: 

2022 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2023 

2024 preserve_rowcount = context.execution_options.get( 

2025 "preserve_rowcount", False 

2026 ) 

2027 rowcount = 0 

2028 

2029 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2030 self, 

2031 cursor, 

2032 str_statement, 

2033 effective_parameters, 

2034 generic_setinputsizes, 

2035 context, 

2036 ): 

2037 if imv_batch.processed_setinputsizes: 

2038 try: 

2039 dialect.do_set_input_sizes( 

2040 context.cursor, 

2041 imv_batch.processed_setinputsizes, 

2042 context, 

2043 ) 

2044 except BaseException as e: 

2045 self._handle_dbapi_exception( 

2046 e, 

2047 sql_util._long_statement(imv_batch.replaced_statement), 

2048 imv_batch.replaced_parameters, 

2049 None, 

2050 context, 

2051 is_sub_exec=True, 

2052 ) 

2053 

2054 sub_stmt = imv_batch.replaced_statement 

2055 sub_params = imv_batch.replaced_parameters 

2056 

2057 if engine_events: 

2058 for fn in self.dispatch.before_cursor_execute: 

2059 sub_stmt, sub_params = fn( 

2060 self, 

2061 cursor, 

2062 sub_stmt, 

2063 sub_params, 

2064 context, 

2065 True, 

2066 ) 

2067 

2068 if self._echo: 

2069 self._log_info(sql_util._long_statement(sub_stmt)) 

2070 

2071 imv_stats = f""" {imv_batch.batchnum}/{ 

2072 imv_batch.total_batches 

2073 } ({ 

2074 'ordered' 

2075 if imv_batch.rows_sorted else 'unordered' 

2076 }{ 

2077 '; batch not supported' 

2078 if imv_batch.is_downgraded 

2079 else '' 

2080 })""" 

2081 

2082 if imv_batch.batchnum == 1: 

2083 stats += imv_stats 

2084 else: 

2085 stats = f"insertmanyvalues{imv_stats}" 

2086 

2087 if not self.engine.hide_parameters: 

2088 self._log_info( 

2089 "[%s] %r", 

2090 stats, 

2091 sql_util._repr_params( 

2092 sub_params, 

2093 batches=10, 

2094 ismulti=False, 

2095 ), 

2096 ) 

2097 else: 

2098 self._log_info( 

2099 "[%s] [SQL parameters hidden due to " 

2100 "hide_parameters=True]", 

2101 stats, 

2102 ) 

2103 

2104 try: 

2105 for fn in do_execute_dispatch: 

2106 if fn( 

2107 cursor, 

2108 sub_stmt, 

2109 sub_params, 

2110 context, 

2111 ): 

2112 break 

2113 else: 

2114 dialect.do_execute( 

2115 cursor, 

2116 sub_stmt, 

2117 sub_params, 

2118 context, 

2119 ) 

2120 

2121 except BaseException as e: 

2122 self._handle_dbapi_exception( 

2123 e, 

2124 sql_util._long_statement(sub_stmt), 

2125 sub_params, 

2126 cursor, 

2127 context, 

2128 is_sub_exec=True, 

2129 ) 

2130 

2131 if engine_events: 

2132 self.dispatch.after_cursor_execute( 

2133 self, 

2134 cursor, 

2135 str_statement, 

2136 effective_parameters, 

2137 context, 

2138 context.executemany, 

2139 ) 

2140 

2141 if preserve_rowcount: 

2142 rowcount += imv_batch.current_batch_size 

2143 

2144 try: 

2145 context.post_exec() 

2146 

2147 if preserve_rowcount: 

2148 context._rowcount = rowcount # type: ignore[attr-defined] 

2149 

2150 result = context._setup_result_proxy() 

2151 

2152 except BaseException as e: 

2153 self._handle_dbapi_exception( 

2154 e, str_statement, effective_parameters, cursor, context 

2155 ) 

2156 

2157 return result 

2158 

2159 def _cursor_execute( 

2160 self, 

2161 cursor: DBAPICursor, 

2162 statement: str, 

2163 parameters: _DBAPISingleExecuteParams, 

2164 context: Optional[ExecutionContext] = None, 

2165 ) -> None: 

2166 """Execute a statement + params on the given cursor. 

2167 

2168 Adds appropriate logging and exception handling. 

2169 

2170 This method is used by DefaultDialect for special-case 

2171 executions, such as for sequences and column defaults. 

2172 The path of statement execution in the majority of cases 

2173 terminates at _execute_context(). 

2174 

2175 """ 

2176 if self._has_events or self.engine._has_events: 

2177 for fn in self.dispatch.before_cursor_execute: 

2178 statement, parameters = fn( 

2179 self, cursor, statement, parameters, context, False 

2180 ) 

2181 

2182 if self._echo: 

2183 self._log_info(statement) 

2184 self._log_info("[raw sql] %r", parameters) 

2185 try: 

2186 for fn in ( 

2187 () 

2188 if not self.dialect._has_events 

2189 else self.dialect.dispatch.do_execute 

2190 ): 

2191 if fn(cursor, statement, parameters, context): 

2192 break 

2193 else: 

2194 self.dialect.do_execute(cursor, statement, parameters, context) 

2195 except BaseException as e: 

2196 self._handle_dbapi_exception( 

2197 e, statement, parameters, cursor, context 

2198 ) 

2199 

2200 if self._has_events or self.engine._has_events: 

2201 self.dispatch.after_cursor_execute( 

2202 self, cursor, statement, parameters, context, False 

2203 ) 

2204 

2205 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2206 """Close the given cursor, catching exceptions 

2207 and turning into log warnings. 

2208 

2209 """ 

2210 try: 

2211 cursor.close() 

2212 except Exception: 

2213 # log the error through the connection pool's logger. 

2214 self.engine.pool.logger.error( 

2215 "Error closing cursor", exc_info=True 

2216 ) 

2217 

2218 _reentrant_error = False 

2219 _is_disconnect = False 

2220 

2221 def _handle_dbapi_exception( 

2222 self, 

2223 e: BaseException, 

2224 statement: Optional[str], 

2225 parameters: Optional[_AnyExecuteParams], 

2226 cursor: Optional[DBAPICursor], 

2227 context: Optional[ExecutionContext], 

2228 is_sub_exec: bool = False, 

2229 ) -> NoReturn: 

2230 exc_info = sys.exc_info() 

2231 

2232 is_exit_exception = util.is_exit_exception(e) 

2233 

2234 if not self._is_disconnect: 

2235 self._is_disconnect = ( 

2236 isinstance(e, self.dialect.loaded_dbapi.Error) 

2237 and not self.closed 

2238 and self.dialect.is_disconnect( 

2239 e, 

2240 self._dbapi_connection if not self.invalidated else None, 

2241 cursor, 

2242 ) 

2243 ) or (is_exit_exception and not self.closed) 

2244 

2245 invalidate_pool_on_disconnect = not is_exit_exception 

2246 

2247 ismulti: bool = ( 

2248 not is_sub_exec and context.executemany 

2249 if context is not None 

2250 else False 

2251 ) 

2252 if self._reentrant_error: 

2253 raise exc.DBAPIError.instance( 

2254 statement, 

2255 parameters, 

2256 e, 

2257 self.dialect.loaded_dbapi.Error, 

2258 hide_parameters=self.engine.hide_parameters, 

2259 dialect=self.dialect, 

2260 ismulti=ismulti, 

2261 ).with_traceback(exc_info[2]) from e 

2262 self._reentrant_error = True 

2263 try: 

2264 # non-DBAPI error - if we already got a context, 

2265 # or there's no string statement, don't wrap it 

2266 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2267 statement is not None 

2268 and context is None 

2269 and not is_exit_exception 

2270 ) 

2271 

2272 if should_wrap: 

2273 sqlalchemy_exception = exc.DBAPIError.instance( 

2274 statement, 

2275 parameters, 

2276 cast(Exception, e), 

2277 self.dialect.loaded_dbapi.Error, 

2278 hide_parameters=self.engine.hide_parameters, 

2279 connection_invalidated=self._is_disconnect, 

2280 dialect=self.dialect, 

2281 ismulti=ismulti, 

2282 ) 

2283 else: 

2284 sqlalchemy_exception = None 

2285 

2286 newraise = None 

2287 

2288 if (self.dialect._has_events) and not self._execution_options.get( 

2289 "skip_user_error_events", False 

2290 ): 

2291 ctx = ExceptionContextImpl( 

2292 e, 

2293 sqlalchemy_exception, 

2294 self.engine, 

2295 self.dialect, 

2296 self, 

2297 cursor, 

2298 statement, 

2299 parameters, 

2300 context, 

2301 self._is_disconnect, 

2302 invalidate_pool_on_disconnect, 

2303 False, 

2304 ) 

2305 

2306 for fn in self.dialect.dispatch.handle_error: 

2307 try: 

2308 # handler returns an exception; 

2309 # call next handler in a chain 

2310 per_fn = fn(ctx) 

2311 if per_fn is not None: 

2312 ctx.chained_exception = newraise = per_fn 

2313 except Exception as _raised: 

2314 # handler raises an exception - stop processing 

2315 newraise = _raised 

2316 break 

2317 

2318 if self._is_disconnect != ctx.is_disconnect: 

2319 self._is_disconnect = ctx.is_disconnect 

2320 if sqlalchemy_exception: 

2321 sqlalchemy_exception.connection_invalidated = ( 

2322 ctx.is_disconnect 

2323 ) 

2324 

2325 # set up potentially user-defined value for 

2326 # invalidate pool. 

2327 invalidate_pool_on_disconnect = ( 

2328 ctx.invalidate_pool_on_disconnect 

2329 ) 

2330 

2331 if should_wrap and context: 

2332 context.handle_dbapi_exception(e) 

2333 

2334 if not self._is_disconnect: 

2335 if cursor: 

2336 self._safe_close_cursor(cursor) 

2337 # "autorollback" was mostly relevant in 1.x series. 

2338 # It's very unlikely to reach here, as the connection 

2339 # does autobegin so when we are here, we are usually 

2340 # in an explicit / semi-explicit transaction. 

2341 # however we have a test which manufactures this 

2342 # scenario in any case using an event handler. 

2343 # test/engine/test_execute.py-> test_actual_autorollback 

2344 if not self.in_transaction(): 

2345 self._rollback_impl() 

2346 

2347 if newraise: 

2348 raise newraise.with_traceback(exc_info[2]) from e 

2349 elif should_wrap: 

2350 assert sqlalchemy_exception is not None 

2351 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2352 else: 

2353 assert exc_info[1] is not None 

2354 raise exc_info[1].with_traceback(exc_info[2]) 

2355 finally: 

2356 del self._reentrant_error 

2357 if self._is_disconnect: 

2358 del self._is_disconnect 

2359 if not self.invalidated: 

2360 dbapi_conn_wrapper = self._dbapi_connection 

2361 assert dbapi_conn_wrapper is not None 

2362 if invalidate_pool_on_disconnect: 

2363 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2364 self.invalidate(e) 

2365 

2366 @classmethod 

2367 def _handle_dbapi_exception_noconnection( 

2368 cls, 

2369 e: BaseException, 

2370 dialect: Dialect, 

2371 engine: Optional[Engine] = None, 

2372 is_disconnect: Optional[bool] = None, 

2373 invalidate_pool_on_disconnect: bool = True, 

2374 is_pre_ping: bool = False, 

2375 ) -> NoReturn: 

2376 exc_info = sys.exc_info() 

2377 

2378 if is_disconnect is None: 

2379 is_disconnect = isinstance( 

2380 e, dialect.loaded_dbapi.Error 

2381 ) and dialect.is_disconnect(e, None, None) 

2382 

2383 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2384 

2385 if should_wrap: 

2386 sqlalchemy_exception = exc.DBAPIError.instance( 

2387 None, 

2388 None, 

2389 cast(Exception, e), 

2390 dialect.loaded_dbapi.Error, 

2391 hide_parameters=( 

2392 engine.hide_parameters if engine is not None else False 

2393 ), 

2394 connection_invalidated=is_disconnect, 

2395 dialect=dialect, 

2396 ) 

2397 else: 

2398 sqlalchemy_exception = None 

2399 

2400 newraise = None 

2401 

2402 if dialect._has_events: 

2403 ctx = ExceptionContextImpl( 

2404 e, 

2405 sqlalchemy_exception, 

2406 engine, 

2407 dialect, 

2408 None, 

2409 None, 

2410 None, 

2411 None, 

2412 None, 

2413 is_disconnect, 

2414 invalidate_pool_on_disconnect, 

2415 is_pre_ping, 

2416 ) 

2417 for fn in dialect.dispatch.handle_error: 

2418 try: 

2419 # handler returns an exception; 

2420 # call next handler in a chain 

2421 per_fn = fn(ctx) 

2422 if per_fn is not None: 

2423 ctx.chained_exception = newraise = per_fn 

2424 except Exception as _raised: 

2425 # handler raises an exception - stop processing 

2426 newraise = _raised 

2427 break 

2428 

2429 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2430 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect 

2431 

2432 if newraise: 

2433 raise newraise.with_traceback(exc_info[2]) from e 

2434 elif should_wrap: 

2435 assert sqlalchemy_exception is not None 

2436 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2437 else: 

2438 assert exc_info[1] is not None 

2439 raise exc_info[1].with_traceback(exc_info[2]) 

2440 

2441 def _run_ddl_visitor( 

2442 self, 

2443 visitorcallable: Type[InvokeDDLBase], 

2444 element: SchemaVisitable, 

2445 **kwargs: Any, 

2446 ) -> None: 

2447 """run a DDL visitor. 

2448 

2449 This method is only here so that the MockConnection can change the 

2450 options given to the visitor so that "checkfirst" is skipped. 

2451 

2452 """ 

2453 visitorcallable( 

2454 dialect=self.dialect, connection=self, **kwargs 

2455 ).traverse_single(element) 

2456 

2457 

2458class ExceptionContextImpl(ExceptionContext): 

2459 """Implement the :class:`.ExceptionContext` interface.""" 

2460 

2461 __slots__ = ( 

2462 "connection", 

2463 "engine", 

2464 "dialect", 

2465 "cursor", 

2466 "statement", 

2467 "parameters", 

2468 "original_exception", 

2469 "sqlalchemy_exception", 

2470 "chained_exception", 

2471 "execution_context", 

2472 "is_disconnect", 

2473 "invalidate_pool_on_disconnect", 

2474 "is_pre_ping", 

2475 ) 

2476 

2477 def __init__( 

2478 self, 

2479 exception: BaseException, 

2480 sqlalchemy_exception: Optional[exc.StatementError], 

2481 engine: Optional[Engine], 

2482 dialect: Dialect, 

2483 connection: Optional[Connection], 

2484 cursor: Optional[DBAPICursor], 

2485 statement: Optional[str], 

2486 parameters: Optional[_DBAPIAnyExecuteParams], 

2487 context: Optional[ExecutionContext], 

2488 is_disconnect: bool, 

2489 invalidate_pool_on_disconnect: bool, 

2490 is_pre_ping: bool, 

2491 ): 

2492 self.engine = engine 

2493 self.dialect = dialect 

2494 self.connection = connection 

2495 self.sqlalchemy_exception = sqlalchemy_exception 

2496 self.original_exception = exception 

2497 self.execution_context = context 

2498 self.statement = statement 

2499 self.parameters = parameters 

2500 self.is_disconnect = is_disconnect 

2501 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2502 self.is_pre_ping = is_pre_ping 

2503 

2504 

2505class Transaction(TransactionalContext): 

2506 """Represent a database transaction in progress. 

2507 

2508 The :class:`.Transaction` object is procured by 

2509 calling the :meth:`_engine.Connection.begin` method of 

2510 :class:`_engine.Connection`:: 

2511 

2512 from sqlalchemy import create_engine 

2513 

2514 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2515 connection = engine.connect() 

2516 trans = connection.begin() 

2517 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2518 trans.commit() 

2519 

2520 The object provides :meth:`.rollback` and :meth:`.commit` 

2521 methods in order to control transaction boundaries. It 

2522 also implements a context manager interface so that 

2523 the Python ``with`` statement can be used with the 

2524 :meth:`_engine.Connection.begin` method:: 

2525 

2526 with connection.begin(): 

2527 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2528 

2529 The Transaction object is **not** threadsafe. 

2530 

2531 .. seealso:: 

2532 

2533 :meth:`_engine.Connection.begin` 

2534 

2535 :meth:`_engine.Connection.begin_twophase` 

2536 

2537 :meth:`_engine.Connection.begin_nested` 

2538 

2539 .. index:: 

2540 single: thread safety; Transaction 

2541 """ # noqa 

2542 

2543 __slots__ = () 

2544 

2545 _is_root: bool = False 

2546 is_active: bool 

2547 connection: Connection 

2548 

2549 def __init__(self, connection: Connection): 

2550 raise NotImplementedError() 

2551 

2552 @property 

2553 def _deactivated_from_connection(self) -> bool: 

2554 """True if this transaction is totally deactivated from the connection 

2555 and therefore can no longer affect its state. 

2556 

2557 """ 

2558 raise NotImplementedError() 

2559 

2560 def _do_close(self) -> None: 

2561 raise NotImplementedError() 

2562 

2563 def _do_rollback(self) -> None: 

2564 raise NotImplementedError() 

2565 

2566 def _do_commit(self) -> None: 

2567 raise NotImplementedError() 

2568 

2569 @property 

2570 def is_valid(self) -> bool: 

2571 return self.is_active and not self.connection.invalidated 

2572 

2573 def close(self) -> None: 

2574 """Close this :class:`.Transaction`. 

2575 

2576 If this transaction is the base transaction in a begin/commit 

2577 nesting, the transaction will rollback(). Otherwise, the 

2578 method returns. 

2579 

2580 This is used to cancel a Transaction without affecting the scope of 

2581 an enclosing transaction. 

2582 

2583 """ 

2584 try: 

2585 self._do_close() 

2586 finally: 

2587 assert not self.is_active 

2588 

2589 def rollback(self) -> None: 

2590 """Roll back this :class:`.Transaction`. 

2591 

2592 The implementation of this may vary based on the type of transaction in 

2593 use: 

2594 

2595 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2596 it corresponds to a ROLLBACK. 

2597 

2598 * For a :class:`.NestedTransaction`, it corresponds to a 

2599 "ROLLBACK TO SAVEPOINT" operation. 

2600 

2601 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2602 phase transactions may be used. 

2603 

2604 

2605 """ 

2606 try: 

2607 self._do_rollback() 

2608 finally: 

2609 assert not self.is_active 

2610 

2611 def commit(self) -> None: 

2612 """Commit this :class:`.Transaction`. 

2613 

2614 The implementation of this may vary based on the type of transaction in 

2615 use: 

2616 

2617 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2618 it corresponds to a COMMIT. 

2619 

2620 * For a :class:`.NestedTransaction`, it corresponds to a 

2621 "RELEASE SAVEPOINT" operation. 

2622 

2623 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2624 phase transactions may be used. 

2625 

2626 """ 

2627 try: 

2628 self._do_commit() 

2629 finally: 

2630 assert not self.is_active 

2631 

2632 def _get_subject(self) -> Connection: 

2633 return self.connection 

2634 

2635 def _transaction_is_active(self) -> bool: 

2636 return self.is_active 

2637 

2638 def _transaction_is_closed(self) -> bool: 

2639 return not self._deactivated_from_connection 

2640 

2641 def _rollback_can_be_called(self) -> bool: 

2642 # for RootTransaction / NestedTransaction, it's safe to call 

2643 # rollback() even if the transaction is deactive and no warnings 

2644 # will be emitted. tested in 

2645 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2646 return True 

2647 

2648 

2649class RootTransaction(Transaction): 

2650 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2651 

2652 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2653 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2654 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2655 remains associated with the :class:`_engine.Connection` throughout its 

2656 active span. The current :class:`_engine.RootTransaction` in use is 

2657 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2658 :class:`_engine.Connection`. 

2659 

2660 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2661 "autobegin" behavior that will create a new 

2662 :class:`_engine.RootTransaction` whenever a connection in a 

2663 non-transactional state is used to emit commands on the DBAPI connection. 

2664 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2665 use can be controlled using the :meth:`_engine.Connection.commit` and 

2666 :meth:`_engine.Connection.rollback` methods. 

2667 

2668 

2669 """ 

2670 

2671 _is_root = True 

2672 

2673 __slots__ = ("connection", "is_active") 

2674 

2675 def __init__(self, connection: Connection): 

2676 assert connection._transaction is None 

2677 if connection._trans_context_manager: 

2678 TransactionalContext._trans_ctx_check(connection) 

2679 self.connection = connection 

2680 self._connection_begin_impl() 

2681 connection._transaction = self 

2682 

2683 self.is_active = True 

2684 

2685 def _deactivate_from_connection(self) -> None: 

2686 if self.is_active: 

2687 assert self.connection._transaction is self 

2688 self.is_active = False 

2689 

2690 elif self.connection._transaction is not self: 

2691 util.warn("transaction already deassociated from connection") 

2692 

2693 @property 

2694 def _deactivated_from_connection(self) -> bool: 

2695 return self.connection._transaction is not self 

2696 

2697 def _connection_begin_impl(self) -> None: 

2698 self.connection._begin_impl(self) 

2699 

2700 def _connection_rollback_impl(self) -> None: 

2701 self.connection._rollback_impl() 

2702 

2703 def _connection_commit_impl(self) -> None: 

2704 self.connection._commit_impl() 

2705 

2706 def _close_impl(self, try_deactivate: bool = False) -> None: 

2707 try: 

2708 if self.is_active: 

2709 self._connection_rollback_impl() 

2710 

2711 if self.connection._nested_transaction: 

2712 self.connection._nested_transaction._cancel() 

2713 finally: 

2714 if self.is_active or try_deactivate: 

2715 self._deactivate_from_connection() 

2716 if self.connection._transaction is self: 

2717 self.connection._transaction = None 

2718 

2719 assert not self.is_active 

2720 assert self.connection._transaction is not self 

2721 

2722 def _do_close(self) -> None: 

2723 self._close_impl() 

2724 

2725 def _do_rollback(self) -> None: 

2726 self._close_impl(try_deactivate=True) 

2727 

2728 def _do_commit(self) -> None: 

2729 if self.is_active: 

2730 assert self.connection._transaction is self 

2731 

2732 try: 

2733 self._connection_commit_impl() 

2734 finally: 

2735 # whether or not commit succeeds, cancel any 

2736 # nested transactions, make this transaction "inactive" 

2737 # and remove it as a reset agent 

2738 if self.connection._nested_transaction: 

2739 self.connection._nested_transaction._cancel() 

2740 

2741 self._deactivate_from_connection() 

2742 

2743 # ...however only remove as the connection's current transaction 

2744 # if commit succeeded. otherwise it stays on so that a rollback 

2745 # needs to occur. 

2746 self.connection._transaction = None 

2747 else: 

2748 if self.connection._transaction is self: 

2749 self.connection._invalid_transaction() 

2750 else: 

2751 raise exc.InvalidRequestError("This transaction is inactive") 

2752 

2753 assert not self.is_active 

2754 assert self.connection._transaction is not self 

2755 

2756 

2757class NestedTransaction(Transaction): 

2758 """Represent a 'nested', or SAVEPOINT transaction. 

2759 

2760 The :class:`.NestedTransaction` object is created by calling the 

2761 :meth:`_engine.Connection.begin_nested` method of 

2762 :class:`_engine.Connection`. 

2763 

2764 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2765 "commit" / "rollback" are as follows: 

2766 

2767 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2768 the savepoint is given an explicit name that is part of the state 

2769 of this object. 

2770 

2771 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2772 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2773 with this :class:`.NestedTransaction`. 

2774 

2775 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2776 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2777 associated with this :class:`.NestedTransaction`. 

2778 

2779 The rationale for mimicking the semantics of an outer transaction in 

2780 terms of savepoints so that code may deal with a "savepoint" transaction 

2781 and an "outer" transaction in an agnostic way. 

2782 

2783 .. seealso:: 

2784 

2785 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2786 

2787 """ 

2788 

2789 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2790 

2791 _savepoint: str 

2792 

2793 def __init__(self, connection: Connection): 

2794 assert connection._transaction is not None 

2795 if connection._trans_context_manager: 

2796 TransactionalContext._trans_ctx_check(connection) 

2797 self.connection = connection 

2798 self._savepoint = self.connection._savepoint_impl() 

2799 self.is_active = True 

2800 self._previous_nested = connection._nested_transaction 

2801 connection._nested_transaction = self 

2802 

2803 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2804 if self.connection._nested_transaction is self: 

2805 self.connection._nested_transaction = self._previous_nested 

2806 elif warn: 

2807 util.warn( 

2808 "nested transaction already deassociated from connection" 

2809 ) 

2810 

2811 @property 

2812 def _deactivated_from_connection(self) -> bool: 

2813 return self.connection._nested_transaction is not self 

2814 

2815 def _cancel(self) -> None: 

2816 # called by RootTransaction when the outer transaction is 

2817 # committed, rolled back, or closed to cancel all savepoints 

2818 # without any action being taken 

2819 self.is_active = False 

2820 self._deactivate_from_connection() 

2821 if self._previous_nested: 

2822 self._previous_nested._cancel() 

2823 

2824 def _close_impl( 

2825 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2826 ) -> None: 

2827 try: 

2828 if ( 

2829 self.is_active 

2830 and self.connection._transaction 

2831 and self.connection._transaction.is_active 

2832 ): 

2833 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2834 finally: 

2835 self.is_active = False 

2836 

2837 if deactivate_from_connection: 

2838 self._deactivate_from_connection(warn=warn_already_deactive) 

2839 

2840 assert not self.is_active 

2841 if deactivate_from_connection: 

2842 assert self.connection._nested_transaction is not self 

2843 

2844 def _do_close(self) -> None: 

2845 self._close_impl(True, False) 

2846 

2847 def _do_rollback(self) -> None: 

2848 self._close_impl(True, True) 

2849 

2850 def _do_commit(self) -> None: 

2851 if self.is_active: 

2852 try: 

2853 self.connection._release_savepoint_impl(self._savepoint) 

2854 finally: 

2855 # nested trans becomes inactive on failed release 

2856 # unconditionally. this prevents it from trying to 

2857 # emit SQL when it rolls back. 

2858 self.is_active = False 

2859 

2860 # but only de-associate from connection if it succeeded 

2861 self._deactivate_from_connection() 

2862 else: 

2863 if self.connection._nested_transaction is self: 

2864 self.connection._invalid_transaction() 

2865 else: 

2866 raise exc.InvalidRequestError( 

2867 "This nested transaction is inactive" 

2868 ) 

2869 

2870 

2871class TwoPhaseTransaction(RootTransaction): 

2872 """Represent a two-phase transaction. 

2873 

2874 A new :class:`.TwoPhaseTransaction` object may be procured 

2875 using the :meth:`_engine.Connection.begin_twophase` method. 

2876 

2877 The interface is the same as that of :class:`.Transaction` 

2878 with the addition of the :meth:`prepare` method. 

2879 

2880 """ 

2881 

2882 __slots__ = ("xid", "_is_prepared") 

2883 

2884 xid: Any 

2885 

2886 def __init__(self, connection: Connection, xid: Any): 

2887 self._is_prepared = False 

2888 self.xid = xid 

2889 super().__init__(connection) 

2890 

2891 def prepare(self) -> None: 

2892 """Prepare this :class:`.TwoPhaseTransaction`. 

2893 

2894 After a PREPARE, the transaction can be committed. 

2895 

2896 """ 

2897 if not self.is_active: 

2898 raise exc.InvalidRequestError("This transaction is inactive") 

2899 self.connection._prepare_twophase_impl(self.xid) 

2900 self._is_prepared = True 

2901 

2902 def _connection_begin_impl(self) -> None: 

2903 self.connection._begin_twophase_impl(self) 

2904 

2905 def _connection_rollback_impl(self) -> None: 

2906 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2907 

2908 def _connection_commit_impl(self) -> None: 

2909 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2910 

2911 

2912class Engine( 

2913 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2914): 

2915 """ 

2916 Connects a :class:`~sqlalchemy.pool.Pool` and 

2917 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2918 source of database connectivity and behavior. 

2919 

2920 An :class:`_engine.Engine` object is instantiated publicly using the 

2921 :func:`~sqlalchemy.create_engine` function. 

2922 

2923 .. seealso:: 

2924 

2925 :doc:`/core/engines` 

2926 

2927 :ref:`connections_toplevel` 

2928 

2929 """ 

2930 

2931 dispatch: dispatcher[ConnectionEventsTarget] 

2932 

2933 _compiled_cache: Optional[CompiledCacheType] 

2934 

2935 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2936 _has_events: bool = False 

2937 _connection_cls: Type[Connection] = Connection 

2938 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2939 _is_future: bool = False 

2940 

2941 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2942 _option_cls: Type[OptionEngine] 

2943 

2944 dialect: Dialect 

2945 pool: Pool 

2946 url: URL 

2947 hide_parameters: bool 

2948 

2949 def __init__( 

2950 self, 

2951 pool: Pool, 

2952 dialect: Dialect, 

2953 url: URL, 

2954 logging_name: Optional[str] = None, 

2955 echo: Optional[_EchoFlagType] = None, 

2956 query_cache_size: int = 500, 

2957 execution_options: Optional[Mapping[str, Any]] = None, 

2958 hide_parameters: bool = False, 

2959 ): 

2960 self.pool = pool 

2961 self.url = url 

2962 self.dialect = dialect 

2963 if logging_name: 

2964 self.logging_name = logging_name 

2965 self.echo = echo 

2966 self.hide_parameters = hide_parameters 

2967 if query_cache_size != 0: 

2968 self._compiled_cache = util.LRUCache( 

2969 query_cache_size, size_alert=self._lru_size_alert 

2970 ) 

2971 else: 

2972 self._compiled_cache = None 

2973 log.instance_logger(self, echoflag=echo) 

2974 if execution_options: 

2975 self.update_execution_options(**execution_options) 

2976 

2977 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2978 if self._should_log_info(): 

2979 self.logger.info( 

2980 "Compiled cache size pruning from %d items to %d. " 

2981 "Increase cache size to reduce the frequency of pruning.", 

2982 len(cache), 

2983 cache.capacity, 

2984 ) 

2985 

2986 @property 

2987 def engine(self) -> Engine: 

2988 """Returns this :class:`.Engine`. 

2989 

2990 Used for legacy schemes that accept :class:`.Connection` / 

2991 :class:`.Engine` objects within the same variable. 

2992 

2993 """ 

2994 return self 

2995 

2996 def clear_compiled_cache(self) -> None: 

2997 """Clear the compiled cache associated with the dialect. 

2998 

2999 This applies **only** to the built-in cache that is established 

3000 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

3001 It will not impact any dictionary caches that were passed via the 

3002 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

3003 

3004 .. versionadded:: 1.4 

3005 

3006 """ 

3007 if self._compiled_cache: 

3008 self._compiled_cache.clear() 

3009 

3010 def update_execution_options(self, **opt: Any) -> None: 

3011 r"""Update the default execution_options dictionary 

3012 of this :class:`_engine.Engine`. 

3013 

3014 The given keys/values in \**opt are added to the 

3015 default execution options that will be used for 

3016 all connections. The initial contents of this dictionary 

3017 can be sent via the ``execution_options`` parameter 

3018 to :func:`_sa.create_engine`. 

3019 

3020 .. seealso:: 

3021 

3022 :meth:`_engine.Connection.execution_options` 

3023 

3024 :meth:`_engine.Engine.execution_options` 

3025 

3026 """ 

3027 self.dispatch.set_engine_execution_options(self, opt) 

3028 self._execution_options = self._execution_options.union(opt) 

3029 self.dialect.set_engine_execution_options(self, opt) 

3030 

3031 @overload 

3032 def execution_options( 

3033 self, 

3034 *, 

3035 compiled_cache: Optional[CompiledCacheType] = ..., 

3036 logging_token: str = ..., 

3037 isolation_level: IsolationLevel = ..., 

3038 insertmanyvalues_page_size: int = ..., 

3039 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3040 **opt: Any, 

3041 ) -> OptionEngine: ... 

3042 

3043 @overload 

3044 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3045 

3046 def execution_options(self, **opt: Any) -> OptionEngine: 

3047 """Return a new :class:`_engine.Engine` that will provide 

3048 :class:`_engine.Connection` objects with the given execution options. 

3049 

3050 The returned :class:`_engine.Engine` remains related to the original 

3051 :class:`_engine.Engine` in that it shares the same connection pool and 

3052 other state: 

3053 

3054 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3055 is the 

3056 same instance. The :meth:`_engine.Engine.dispose` 

3057 method will replace 

3058 the connection pool instance for the parent engine as well 

3059 as this one. 

3060 * Event listeners are "cascaded" - meaning, the new 

3061 :class:`_engine.Engine` 

3062 inherits the events of the parent, and new events can be associated 

3063 with the new :class:`_engine.Engine` individually. 

3064 * The logging configuration and logging_name is copied from the parent 

3065 :class:`_engine.Engine`. 

3066 

3067 The intent of the :meth:`_engine.Engine.execution_options` method is 

3068 to implement schemes where multiple :class:`_engine.Engine` 

3069 objects refer to the same connection pool, but are differentiated 

3070 by options that affect some execution-level behavior for each 

3071 engine. One such example is breaking into separate "reader" and 

3072 "writer" :class:`_engine.Engine` instances, where one 

3073 :class:`_engine.Engine` 

3074 has a lower :term:`isolation level` setting configured or is even 

3075 transaction-disabled using "autocommit". An example of this 

3076 configuration is at :ref:`dbapi_autocommit_multiple`. 

3077 

3078 Another example is one that 

3079 uses a custom option ``shard_id`` which is consumed by an event 

3080 to change the current schema on a database connection:: 

3081 

3082 from sqlalchemy import event 

3083 from sqlalchemy.engine import Engine 

3084 

3085 primary_engine = create_engine("mysql+mysqldb://") 

3086 shard1 = primary_engine.execution_options(shard_id="shard1") 

3087 shard2 = primary_engine.execution_options(shard_id="shard2") 

3088 

3089 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3090 

3091 

3092 @event.listens_for(Engine, "before_cursor_execute") 

3093 def _switch_shard(conn, cursor, stmt, params, context, executemany): 

3094 shard_id = conn.get_execution_options().get("shard_id", "default") 

3095 current_shard = conn.info.get("current_shard", None) 

3096 

3097 if current_shard != shard_id: 

3098 cursor.execute("use %s" % shards[shard_id]) 

3099 conn.info["current_shard"] = shard_id 

3100 

3101 The above recipe illustrates two :class:`_engine.Engine` objects that 

3102 will each serve as factories for :class:`_engine.Connection` objects 

3103 that have pre-established "shard_id" execution options present. A 

3104 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3105 then interprets this execution option to emit a MySQL ``use`` statement 

3106 to switch databases before a statement execution, while at the same 

3107 time keeping track of which database we've established using the 

3108 :attr:`_engine.Connection.info` dictionary. 

3109 

3110 .. seealso:: 

3111 

3112 :meth:`_engine.Connection.execution_options` 

3113 - update execution options 

3114 on a :class:`_engine.Connection` object. 

3115 

3116 :meth:`_engine.Engine.update_execution_options` 

3117 - update the execution 

3118 options for a given :class:`_engine.Engine` in place. 

3119 

3120 :meth:`_engine.Engine.get_execution_options` 

3121 

3122 

3123 """ # noqa: E501 

3124 return self._option_cls(self, opt) 

3125 

3126 def get_execution_options(self) -> _ExecuteOptions: 

3127 """Get the non-SQL options which will take effect during execution. 

3128 

3129 .. versionadded: 1.3 

3130 

3131 .. seealso:: 

3132 

3133 :meth:`_engine.Engine.execution_options` 

3134 """ 

3135 return self._execution_options 

3136 

3137 @property 

3138 def name(self) -> str: 

3139 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3140 in use by this :class:`Engine`. 

3141 

3142 """ 

3143 

3144 return self.dialect.name 

3145 

3146 @property 

3147 def driver(self) -> str: 

3148 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3149 in use by this :class:`Engine`. 

3150 

3151 """ 

3152 

3153 return self.dialect.driver 

3154 

3155 echo = log.echo_property() 

3156 

3157 def __repr__(self) -> str: 

3158 return "Engine(%r)" % (self.url,) 

3159 

3160 def dispose(self, close: bool = True) -> None: 

3161 """Dispose of the connection pool used by this 

3162 :class:`_engine.Engine`. 

3163 

3164 A new connection pool is created immediately after the old one has been 

3165 disposed. The previous connection pool is disposed either actively, by 

3166 closing out all currently checked-in connections in that pool, or 

3167 passively, by losing references to it but otherwise not closing any 

3168 connections. The latter strategy is more appropriate for an initializer 

3169 in a forked Python process. 

3170 

3171 :param close: if left at its default of ``True``, has the 

3172 effect of fully closing all **currently checked in** 

3173 database connections. Connections that are still checked out 

3174 will **not** be closed, however they will no longer be associated 

3175 with this :class:`_engine.Engine`, 

3176 so when they are closed individually, eventually the 

3177 :class:`_pool.Pool` which they are associated with will 

3178 be garbage collected and they will be closed out fully, if 

3179 not already closed on checkin. 

3180 

3181 If set to ``False``, the previous connection pool is de-referenced, 

3182 and otherwise not touched in any way. 

3183 

3184 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3185 parameter to allow the replacement of a connection pool in a child 

3186 process without interfering with the connections used by the parent 

3187 process. 

3188 

3189 

3190 .. seealso:: 

3191 

3192 :ref:`engine_disposal` 

3193 

3194 :ref:`pooling_multiprocessing` 

3195 

3196 """ 

3197 if close: 

3198 self.pool.dispose() 

3199 self.pool = self.pool.recreate() 

3200 self.dispatch.engine_disposed(self) 

3201 

3202 @contextlib.contextmanager 

3203 def _optional_conn_ctx_manager( 

3204 self, connection: Optional[Connection] = None 

3205 ) -> Iterator[Connection]: 

3206 if connection is None: 

3207 with self.connect() as conn: 

3208 yield conn 

3209 else: 

3210 yield connection 

3211 

3212 @contextlib.contextmanager 

3213 def begin(self) -> Iterator[Connection]: 

3214 """Return a context manager delivering a :class:`_engine.Connection` 

3215 with a :class:`.Transaction` established. 

3216 

3217 E.g.:: 

3218 

3219 with engine.begin() as conn: 

3220 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)")) 

3221 conn.execute(text("my_special_procedure(5)")) 

3222 

3223 Upon successful operation, the :class:`.Transaction` 

3224 is committed. If an error is raised, the :class:`.Transaction` 

3225 is rolled back. 

3226 

3227 .. seealso:: 

3228 

3229 :meth:`_engine.Engine.connect` - procure a 

3230 :class:`_engine.Connection` from 

3231 an :class:`_engine.Engine`. 

3232 

3233 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3234 for a particular :class:`_engine.Connection`. 

3235 

3236 """ # noqa: E501 

3237 with self.connect() as conn: 

3238 with conn.begin(): 

3239 yield conn 

3240 

3241 def _run_ddl_visitor( 

3242 self, 

3243 visitorcallable: Type[InvokeDDLBase], 

3244 element: SchemaVisitable, 

3245 **kwargs: Any, 

3246 ) -> None: 

3247 with self.begin() as conn: 

3248 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3249 

3250 def connect(self) -> Connection: 

3251 """Return a new :class:`_engine.Connection` object. 

3252 

3253 The :class:`_engine.Connection` acts as a Python context manager, so 

3254 the typical use of this method looks like:: 

3255 

3256 with engine.connect() as connection: 

3257 connection.execute(text("insert into table values ('foo')")) 

3258 connection.commit() 

3259 

3260 Where above, after the block is completed, the connection is "closed" 

3261 and its underlying DBAPI resources are returned to the connection pool. 

3262 This also has the effect of rolling back any transaction that 

3263 was explicitly begun or was begun via autobegin, and will 

3264 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3265 started and is still in progress. 

3266 

3267 .. seealso:: 

3268 

3269 :meth:`_engine.Engine.begin` 

3270 

3271 """ 

3272 

3273 return self._connection_cls(self) 

3274 

3275 def raw_connection(self) -> PoolProxiedConnection: 

3276 """Return a "raw" DBAPI connection from the connection pool. 

3277 

3278 The returned object is a proxied version of the DBAPI 

3279 connection object used by the underlying driver in use. 

3280 The object will have all the same behavior as the real DBAPI 

3281 connection, except that its ``close()`` method will result in the 

3282 connection being returned to the pool, rather than being closed 

3283 for real. 

3284 

3285 This method provides direct DBAPI connection access for 

3286 special situations when the API provided by 

3287 :class:`_engine.Connection` 

3288 is not needed. When a :class:`_engine.Connection` object is already 

3289 present, the DBAPI connection is available using 

3290 the :attr:`_engine.Connection.connection` accessor. 

3291 

3292 .. seealso:: 

3293 

3294 :ref:`dbapi_connections` 

3295 

3296 """ 

3297 return self.pool.connect() 

3298 

3299 

3300class OptionEngineMixin(log.Identified): 

3301 _sa_propagate_class_events = False 

3302 

3303 dispatch: dispatcher[ConnectionEventsTarget] 

3304 _compiled_cache: Optional[CompiledCacheType] 

3305 dialect: Dialect 

3306 pool: Pool 

3307 url: URL 

3308 hide_parameters: bool 

3309 echo: log.echo_property 

3310 

3311 def __init__( 

3312 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3313 ): 

3314 self._proxied = proxied 

3315 self.url = proxied.url 

3316 self.dialect = proxied.dialect 

3317 self.logging_name = proxied.logging_name 

3318 self.echo = proxied.echo 

3319 self._compiled_cache = proxied._compiled_cache 

3320 self.hide_parameters = proxied.hide_parameters 

3321 log.instance_logger(self, echoflag=self.echo) 

3322 

3323 # note: this will propagate events that are assigned to the parent 

3324 # engine after this OptionEngine is created. Since we share 

3325 # the events of the parent we also disallow class-level events 

3326 # to apply to the OptionEngine class directly. 

3327 # 

3328 # the other way this can work would be to transfer existing 

3329 # events only, using: 

3330 # self.dispatch._update(proxied.dispatch) 

3331 # 

3332 # that might be more appropriate however it would be a behavioral 

3333 # change for logic that assigns events to the parent engine and 

3334 # would like it to take effect for the already-created sub-engine. 

3335 self.dispatch = self.dispatch._join(proxied.dispatch) 

3336 

3337 self._execution_options = proxied._execution_options 

3338 self.update_execution_options(**execution_options) 

3339 

3340 def update_execution_options(self, **opt: Any) -> None: 

3341 raise NotImplementedError() 

3342 

3343 if not typing.TYPE_CHECKING: 

3344 # https://github.com/python/typing/discussions/1095 

3345 

3346 @property 

3347 def pool(self) -> Pool: 

3348 return self._proxied.pool 

3349 

3350 @pool.setter 

3351 def pool(self, pool: Pool) -> None: 

3352 self._proxied.pool = pool 

3353 

3354 @property 

3355 def _has_events(self) -> bool: 

3356 return self._proxied._has_events or self.__dict__.get( 

3357 "_has_events", False 

3358 ) 

3359 

3360 @_has_events.setter 

3361 def _has_events(self, value: bool) -> None: 

3362 self.__dict__["_has_events"] = value 

3363 

3364 

3365class OptionEngine(OptionEngineMixin, Engine): 

3366 def update_execution_options(self, **opt: Any) -> None: 

3367 Engine.update_execution_options(self, **opt) 

3368 

3369 

3370Engine._option_cls = OptionEngine