Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1005 statements  

1# engine/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.""" 

8 

9from __future__ import annotations 

10 

11import contextlib 

12import sys 

13import typing 

14from typing import Any 

15from typing import Callable 

16from typing import cast 

17from typing import Iterable 

18from typing import Iterator 

19from typing import List 

20from typing import Mapping 

21from typing import NoReturn 

22from typing import Optional 

23from typing import overload 

24from typing import Tuple 

25from typing import Type 

26from typing import TypeVar 

27from typing import Union 

28 

29from .interfaces import BindTyping 

30from .interfaces import ConnectionEventsTarget 

31from .interfaces import DBAPICursor 

32from .interfaces import ExceptionContext 

33from .interfaces import ExecuteStyle 

34from .interfaces import ExecutionContext 

35from .interfaces import IsolationLevel 

36from .util import _distill_params_20 

37from .util import _distill_raw_params 

38from .util import TransactionalContext 

39from .. import exc 

40from .. import inspection 

41from .. import log 

42from .. import util 

43from ..sql import compiler 

44from ..sql import util as sql_util 

45 

46if typing.TYPE_CHECKING: 

47 from . import CursorResult 

48 from . import ScalarResult 

49 from .interfaces import _AnyExecuteParams 

50 from .interfaces import _AnyMultiExecuteParams 

51 from .interfaces import _CoreAnyExecuteParams 

52 from .interfaces import _CoreMultiExecuteParams 

53 from .interfaces import _CoreSingleExecuteParams 

54 from .interfaces import _DBAPIAnyExecuteParams 

55 from .interfaces import _DBAPISingleExecuteParams 

56 from .interfaces import _ExecuteOptions 

57 from .interfaces import CompiledCacheType 

58 from .interfaces import CoreExecuteOptionsParameter 

59 from .interfaces import Dialect 

60 from .interfaces import SchemaTranslateMapType 

61 from .reflection import Inspector # noqa 

62 from .url import URL 

63 from ..event import dispatcher 

64 from ..log import _EchoFlagType 

65 from ..pool import _ConnectionFairy 

66 from ..pool import Pool 

67 from ..pool import PoolProxiedConnection 

68 from ..sql import Executable 

69 from ..sql._typing import _InfoType 

70 from ..sql.compiler import Compiled 

71 from ..sql.ddl import ExecutableDDLElement 

72 from ..sql.ddl import InvokeDDLBase 

73 from ..sql.functions import FunctionElement 

74 from ..sql.schema import DefaultGenerator 

75 from ..sql.schema import HasSchemaAttr 

76 from ..sql.schema import SchemaVisitable 

77 from ..sql.selectable import TypedReturnsRows 

78 

79 

80_T = TypeVar("_T", bound=Any) 

81_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

82NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

83 

84 

85class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

86 """Provides high-level functionality for a wrapped DB-API connection. 

87 

88 The :class:`_engine.Connection` object is procured by calling the 

89 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

90 object, and provides services for execution of SQL statements as well 

91 as transaction control. 

92 

93 The Connection object is **not** thread-safe. While a Connection can be 

94 shared among threads using properly synchronized access, it is still 

95 possible that the underlying DBAPI connection may not support shared 

96 access between threads. Check the DBAPI documentation for details. 

97 

98 The Connection object represents a single DBAPI connection checked out 

99 from the connection pool. In this state, the connection pool has no 

100 affect upon the connection, including its expiration or timeout state. 

101 For the connection pool to properly manage connections, connections 

102 should be returned to the connection pool (i.e. ``connection.close()``) 

103 whenever the connection is not in use. 

104 

105 .. index:: 

106 single: thread safety; Connection 

107 

108 """ 

109 

110 dialect: Dialect 

111 dispatch: dispatcher[ConnectionEventsTarget] 

112 

113 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

114 

115 # used by sqlalchemy.engine.util.TransactionalContext 

116 _trans_context_manager: Optional[TransactionalContext] = None 

117 

118 # legacy as of 2.0, should be eventually deprecated and 

119 # removed. was used in the "pre_ping" recipe that's been in the docs 

120 # a long time 

121 should_close_with_result = False 

122 

123 _dbapi_connection: Optional[PoolProxiedConnection] 

124 

125 _execution_options: _ExecuteOptions 

126 

127 _transaction: Optional[RootTransaction] 

128 _nested_transaction: Optional[NestedTransaction] 

129 

130 def __init__( 

131 self, 

132 engine: Engine, 

133 connection: Optional[PoolProxiedConnection] = None, 

134 _has_events: Optional[bool] = None, 

135 _allow_revalidate: bool = True, 

136 _allow_autobegin: bool = True, 

137 ): 

138 """Construct a new Connection.""" 

139 self.engine = engine 

140 self.dialect = dialect = engine.dialect 

141 

142 if connection is None: 

143 try: 

144 self._dbapi_connection = engine.raw_connection() 

145 except dialect.loaded_dbapi.Error as err: 

146 Connection._handle_dbapi_exception_noconnection( 

147 err, dialect, engine 

148 ) 

149 raise 

150 else: 

151 self._dbapi_connection = connection 

152 

153 self._transaction = self._nested_transaction = None 

154 self.__savepoint_seq = 0 

155 self.__in_begin = False 

156 

157 self.__can_reconnect = _allow_revalidate 

158 self._allow_autobegin = _allow_autobegin 

159 self._echo = self.engine._should_log_info() 

160 

161 if _has_events is None: 

162 # if _has_events is sent explicitly as False, 

163 # then don't join the dispatch of the engine; we don't 

164 # want to handle any of the engine's events in that case. 

165 self.dispatch = self.dispatch._join(engine.dispatch) 

166 self._has_events = _has_events or ( 

167 _has_events is None and engine._has_events 

168 ) 

169 

170 self._execution_options = engine._execution_options 

171 

172 if self._has_events or self.engine._has_events: 

173 self.dispatch.engine_connect(self) 

174 

175 # this can be assigned differently via 

176 # characteristics.LoggingTokenCharacteristic 

177 _message_formatter: Any = None 

178 

179 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

180 fmt = self._message_formatter 

181 

182 if fmt: 

183 message = fmt(message) 

184 

185 if log.STACKLEVEL: 

186 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

187 

188 self.engine.logger.info(message, *arg, **kw) 

189 

190 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

191 fmt = self._message_formatter 

192 

193 if fmt: 

194 message = fmt(message) 

195 

196 if log.STACKLEVEL: 

197 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

198 

199 self.engine.logger.debug(message, *arg, **kw) 

200 

201 @property 

202 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

203 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

204 self._execution_options.get("schema_translate_map", None) 

205 ) 

206 

207 return schema_translate_map 

208 

209 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

210 """Return the schema name for the given schema item taking into 

211 account current schema translate map. 

212 

213 """ 

214 

215 name = obj.schema 

216 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

217 self._execution_options.get("schema_translate_map", None) 

218 ) 

219 

220 if ( 

221 schema_translate_map 

222 and name in schema_translate_map 

223 and obj._use_schema_map 

224 ): 

225 return schema_translate_map[name] 

226 else: 

227 return name 

228 

229 def __enter__(self) -> Connection: 

230 return self 

231 

232 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

233 self.close() 

234 

235 @overload 

236 def execution_options( 

237 self, 

238 *, 

239 compiled_cache: Optional[CompiledCacheType] = ..., 

240 logging_token: str = ..., 

241 isolation_level: IsolationLevel = ..., 

242 no_parameters: bool = False, 

243 stream_results: bool = False, 

244 max_row_buffer: int = ..., 

245 yield_per: int = ..., 

246 insertmanyvalues_page_size: int = ..., 

247 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

248 preserve_rowcount: bool = False, 

249 **opt: Any, 

250 ) -> Connection: ... 

251 

252 @overload 

253 def execution_options(self, **opt: Any) -> Connection: ... 

254 

255 def execution_options(self, **opt: Any) -> Connection: 

256 r"""Set non-SQL options for the connection which take effect 

257 during execution. 

258 

259 This method modifies this :class:`_engine.Connection` **in-place**; 

260 the return value is the same :class:`_engine.Connection` object 

261 upon which the method is called. Note that this is in contrast 

262 to the behavior of the ``execution_options`` methods on other 

263 objects such as :meth:`_engine.Engine.execution_options` and 

264 :meth:`_sql.Executable.execution_options`. The rationale is that many 

265 such execution options necessarily modify the state of the base 

266 DBAPI connection in any case so there is no feasible means of 

267 keeping the effect of such an option localized to a "sub" connection. 

268 

269 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

270 method, in contrast to other objects with this method, modifies 

271 the connection in-place without creating copy of it. 

272 

273 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

274 method accepts any arbitrary parameters including user defined names. 

275 All parameters given are consumable in a number of ways including 

276 by using the :meth:`_engine.Connection.get_execution_options` method. 

277 See the examples at :meth:`_sql.Executable.execution_options` 

278 and :meth:`_engine.Engine.execution_options`. 

279 

280 The keywords that are currently recognized by SQLAlchemy itself 

281 include all those listed under :meth:`.Executable.execution_options`, 

282 as well as others that are specific to :class:`_engine.Connection`. 

283 

284 :param compiled_cache: Available on: :class:`_engine.Connection`, 

285 :class:`_engine.Engine`. 

286 

287 A dictionary where :class:`.Compiled` objects 

288 will be cached when the :class:`_engine.Connection` 

289 compiles a clause 

290 expression into a :class:`.Compiled` object. This dictionary will 

291 supersede the statement cache that may be configured on the 

292 :class:`_engine.Engine` itself. If set to None, caching 

293 is disabled, even if the engine has a configured cache size. 

294 

295 Note that the ORM makes use of its own "compiled" caches for 

296 some operations, including flush operations. The caching 

297 used by the ORM internally supersedes a cache dictionary 

298 specified here. 

299 

300 :param logging_token: Available on: :class:`_engine.Connection`, 

301 :class:`_engine.Engine`, :class:`_sql.Executable`. 

302 

303 Adds the specified string token surrounded by brackets in log 

304 messages logged by the connection, i.e. the logging that's enabled 

305 either via the :paramref:`_sa.create_engine.echo` flag or via the 

306 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

307 per-connection or per-sub-engine token to be available which is 

308 useful for debugging concurrent connection scenarios. 

309 

310 .. versionadded:: 1.4.0b2 

311 

312 .. seealso:: 

313 

314 :ref:`dbengine_logging_tokens` - usage example 

315 

316 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

317 name used by the Python logger object itself. 

318 

319 :param isolation_level: Available on: :class:`_engine.Connection`, 

320 :class:`_engine.Engine`. 

321 

322 Set the transaction isolation level for the lifespan of this 

323 :class:`_engine.Connection` object. 

324 Valid values include those string 

325 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

326 parameter passed to :func:`_sa.create_engine`. These levels are 

327 semi-database specific; see individual dialect documentation for 

328 valid levels. 

329 

330 The isolation level option applies the isolation level by emitting 

331 statements on the DBAPI connection, and **necessarily affects the 

332 original Connection object overall**. The isolation level will remain 

333 at the given setting until explicitly changed, or when the DBAPI 

334 connection itself is :term:`released` to the connection pool, i.e. the 

335 :meth:`_engine.Connection.close` method is called, at which time an 

336 event handler will emit additional statements on the DBAPI connection 

337 in order to revert the isolation level change. 

338 

339 .. note:: The ``isolation_level`` execution option may only be 

340 established before the :meth:`_engine.Connection.begin` method is 

341 called, as well as before any SQL statements are emitted which 

342 would otherwise trigger "autobegin", or directly after a call to 

343 :meth:`_engine.Connection.commit` or 

344 :meth:`_engine.Connection.rollback`. A database cannot change the 

345 isolation level on a transaction in progress. 

346 

347 .. note:: The ``isolation_level`` execution option is implicitly 

348 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

349 the :meth:`_engine.Connection.invalidate` method, or if a 

350 disconnection error occurs. The new connection produced after the 

351 invalidation will **not** have the selected isolation level 

352 re-applied to it automatically. 

353 

354 .. seealso:: 

355 

356 :ref:`dbapi_autocommit` 

357 

358 :meth:`_engine.Connection.get_isolation_level` 

359 - view current actual level 

360 

361 :param no_parameters: Available on: :class:`_engine.Connection`, 

362 :class:`_sql.Executable`. 

363 

364 When ``True``, if the final parameter 

365 list or dictionary is totally empty, will invoke the 

366 statement on the cursor as ``cursor.execute(statement)``, 

367 not passing the parameter collection at all. 

368 Some DBAPIs such as psycopg2 and mysql-python consider 

369 percent signs as significant only when parameters are 

370 present; this option allows code to generate SQL 

371 containing percent signs (and possibly other characters) 

372 that is neutral regarding whether it's executed by the DBAPI 

373 or piped into a script that's later invoked by 

374 command line tools. 

375 

376 :param stream_results: Available on: :class:`_engine.Connection`, 

377 :class:`_sql.Executable`. 

378 

379 Indicate to the dialect that results should be "streamed" and not 

380 pre-buffered, if possible. For backends such as PostgreSQL, MySQL 

381 and MariaDB, this indicates the use of a "server side cursor" as 

382 opposed to a client side cursor. Other backends such as that of 

383 Oracle Database may already use server side cursors by default. 

384 

385 The usage of 

386 :paramref:`_engine.Connection.execution_options.stream_results` is 

387 usually combined with setting a fixed number of rows to to be fetched 

388 in batches, to allow for efficient iteration of database rows while 

389 at the same time not loading all result rows into memory at once; 

390 this can be configured on a :class:`_engine.Result` object using the 

391 :meth:`_engine.Result.yield_per` method, after execution has 

392 returned a new :class:`_engine.Result`. If 

393 :meth:`_engine.Result.yield_per` is not used, 

394 the :paramref:`_engine.Connection.execution_options.stream_results` 

395 mode of operation will instead use a dynamically sized buffer 

396 which buffers sets of rows at a time, growing on each batch 

397 based on a fixed growth size up until a limit which may 

398 be configured using the 

399 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

400 parameter. 

401 

402 When using the ORM to fetch ORM mapped objects from a result, 

403 :meth:`_engine.Result.yield_per` should always be used with 

404 :paramref:`_engine.Connection.execution_options.stream_results`, 

405 so that the ORM does not fetch all rows into new ORM objects at once. 

406 

407 For typical use, the 

408 :paramref:`_engine.Connection.execution_options.yield_per` execution 

409 option should be preferred, which sets up both 

410 :paramref:`_engine.Connection.execution_options.stream_results` and 

411 :meth:`_engine.Result.yield_per` at once. This option is supported 

412 both at a core level by :class:`_engine.Connection` as well as by the 

413 ORM :class:`_engine.Session`; the latter is described at 

414 :ref:`orm_queryguide_yield_per`. 

415 

416 .. seealso:: 

417 

418 :ref:`engine_stream_results` - background on 

419 :paramref:`_engine.Connection.execution_options.stream_results` 

420 

421 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

422 

423 :paramref:`_engine.Connection.execution_options.yield_per` 

424 

425 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

426 describing the ORM version of ``yield_per`` 

427 

428 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

429 :class:`_sql.Executable`. Sets a maximum 

430 buffer size to use when the 

431 :paramref:`_engine.Connection.execution_options.stream_results` 

432 execution option is used on a backend that supports server side 

433 cursors. The default value if not specified is 1000. 

434 

435 .. seealso:: 

436 

437 :paramref:`_engine.Connection.execution_options.stream_results` 

438 

439 :ref:`engine_stream_results` 

440 

441 

442 :param yield_per: Available on: :class:`_engine.Connection`, 

443 :class:`_sql.Executable`. Integer value applied which will 

444 set the :paramref:`_engine.Connection.execution_options.stream_results` 

445 execution option and invoke :meth:`_engine.Result.yield_per` 

446 automatically at once. Allows equivalent functionality as 

447 is present when using this parameter with the ORM. 

448 

449 .. versionadded:: 1.4.40 

450 

451 .. seealso:: 

452 

453 :ref:`engine_stream_results` - background and examples 

454 on using server side cursors with Core. 

455 

456 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

457 describing the ORM version of ``yield_per`` 

458 

459 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

460 :class:`_engine.Engine`. Number of rows to format into an 

461 INSERT statement when the statement uses "insertmanyvalues" mode, 

462 which is a paged form of bulk insert that is used for many backends 

463 when using :term:`executemany` execution typically in conjunction 

464 with RETURNING. Defaults to 1000. May also be modified on a 

465 per-engine basis using the 

466 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

467 

468 .. versionadded:: 2.0 

469 

470 .. seealso:: 

471 

472 :ref:`engine_insertmanyvalues` 

473 

474 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

475 :class:`_engine.Engine`, :class:`_sql.Executable`. 

476 

477 A dictionary mapping schema names to schema names, that will be 

478 applied to the :paramref:`_schema.Table.schema` element of each 

479 :class:`_schema.Table` 

480 encountered when SQL or DDL expression elements 

481 are compiled into strings; the resulting schema name will be 

482 converted based on presence in the map of the original name. 

483 

484 .. seealso:: 

485 

486 :ref:`schema_translating` 

487 

488 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

489 attribute will be unconditionally memoized within the result and 

490 made available via the :attr:`.CursorResult.rowcount` attribute. 

491 Normally, this attribute is only preserved for UPDATE and DELETE 

492 statements. Using this option, the DBAPIs rowcount value can 

493 be accessed for other kinds of statements such as INSERT and SELECT, 

494 to the degree that the DBAPI supports these statements. See 

495 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

496 of this attribute. 

497 

498 .. versionadded:: 2.0.28 

499 

500 .. seealso:: 

501 

502 :meth:`_engine.Engine.execution_options` 

503 

504 :meth:`.Executable.execution_options` 

505 

506 :meth:`_engine.Connection.get_execution_options` 

507 

508 :ref:`orm_queryguide_execution_options` - documentation on all 

509 ORM-specific execution options 

510 

511 """ # noqa 

512 if self._has_events or self.engine._has_events: 

513 self.dispatch.set_connection_execution_options(self, opt) 

514 self._execution_options = self._execution_options.union(opt) 

515 self.dialect.set_connection_execution_options(self, opt) 

516 return self 

517 

518 def get_execution_options(self) -> _ExecuteOptions: 

519 """Get the non-SQL options which will take effect during execution. 

520 

521 .. versionadded:: 1.3 

522 

523 .. seealso:: 

524 

525 :meth:`_engine.Connection.execution_options` 

526 """ 

527 return self._execution_options 

528 

529 @property 

530 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

531 pool_proxied_connection = self._dbapi_connection 

532 return ( 

533 pool_proxied_connection is not None 

534 and pool_proxied_connection.is_valid 

535 ) 

536 

537 @property 

538 def closed(self) -> bool: 

539 """Return True if this connection is closed.""" 

540 

541 return self._dbapi_connection is None and not self.__can_reconnect 

542 

543 @property 

544 def invalidated(self) -> bool: 

545 """Return True if this connection was invalidated. 

546 

547 This does not indicate whether or not the connection was 

548 invalidated at the pool level, however 

549 

550 """ 

551 

552 # prior to 1.4, "invalid" was stored as a state independent of 

553 # "closed", meaning an invalidated connection could be "closed", 

554 # the _dbapi_connection would be None and closed=True, yet the 

555 # "invalid" flag would stay True. This meant that there were 

556 # three separate states (open/valid, closed/valid, closed/invalid) 

557 # when there is really no reason for that; a connection that's 

558 # "closed" does not need to be "invalid". So the state is now 

559 # represented by the two facts alone. 

560 

561 pool_proxied_connection = self._dbapi_connection 

562 return pool_proxied_connection is None and self.__can_reconnect 

563 

564 @property 

565 def connection(self) -> PoolProxiedConnection: 

566 """The underlying DB-API connection managed by this Connection. 

567 

568 This is a SQLAlchemy connection-pool proxied connection 

569 which then has the attribute 

570 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

571 actual driver connection. 

572 

573 .. seealso:: 

574 

575 

576 :ref:`dbapi_connections` 

577 

578 """ 

579 

580 if self._dbapi_connection is None: 

581 try: 

582 return self._revalidate_connection() 

583 except (exc.PendingRollbackError, exc.ResourceClosedError): 

584 raise 

585 except BaseException as e: 

586 self._handle_dbapi_exception(e, None, None, None, None) 

587 else: 

588 return self._dbapi_connection 

589 

590 def get_isolation_level(self) -> IsolationLevel: 

591 """Return the current **actual** isolation level that's present on 

592 the database within the scope of this connection. 

593 

594 This attribute will perform a live SQL operation against the database 

595 in order to procure the current isolation level, so the value returned 

596 is the actual level on the underlying DBAPI connection regardless of 

597 how this state was set. This will be one of the four actual isolation 

598 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

599 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

600 level setting. Third party dialects may also feature additional 

601 isolation level settings. 

602 

603 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

604 isolation level, which is a separate :term:`dbapi` setting that's 

605 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

606 in use, the database connection still has a "traditional" isolation 

607 mode in effect, that is typically one of the four values 

608 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

609 ``SERIALIZABLE``. 

610 

611 Compare to the :attr:`_engine.Connection.default_isolation_level` 

612 accessor which returns the isolation level that is present on the 

613 database at initial connection time. 

614 

615 .. seealso:: 

616 

617 :attr:`_engine.Connection.default_isolation_level` 

618 - view default level 

619 

620 :paramref:`_sa.create_engine.isolation_level` 

621 - set per :class:`_engine.Engine` isolation level 

622 

623 :paramref:`.Connection.execution_options.isolation_level` 

624 - set per :class:`_engine.Connection` isolation level 

625 

626 """ 

627 dbapi_connection = self.connection.dbapi_connection 

628 assert dbapi_connection is not None 

629 try: 

630 return self.dialect.get_isolation_level(dbapi_connection) 

631 except BaseException as e: 

632 self._handle_dbapi_exception(e, None, None, None, None) 

633 

634 @property 

635 def default_isolation_level(self) -> Optional[IsolationLevel]: 

636 """The initial-connection time isolation level associated with the 

637 :class:`_engine.Dialect` in use. 

638 

639 This value is independent of the 

640 :paramref:`.Connection.execution_options.isolation_level` and 

641 :paramref:`.Engine.execution_options.isolation_level` execution 

642 options, and is determined by the :class:`_engine.Dialect` when the 

643 first connection is created, by performing a SQL query against the 

644 database for the current isolation level before any additional commands 

645 have been emitted. 

646 

647 Calling this accessor does not invoke any new SQL queries. 

648 

649 .. seealso:: 

650 

651 :meth:`_engine.Connection.get_isolation_level` 

652 - view current actual isolation level 

653 

654 :paramref:`_sa.create_engine.isolation_level` 

655 - set per :class:`_engine.Engine` isolation level 

656 

657 :paramref:`.Connection.execution_options.isolation_level` 

658 - set per :class:`_engine.Connection` isolation level 

659 

660 """ 

661 return self.dialect.default_isolation_level 

662 

663 def _invalid_transaction(self) -> NoReturn: 

664 raise exc.PendingRollbackError( 

665 "Can't reconnect until invalid %stransaction is rolled " 

666 "back. Please rollback() fully before proceeding" 

667 % ("savepoint " if self._nested_transaction is not None else ""), 

668 code="8s2b", 

669 ) 

670 

671 def _revalidate_connection(self) -> PoolProxiedConnection: 

672 if self.__can_reconnect and self.invalidated: 

673 if self._transaction is not None: 

674 self._invalid_transaction() 

675 self._dbapi_connection = self.engine.raw_connection() 

676 return self._dbapi_connection 

677 raise exc.ResourceClosedError("This Connection is closed") 

678 

679 @property 

680 def info(self) -> _InfoType: 

681 """Info dictionary associated with the underlying DBAPI connection 

682 referred to by this :class:`_engine.Connection`, allowing user-defined 

683 data to be associated with the connection. 

684 

685 The data here will follow along with the DBAPI connection including 

686 after it is returned to the connection pool and used again 

687 in subsequent instances of :class:`_engine.Connection`. 

688 

689 """ 

690 

691 return self.connection.info 

692 

693 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

694 """Invalidate the underlying DBAPI connection associated with 

695 this :class:`_engine.Connection`. 

696 

697 An attempt will be made to close the underlying DBAPI connection 

698 immediately; however if this operation fails, the error is logged 

699 but not raised. The connection is then discarded whether or not 

700 close() succeeded. 

701 

702 Upon the next use (where "use" typically means using the 

703 :meth:`_engine.Connection.execute` method or similar), 

704 this :class:`_engine.Connection` will attempt to 

705 procure a new DBAPI connection using the services of the 

706 :class:`_pool.Pool` as a source of connectivity (e.g. 

707 a "reconnection"). 

708 

709 If a transaction was in progress (e.g. the 

710 :meth:`_engine.Connection.begin` method has been called) when 

711 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

712 level all state associated with this transaction is lost, as 

713 the DBAPI connection is closed. The :class:`_engine.Connection` 

714 will not allow a reconnection to proceed until the 

715 :class:`.Transaction` object is ended, by calling the 

716 :meth:`.Transaction.rollback` method; until that point, any attempt at 

717 continuing to use the :class:`_engine.Connection` will raise an 

718 :class:`~sqlalchemy.exc.InvalidRequestError`. 

719 This is to prevent applications from accidentally 

720 continuing an ongoing transactional operations despite the 

721 fact that the transaction has been lost due to an 

722 invalidation. 

723 

724 The :meth:`_engine.Connection.invalidate` method, 

725 just like auto-invalidation, 

726 will at the connection pool level invoke the 

727 :meth:`_events.PoolEvents.invalidate` event. 

728 

729 :param exception: an optional ``Exception`` instance that's the 

730 reason for the invalidation. is passed along to event handlers 

731 and logging functions. 

732 

733 .. seealso:: 

734 

735 :ref:`pool_connection_invalidation` 

736 

737 """ 

738 

739 if self.invalidated: 

740 return 

741 

742 if self.closed: 

743 raise exc.ResourceClosedError("This Connection is closed") 

744 

745 if self._still_open_and_dbapi_connection_is_valid: 

746 pool_proxied_connection = self._dbapi_connection 

747 assert pool_proxied_connection is not None 

748 pool_proxied_connection.invalidate(exception) 

749 

750 self._dbapi_connection = None 

751 

752 def detach(self) -> None: 

753 """Detach the underlying DB-API connection from its connection pool. 

754 

755 E.g.:: 

756 

757 with engine.connect() as conn: 

758 conn.detach() 

759 conn.execute(text("SET search_path TO schema1, schema2")) 

760 

761 # work with connection 

762 

763 # connection is fully closed (since we used "with:", can 

764 # also call .close()) 

765 

766 This :class:`_engine.Connection` instance will remain usable. 

767 When closed 

768 (or exited from a context manager context as above), 

769 the DB-API connection will be literally closed and not 

770 returned to its originating pool. 

771 

772 This method can be used to insulate the rest of an application 

773 from a modified state on a connection (such as a transaction 

774 isolation level or similar). 

775 

776 """ 

777 

778 if self.closed: 

779 raise exc.ResourceClosedError("This Connection is closed") 

780 

781 pool_proxied_connection = self._dbapi_connection 

782 if pool_proxied_connection is None: 

783 raise exc.InvalidRequestError( 

784 "Can't detach an invalidated Connection" 

785 ) 

786 pool_proxied_connection.detach() 

787 

788 def _autobegin(self) -> None: 

789 if self._allow_autobegin and not self.__in_begin: 

790 self.begin() 

791 

792 def begin(self) -> RootTransaction: 

793 """Begin a transaction prior to autobegin occurring. 

794 

795 E.g.:: 

796 

797 with engine.connect() as conn: 

798 with conn.begin() as trans: 

799 conn.execute(table.insert(), {"username": "sandy"}) 

800 

801 The returned object is an instance of :class:`_engine.RootTransaction`. 

802 This object represents the "scope" of the transaction, 

803 which completes when either the :meth:`_engine.Transaction.rollback` 

804 or :meth:`_engine.Transaction.commit` method is called; the object 

805 also works as a context manager as illustrated above. 

806 

807 The :meth:`_engine.Connection.begin` method begins a 

808 transaction that normally will be begun in any case when the connection 

809 is first used to execute a statement. The reason this method might be 

810 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

811 event at a specific time, or to organize code within the scope of a 

812 connection checkout in terms of context managed blocks, such as:: 

813 

814 with engine.connect() as conn: 

815 with conn.begin(): 

816 conn.execute(...) 

817 conn.execute(...) 

818 

819 with conn.begin(): 

820 conn.execute(...) 

821 conn.execute(...) 

822 

823 The above code is not fundamentally any different in its behavior than 

824 the following code which does not use 

825 :meth:`_engine.Connection.begin`; the below style is known 

826 as "commit as you go" style:: 

827 

828 with engine.connect() as conn: 

829 conn.execute(...) 

830 conn.execute(...) 

831 conn.commit() 

832 

833 conn.execute(...) 

834 conn.execute(...) 

835 conn.commit() 

836 

837 From a database point of view, the :meth:`_engine.Connection.begin` 

838 method does not emit any SQL or change the state of the underlying 

839 DBAPI connection in any way; the Python DBAPI does not have any 

840 concept of explicit transaction begin. 

841 

842 .. seealso:: 

843 

844 :ref:`tutorial_working_with_transactions` - in the 

845 :ref:`unified_tutorial` 

846 

847 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

848 

849 :meth:`_engine.Connection.begin_twophase` - 

850 use a two phase /XID transaction 

851 

852 :meth:`_engine.Engine.begin` - context manager available from 

853 :class:`_engine.Engine` 

854 

855 """ 

856 if self._transaction is None: 

857 self._transaction = RootTransaction(self) 

858 return self._transaction 

859 else: 

860 raise exc.InvalidRequestError( 

861 "This connection has already initialized a SQLAlchemy " 

862 "Transaction() object via begin() or autobegin; can't " 

863 "call begin() here unless rollback() or commit() " 

864 "is called first." 

865 ) 

866 

867 def begin_nested(self) -> NestedTransaction: 

868 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

869 handle that controls the scope of the SAVEPOINT. 

870 

871 E.g.:: 

872 

873 with engine.begin() as connection: 

874 with connection.begin_nested(): 

875 connection.execute(table.insert(), {"username": "sandy"}) 

876 

877 The returned object is an instance of 

878 :class:`_engine.NestedTransaction`, which includes transactional 

879 methods :meth:`_engine.NestedTransaction.commit` and 

880 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

881 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

882 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

883 to the :class:`_engine.NestedTransaction` object and is generated 

884 automatically. Like any other :class:`_engine.Transaction`, the 

885 :class:`_engine.NestedTransaction` may be used as a context manager as 

886 illustrated above which will "release" or "rollback" corresponding to 

887 if the operation within the block were successful or raised an 

888 exception. 

889 

890 Nested transactions require SAVEPOINT support in the underlying 

891 database, else the behavior is undefined. SAVEPOINT is commonly used to 

892 run operations within a transaction that may fail, while continuing the 

893 outer transaction. E.g.:: 

894 

895 from sqlalchemy import exc 

896 

897 with engine.begin() as connection: 

898 trans = connection.begin_nested() 

899 try: 

900 connection.execute(table.insert(), {"username": "sandy"}) 

901 trans.commit() 

902 except exc.IntegrityError: # catch for duplicate username 

903 trans.rollback() # rollback to savepoint 

904 

905 # outer transaction continues 

906 connection.execute(...) 

907 

908 If :meth:`_engine.Connection.begin_nested` is called without first 

909 calling :meth:`_engine.Connection.begin` or 

910 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

911 will "autobegin" the outer transaction first. This outer transaction 

912 may be committed using "commit-as-you-go" style, e.g.:: 

913 

914 with engine.connect() as connection: # begin() wasn't called 

915 

916 with connection.begin_nested(): # will auto-"begin()" first 

917 connection.execute(...) 

918 # savepoint is released 

919 

920 connection.execute(...) 

921 

922 # explicitly commit outer transaction 

923 connection.commit() 

924 

925 # can continue working with connection here 

926 

927 .. versionchanged:: 2.0 

928 

929 :meth:`_engine.Connection.begin_nested` will now participate 

930 in the connection "autobegin" behavior that is new as of 

931 2.0 / "future" style connections in 1.4. 

932 

933 .. seealso:: 

934 

935 :meth:`_engine.Connection.begin` 

936 

937 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

938 

939 """ 

940 if self._transaction is None: 

941 self._autobegin() 

942 

943 return NestedTransaction(self) 

944 

945 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

946 """Begin a two-phase or XA transaction and return a transaction 

947 handle. 

948 

949 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

950 which in addition to the methods provided by 

951 :class:`.Transaction`, also provides a 

952 :meth:`~.TwoPhaseTransaction.prepare` method. 

953 

954 :param xid: the two phase transaction id. If not supplied, a 

955 random id will be generated. The accepted type and value depends on 

956 the driver in use. 

957 

958 .. seealso:: 

959 

960 :meth:`_engine.Connection.begin` 

961 

962 :meth:`_engine.Connection.begin_twophase` 

963 

964 """ 

965 

966 if self._transaction is not None: 

967 raise exc.InvalidRequestError( 

968 "Cannot start a two phase transaction when a transaction " 

969 "is already in progress." 

970 ) 

971 if xid is None: 

972 xid = self.engine.dialect.create_xid() 

973 return TwoPhaseTransaction(self, xid) 

974 

975 def commit(self) -> None: 

976 """Commit the transaction that is currently in progress. 

977 

978 This method commits the current transaction if one has been started. 

979 If no transaction was started, the method has no effect, assuming 

980 the connection is in a non-invalidated state. 

981 

982 A transaction is begun on a :class:`_engine.Connection` automatically 

983 whenever a statement is first executed, or when the 

984 :meth:`_engine.Connection.begin` method is called. 

985 

986 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

987 the primary database transaction that is linked to the 

988 :class:`_engine.Connection` object. It does not operate upon a 

989 SAVEPOINT that would have been invoked from the 

990 :meth:`_engine.Connection.begin_nested` method; for control of a 

991 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

992 :class:`_engine.NestedTransaction` that is returned by the 

993 :meth:`_engine.Connection.begin_nested` method itself. 

994 

995 

996 """ 

997 if self._transaction: 

998 self._transaction.commit() 

999 

1000 def rollback(self) -> None: 

1001 """Roll back the transaction that is currently in progress. 

1002 

1003 This method rolls back the current transaction if one has been started. 

1004 If no transaction was started, the method has no effect. If a 

1005 transaction was started and the connection is in an invalidated state, 

1006 the transaction is cleared using this method. 

1007 

1008 A transaction is begun on a :class:`_engine.Connection` automatically 

1009 whenever a statement is first executed, or when the 

1010 :meth:`_engine.Connection.begin` method is called. 

1011 

1012 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1013 upon the primary database transaction that is linked to the 

1014 :class:`_engine.Connection` object. It does not operate upon a 

1015 SAVEPOINT that would have been invoked from the 

1016 :meth:`_engine.Connection.begin_nested` method; for control of a 

1017 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1018 :class:`_engine.NestedTransaction` that is returned by the 

1019 :meth:`_engine.Connection.begin_nested` method itself. 

1020 

1021 

1022 """ 

1023 if self._transaction: 

1024 self._transaction.rollback() 

1025 

1026 def recover_twophase(self) -> List[Any]: 

1027 return self.engine.dialect.do_recover_twophase(self) 

1028 

1029 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1030 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1031 

1032 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1033 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1034 

1035 def in_transaction(self) -> bool: 

1036 """Return True if a transaction is in progress.""" 

1037 return self._transaction is not None and self._transaction.is_active 

1038 

1039 def in_nested_transaction(self) -> bool: 

1040 """Return True if a transaction is in progress.""" 

1041 return ( 

1042 self._nested_transaction is not None 

1043 and self._nested_transaction.is_active 

1044 ) 

1045 

1046 def _is_autocommit_isolation(self) -> bool: 

1047 opt_iso = self._execution_options.get("isolation_level", None) 

1048 return bool( 

1049 opt_iso == "AUTOCOMMIT" 

1050 or ( 

1051 opt_iso is None 

1052 and self.engine.dialect._on_connect_isolation_level 

1053 == "AUTOCOMMIT" 

1054 ) 

1055 ) 

1056 

1057 def _get_required_transaction(self) -> RootTransaction: 

1058 trans = self._transaction 

1059 if trans is None: 

1060 raise exc.InvalidRequestError("connection is not in a transaction") 

1061 return trans 

1062 

1063 def _get_required_nested_transaction(self) -> NestedTransaction: 

1064 trans = self._nested_transaction 

1065 if trans is None: 

1066 raise exc.InvalidRequestError( 

1067 "connection is not in a nested transaction" 

1068 ) 

1069 return trans 

1070 

1071 def get_transaction(self) -> Optional[RootTransaction]: 

1072 """Return the current root transaction in progress, if any. 

1073 

1074 .. versionadded:: 1.4 

1075 

1076 """ 

1077 

1078 return self._transaction 

1079 

1080 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1081 """Return the current nested transaction in progress, if any. 

1082 

1083 .. versionadded:: 1.4 

1084 

1085 """ 

1086 return self._nested_transaction 

1087 

1088 def _begin_impl(self, transaction: RootTransaction) -> None: 

1089 if self._echo: 

1090 if self._is_autocommit_isolation(): 

1091 self._log_info( 

1092 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1093 "autocommit mode)" 

1094 ) 

1095 else: 

1096 self._log_info("BEGIN (implicit)") 

1097 

1098 self.__in_begin = True 

1099 

1100 if self._has_events or self.engine._has_events: 

1101 self.dispatch.begin(self) 

1102 

1103 try: 

1104 self.engine.dialect.do_begin(self.connection) 

1105 except BaseException as e: 

1106 self._handle_dbapi_exception(e, None, None, None, None) 

1107 finally: 

1108 self.__in_begin = False 

1109 

1110 def _rollback_impl(self) -> None: 

1111 if self._has_events or self.engine._has_events: 

1112 self.dispatch.rollback(self) 

1113 

1114 if self._still_open_and_dbapi_connection_is_valid: 

1115 if self._echo: 

1116 if self._is_autocommit_isolation(): 

1117 if self.dialect.skip_autocommit_rollback: 

1118 self._log_info( 

1119 "ROLLBACK will be skipped by " 

1120 "skip_autocommit_rollback" 

1121 ) 

1122 else: 

1123 self._log_info( 

1124 "ROLLBACK using DBAPI connection.rollback(); " 

1125 "set skip_autocommit_rollback to prevent fully" 

1126 ) 

1127 else: 

1128 self._log_info("ROLLBACK") 

1129 try: 

1130 self.engine.dialect.do_rollback(self.connection) 

1131 except BaseException as e: 

1132 self._handle_dbapi_exception(e, None, None, None, None) 

1133 

1134 def _commit_impl(self) -> None: 

1135 if self._has_events or self.engine._has_events: 

1136 self.dispatch.commit(self) 

1137 

1138 if self._echo: 

1139 if self._is_autocommit_isolation(): 

1140 self._log_info( 

1141 "COMMIT using DBAPI connection.commit(), " 

1142 "has no effect due to autocommit mode" 

1143 ) 

1144 else: 

1145 self._log_info("COMMIT") 

1146 try: 

1147 self.engine.dialect.do_commit(self.connection) 

1148 except BaseException as e: 

1149 self._handle_dbapi_exception(e, None, None, None, None) 

1150 

1151 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1152 if self._has_events or self.engine._has_events: 

1153 self.dispatch.savepoint(self, name) 

1154 

1155 if name is None: 

1156 self.__savepoint_seq += 1 

1157 name = "sa_savepoint_%s" % self.__savepoint_seq 

1158 self.engine.dialect.do_savepoint(self, name) 

1159 return name 

1160 

1161 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1162 if self._has_events or self.engine._has_events: 

1163 self.dispatch.rollback_savepoint(self, name, None) 

1164 

1165 if self._still_open_and_dbapi_connection_is_valid: 

1166 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1167 

1168 def _release_savepoint_impl(self, name: str) -> None: 

1169 if self._has_events or self.engine._has_events: 

1170 self.dispatch.release_savepoint(self, name, None) 

1171 

1172 self.engine.dialect.do_release_savepoint(self, name) 

1173 

1174 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1175 if self._echo: 

1176 self._log_info("BEGIN TWOPHASE (implicit)") 

1177 if self._has_events or self.engine._has_events: 

1178 self.dispatch.begin_twophase(self, transaction.xid) 

1179 

1180 self.__in_begin = True 

1181 try: 

1182 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1183 except BaseException as e: 

1184 self._handle_dbapi_exception(e, None, None, None, None) 

1185 finally: 

1186 self.__in_begin = False 

1187 

1188 def _prepare_twophase_impl(self, xid: Any) -> None: 

1189 if self._has_events or self.engine._has_events: 

1190 self.dispatch.prepare_twophase(self, xid) 

1191 

1192 assert isinstance(self._transaction, TwoPhaseTransaction) 

1193 try: 

1194 self.engine.dialect.do_prepare_twophase(self, xid) 

1195 except BaseException as e: 

1196 self._handle_dbapi_exception(e, None, None, None, None) 

1197 

1198 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1199 if self._has_events or self.engine._has_events: 

1200 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1201 

1202 if self._still_open_and_dbapi_connection_is_valid: 

1203 assert isinstance(self._transaction, TwoPhaseTransaction) 

1204 try: 

1205 self.engine.dialect.do_rollback_twophase( 

1206 self, xid, is_prepared 

1207 ) 

1208 except BaseException as e: 

1209 self._handle_dbapi_exception(e, None, None, None, None) 

1210 

1211 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1212 if self._has_events or self.engine._has_events: 

1213 self.dispatch.commit_twophase(self, xid, is_prepared) 

1214 

1215 assert isinstance(self._transaction, TwoPhaseTransaction) 

1216 try: 

1217 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1218 except BaseException as e: 

1219 self._handle_dbapi_exception(e, None, None, None, None) 

1220 

1221 def close(self) -> None: 

1222 """Close this :class:`_engine.Connection`. 

1223 

1224 This results in a release of the underlying database 

1225 resources, that is, the DBAPI connection referenced 

1226 internally. The DBAPI connection is typically restored 

1227 back to the connection-holding :class:`_pool.Pool` referenced 

1228 by the :class:`_engine.Engine` that produced this 

1229 :class:`_engine.Connection`. Any transactional state present on 

1230 the DBAPI connection is also unconditionally released via 

1231 the DBAPI connection's ``rollback()`` method, regardless 

1232 of any :class:`.Transaction` object that may be 

1233 outstanding with regards to this :class:`_engine.Connection`. 

1234 

1235 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1236 if any transaction is in place. 

1237 

1238 After :meth:`_engine.Connection.close` is called, the 

1239 :class:`_engine.Connection` is permanently in a closed state, 

1240 and will allow no further operations. 

1241 

1242 """ 

1243 

1244 if self._transaction: 

1245 self._transaction.close() 

1246 skip_reset = True 

1247 else: 

1248 skip_reset = False 

1249 

1250 if self._dbapi_connection is not None: 

1251 conn = self._dbapi_connection 

1252 

1253 # as we just closed the transaction, close the connection 

1254 # pool connection without doing an additional reset 

1255 if skip_reset: 

1256 cast("_ConnectionFairy", conn)._close_special( 

1257 transaction_reset=True 

1258 ) 

1259 else: 

1260 conn.close() 

1261 

1262 # There is a slight chance that conn.close() may have 

1263 # triggered an invalidation here in which case 

1264 # _dbapi_connection would already be None, however usually 

1265 # it will be non-None here and in a "closed" state. 

1266 self._dbapi_connection = None 

1267 self.__can_reconnect = False 

1268 

1269 @overload 

1270 def scalar( 

1271 self, 

1272 statement: TypedReturnsRows[Tuple[_T]], 

1273 parameters: Optional[_CoreSingleExecuteParams] = None, 

1274 *, 

1275 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1276 ) -> Optional[_T]: ... 

1277 

1278 @overload 

1279 def scalar( 

1280 self, 

1281 statement: Executable, 

1282 parameters: Optional[_CoreSingleExecuteParams] = None, 

1283 *, 

1284 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1285 ) -> Any: ... 

1286 

1287 def scalar( 

1288 self, 

1289 statement: Executable, 

1290 parameters: Optional[_CoreSingleExecuteParams] = None, 

1291 *, 

1292 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1293 ) -> Any: 

1294 r"""Executes a SQL statement construct and returns a scalar object. 

1295 

1296 This method is shorthand for invoking the 

1297 :meth:`_engine.Result.scalar` method after invoking the 

1298 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1299 

1300 :return: a scalar Python value representing the first column of the 

1301 first row returned. 

1302 

1303 """ 

1304 distilled_parameters = _distill_params_20(parameters) 

1305 try: 

1306 meth = statement._execute_on_scalar 

1307 except AttributeError as err: 

1308 raise exc.ObjectNotExecutableError(statement) from err 

1309 else: 

1310 return meth( 

1311 self, 

1312 distilled_parameters, 

1313 execution_options or NO_OPTIONS, 

1314 ) 

1315 

1316 @overload 

1317 def scalars( 

1318 self, 

1319 statement: TypedReturnsRows[Tuple[_T]], 

1320 parameters: Optional[_CoreAnyExecuteParams] = None, 

1321 *, 

1322 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1323 ) -> ScalarResult[_T]: ... 

1324 

1325 @overload 

1326 def scalars( 

1327 self, 

1328 statement: Executable, 

1329 parameters: Optional[_CoreAnyExecuteParams] = None, 

1330 *, 

1331 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1332 ) -> ScalarResult[Any]: ... 

1333 

1334 def scalars( 

1335 self, 

1336 statement: Executable, 

1337 parameters: Optional[_CoreAnyExecuteParams] = None, 

1338 *, 

1339 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1340 ) -> ScalarResult[Any]: 

1341 """Executes and returns a scalar result set, which yields scalar values 

1342 from the first column of each row. 

1343 

1344 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1345 to receive a :class:`_result.Result` object, then invoking the 

1346 :meth:`_result.Result.scalars` method to produce a 

1347 :class:`_result.ScalarResult` instance. 

1348 

1349 :return: a :class:`_result.ScalarResult` 

1350 

1351 .. versionadded:: 1.4.24 

1352 

1353 """ 

1354 

1355 return self.execute( 

1356 statement, parameters, execution_options=execution_options 

1357 ).scalars() 

1358 

1359 @overload 

1360 def execute( 

1361 self, 

1362 statement: TypedReturnsRows[_T], 

1363 parameters: Optional[_CoreAnyExecuteParams] = None, 

1364 *, 

1365 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1366 ) -> CursorResult[_T]: ... 

1367 

1368 @overload 

1369 def execute( 

1370 self, 

1371 statement: Executable, 

1372 parameters: Optional[_CoreAnyExecuteParams] = None, 

1373 *, 

1374 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1375 ) -> CursorResult[Any]: ... 

1376 

1377 def execute( 

1378 self, 

1379 statement: Executable, 

1380 parameters: Optional[_CoreAnyExecuteParams] = None, 

1381 *, 

1382 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1383 ) -> CursorResult[Any]: 

1384 r"""Executes a SQL statement construct and returns a 

1385 :class:`_engine.CursorResult`. 

1386 

1387 :param statement: The statement to be executed. This is always 

1388 an object that is in both the :class:`_expression.ClauseElement` and 

1389 :class:`_expression.Executable` hierarchies, including: 

1390 

1391 * :class:`_expression.Select` 

1392 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1393 :class:`_expression.Delete` 

1394 * :class:`_expression.TextClause` and 

1395 :class:`_expression.TextualSelect` 

1396 * :class:`_schema.DDL` and objects which inherit from 

1397 :class:`_schema.ExecutableDDLElement` 

1398 

1399 :param parameters: parameters which will be bound into the statement. 

1400 This may be either a dictionary of parameter names to values, 

1401 or a mutable sequence (e.g. a list) of dictionaries. When a 

1402 list of dictionaries is passed, the underlying statement execution 

1403 will make use of the DBAPI ``cursor.executemany()`` method. 

1404 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1405 method will be used. 

1406 

1407 :param execution_options: optional dictionary of execution options, 

1408 which will be associated with the statement execution. This 

1409 dictionary can provide a subset of the options that are accepted 

1410 by :meth:`_engine.Connection.execution_options`. 

1411 

1412 :return: a :class:`_engine.Result` object. 

1413 

1414 """ 

1415 distilled_parameters = _distill_params_20(parameters) 

1416 try: 

1417 meth = statement._execute_on_connection 

1418 except AttributeError as err: 

1419 raise exc.ObjectNotExecutableError(statement) from err 

1420 else: 

1421 return meth( 

1422 self, 

1423 distilled_parameters, 

1424 execution_options or NO_OPTIONS, 

1425 ) 

1426 

1427 def _execute_function( 

1428 self, 

1429 func: FunctionElement[Any], 

1430 distilled_parameters: _CoreMultiExecuteParams, 

1431 execution_options: CoreExecuteOptionsParameter, 

1432 ) -> CursorResult[Any]: 

1433 """Execute a sql.FunctionElement object.""" 

1434 

1435 return self._execute_clauseelement( 

1436 func.select(), distilled_parameters, execution_options 

1437 ) 

1438 

1439 def _execute_default( 

1440 self, 

1441 default: DefaultGenerator, 

1442 distilled_parameters: _CoreMultiExecuteParams, 

1443 execution_options: CoreExecuteOptionsParameter, 

1444 ) -> Any: 

1445 """Execute a schema.ColumnDefault object.""" 

1446 

1447 execution_options = self._execution_options.merge_with( 

1448 execution_options 

1449 ) 

1450 

1451 event_multiparams: Optional[_CoreMultiExecuteParams] 

1452 event_params: Optional[_CoreAnyExecuteParams] 

1453 

1454 # note for event handlers, the "distilled parameters" which is always 

1455 # a list of dicts is broken out into separate "multiparams" and 

1456 # "params" collections, which allows the handler to distinguish 

1457 # between an executemany and execute style set of parameters. 

1458 if self._has_events or self.engine._has_events: 

1459 ( 

1460 default, 

1461 distilled_parameters, 

1462 event_multiparams, 

1463 event_params, 

1464 ) = self._invoke_before_exec_event( 

1465 default, distilled_parameters, execution_options 

1466 ) 

1467 else: 

1468 event_multiparams = event_params = None 

1469 

1470 try: 

1471 conn = self._dbapi_connection 

1472 if conn is None: 

1473 conn = self._revalidate_connection() 

1474 

1475 dialect = self.dialect 

1476 ctx = dialect.execution_ctx_cls._init_default( 

1477 dialect, self, conn, execution_options 

1478 ) 

1479 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1480 raise 

1481 except BaseException as e: 

1482 self._handle_dbapi_exception(e, None, None, None, None) 

1483 

1484 ret = ctx._exec_default(None, default, None) 

1485 

1486 if self._has_events or self.engine._has_events: 

1487 self.dispatch.after_execute( 

1488 self, 

1489 default, 

1490 event_multiparams, 

1491 event_params, 

1492 execution_options, 

1493 ret, 

1494 ) 

1495 

1496 return ret 

1497 

1498 def _execute_ddl( 

1499 self, 

1500 ddl: ExecutableDDLElement, 

1501 distilled_parameters: _CoreMultiExecuteParams, 

1502 execution_options: CoreExecuteOptionsParameter, 

1503 ) -> CursorResult[Any]: 

1504 """Execute a schema.DDL object.""" 

1505 

1506 exec_opts = ddl._execution_options.merge_with( 

1507 self._execution_options, execution_options 

1508 ) 

1509 

1510 event_multiparams: Optional[_CoreMultiExecuteParams] 

1511 event_params: Optional[_CoreSingleExecuteParams] 

1512 

1513 if self._has_events or self.engine._has_events: 

1514 ( 

1515 ddl, 

1516 distilled_parameters, 

1517 event_multiparams, 

1518 event_params, 

1519 ) = self._invoke_before_exec_event( 

1520 ddl, distilled_parameters, exec_opts 

1521 ) 

1522 else: 

1523 event_multiparams = event_params = None 

1524 

1525 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1526 

1527 dialect = self.dialect 

1528 

1529 compiled = ddl.compile( 

1530 dialect=dialect, schema_translate_map=schema_translate_map 

1531 ) 

1532 ret = self._execute_context( 

1533 dialect, 

1534 dialect.execution_ctx_cls._init_ddl, 

1535 compiled, 

1536 None, 

1537 exec_opts, 

1538 compiled, 

1539 ) 

1540 if self._has_events or self.engine._has_events: 

1541 self.dispatch.after_execute( 

1542 self, 

1543 ddl, 

1544 event_multiparams, 

1545 event_params, 

1546 exec_opts, 

1547 ret, 

1548 ) 

1549 return ret 

1550 

1551 def _invoke_before_exec_event( 

1552 self, 

1553 elem: Any, 

1554 distilled_params: _CoreMultiExecuteParams, 

1555 execution_options: _ExecuteOptions, 

1556 ) -> Tuple[ 

1557 Any, 

1558 _CoreMultiExecuteParams, 

1559 _CoreMultiExecuteParams, 

1560 _CoreSingleExecuteParams, 

1561 ]: 

1562 event_multiparams: _CoreMultiExecuteParams 

1563 event_params: _CoreSingleExecuteParams 

1564 

1565 if len(distilled_params) == 1: 

1566 event_multiparams, event_params = [], distilled_params[0] 

1567 else: 

1568 event_multiparams, event_params = distilled_params, {} 

1569 

1570 for fn in self.dispatch.before_execute: 

1571 elem, event_multiparams, event_params = fn( 

1572 self, 

1573 elem, 

1574 event_multiparams, 

1575 event_params, 

1576 execution_options, 

1577 ) 

1578 

1579 if event_multiparams: 

1580 distilled_params = list(event_multiparams) 

1581 if event_params: 

1582 raise exc.InvalidRequestError( 

1583 "Event handler can't return non-empty multiparams " 

1584 "and params at the same time" 

1585 ) 

1586 elif event_params: 

1587 distilled_params = [event_params] 

1588 else: 

1589 distilled_params = [] 

1590 

1591 return elem, distilled_params, event_multiparams, event_params 

1592 

1593 def _execute_clauseelement( 

1594 self, 

1595 elem: Executable, 

1596 distilled_parameters: _CoreMultiExecuteParams, 

1597 execution_options: CoreExecuteOptionsParameter, 

1598 ) -> CursorResult[Any]: 

1599 """Execute a sql.ClauseElement object.""" 

1600 

1601 execution_options = elem._execution_options.merge_with( 

1602 self._execution_options, execution_options 

1603 ) 

1604 

1605 has_events = self._has_events or self.engine._has_events 

1606 if has_events: 

1607 ( 

1608 elem, 

1609 distilled_parameters, 

1610 event_multiparams, 

1611 event_params, 

1612 ) = self._invoke_before_exec_event( 

1613 elem, distilled_parameters, execution_options 

1614 ) 

1615 

1616 if distilled_parameters: 

1617 # ensure we don't retain a link to the view object for keys() 

1618 # which links to the values, which we don't want to cache 

1619 keys = sorted(distilled_parameters[0]) 

1620 for_executemany = len(distilled_parameters) > 1 

1621 else: 

1622 keys = [] 

1623 for_executemany = False 

1624 

1625 dialect = self.dialect 

1626 

1627 schema_translate_map = execution_options.get( 

1628 "schema_translate_map", None 

1629 ) 

1630 

1631 compiled_cache: Optional[CompiledCacheType] = execution_options.get( 

1632 "compiled_cache", self.engine._compiled_cache 

1633 ) 

1634 

1635 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 

1636 dialect=dialect, 

1637 compiled_cache=compiled_cache, 

1638 column_keys=keys, 

1639 for_executemany=for_executemany, 

1640 schema_translate_map=schema_translate_map, 

1641 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1642 ) 

1643 ret = self._execute_context( 

1644 dialect, 

1645 dialect.execution_ctx_cls._init_compiled, 

1646 compiled_sql, 

1647 distilled_parameters, 

1648 execution_options, 

1649 compiled_sql, 

1650 distilled_parameters, 

1651 elem, 

1652 extracted_params, 

1653 cache_hit=cache_hit, 

1654 ) 

1655 if has_events: 

1656 self.dispatch.after_execute( 

1657 self, 

1658 elem, 

1659 event_multiparams, 

1660 event_params, 

1661 execution_options, 

1662 ret, 

1663 ) 

1664 return ret 

1665 

1666 def _execute_compiled( 

1667 self, 

1668 compiled: Compiled, 

1669 distilled_parameters: _CoreMultiExecuteParams, 

1670 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS, 

1671 ) -> CursorResult[Any]: 

1672 """Execute a sql.Compiled object. 

1673 

1674 TODO: why do we have this? likely deprecate or remove 

1675 

1676 """ 

1677 

1678 execution_options = compiled.execution_options.merge_with( 

1679 self._execution_options, execution_options 

1680 ) 

1681 

1682 if self._has_events or self.engine._has_events: 

1683 ( 

1684 compiled, 

1685 distilled_parameters, 

1686 event_multiparams, 

1687 event_params, 

1688 ) = self._invoke_before_exec_event( 

1689 compiled, distilled_parameters, execution_options 

1690 ) 

1691 

1692 dialect = self.dialect 

1693 

1694 ret = self._execute_context( 

1695 dialect, 

1696 dialect.execution_ctx_cls._init_compiled, 

1697 compiled, 

1698 distilled_parameters, 

1699 execution_options, 

1700 compiled, 

1701 distilled_parameters, 

1702 None, 

1703 None, 

1704 ) 

1705 if self._has_events or self.engine._has_events: 

1706 self.dispatch.after_execute( 

1707 self, 

1708 compiled, 

1709 event_multiparams, 

1710 event_params, 

1711 execution_options, 

1712 ret, 

1713 ) 

1714 return ret 

1715 

1716 def exec_driver_sql( 

1717 self, 

1718 statement: str, 

1719 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1720 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1721 ) -> CursorResult[Any]: 

1722 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1723 without any SQL compilation steps. 

1724 

1725 This can be used to pass any string directly to the 

1726 ``cursor.execute()`` method of the DBAPI in use. 

1727 

1728 :param statement: The statement str to be executed. Bound parameters 

1729 must use the underlying DBAPI's paramstyle, such as "qmark", 

1730 "pyformat", "format", etc. 

1731 

1732 :param parameters: represent bound parameter values to be used in the 

1733 execution. The format is one of: a dictionary of named parameters, 

1734 a tuple of positional parameters, or a list containing either 

1735 dictionaries or tuples for multiple-execute support. 

1736 

1737 :return: a :class:`_engine.CursorResult`. 

1738 

1739 E.g. multiple dictionaries:: 

1740 

1741 

1742 conn.exec_driver_sql( 

1743 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1744 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}], 

1745 ) 

1746 

1747 Single dictionary:: 

1748 

1749 conn.exec_driver_sql( 

1750 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1751 dict(id=1, value="v1"), 

1752 ) 

1753 

1754 Single tuple:: 

1755 

1756 conn.exec_driver_sql( 

1757 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1") 

1758 ) 

1759 

1760 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1761 not participate in the 

1762 :meth:`_events.ConnectionEvents.before_execute` and 

1763 :meth:`_events.ConnectionEvents.after_execute` events. To 

1764 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1765 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1766 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1767 

1768 .. seealso:: 

1769 

1770 :pep:`249` 

1771 

1772 """ 

1773 

1774 distilled_parameters = _distill_raw_params(parameters) 

1775 

1776 execution_options = self._execution_options.merge_with( 

1777 execution_options 

1778 ) 

1779 

1780 dialect = self.dialect 

1781 ret = self._execute_context( 

1782 dialect, 

1783 dialect.execution_ctx_cls._init_statement, 

1784 statement, 

1785 None, 

1786 execution_options, 

1787 statement, 

1788 distilled_parameters, 

1789 ) 

1790 

1791 return ret 

1792 

1793 def _execute_context( 

1794 self, 

1795 dialect: Dialect, 

1796 constructor: Callable[..., ExecutionContext], 

1797 statement: Union[str, Compiled], 

1798 parameters: Optional[_AnyMultiExecuteParams], 

1799 execution_options: _ExecuteOptions, 

1800 *args: Any, 

1801 **kw: Any, 

1802 ) -> CursorResult[Any]: 

1803 """Create an :class:`.ExecutionContext` and execute, returning 

1804 a :class:`_engine.CursorResult`.""" 

1805 

1806 if execution_options: 

1807 yp = execution_options.get("yield_per", None) 

1808 if yp: 

1809 execution_options = execution_options.union( 

1810 {"stream_results": True, "max_row_buffer": yp} 

1811 ) 

1812 try: 

1813 conn = self._dbapi_connection 

1814 if conn is None: 

1815 conn = self._revalidate_connection() 

1816 

1817 context = constructor( 

1818 dialect, self, conn, execution_options, *args, **kw 

1819 ) 

1820 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1821 raise 

1822 except BaseException as e: 

1823 self._handle_dbapi_exception( 

1824 e, str(statement), parameters, None, None 

1825 ) 

1826 

1827 if ( 

1828 self._transaction 

1829 and not self._transaction.is_active 

1830 or ( 

1831 self._nested_transaction 

1832 and not self._nested_transaction.is_active 

1833 ) 

1834 ): 

1835 self._invalid_transaction() 

1836 

1837 elif self._trans_context_manager: 

1838 TransactionalContext._trans_ctx_check(self) 

1839 

1840 if self._transaction is None: 

1841 self._autobegin() 

1842 

1843 context.pre_exec() 

1844 

1845 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1846 return self._exec_insertmany_context(dialect, context) 

1847 else: 

1848 return self._exec_single_context( 

1849 dialect, context, statement, parameters 

1850 ) 

1851 

1852 def _exec_single_context( 

1853 self, 

1854 dialect: Dialect, 

1855 context: ExecutionContext, 

1856 statement: Union[str, Compiled], 

1857 parameters: Optional[_AnyMultiExecuteParams], 

1858 ) -> CursorResult[Any]: 

1859 """continue the _execute_context() method for a single DBAPI 

1860 cursor.execute() or cursor.executemany() call. 

1861 

1862 """ 

1863 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1864 generic_setinputsizes = context._prepare_set_input_sizes() 

1865 

1866 if generic_setinputsizes: 

1867 try: 

1868 dialect.do_set_input_sizes( 

1869 context.cursor, generic_setinputsizes, context 

1870 ) 

1871 except BaseException as e: 

1872 self._handle_dbapi_exception( 

1873 e, str(statement), parameters, None, context 

1874 ) 

1875 

1876 cursor, str_statement, parameters = ( 

1877 context.cursor, 

1878 context.statement, 

1879 context.parameters, 

1880 ) 

1881 

1882 effective_parameters: Optional[_AnyExecuteParams] 

1883 

1884 if not context.executemany: 

1885 effective_parameters = parameters[0] 

1886 else: 

1887 effective_parameters = parameters 

1888 

1889 if self._has_events or self.engine._has_events: 

1890 for fn in self.dispatch.before_cursor_execute: 

1891 str_statement, effective_parameters = fn( 

1892 self, 

1893 cursor, 

1894 str_statement, 

1895 effective_parameters, 

1896 context, 

1897 context.executemany, 

1898 ) 

1899 

1900 if self._echo: 

1901 self._log_info(str_statement) 

1902 

1903 stats = context._get_cache_stats() 

1904 

1905 if not self.engine.hide_parameters: 

1906 self._log_info( 

1907 "[%s] %r", 

1908 stats, 

1909 sql_util._repr_params( 

1910 effective_parameters, 

1911 batches=10, 

1912 ismulti=context.executemany, 

1913 ), 

1914 ) 

1915 else: 

1916 self._log_info( 

1917 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1918 stats, 

1919 ) 

1920 

1921 evt_handled: bool = False 

1922 try: 

1923 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1924 effective_parameters = cast( 

1925 "_CoreMultiExecuteParams", effective_parameters 

1926 ) 

1927 if self.dialect._has_events: 

1928 for fn in self.dialect.dispatch.do_executemany: 

1929 if fn( 

1930 cursor, 

1931 str_statement, 

1932 effective_parameters, 

1933 context, 

1934 ): 

1935 evt_handled = True 

1936 break 

1937 if not evt_handled: 

1938 self.dialect.do_executemany( 

1939 cursor, 

1940 str_statement, 

1941 effective_parameters, 

1942 context, 

1943 ) 

1944 elif not effective_parameters and context.no_parameters: 

1945 if self.dialect._has_events: 

1946 for fn in self.dialect.dispatch.do_execute_no_params: 

1947 if fn(cursor, str_statement, context): 

1948 evt_handled = True 

1949 break 

1950 if not evt_handled: 

1951 self.dialect.do_execute_no_params( 

1952 cursor, str_statement, context 

1953 ) 

1954 else: 

1955 effective_parameters = cast( 

1956 "_CoreSingleExecuteParams", effective_parameters 

1957 ) 

1958 if self.dialect._has_events: 

1959 for fn in self.dialect.dispatch.do_execute: 

1960 if fn( 

1961 cursor, 

1962 str_statement, 

1963 effective_parameters, 

1964 context, 

1965 ): 

1966 evt_handled = True 

1967 break 

1968 if not evt_handled: 

1969 self.dialect.do_execute( 

1970 cursor, str_statement, effective_parameters, context 

1971 ) 

1972 

1973 if self._has_events or self.engine._has_events: 

1974 self.dispatch.after_cursor_execute( 

1975 self, 

1976 cursor, 

1977 str_statement, 

1978 effective_parameters, 

1979 context, 

1980 context.executemany, 

1981 ) 

1982 

1983 context.post_exec() 

1984 

1985 result = context._setup_result_proxy() 

1986 

1987 except BaseException as e: 

1988 self._handle_dbapi_exception( 

1989 e, str_statement, effective_parameters, cursor, context 

1990 ) 

1991 

1992 return result 

1993 

1994 def _exec_insertmany_context( 

1995 self, 

1996 dialect: Dialect, 

1997 context: ExecutionContext, 

1998 ) -> CursorResult[Any]: 

1999 """continue the _execute_context() method for an "insertmanyvalues" 

2000 operation, which will invoke DBAPI 

2001 cursor.execute() one or more times with individual log and 

2002 event hook calls. 

2003 

2004 """ 

2005 

2006 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

2007 generic_setinputsizes = context._prepare_set_input_sizes() 

2008 else: 

2009 generic_setinputsizes = None 

2010 

2011 cursor, str_statement, parameters = ( 

2012 context.cursor, 

2013 context.statement, 

2014 context.parameters, 

2015 ) 

2016 

2017 effective_parameters = parameters 

2018 

2019 engine_events = self._has_events or self.engine._has_events 

2020 if self.dialect._has_events: 

2021 do_execute_dispatch: Iterable[Any] = ( 

2022 self.dialect.dispatch.do_execute 

2023 ) 

2024 else: 

2025 do_execute_dispatch = () 

2026 

2027 if engine_events: 

2028 _WORKAROUND_ISSUE_13018 = getattr( 

2029 self, "_WORKAROUND_ISSUE_13018", False 

2030 ) 

2031 else: 

2032 _WORKAROUND_ISSUE_13018 = False 

2033 

2034 if self._echo: 

2035 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2036 

2037 preserve_rowcount = context.execution_options.get( 

2038 "preserve_rowcount", False 

2039 ) 

2040 rowcount = 0 

2041 

2042 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2043 self, 

2044 cursor, 

2045 str_statement, 

2046 effective_parameters, 

2047 generic_setinputsizes, 

2048 context, 

2049 ): 

2050 if imv_batch.processed_setinputsizes: 

2051 try: 

2052 dialect.do_set_input_sizes( 

2053 context.cursor, 

2054 imv_batch.processed_setinputsizes, 

2055 context, 

2056 ) 

2057 except BaseException as e: 

2058 self._handle_dbapi_exception( 

2059 e, 

2060 sql_util._long_statement(imv_batch.replaced_statement), 

2061 imv_batch.replaced_parameters, 

2062 None, 

2063 context, 

2064 is_sub_exec=True, 

2065 ) 

2066 

2067 sub_stmt = imv_batch.replaced_statement 

2068 sub_params = imv_batch.replaced_parameters 

2069 

2070 if engine_events: 

2071 for fn in self.dispatch.before_cursor_execute: 

2072 sub_stmt, sub_params = fn( 

2073 self, 

2074 cursor, 

2075 sub_stmt, 

2076 sub_params, 

2077 context, 

2078 True, 

2079 ) 

2080 

2081 if self._echo: 

2082 self._log_info(sql_util._long_statement(sub_stmt)) 

2083 

2084 imv_stats = f""" {imv_batch.batchnum}/{ 

2085 imv_batch.total_batches 

2086 } ({ 

2087 'ordered' 

2088 if imv_batch.rows_sorted else 'unordered' 

2089 }{ 

2090 '; batch not supported' 

2091 if imv_batch.is_downgraded 

2092 else '' 

2093 })""" 

2094 

2095 if imv_batch.batchnum == 1: 

2096 stats += imv_stats 

2097 else: 

2098 stats = f"insertmanyvalues{imv_stats}" 

2099 

2100 if not self.engine.hide_parameters: 

2101 self._log_info( 

2102 "[%s] %r", 

2103 stats, 

2104 sql_util._repr_params( 

2105 sub_params, 

2106 batches=10, 

2107 ismulti=False, 

2108 ), 

2109 ) 

2110 else: 

2111 self._log_info( 

2112 "[%s] [SQL parameters hidden due to " 

2113 "hide_parameters=True]", 

2114 stats, 

2115 ) 

2116 

2117 try: 

2118 for fn in do_execute_dispatch: 

2119 if fn( 

2120 cursor, 

2121 sub_stmt, 

2122 sub_params, 

2123 context, 

2124 ): 

2125 break 

2126 else: 

2127 dialect.do_execute( 

2128 cursor, 

2129 sub_stmt, 

2130 sub_params, 

2131 context, 

2132 ) 

2133 

2134 except BaseException as e: 

2135 self._handle_dbapi_exception( 

2136 e, 

2137 sql_util._long_statement(sub_stmt), 

2138 sub_params, 

2139 cursor, 

2140 context, 

2141 is_sub_exec=True, 

2142 ) 

2143 

2144 if engine_events: 

2145 self.dispatch.after_cursor_execute( 

2146 self, 

2147 cursor, 

2148 # TODO: this will be fixed by #13018 

2149 sub_stmt if _WORKAROUND_ISSUE_13018 else str_statement, 

2150 sub_params if _WORKAROUND_ISSUE_13018 else parameters, 

2151 context, 

2152 context.executemany, 

2153 ) 

2154 

2155 if preserve_rowcount: 

2156 rowcount += imv_batch.current_batch_size 

2157 

2158 try: 

2159 context.post_exec() 

2160 

2161 if preserve_rowcount: 

2162 context._rowcount = rowcount # type: ignore[attr-defined] 

2163 

2164 result = context._setup_result_proxy() 

2165 

2166 except BaseException as e: 

2167 self._handle_dbapi_exception( 

2168 e, str_statement, effective_parameters, cursor, context 

2169 ) 

2170 

2171 return result 

2172 

2173 def _cursor_execute( 

2174 self, 

2175 cursor: DBAPICursor, 

2176 statement: str, 

2177 parameters: _DBAPISingleExecuteParams, 

2178 context: Optional[ExecutionContext] = None, 

2179 ) -> None: 

2180 """Execute a statement + params on the given cursor. 

2181 

2182 Adds appropriate logging and exception handling. 

2183 

2184 This method is used by DefaultDialect for special-case 

2185 executions, such as for sequences and column defaults. 

2186 The path of statement execution in the majority of cases 

2187 terminates at _execute_context(). 

2188 

2189 """ 

2190 if self._has_events or self.engine._has_events: 

2191 for fn in self.dispatch.before_cursor_execute: 

2192 statement, parameters = fn( 

2193 self, cursor, statement, parameters, context, False 

2194 ) 

2195 

2196 if self._echo: 

2197 self._log_info(statement) 

2198 self._log_info("[raw sql] %r", parameters) 

2199 try: 

2200 for fn in ( 

2201 () 

2202 if not self.dialect._has_events 

2203 else self.dialect.dispatch.do_execute 

2204 ): 

2205 if fn(cursor, statement, parameters, context): 

2206 break 

2207 else: 

2208 self.dialect.do_execute(cursor, statement, parameters, context) 

2209 except BaseException as e: 

2210 self._handle_dbapi_exception( 

2211 e, statement, parameters, cursor, context 

2212 ) 

2213 

2214 if self._has_events or self.engine._has_events: 

2215 self.dispatch.after_cursor_execute( 

2216 self, cursor, statement, parameters, context, False 

2217 ) 

2218 

2219 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2220 """Close the given cursor, catching exceptions 

2221 and turning into log warnings. 

2222 

2223 """ 

2224 try: 

2225 cursor.close() 

2226 except Exception: 

2227 # log the error through the connection pool's logger. 

2228 self.engine.pool.logger.error( 

2229 "Error closing cursor", exc_info=True 

2230 ) 

2231 

2232 _reentrant_error = False 

2233 _is_disconnect = False 

2234 

2235 def _handle_dbapi_exception( 

2236 self, 

2237 e: BaseException, 

2238 statement: Optional[str], 

2239 parameters: Optional[_AnyExecuteParams], 

2240 cursor: Optional[DBAPICursor], 

2241 context: Optional[ExecutionContext], 

2242 is_sub_exec: bool = False, 

2243 ) -> NoReturn: 

2244 exc_info = sys.exc_info() 

2245 

2246 is_exit_exception = util.is_exit_exception(e) 

2247 

2248 if not self._is_disconnect: 

2249 self._is_disconnect = ( 

2250 isinstance(e, self.dialect.loaded_dbapi.Error) 

2251 and not self.closed 

2252 and self.dialect.is_disconnect( 

2253 e, 

2254 self._dbapi_connection if not self.invalidated else None, 

2255 cursor, 

2256 ) 

2257 ) or (is_exit_exception and not self.closed) 

2258 

2259 invalidate_pool_on_disconnect = not is_exit_exception 

2260 

2261 ismulti: bool = ( 

2262 not is_sub_exec and context.executemany 

2263 if context is not None 

2264 else False 

2265 ) 

2266 if self._reentrant_error: 

2267 raise exc.DBAPIError.instance( 

2268 statement, 

2269 parameters, 

2270 e, 

2271 self.dialect.loaded_dbapi.Error, 

2272 hide_parameters=self.engine.hide_parameters, 

2273 dialect=self.dialect, 

2274 ismulti=ismulti, 

2275 ).with_traceback(exc_info[2]) from e 

2276 self._reentrant_error = True 

2277 try: 

2278 # non-DBAPI error - if we already got a context, 

2279 # or there's no string statement, don't wrap it 

2280 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2281 statement is not None 

2282 and context is None 

2283 and not is_exit_exception 

2284 ) 

2285 

2286 if should_wrap: 

2287 sqlalchemy_exception = exc.DBAPIError.instance( 

2288 statement, 

2289 parameters, 

2290 cast(Exception, e), 

2291 self.dialect.loaded_dbapi.Error, 

2292 hide_parameters=self.engine.hide_parameters, 

2293 connection_invalidated=self._is_disconnect, 

2294 dialect=self.dialect, 

2295 ismulti=ismulti, 

2296 ) 

2297 else: 

2298 sqlalchemy_exception = None 

2299 

2300 newraise = None 

2301 

2302 if (self.dialect._has_events) and not self._execution_options.get( 

2303 "skip_user_error_events", False 

2304 ): 

2305 ctx = ExceptionContextImpl( 

2306 e, 

2307 sqlalchemy_exception, 

2308 self.engine, 

2309 self.dialect, 

2310 self, 

2311 cursor, 

2312 statement, 

2313 parameters, 

2314 context, 

2315 self._is_disconnect, 

2316 invalidate_pool_on_disconnect, 

2317 False, 

2318 ) 

2319 

2320 for fn in self.dialect.dispatch.handle_error: 

2321 try: 

2322 # handler returns an exception; 

2323 # call next handler in a chain 

2324 per_fn = fn(ctx) 

2325 if per_fn is not None: 

2326 ctx.chained_exception = newraise = per_fn 

2327 except Exception as _raised: 

2328 # handler raises an exception - stop processing 

2329 newraise = _raised 

2330 break 

2331 

2332 if self._is_disconnect != ctx.is_disconnect: 

2333 self._is_disconnect = ctx.is_disconnect 

2334 if sqlalchemy_exception: 

2335 sqlalchemy_exception.connection_invalidated = ( 

2336 ctx.is_disconnect 

2337 ) 

2338 

2339 # set up potentially user-defined value for 

2340 # invalidate pool. 

2341 invalidate_pool_on_disconnect = ( 

2342 ctx.invalidate_pool_on_disconnect 

2343 ) 

2344 

2345 if should_wrap and context: 

2346 context.handle_dbapi_exception(e) 

2347 

2348 if not self._is_disconnect: 

2349 if cursor: 

2350 self._safe_close_cursor(cursor) 

2351 # "autorollback" was mostly relevant in 1.x series. 

2352 # It's very unlikely to reach here, as the connection 

2353 # does autobegin so when we are here, we are usually 

2354 # in an explicit / semi-explicit transaction. 

2355 # however we have a test which manufactures this 

2356 # scenario in any case using an event handler. 

2357 # test/engine/test_execute.py-> test_actual_autorollback 

2358 if not self.in_transaction(): 

2359 self._rollback_impl() 

2360 

2361 if newraise: 

2362 raise newraise.with_traceback(exc_info[2]) from e 

2363 elif should_wrap: 

2364 assert sqlalchemy_exception is not None 

2365 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2366 else: 

2367 assert exc_info[1] is not None 

2368 raise exc_info[1].with_traceback(exc_info[2]) 

2369 finally: 

2370 del self._reentrant_error 

2371 if self._is_disconnect: 

2372 del self._is_disconnect 

2373 if not self.invalidated: 

2374 dbapi_conn_wrapper = self._dbapi_connection 

2375 assert dbapi_conn_wrapper is not None 

2376 if invalidate_pool_on_disconnect: 

2377 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2378 self.invalidate(e) 

2379 

2380 @classmethod 

2381 def _handle_dbapi_exception_noconnection( 

2382 cls, 

2383 e: BaseException, 

2384 dialect: Dialect, 

2385 engine: Optional[Engine] = None, 

2386 is_disconnect: Optional[bool] = None, 

2387 invalidate_pool_on_disconnect: bool = True, 

2388 is_pre_ping: bool = False, 

2389 ) -> NoReturn: 

2390 exc_info = sys.exc_info() 

2391 

2392 if is_disconnect is None: 

2393 is_disconnect = isinstance( 

2394 e, dialect.loaded_dbapi.Error 

2395 ) and dialect.is_disconnect(e, None, None) 

2396 

2397 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2398 

2399 if should_wrap: 

2400 sqlalchemy_exception = exc.DBAPIError.instance( 

2401 None, 

2402 None, 

2403 cast(Exception, e), 

2404 dialect.loaded_dbapi.Error, 

2405 hide_parameters=( 

2406 engine.hide_parameters if engine is not None else False 

2407 ), 

2408 connection_invalidated=is_disconnect, 

2409 dialect=dialect, 

2410 ) 

2411 else: 

2412 sqlalchemy_exception = None 

2413 

2414 newraise = None 

2415 

2416 if dialect._has_events: 

2417 ctx = ExceptionContextImpl( 

2418 e, 

2419 sqlalchemy_exception, 

2420 engine, 

2421 dialect, 

2422 None, 

2423 None, 

2424 None, 

2425 None, 

2426 None, 

2427 is_disconnect, 

2428 invalidate_pool_on_disconnect, 

2429 is_pre_ping, 

2430 ) 

2431 for fn in dialect.dispatch.handle_error: 

2432 try: 

2433 # handler returns an exception; 

2434 # call next handler in a chain 

2435 per_fn = fn(ctx) 

2436 if per_fn is not None: 

2437 ctx.chained_exception = newraise = per_fn 

2438 except Exception as _raised: 

2439 # handler raises an exception - stop processing 

2440 newraise = _raised 

2441 break 

2442 

2443 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2444 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect 

2445 

2446 if newraise: 

2447 raise newraise.with_traceback(exc_info[2]) from e 

2448 elif should_wrap: 

2449 assert sqlalchemy_exception is not None 

2450 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2451 else: 

2452 assert exc_info[1] is not None 

2453 raise exc_info[1].with_traceback(exc_info[2]) 

2454 

2455 def _run_ddl_visitor( 

2456 self, 

2457 visitorcallable: Type[InvokeDDLBase], 

2458 element: SchemaVisitable, 

2459 **kwargs: Any, 

2460 ) -> None: 

2461 """run a DDL visitor. 

2462 

2463 This method is only here so that the MockConnection can change the 

2464 options given to the visitor so that "checkfirst" is skipped. 

2465 

2466 """ 

2467 visitorcallable( 

2468 dialect=self.dialect, connection=self, **kwargs 

2469 ).traverse_single(element) 

2470 

2471 

2472class ExceptionContextImpl(ExceptionContext): 

2473 """Implement the :class:`.ExceptionContext` interface.""" 

2474 

2475 __slots__ = ( 

2476 "connection", 

2477 "engine", 

2478 "dialect", 

2479 "cursor", 

2480 "statement", 

2481 "parameters", 

2482 "original_exception", 

2483 "sqlalchemy_exception", 

2484 "chained_exception", 

2485 "execution_context", 

2486 "is_disconnect", 

2487 "invalidate_pool_on_disconnect", 

2488 "is_pre_ping", 

2489 ) 

2490 

2491 def __init__( 

2492 self, 

2493 exception: BaseException, 

2494 sqlalchemy_exception: Optional[exc.StatementError], 

2495 engine: Optional[Engine], 

2496 dialect: Dialect, 

2497 connection: Optional[Connection], 

2498 cursor: Optional[DBAPICursor], 

2499 statement: Optional[str], 

2500 parameters: Optional[_DBAPIAnyExecuteParams], 

2501 context: Optional[ExecutionContext], 

2502 is_disconnect: bool, 

2503 invalidate_pool_on_disconnect: bool, 

2504 is_pre_ping: bool, 

2505 ): 

2506 self.engine = engine 

2507 self.dialect = dialect 

2508 self.connection = connection 

2509 self.sqlalchemy_exception = sqlalchemy_exception 

2510 self.original_exception = exception 

2511 self.execution_context = context 

2512 self.statement = statement 

2513 self.parameters = parameters 

2514 self.is_disconnect = is_disconnect 

2515 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2516 self.is_pre_ping = is_pre_ping 

2517 

2518 

2519class Transaction(TransactionalContext): 

2520 """Represent a database transaction in progress. 

2521 

2522 The :class:`.Transaction` object is procured by 

2523 calling the :meth:`_engine.Connection.begin` method of 

2524 :class:`_engine.Connection`:: 

2525 

2526 from sqlalchemy import create_engine 

2527 

2528 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2529 connection = engine.connect() 

2530 trans = connection.begin() 

2531 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2532 trans.commit() 

2533 

2534 The object provides :meth:`.rollback` and :meth:`.commit` 

2535 methods in order to control transaction boundaries. It 

2536 also implements a context manager interface so that 

2537 the Python ``with`` statement can be used with the 

2538 :meth:`_engine.Connection.begin` method:: 

2539 

2540 with connection.begin(): 

2541 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2542 

2543 The Transaction object is **not** threadsafe. 

2544 

2545 .. seealso:: 

2546 

2547 :meth:`_engine.Connection.begin` 

2548 

2549 :meth:`_engine.Connection.begin_twophase` 

2550 

2551 :meth:`_engine.Connection.begin_nested` 

2552 

2553 .. index:: 

2554 single: thread safety; Transaction 

2555 """ # noqa 

2556 

2557 __slots__ = () 

2558 

2559 _is_root: bool = False 

2560 is_active: bool 

2561 connection: Connection 

2562 

2563 def __init__(self, connection: Connection): 

2564 raise NotImplementedError() 

2565 

2566 @property 

2567 def _deactivated_from_connection(self) -> bool: 

2568 """True if this transaction is totally deactivated from the connection 

2569 and therefore can no longer affect its state. 

2570 

2571 """ 

2572 raise NotImplementedError() 

2573 

2574 def _do_close(self) -> None: 

2575 raise NotImplementedError() 

2576 

2577 def _do_rollback(self) -> None: 

2578 raise NotImplementedError() 

2579 

2580 def _do_commit(self) -> None: 

2581 raise NotImplementedError() 

2582 

2583 @property 

2584 def is_valid(self) -> bool: 

2585 return self.is_active and not self.connection.invalidated 

2586 

2587 def close(self) -> None: 

2588 """Close this :class:`.Transaction`. 

2589 

2590 If this transaction is the base transaction in a begin/commit 

2591 nesting, the transaction will rollback(). Otherwise, the 

2592 method returns. 

2593 

2594 This is used to cancel a Transaction without affecting the scope of 

2595 an enclosing transaction. 

2596 

2597 """ 

2598 try: 

2599 self._do_close() 

2600 finally: 

2601 assert not self.is_active 

2602 

2603 def rollback(self) -> None: 

2604 """Roll back this :class:`.Transaction`. 

2605 

2606 The implementation of this may vary based on the type of transaction in 

2607 use: 

2608 

2609 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2610 it corresponds to a ROLLBACK. 

2611 

2612 * For a :class:`.NestedTransaction`, it corresponds to a 

2613 "ROLLBACK TO SAVEPOINT" operation. 

2614 

2615 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2616 phase transactions may be used. 

2617 

2618 

2619 """ 

2620 try: 

2621 self._do_rollback() 

2622 finally: 

2623 assert not self.is_active 

2624 

2625 def commit(self) -> None: 

2626 """Commit this :class:`.Transaction`. 

2627 

2628 The implementation of this may vary based on the type of transaction in 

2629 use: 

2630 

2631 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2632 it corresponds to a COMMIT. 

2633 

2634 * For a :class:`.NestedTransaction`, it corresponds to a 

2635 "RELEASE SAVEPOINT" operation. 

2636 

2637 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2638 phase transactions may be used. 

2639 

2640 """ 

2641 try: 

2642 self._do_commit() 

2643 finally: 

2644 assert not self.is_active 

2645 

2646 def _get_subject(self) -> Connection: 

2647 return self.connection 

2648 

2649 def _transaction_is_active(self) -> bool: 

2650 return self.is_active 

2651 

2652 def _transaction_is_closed(self) -> bool: 

2653 return not self._deactivated_from_connection 

2654 

2655 def _rollback_can_be_called(self) -> bool: 

2656 # for RootTransaction / NestedTransaction, it's safe to call 

2657 # rollback() even if the transaction is deactive and no warnings 

2658 # will be emitted. tested in 

2659 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2660 return True 

2661 

2662 

2663class RootTransaction(Transaction): 

2664 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2665 

2666 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2667 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2668 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2669 remains associated with the :class:`_engine.Connection` throughout its 

2670 active span. The current :class:`_engine.RootTransaction` in use is 

2671 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2672 :class:`_engine.Connection`. 

2673 

2674 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2675 "autobegin" behavior that will create a new 

2676 :class:`_engine.RootTransaction` whenever a connection in a 

2677 non-transactional state is used to emit commands on the DBAPI connection. 

2678 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2679 use can be controlled using the :meth:`_engine.Connection.commit` and 

2680 :meth:`_engine.Connection.rollback` methods. 

2681 

2682 

2683 """ 

2684 

2685 _is_root = True 

2686 

2687 __slots__ = ("connection", "is_active") 

2688 

2689 def __init__(self, connection: Connection): 

2690 assert connection._transaction is None 

2691 if connection._trans_context_manager: 

2692 TransactionalContext._trans_ctx_check(connection) 

2693 self.connection = connection 

2694 self._connection_begin_impl() 

2695 connection._transaction = self 

2696 

2697 self.is_active = True 

2698 

2699 def _deactivate_from_connection(self) -> None: 

2700 if self.is_active: 

2701 assert self.connection._transaction is self 

2702 self.is_active = False 

2703 

2704 elif self.connection._transaction is not self: 

2705 util.warn("transaction already deassociated from connection") 

2706 

2707 @property 

2708 def _deactivated_from_connection(self) -> bool: 

2709 return self.connection._transaction is not self 

2710 

2711 def _connection_begin_impl(self) -> None: 

2712 self.connection._begin_impl(self) 

2713 

2714 def _connection_rollback_impl(self) -> None: 

2715 self.connection._rollback_impl() 

2716 

2717 def _connection_commit_impl(self) -> None: 

2718 self.connection._commit_impl() 

2719 

2720 def _close_impl(self, try_deactivate: bool = False) -> None: 

2721 try: 

2722 if self.is_active: 

2723 self._connection_rollback_impl() 

2724 

2725 if self.connection._nested_transaction: 

2726 self.connection._nested_transaction._cancel() 

2727 finally: 

2728 if self.is_active or try_deactivate: 

2729 self._deactivate_from_connection() 

2730 if self.connection._transaction is self: 

2731 self.connection._transaction = None 

2732 

2733 assert not self.is_active 

2734 assert self.connection._transaction is not self 

2735 

2736 def _do_close(self) -> None: 

2737 self._close_impl() 

2738 

2739 def _do_rollback(self) -> None: 

2740 self._close_impl(try_deactivate=True) 

2741 

2742 def _do_commit(self) -> None: 

2743 if self.is_active: 

2744 assert self.connection._transaction is self 

2745 

2746 try: 

2747 self._connection_commit_impl() 

2748 finally: 

2749 # whether or not commit succeeds, cancel any 

2750 # nested transactions, make this transaction "inactive" 

2751 # and remove it as a reset agent 

2752 if self.connection._nested_transaction: 

2753 self.connection._nested_transaction._cancel() 

2754 

2755 self._deactivate_from_connection() 

2756 

2757 # ...however only remove as the connection's current transaction 

2758 # if commit succeeded. otherwise it stays on so that a rollback 

2759 # needs to occur. 

2760 self.connection._transaction = None 

2761 else: 

2762 if self.connection._transaction is self: 

2763 self.connection._invalid_transaction() 

2764 else: 

2765 raise exc.InvalidRequestError("This transaction is inactive") 

2766 

2767 assert not self.is_active 

2768 assert self.connection._transaction is not self 

2769 

2770 

2771class NestedTransaction(Transaction): 

2772 """Represent a 'nested', or SAVEPOINT transaction. 

2773 

2774 The :class:`.NestedTransaction` object is created by calling the 

2775 :meth:`_engine.Connection.begin_nested` method of 

2776 :class:`_engine.Connection`. 

2777 

2778 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2779 "commit" / "rollback" are as follows: 

2780 

2781 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2782 the savepoint is given an explicit name that is part of the state 

2783 of this object. 

2784 

2785 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2786 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2787 with this :class:`.NestedTransaction`. 

2788 

2789 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2790 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2791 associated with this :class:`.NestedTransaction`. 

2792 

2793 The rationale for mimicking the semantics of an outer transaction in 

2794 terms of savepoints so that code may deal with a "savepoint" transaction 

2795 and an "outer" transaction in an agnostic way. 

2796 

2797 .. seealso:: 

2798 

2799 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2800 

2801 """ 

2802 

2803 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2804 

2805 _savepoint: str 

2806 

2807 def __init__(self, connection: Connection): 

2808 assert connection._transaction is not None 

2809 if connection._trans_context_manager: 

2810 TransactionalContext._trans_ctx_check(connection) 

2811 self.connection = connection 

2812 self._savepoint = self.connection._savepoint_impl() 

2813 self.is_active = True 

2814 self._previous_nested = connection._nested_transaction 

2815 connection._nested_transaction = self 

2816 

2817 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2818 if self.connection._nested_transaction is self: 

2819 self.connection._nested_transaction = self._previous_nested 

2820 elif warn: 

2821 util.warn( 

2822 "nested transaction already deassociated from connection" 

2823 ) 

2824 

2825 @property 

2826 def _deactivated_from_connection(self) -> bool: 

2827 return self.connection._nested_transaction is not self 

2828 

2829 def _cancel(self) -> None: 

2830 # called by RootTransaction when the outer transaction is 

2831 # committed, rolled back, or closed to cancel all savepoints 

2832 # without any action being taken 

2833 self.is_active = False 

2834 self._deactivate_from_connection() 

2835 if self._previous_nested: 

2836 self._previous_nested._cancel() 

2837 

2838 def _close_impl( 

2839 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2840 ) -> None: 

2841 try: 

2842 if ( 

2843 self.is_active 

2844 and self.connection._transaction 

2845 and self.connection._transaction.is_active 

2846 ): 

2847 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2848 finally: 

2849 self.is_active = False 

2850 

2851 if deactivate_from_connection: 

2852 self._deactivate_from_connection(warn=warn_already_deactive) 

2853 

2854 assert not self.is_active 

2855 if deactivate_from_connection: 

2856 assert self.connection._nested_transaction is not self 

2857 

2858 def _do_close(self) -> None: 

2859 self._close_impl(True, False) 

2860 

2861 def _do_rollback(self) -> None: 

2862 self._close_impl(True, True) 

2863 

2864 def _do_commit(self) -> None: 

2865 if self.is_active: 

2866 try: 

2867 self.connection._release_savepoint_impl(self._savepoint) 

2868 finally: 

2869 # nested trans becomes inactive on failed release 

2870 # unconditionally. this prevents it from trying to 

2871 # emit SQL when it rolls back. 

2872 self.is_active = False 

2873 

2874 # but only de-associate from connection if it succeeded 

2875 self._deactivate_from_connection() 

2876 else: 

2877 if self.connection._nested_transaction is self: 

2878 self.connection._invalid_transaction() 

2879 else: 

2880 raise exc.InvalidRequestError( 

2881 "This nested transaction is inactive" 

2882 ) 

2883 

2884 

2885class TwoPhaseTransaction(RootTransaction): 

2886 """Represent a two-phase transaction. 

2887 

2888 A new :class:`.TwoPhaseTransaction` object may be procured 

2889 using the :meth:`_engine.Connection.begin_twophase` method. 

2890 

2891 The interface is the same as that of :class:`.Transaction` 

2892 with the addition of the :meth:`prepare` method. 

2893 

2894 """ 

2895 

2896 __slots__ = ("xid", "_is_prepared") 

2897 

2898 xid: Any 

2899 

2900 def __init__(self, connection: Connection, xid: Any): 

2901 self._is_prepared = False 

2902 self.xid = xid 

2903 super().__init__(connection) 

2904 

2905 def prepare(self) -> None: 

2906 """Prepare this :class:`.TwoPhaseTransaction`. 

2907 

2908 After a PREPARE, the transaction can be committed. 

2909 

2910 """ 

2911 if not self.is_active: 

2912 raise exc.InvalidRequestError("This transaction is inactive") 

2913 self.connection._prepare_twophase_impl(self.xid) 

2914 self._is_prepared = True 

2915 

2916 def _connection_begin_impl(self) -> None: 

2917 self.connection._begin_twophase_impl(self) 

2918 

2919 def _connection_rollback_impl(self) -> None: 

2920 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2921 

2922 def _connection_commit_impl(self) -> None: 

2923 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2924 

2925 

2926class Engine( 

2927 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2928): 

2929 """ 

2930 Connects a :class:`~sqlalchemy.pool.Pool` and 

2931 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2932 source of database connectivity and behavior. 

2933 

2934 An :class:`_engine.Engine` object is instantiated publicly using the 

2935 :func:`~sqlalchemy.create_engine` function. 

2936 

2937 .. seealso:: 

2938 

2939 :doc:`/core/engines` 

2940 

2941 :ref:`connections_toplevel` 

2942 

2943 """ 

2944 

2945 dispatch: dispatcher[ConnectionEventsTarget] 

2946 

2947 _compiled_cache: Optional[CompiledCacheType] 

2948 

2949 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2950 _has_events: bool = False 

2951 _connection_cls: Type[Connection] = Connection 

2952 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2953 _is_future: bool = False 

2954 

2955 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2956 _option_cls: Type[OptionEngine] 

2957 

2958 dialect: Dialect 

2959 pool: Pool 

2960 url: URL 

2961 hide_parameters: bool 

2962 

2963 def __init__( 

2964 self, 

2965 pool: Pool, 

2966 dialect: Dialect, 

2967 url: URL, 

2968 logging_name: Optional[str] = None, 

2969 echo: Optional[_EchoFlagType] = None, 

2970 query_cache_size: int = 500, 

2971 execution_options: Optional[Mapping[str, Any]] = None, 

2972 hide_parameters: bool = False, 

2973 ): 

2974 self.pool = pool 

2975 self.url = url 

2976 self.dialect = dialect 

2977 if logging_name: 

2978 self.logging_name = logging_name 

2979 self.echo = echo 

2980 self.hide_parameters = hide_parameters 

2981 if query_cache_size != 0: 

2982 self._compiled_cache = util.LRUCache( 

2983 query_cache_size, size_alert=self._lru_size_alert 

2984 ) 

2985 else: 

2986 self._compiled_cache = None 

2987 log.instance_logger(self, echoflag=echo) 

2988 if execution_options: 

2989 self.update_execution_options(**execution_options) 

2990 

2991 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2992 if self._should_log_info(): 

2993 self.logger.info( 

2994 "Compiled cache size pruning from %d items to %d. " 

2995 "Increase cache size to reduce the frequency of pruning.", 

2996 len(cache), 

2997 cache.capacity, 

2998 ) 

2999 

3000 @property 

3001 def engine(self) -> Engine: 

3002 """Returns this :class:`.Engine`. 

3003 

3004 Used for legacy schemes that accept :class:`.Connection` / 

3005 :class:`.Engine` objects within the same variable. 

3006 

3007 """ 

3008 return self 

3009 

3010 def clear_compiled_cache(self) -> None: 

3011 """Clear the compiled cache associated with the dialect. 

3012 

3013 This applies **only** to the built-in cache that is established 

3014 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

3015 It will not impact any dictionary caches that were passed via the 

3016 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

3017 

3018 .. versionadded:: 1.4 

3019 

3020 """ 

3021 if self._compiled_cache: 

3022 self._compiled_cache.clear() 

3023 

3024 def update_execution_options(self, **opt: Any) -> None: 

3025 r"""Update the default execution_options dictionary 

3026 of this :class:`_engine.Engine`. 

3027 

3028 The given keys/values in \**opt are added to the 

3029 default execution options that will be used for 

3030 all connections. The initial contents of this dictionary 

3031 can be sent via the ``execution_options`` parameter 

3032 to :func:`_sa.create_engine`. 

3033 

3034 .. seealso:: 

3035 

3036 :meth:`_engine.Connection.execution_options` 

3037 

3038 :meth:`_engine.Engine.execution_options` 

3039 

3040 """ 

3041 self.dispatch.set_engine_execution_options(self, opt) 

3042 self._execution_options = self._execution_options.union(opt) 

3043 self.dialect.set_engine_execution_options(self, opt) 

3044 

3045 @overload 

3046 def execution_options( 

3047 self, 

3048 *, 

3049 compiled_cache: Optional[CompiledCacheType] = ..., 

3050 logging_token: str = ..., 

3051 isolation_level: IsolationLevel = ..., 

3052 insertmanyvalues_page_size: int = ..., 

3053 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3054 **opt: Any, 

3055 ) -> OptionEngine: ... 

3056 

3057 @overload 

3058 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3059 

3060 def execution_options(self, **opt: Any) -> OptionEngine: 

3061 """Return a new :class:`_engine.Engine` that will provide 

3062 :class:`_engine.Connection` objects with the given execution options. 

3063 

3064 The returned :class:`_engine.Engine` remains related to the original 

3065 :class:`_engine.Engine` in that it shares the same connection pool and 

3066 other state: 

3067 

3068 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3069 is the 

3070 same instance. The :meth:`_engine.Engine.dispose` 

3071 method will replace 

3072 the connection pool instance for the parent engine as well 

3073 as this one. 

3074 * Event listeners are "cascaded" - meaning, the new 

3075 :class:`_engine.Engine` 

3076 inherits the events of the parent, and new events can be associated 

3077 with the new :class:`_engine.Engine` individually. 

3078 * The logging configuration and logging_name is copied from the parent 

3079 :class:`_engine.Engine`. 

3080 

3081 The intent of the :meth:`_engine.Engine.execution_options` method is 

3082 to implement schemes where multiple :class:`_engine.Engine` 

3083 objects refer to the same connection pool, but are differentiated 

3084 by options that affect some execution-level behavior for each 

3085 engine. One such example is breaking into separate "reader" and 

3086 "writer" :class:`_engine.Engine` instances, where one 

3087 :class:`_engine.Engine` 

3088 has a lower :term:`isolation level` setting configured or is even 

3089 transaction-disabled using "autocommit". An example of this 

3090 configuration is at :ref:`dbapi_autocommit_multiple`. 

3091 

3092 Another example is one that 

3093 uses a custom option ``shard_id`` which is consumed by an event 

3094 to change the current schema on a database connection:: 

3095 

3096 from sqlalchemy import event 

3097 from sqlalchemy.engine import Engine 

3098 

3099 primary_engine = create_engine("mysql+mysqldb://") 

3100 shard1 = primary_engine.execution_options(shard_id="shard1") 

3101 shard2 = primary_engine.execution_options(shard_id="shard2") 

3102 

3103 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3104 

3105 

3106 @event.listens_for(Engine, "before_cursor_execute") 

3107 def _switch_shard(conn, cursor, stmt, params, context, executemany): 

3108 shard_id = conn.get_execution_options().get("shard_id", "default") 

3109 current_shard = conn.info.get("current_shard", None) 

3110 

3111 if current_shard != shard_id: 

3112 cursor.execute("use %s" % shards[shard_id]) 

3113 conn.info["current_shard"] = shard_id 

3114 

3115 The above recipe illustrates two :class:`_engine.Engine` objects that 

3116 will each serve as factories for :class:`_engine.Connection` objects 

3117 that have pre-established "shard_id" execution options present. A 

3118 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3119 then interprets this execution option to emit a MySQL ``use`` statement 

3120 to switch databases before a statement execution, while at the same 

3121 time keeping track of which database we've established using the 

3122 :attr:`_engine.Connection.info` dictionary. 

3123 

3124 .. seealso:: 

3125 

3126 :meth:`_engine.Connection.execution_options` 

3127 - update execution options 

3128 on a :class:`_engine.Connection` object. 

3129 

3130 :meth:`_engine.Engine.update_execution_options` 

3131 - update the execution 

3132 options for a given :class:`_engine.Engine` in place. 

3133 

3134 :meth:`_engine.Engine.get_execution_options` 

3135 

3136 

3137 """ # noqa: E501 

3138 return self._option_cls(self, opt) 

3139 

3140 def get_execution_options(self) -> _ExecuteOptions: 

3141 """Get the non-SQL options which will take effect during execution. 

3142 

3143 .. versionadded: 1.3 

3144 

3145 .. seealso:: 

3146 

3147 :meth:`_engine.Engine.execution_options` 

3148 """ 

3149 return self._execution_options 

3150 

3151 @property 

3152 def name(self) -> str: 

3153 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3154 in use by this :class:`Engine`. 

3155 

3156 """ 

3157 

3158 return self.dialect.name 

3159 

3160 @property 

3161 def driver(self) -> str: 

3162 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3163 in use by this :class:`Engine`. 

3164 

3165 """ 

3166 

3167 return self.dialect.driver 

3168 

3169 echo = log.echo_property() 

3170 

3171 def __repr__(self) -> str: 

3172 return "Engine(%r)" % (self.url,) 

3173 

3174 def dispose(self, close: bool = True) -> None: 

3175 """Dispose of the connection pool used by this 

3176 :class:`_engine.Engine`. 

3177 

3178 A new connection pool is created immediately after the old one has been 

3179 disposed. The previous connection pool is disposed either actively, by 

3180 closing out all currently checked-in connections in that pool, or 

3181 passively, by losing references to it but otherwise not closing any 

3182 connections. The latter strategy is more appropriate for an initializer 

3183 in a forked Python process. 

3184 

3185 Event listeners associated with the old pool via :class:`.PoolEvents` 

3186 are **transferred to the new pool**; this is to support the pattern 

3187 by which :class:`.PoolEvents` are set up in terms of the owning 

3188 :class:`.Engine` without the need to refer to the :class:`.Pool` 

3189 directly. 

3190 

3191 :param close: if left at its default of ``True``, has the 

3192 effect of fully closing all **currently checked in** 

3193 database connections. Connections that are still checked out 

3194 will **not** be closed, however they will no longer be associated 

3195 with this :class:`_engine.Engine`, 

3196 so when they are closed individually, eventually the 

3197 :class:`_pool.Pool` which they are associated with will 

3198 be garbage collected and they will be closed out fully, if 

3199 not already closed on checkin. 

3200 

3201 If set to ``False``, the previous connection pool is de-referenced, 

3202 and otherwise not touched in any way. 

3203 

3204 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3205 parameter to allow the replacement of a connection pool in a child 

3206 process without interfering with the connections used by the parent 

3207 process. 

3208 

3209 

3210 .. seealso:: 

3211 

3212 :ref:`engine_disposal` 

3213 

3214 :ref:`pooling_multiprocessing` 

3215 

3216 :meth:`.ConnectionEvents.engine_disposed` 

3217 

3218 """ 

3219 if close: 

3220 self.pool.dispose() 

3221 self.pool = self.pool.recreate() 

3222 self.dispatch.engine_disposed(self) 

3223 

3224 @contextlib.contextmanager 

3225 def _optional_conn_ctx_manager( 

3226 self, connection: Optional[Connection] = None 

3227 ) -> Iterator[Connection]: 

3228 if connection is None: 

3229 with self.connect() as conn: 

3230 yield conn 

3231 else: 

3232 yield connection 

3233 

3234 @contextlib.contextmanager 

3235 def begin(self) -> Iterator[Connection]: 

3236 """Return a context manager delivering a :class:`_engine.Connection` 

3237 with a :class:`.Transaction` established. 

3238 

3239 E.g.:: 

3240 

3241 with engine.begin() as conn: 

3242 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)")) 

3243 conn.execute(text("my_special_procedure(5)")) 

3244 

3245 Upon successful operation, the :class:`.Transaction` 

3246 is committed. If an error is raised, the :class:`.Transaction` 

3247 is rolled back. 

3248 

3249 .. seealso:: 

3250 

3251 :meth:`_engine.Engine.connect` - procure a 

3252 :class:`_engine.Connection` from 

3253 an :class:`_engine.Engine`. 

3254 

3255 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3256 for a particular :class:`_engine.Connection`. 

3257 

3258 """ # noqa: E501 

3259 with self.connect() as conn: 

3260 with conn.begin(): 

3261 yield conn 

3262 

3263 def _run_ddl_visitor( 

3264 self, 

3265 visitorcallable: Type[InvokeDDLBase], 

3266 element: SchemaVisitable, 

3267 **kwargs: Any, 

3268 ) -> None: 

3269 with self.begin() as conn: 

3270 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3271 

3272 def connect(self) -> Connection: 

3273 """Return a new :class:`_engine.Connection` object. 

3274 

3275 The :class:`_engine.Connection` acts as a Python context manager, so 

3276 the typical use of this method looks like:: 

3277 

3278 with engine.connect() as connection: 

3279 connection.execute(text("insert into table values ('foo')")) 

3280 connection.commit() 

3281 

3282 Where above, after the block is completed, the connection is "closed" 

3283 and its underlying DBAPI resources are returned to the connection pool. 

3284 This also has the effect of rolling back any transaction that 

3285 was explicitly begun or was begun via autobegin, and will 

3286 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3287 started and is still in progress. 

3288 

3289 .. seealso:: 

3290 

3291 :meth:`_engine.Engine.begin` 

3292 

3293 """ 

3294 

3295 return self._connection_cls(self) 

3296 

3297 def raw_connection(self) -> PoolProxiedConnection: 

3298 """Return a "raw" DBAPI connection from the connection pool. 

3299 

3300 The returned object is a proxied version of the DBAPI 

3301 connection object used by the underlying driver in use. 

3302 The object will have all the same behavior as the real DBAPI 

3303 connection, except that its ``close()`` method will result in the 

3304 connection being returned to the pool, rather than being closed 

3305 for real. 

3306 

3307 This method provides direct DBAPI connection access for 

3308 special situations when the API provided by 

3309 :class:`_engine.Connection` 

3310 is not needed. When a :class:`_engine.Connection` object is already 

3311 present, the DBAPI connection is available using 

3312 the :attr:`_engine.Connection.connection` accessor. 

3313 

3314 .. seealso:: 

3315 

3316 :ref:`dbapi_connections` 

3317 

3318 """ 

3319 return self.pool.connect() 

3320 

3321 

3322class OptionEngineMixin(log.Identified): 

3323 _sa_propagate_class_events = False 

3324 

3325 dispatch: dispatcher[ConnectionEventsTarget] 

3326 _compiled_cache: Optional[CompiledCacheType] 

3327 dialect: Dialect 

3328 pool: Pool 

3329 url: URL 

3330 hide_parameters: bool 

3331 echo: log.echo_property 

3332 

3333 def __init__( 

3334 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3335 ): 

3336 self._proxied = proxied 

3337 self.url = proxied.url 

3338 self.dialect = proxied.dialect 

3339 self.logging_name = proxied.logging_name 

3340 self.echo = proxied.echo 

3341 self._compiled_cache = proxied._compiled_cache 

3342 self.hide_parameters = proxied.hide_parameters 

3343 log.instance_logger(self, echoflag=self.echo) 

3344 

3345 # note: this will propagate events that are assigned to the parent 

3346 # engine after this OptionEngine is created. Since we share 

3347 # the events of the parent we also disallow class-level events 

3348 # to apply to the OptionEngine class directly. 

3349 # 

3350 # the other way this can work would be to transfer existing 

3351 # events only, using: 

3352 # self.dispatch._update(proxied.dispatch) 

3353 # 

3354 # that might be more appropriate however it would be a behavioral 

3355 # change for logic that assigns events to the parent engine and 

3356 # would like it to take effect for the already-created sub-engine. 

3357 self.dispatch = self.dispatch._join(proxied.dispatch) 

3358 

3359 self._execution_options = proxied._execution_options 

3360 self.update_execution_options(**execution_options) 

3361 

3362 def update_execution_options(self, **opt: Any) -> None: 

3363 raise NotImplementedError() 

3364 

3365 if not typing.TYPE_CHECKING: 

3366 # https://github.com/python/typing/discussions/1095 

3367 

3368 @property 

3369 def pool(self) -> Pool: 

3370 return self._proxied.pool 

3371 

3372 @pool.setter 

3373 def pool(self, pool: Pool) -> None: 

3374 self._proxied.pool = pool 

3375 

3376 @property 

3377 def _has_events(self) -> bool: 

3378 return self._proxied._has_events or self.__dict__.get( 

3379 "_has_events", False 

3380 ) 

3381 

3382 @_has_events.setter 

3383 def _has_events(self, value: bool) -> None: 

3384 self.__dict__["_has_events"] = value 

3385 

3386 

3387class OptionEngine(OptionEngineMixin, Engine): 

3388 def update_execution_options(self, **opt: Any) -> None: 

3389 Engine.update_execution_options(self, **opt) 

3390 

3391 

3392Engine._option_cls = OptionEngine