Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1005 statements  

1# engine/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.""" 

8from __future__ import annotations 

9 

10import contextlib 

11import sys 

12import typing 

13from typing import Any 

14from typing import Callable 

15from typing import cast 

16from typing import Iterable 

17from typing import Iterator 

18from typing import List 

19from typing import Mapping 

20from typing import NoReturn 

21from typing import Optional 

22from typing import overload 

23from typing import Tuple 

24from typing import Type 

25from typing import TypeVar 

26from typing import Union 

27 

28from .interfaces import BindTyping 

29from .interfaces import ConnectionEventsTarget 

30from .interfaces import DBAPICursor 

31from .interfaces import ExceptionContext 

32from .interfaces import ExecuteStyle 

33from .interfaces import ExecutionContext 

34from .interfaces import IsolationLevel 

35from .util import _distill_params_20 

36from .util import _distill_raw_params 

37from .util import TransactionalContext 

38from .. import exc 

39from .. import inspection 

40from .. import log 

41from .. import util 

42from ..sql import compiler 

43from ..sql import util as sql_util 

44 

45if typing.TYPE_CHECKING: 

46 from . import CursorResult 

47 from . import ScalarResult 

48 from .interfaces import _AnyExecuteParams 

49 from .interfaces import _AnyMultiExecuteParams 

50 from .interfaces import _CoreAnyExecuteParams 

51 from .interfaces import _CoreMultiExecuteParams 

52 from .interfaces import _CoreSingleExecuteParams 

53 from .interfaces import _DBAPIAnyExecuteParams 

54 from .interfaces import _DBAPISingleExecuteParams 

55 from .interfaces import _ExecuteOptions 

56 from .interfaces import CompiledCacheType 

57 from .interfaces import CoreExecuteOptionsParameter 

58 from .interfaces import Dialect 

59 from .interfaces import SchemaTranslateMapType 

60 from .reflection import Inspector # noqa 

61 from .url import URL 

62 from ..event import dispatcher 

63 from ..log import _EchoFlagType 

64 from ..pool import _ConnectionFairy 

65 from ..pool import Pool 

66 from ..pool import PoolProxiedConnection 

67 from ..sql import Executable 

68 from ..sql._typing import _InfoType 

69 from ..sql.compiler import Compiled 

70 from ..sql.ddl import ExecutableDDLElement 

71 from ..sql.ddl import InvokeDDLBase 

72 from ..sql.functions import FunctionElement 

73 from ..sql.schema import DefaultGenerator 

74 from ..sql.schema import HasSchemaAttr 

75 from ..sql.schema import SchemaVisitable 

76 from ..sql.selectable import TypedReturnsRows 

77 

78 

79_T = TypeVar("_T", bound=Any) 

80_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

81NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

82 

83 

84class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

85 """Provides high-level functionality for a wrapped DB-API connection. 

86 

87 The :class:`_engine.Connection` object is procured by calling the 

88 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

89 object, and provides services for execution of SQL statements as well 

90 as transaction control. 

91 

92 The Connection object is **not** thread-safe. While a Connection can be 

93 shared among threads using properly synchronized access, it is still 

94 possible that the underlying DBAPI connection may not support shared 

95 access between threads. Check the DBAPI documentation for details. 

96 

97 The Connection object represents a single DBAPI connection checked out 

98 from the connection pool. In this state, the connection pool has no 

99 affect upon the connection, including its expiration or timeout state. 

100 For the connection pool to properly manage connections, connections 

101 should be returned to the connection pool (i.e. ``connection.close()``) 

102 whenever the connection is not in use. 

103 

104 .. index:: 

105 single: thread safety; Connection 

106 

107 """ 

108 

109 dialect: Dialect 

110 dispatch: dispatcher[ConnectionEventsTarget] 

111 

112 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

113 

114 # used by sqlalchemy.engine.util.TransactionalContext 

115 _trans_context_manager: Optional[TransactionalContext] = None 

116 

117 # legacy as of 2.0, should be eventually deprecated and 

118 # removed. was used in the "pre_ping" recipe that's been in the docs 

119 # a long time 

120 should_close_with_result = False 

121 

122 _dbapi_connection: Optional[PoolProxiedConnection] 

123 

124 _execution_options: _ExecuteOptions 

125 

126 _transaction: Optional[RootTransaction] 

127 _nested_transaction: Optional[NestedTransaction] 

128 

129 def __init__( 

130 self, 

131 engine: Engine, 

132 connection: Optional[PoolProxiedConnection] = None, 

133 _has_events: Optional[bool] = None, 

134 _allow_revalidate: bool = True, 

135 _allow_autobegin: bool = True, 

136 ): 

137 """Construct a new Connection.""" 

138 self.engine = engine 

139 self.dialect = dialect = engine.dialect 

140 

141 if connection is None: 

142 try: 

143 self._dbapi_connection = engine.raw_connection() 

144 except dialect.loaded_dbapi.Error as err: 

145 Connection._handle_dbapi_exception_noconnection( 

146 err, dialect, engine 

147 ) 

148 raise 

149 else: 

150 self._dbapi_connection = connection 

151 

152 self._transaction = self._nested_transaction = None 

153 self.__savepoint_seq = 0 

154 self.__in_begin = False 

155 

156 self.__can_reconnect = _allow_revalidate 

157 self._allow_autobegin = _allow_autobegin 

158 self._echo = self.engine._should_log_info() 

159 

160 if _has_events is None: 

161 # if _has_events is sent explicitly as False, 

162 # then don't join the dispatch of the engine; we don't 

163 # want to handle any of the engine's events in that case. 

164 self.dispatch = self.dispatch._join(engine.dispatch) 

165 self._has_events = _has_events or ( 

166 _has_events is None and engine._has_events 

167 ) 

168 

169 self._execution_options = engine._execution_options 

170 

171 if self._has_events or self.engine._has_events: 

172 self.dispatch.engine_connect(self) 

173 

174 # this can be assigned differently via 

175 # characteristics.LoggingTokenCharacteristic 

176 _message_formatter: Any = None 

177 

178 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

179 fmt = self._message_formatter 

180 

181 if fmt: 

182 message = fmt(message) 

183 

184 if log.STACKLEVEL: 

185 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

186 

187 self.engine.logger.info(message, *arg, **kw) 

188 

189 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

190 fmt = self._message_formatter 

191 

192 if fmt: 

193 message = fmt(message) 

194 

195 if log.STACKLEVEL: 

196 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

197 

198 self.engine.logger.debug(message, *arg, **kw) 

199 

200 @property 

201 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

202 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

203 self._execution_options.get("schema_translate_map", None) 

204 ) 

205 

206 return schema_translate_map 

207 

208 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

209 """Return the schema name for the given schema item taking into 

210 account current schema translate map. 

211 

212 """ 

213 

214 name = obj.schema 

215 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

216 self._execution_options.get("schema_translate_map", None) 

217 ) 

218 

219 if ( 

220 schema_translate_map 

221 and name in schema_translate_map 

222 and obj._use_schema_map 

223 ): 

224 return schema_translate_map[name] 

225 else: 

226 return name 

227 

228 def __enter__(self) -> Connection: 

229 return self 

230 

231 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

232 self.close() 

233 

234 @overload 

235 def execution_options( 

236 self, 

237 *, 

238 compiled_cache: Optional[CompiledCacheType] = ..., 

239 logging_token: str = ..., 

240 isolation_level: IsolationLevel = ..., 

241 no_parameters: bool = False, 

242 stream_results: bool = False, 

243 max_row_buffer: int = ..., 

244 yield_per: int = ..., 

245 insertmanyvalues_page_size: int = ..., 

246 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

247 preserve_rowcount: bool = False, 

248 **opt: Any, 

249 ) -> Connection: ... 

250 

251 @overload 

252 def execution_options(self, **opt: Any) -> Connection: ... 

253 

254 def execution_options(self, **opt: Any) -> Connection: 

255 r"""Set non-SQL options for the connection which take effect 

256 during execution. 

257 

258 This method modifies this :class:`_engine.Connection` **in-place**; 

259 the return value is the same :class:`_engine.Connection` object 

260 upon which the method is called. Note that this is in contrast 

261 to the behavior of the ``execution_options`` methods on other 

262 objects such as :meth:`_engine.Engine.execution_options` and 

263 :meth:`_sql.Executable.execution_options`. The rationale is that many 

264 such execution options necessarily modify the state of the base 

265 DBAPI connection in any case so there is no feasible means of 

266 keeping the effect of such an option localized to a "sub" connection. 

267 

268 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

269 method, in contrast to other objects with this method, modifies 

270 the connection in-place without creating copy of it. 

271 

272 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

273 method accepts any arbitrary parameters including user defined names. 

274 All parameters given are consumable in a number of ways including 

275 by using the :meth:`_engine.Connection.get_execution_options` method. 

276 See the examples at :meth:`_sql.Executable.execution_options` 

277 and :meth:`_engine.Engine.execution_options`. 

278 

279 The keywords that are currently recognized by SQLAlchemy itself 

280 include all those listed under :meth:`.Executable.execution_options`, 

281 as well as others that are specific to :class:`_engine.Connection`. 

282 

283 :param compiled_cache: Available on: :class:`_engine.Connection`, 

284 :class:`_engine.Engine`. 

285 

286 A dictionary where :class:`.Compiled` objects 

287 will be cached when the :class:`_engine.Connection` 

288 compiles a clause 

289 expression into a :class:`.Compiled` object. This dictionary will 

290 supersede the statement cache that may be configured on the 

291 :class:`_engine.Engine` itself. If set to None, caching 

292 is disabled, even if the engine has a configured cache size. 

293 

294 Note that the ORM makes use of its own "compiled" caches for 

295 some operations, including flush operations. The caching 

296 used by the ORM internally supersedes a cache dictionary 

297 specified here. 

298 

299 :param logging_token: Available on: :class:`_engine.Connection`, 

300 :class:`_engine.Engine`, :class:`_sql.Executable`. 

301 

302 Adds the specified string token surrounded by brackets in log 

303 messages logged by the connection, i.e. the logging that's enabled 

304 either via the :paramref:`_sa.create_engine.echo` flag or via the 

305 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

306 per-connection or per-sub-engine token to be available which is 

307 useful for debugging concurrent connection scenarios. 

308 

309 .. versionadded:: 1.4.0b2 

310 

311 .. seealso:: 

312 

313 :ref:`dbengine_logging_tokens` - usage example 

314 

315 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

316 name used by the Python logger object itself. 

317 

318 :param isolation_level: Available on: :class:`_engine.Connection`, 

319 :class:`_engine.Engine`. 

320 

321 Set the transaction isolation level for the lifespan of this 

322 :class:`_engine.Connection` object. 

323 Valid values include those string 

324 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

325 parameter passed to :func:`_sa.create_engine`. These levels are 

326 semi-database specific; see individual dialect documentation for 

327 valid levels. 

328 

329 The isolation level option applies the isolation level by emitting 

330 statements on the DBAPI connection, and **necessarily affects the 

331 original Connection object overall**. The isolation level will remain 

332 at the given setting until explicitly changed, or when the DBAPI 

333 connection itself is :term:`released` to the connection pool, i.e. the 

334 :meth:`_engine.Connection.close` method is called, at which time an 

335 event handler will emit additional statements on the DBAPI connection 

336 in order to revert the isolation level change. 

337 

338 .. note:: The ``isolation_level`` execution option may only be 

339 established before the :meth:`_engine.Connection.begin` method is 

340 called, as well as before any SQL statements are emitted which 

341 would otherwise trigger "autobegin", or directly after a call to 

342 :meth:`_engine.Connection.commit` or 

343 :meth:`_engine.Connection.rollback`. A database cannot change the 

344 isolation level on a transaction in progress. 

345 

346 .. note:: The ``isolation_level`` execution option is implicitly 

347 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

348 the :meth:`_engine.Connection.invalidate` method, or if a 

349 disconnection error occurs. The new connection produced after the 

350 invalidation will **not** have the selected isolation level 

351 re-applied to it automatically. 

352 

353 .. seealso:: 

354 

355 :ref:`dbapi_autocommit` 

356 

357 :meth:`_engine.Connection.get_isolation_level` 

358 - view current actual level 

359 

360 :param no_parameters: Available on: :class:`_engine.Connection`, 

361 :class:`_sql.Executable`. 

362 

363 When ``True``, if the final parameter 

364 list or dictionary is totally empty, will invoke the 

365 statement on the cursor as ``cursor.execute(statement)``, 

366 not passing the parameter collection at all. 

367 Some DBAPIs such as psycopg2 and mysql-python consider 

368 percent signs as significant only when parameters are 

369 present; this option allows code to generate SQL 

370 containing percent signs (and possibly other characters) 

371 that is neutral regarding whether it's executed by the DBAPI 

372 or piped into a script that's later invoked by 

373 command line tools. 

374 

375 :param stream_results: Available on: :class:`_engine.Connection`, 

376 :class:`_sql.Executable`. 

377 

378 Indicate to the dialect that results should be "streamed" and not 

379 pre-buffered, if possible. For backends such as PostgreSQL, MySQL 

380 and MariaDB, this indicates the use of a "server side cursor" as 

381 opposed to a client side cursor. Other backends such as that of 

382 Oracle Database may already use server side cursors by default. 

383 

384 The usage of 

385 :paramref:`_engine.Connection.execution_options.stream_results` is 

386 usually combined with setting a fixed number of rows to to be fetched 

387 in batches, to allow for efficient iteration of database rows while 

388 at the same time not loading all result rows into memory at once; 

389 this can be configured on a :class:`_engine.Result` object using the 

390 :meth:`_engine.Result.yield_per` method, after execution has 

391 returned a new :class:`_engine.Result`. If 

392 :meth:`_engine.Result.yield_per` is not used, 

393 the :paramref:`_engine.Connection.execution_options.stream_results` 

394 mode of operation will instead use a dynamically sized buffer 

395 which buffers sets of rows at a time, growing on each batch 

396 based on a fixed growth size up until a limit which may 

397 be configured using the 

398 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

399 parameter. 

400 

401 When using the ORM to fetch ORM mapped objects from a result, 

402 :meth:`_engine.Result.yield_per` should always be used with 

403 :paramref:`_engine.Connection.execution_options.stream_results`, 

404 so that the ORM does not fetch all rows into new ORM objects at once. 

405 

406 For typical use, the 

407 :paramref:`_engine.Connection.execution_options.yield_per` execution 

408 option should be preferred, which sets up both 

409 :paramref:`_engine.Connection.execution_options.stream_results` and 

410 :meth:`_engine.Result.yield_per` at once. This option is supported 

411 both at a core level by :class:`_engine.Connection` as well as by the 

412 ORM :class:`_engine.Session`; the latter is described at 

413 :ref:`orm_queryguide_yield_per`. 

414 

415 .. seealso:: 

416 

417 :ref:`engine_stream_results` - background on 

418 :paramref:`_engine.Connection.execution_options.stream_results` 

419 

420 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

421 

422 :paramref:`_engine.Connection.execution_options.yield_per` 

423 

424 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

425 describing the ORM version of ``yield_per`` 

426 

427 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

428 :class:`_sql.Executable`. Sets a maximum 

429 buffer size to use when the 

430 :paramref:`_engine.Connection.execution_options.stream_results` 

431 execution option is used on a backend that supports server side 

432 cursors. The default value if not specified is 1000. 

433 

434 .. seealso:: 

435 

436 :paramref:`_engine.Connection.execution_options.stream_results` 

437 

438 :ref:`engine_stream_results` 

439 

440 

441 :param yield_per: Available on: :class:`_engine.Connection`, 

442 :class:`_sql.Executable`. Integer value applied which will 

443 set the :paramref:`_engine.Connection.execution_options.stream_results` 

444 execution option and invoke :meth:`_engine.Result.yield_per` 

445 automatically at once. Allows equivalent functionality as 

446 is present when using this parameter with the ORM. 

447 

448 .. versionadded:: 1.4.40 

449 

450 .. seealso:: 

451 

452 :ref:`engine_stream_results` - background and examples 

453 on using server side cursors with Core. 

454 

455 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

456 describing the ORM version of ``yield_per`` 

457 

458 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

459 :class:`_engine.Engine`. Number of rows to format into an 

460 INSERT statement when the statement uses "insertmanyvalues" mode, 

461 which is a paged form of bulk insert that is used for many backends 

462 when using :term:`executemany` execution typically in conjunction 

463 with RETURNING. Defaults to 1000. May also be modified on a 

464 per-engine basis using the 

465 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

466 

467 .. versionadded:: 2.0 

468 

469 .. seealso:: 

470 

471 :ref:`engine_insertmanyvalues` 

472 

473 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

474 :class:`_engine.Engine`, :class:`_sql.Executable`. 

475 

476 A dictionary mapping schema names to schema names, that will be 

477 applied to the :paramref:`_schema.Table.schema` element of each 

478 :class:`_schema.Table` 

479 encountered when SQL or DDL expression elements 

480 are compiled into strings; the resulting schema name will be 

481 converted based on presence in the map of the original name. 

482 

483 .. seealso:: 

484 

485 :ref:`schema_translating` 

486 

487 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

488 attribute will be unconditionally memoized within the result and 

489 made available via the :attr:`.CursorResult.rowcount` attribute. 

490 Normally, this attribute is only preserved for UPDATE and DELETE 

491 statements. Using this option, the DBAPIs rowcount value can 

492 be accessed for other kinds of statements such as INSERT and SELECT, 

493 to the degree that the DBAPI supports these statements. See 

494 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

495 of this attribute. 

496 

497 .. versionadded:: 2.0.28 

498 

499 .. seealso:: 

500 

501 :meth:`_engine.Engine.execution_options` 

502 

503 :meth:`.Executable.execution_options` 

504 

505 :meth:`_engine.Connection.get_execution_options` 

506 

507 :ref:`orm_queryguide_execution_options` - documentation on all 

508 ORM-specific execution options 

509 

510 """ # noqa 

511 if self._has_events or self.engine._has_events: 

512 self.dispatch.set_connection_execution_options(self, opt) 

513 self._execution_options = self._execution_options.union(opt) 

514 self.dialect.set_connection_execution_options(self, opt) 

515 return self 

516 

517 def get_execution_options(self) -> _ExecuteOptions: 

518 """Get the non-SQL options which will take effect during execution. 

519 

520 .. versionadded:: 1.3 

521 

522 .. seealso:: 

523 

524 :meth:`_engine.Connection.execution_options` 

525 """ 

526 return self._execution_options 

527 

528 @property 

529 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

530 pool_proxied_connection = self._dbapi_connection 

531 return ( 

532 pool_proxied_connection is not None 

533 and pool_proxied_connection.is_valid 

534 ) 

535 

536 @property 

537 def closed(self) -> bool: 

538 """Return True if this connection is closed.""" 

539 

540 return self._dbapi_connection is None and not self.__can_reconnect 

541 

542 @property 

543 def invalidated(self) -> bool: 

544 """Return True if this connection was invalidated. 

545 

546 This does not indicate whether or not the connection was 

547 invalidated at the pool level, however 

548 

549 """ 

550 

551 # prior to 1.4, "invalid" was stored as a state independent of 

552 # "closed", meaning an invalidated connection could be "closed", 

553 # the _dbapi_connection would be None and closed=True, yet the 

554 # "invalid" flag would stay True. This meant that there were 

555 # three separate states (open/valid, closed/valid, closed/invalid) 

556 # when there is really no reason for that; a connection that's 

557 # "closed" does not need to be "invalid". So the state is now 

558 # represented by the two facts alone. 

559 

560 pool_proxied_connection = self._dbapi_connection 

561 return pool_proxied_connection is None and self.__can_reconnect 

562 

563 @property 

564 def connection(self) -> PoolProxiedConnection: 

565 """The underlying DB-API connection managed by this Connection. 

566 

567 This is a SQLAlchemy connection-pool proxied connection 

568 which then has the attribute 

569 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

570 actual driver connection. 

571 

572 .. seealso:: 

573 

574 

575 :ref:`dbapi_connections` 

576 

577 """ 

578 

579 if self._dbapi_connection is None: 

580 try: 

581 return self._revalidate_connection() 

582 except (exc.PendingRollbackError, exc.ResourceClosedError): 

583 raise 

584 except BaseException as e: 

585 self._handle_dbapi_exception(e, None, None, None, None) 

586 else: 

587 return self._dbapi_connection 

588 

589 def get_isolation_level(self) -> IsolationLevel: 

590 """Return the current **actual** isolation level that's present on 

591 the database within the scope of this connection. 

592 

593 This attribute will perform a live SQL operation against the database 

594 in order to procure the current isolation level, so the value returned 

595 is the actual level on the underlying DBAPI connection regardless of 

596 how this state was set. This will be one of the four actual isolation 

597 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

598 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

599 level setting. Third party dialects may also feature additional 

600 isolation level settings. 

601 

602 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

603 isolation level, which is a separate :term:`dbapi` setting that's 

604 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

605 in use, the database connection still has a "traditional" isolation 

606 mode in effect, that is typically one of the four values 

607 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

608 ``SERIALIZABLE``. 

609 

610 Compare to the :attr:`_engine.Connection.default_isolation_level` 

611 accessor which returns the isolation level that is present on the 

612 database at initial connection time. 

613 

614 .. seealso:: 

615 

616 :attr:`_engine.Connection.default_isolation_level` 

617 - view default level 

618 

619 :paramref:`_sa.create_engine.isolation_level` 

620 - set per :class:`_engine.Engine` isolation level 

621 

622 :paramref:`.Connection.execution_options.isolation_level` 

623 - set per :class:`_engine.Connection` isolation level 

624 

625 """ 

626 dbapi_connection = self.connection.dbapi_connection 

627 assert dbapi_connection is not None 

628 try: 

629 return self.dialect.get_isolation_level(dbapi_connection) 

630 except BaseException as e: 

631 self._handle_dbapi_exception(e, None, None, None, None) 

632 

633 @property 

634 def default_isolation_level(self) -> Optional[IsolationLevel]: 

635 """The initial-connection time isolation level associated with the 

636 :class:`_engine.Dialect` in use. 

637 

638 This value is independent of the 

639 :paramref:`.Connection.execution_options.isolation_level` and 

640 :paramref:`.Engine.execution_options.isolation_level` execution 

641 options, and is determined by the :class:`_engine.Dialect` when the 

642 first connection is created, by performing a SQL query against the 

643 database for the current isolation level before any additional commands 

644 have been emitted. 

645 

646 Calling this accessor does not invoke any new SQL queries. 

647 

648 .. seealso:: 

649 

650 :meth:`_engine.Connection.get_isolation_level` 

651 - view current actual isolation level 

652 

653 :paramref:`_sa.create_engine.isolation_level` 

654 - set per :class:`_engine.Engine` isolation level 

655 

656 :paramref:`.Connection.execution_options.isolation_level` 

657 - set per :class:`_engine.Connection` isolation level 

658 

659 """ 

660 return self.dialect.default_isolation_level 

661 

662 def _invalid_transaction(self) -> NoReturn: 

663 raise exc.PendingRollbackError( 

664 "Can't reconnect until invalid %stransaction is rolled " 

665 "back. Please rollback() fully before proceeding" 

666 % ("savepoint " if self._nested_transaction is not None else ""), 

667 code="8s2b", 

668 ) 

669 

670 def _revalidate_connection(self) -> PoolProxiedConnection: 

671 if self.__can_reconnect and self.invalidated: 

672 if self._transaction is not None: 

673 self._invalid_transaction() 

674 self._dbapi_connection = self.engine.raw_connection() 

675 return self._dbapi_connection 

676 raise exc.ResourceClosedError("This Connection is closed") 

677 

678 @property 

679 def info(self) -> _InfoType: 

680 """Info dictionary associated with the underlying DBAPI connection 

681 referred to by this :class:`_engine.Connection`, allowing user-defined 

682 data to be associated with the connection. 

683 

684 The data here will follow along with the DBAPI connection including 

685 after it is returned to the connection pool and used again 

686 in subsequent instances of :class:`_engine.Connection`. 

687 

688 """ 

689 

690 return self.connection.info 

691 

692 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

693 """Invalidate the underlying DBAPI connection associated with 

694 this :class:`_engine.Connection`. 

695 

696 An attempt will be made to close the underlying DBAPI connection 

697 immediately; however if this operation fails, the error is logged 

698 but not raised. The connection is then discarded whether or not 

699 close() succeeded. 

700 

701 Upon the next use (where "use" typically means using the 

702 :meth:`_engine.Connection.execute` method or similar), 

703 this :class:`_engine.Connection` will attempt to 

704 procure a new DBAPI connection using the services of the 

705 :class:`_pool.Pool` as a source of connectivity (e.g. 

706 a "reconnection"). 

707 

708 If a transaction was in progress (e.g. the 

709 :meth:`_engine.Connection.begin` method has been called) when 

710 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

711 level all state associated with this transaction is lost, as 

712 the DBAPI connection is closed. The :class:`_engine.Connection` 

713 will not allow a reconnection to proceed until the 

714 :class:`.Transaction` object is ended, by calling the 

715 :meth:`.Transaction.rollback` method; until that point, any attempt at 

716 continuing to use the :class:`_engine.Connection` will raise an 

717 :class:`~sqlalchemy.exc.InvalidRequestError`. 

718 This is to prevent applications from accidentally 

719 continuing an ongoing transactional operations despite the 

720 fact that the transaction has been lost due to an 

721 invalidation. 

722 

723 The :meth:`_engine.Connection.invalidate` method, 

724 just like auto-invalidation, 

725 will at the connection pool level invoke the 

726 :meth:`_events.PoolEvents.invalidate` event. 

727 

728 :param exception: an optional ``Exception`` instance that's the 

729 reason for the invalidation. is passed along to event handlers 

730 and logging functions. 

731 

732 .. seealso:: 

733 

734 :ref:`pool_connection_invalidation` 

735 

736 """ 

737 

738 if self.invalidated: 

739 return 

740 

741 if self.closed: 

742 raise exc.ResourceClosedError("This Connection is closed") 

743 

744 if self._still_open_and_dbapi_connection_is_valid: 

745 pool_proxied_connection = self._dbapi_connection 

746 assert pool_proxied_connection is not None 

747 pool_proxied_connection.invalidate(exception) 

748 

749 self._dbapi_connection = None 

750 

751 def detach(self) -> None: 

752 """Detach the underlying DB-API connection from its connection pool. 

753 

754 E.g.:: 

755 

756 with engine.connect() as conn: 

757 conn.detach() 

758 conn.execute(text("SET search_path TO schema1, schema2")) 

759 

760 # work with connection 

761 

762 # connection is fully closed (since we used "with:", can 

763 # also call .close()) 

764 

765 This :class:`_engine.Connection` instance will remain usable. 

766 When closed 

767 (or exited from a context manager context as above), 

768 the DB-API connection will be literally closed and not 

769 returned to its originating pool. 

770 

771 This method can be used to insulate the rest of an application 

772 from a modified state on a connection (such as a transaction 

773 isolation level or similar). 

774 

775 """ 

776 

777 if self.closed: 

778 raise exc.ResourceClosedError("This Connection is closed") 

779 

780 pool_proxied_connection = self._dbapi_connection 

781 if pool_proxied_connection is None: 

782 raise exc.InvalidRequestError( 

783 "Can't detach an invalidated Connection" 

784 ) 

785 pool_proxied_connection.detach() 

786 

787 def _autobegin(self) -> None: 

788 if self._allow_autobegin and not self.__in_begin: 

789 self.begin() 

790 

791 def begin(self) -> RootTransaction: 

792 """Begin a transaction prior to autobegin occurring. 

793 

794 E.g.:: 

795 

796 with engine.connect() as conn: 

797 with conn.begin() as trans: 

798 conn.execute(table.insert(), {"username": "sandy"}) 

799 

800 The returned object is an instance of :class:`_engine.RootTransaction`. 

801 This object represents the "scope" of the transaction, 

802 which completes when either the :meth:`_engine.Transaction.rollback` 

803 or :meth:`_engine.Transaction.commit` method is called; the object 

804 also works as a context manager as illustrated above. 

805 

806 The :meth:`_engine.Connection.begin` method begins a 

807 transaction that normally will be begun in any case when the connection 

808 is first used to execute a statement. The reason this method might be 

809 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

810 event at a specific time, or to organize code within the scope of a 

811 connection checkout in terms of context managed blocks, such as:: 

812 

813 with engine.connect() as conn: 

814 with conn.begin(): 

815 conn.execute(...) 

816 conn.execute(...) 

817 

818 with conn.begin(): 

819 conn.execute(...) 

820 conn.execute(...) 

821 

822 The above code is not fundamentally any different in its behavior than 

823 the following code which does not use 

824 :meth:`_engine.Connection.begin`; the below style is known 

825 as "commit as you go" style:: 

826 

827 with engine.connect() as conn: 

828 conn.execute(...) 

829 conn.execute(...) 

830 conn.commit() 

831 

832 conn.execute(...) 

833 conn.execute(...) 

834 conn.commit() 

835 

836 From a database point of view, the :meth:`_engine.Connection.begin` 

837 method does not emit any SQL or change the state of the underlying 

838 DBAPI connection in any way; the Python DBAPI does not have any 

839 concept of explicit transaction begin. 

840 

841 .. seealso:: 

842 

843 :ref:`tutorial_working_with_transactions` - in the 

844 :ref:`unified_tutorial` 

845 

846 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

847 

848 :meth:`_engine.Connection.begin_twophase` - 

849 use a two phase /XID transaction 

850 

851 :meth:`_engine.Engine.begin` - context manager available from 

852 :class:`_engine.Engine` 

853 

854 """ 

855 if self._transaction is None: 

856 self._transaction = RootTransaction(self) 

857 return self._transaction 

858 else: 

859 raise exc.InvalidRequestError( 

860 "This connection has already initialized a SQLAlchemy " 

861 "Transaction() object via begin() or autobegin; can't " 

862 "call begin() here unless rollback() or commit() " 

863 "is called first." 

864 ) 

865 

866 def begin_nested(self) -> NestedTransaction: 

867 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

868 handle that controls the scope of the SAVEPOINT. 

869 

870 E.g.:: 

871 

872 with engine.begin() as connection: 

873 with connection.begin_nested(): 

874 connection.execute(table.insert(), {"username": "sandy"}) 

875 

876 The returned object is an instance of 

877 :class:`_engine.NestedTransaction`, which includes transactional 

878 methods :meth:`_engine.NestedTransaction.commit` and 

879 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

880 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

881 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

882 to the :class:`_engine.NestedTransaction` object and is generated 

883 automatically. Like any other :class:`_engine.Transaction`, the 

884 :class:`_engine.NestedTransaction` may be used as a context manager as 

885 illustrated above which will "release" or "rollback" corresponding to 

886 if the operation within the block were successful or raised an 

887 exception. 

888 

889 Nested transactions require SAVEPOINT support in the underlying 

890 database, else the behavior is undefined. SAVEPOINT is commonly used to 

891 run operations within a transaction that may fail, while continuing the 

892 outer transaction. E.g.:: 

893 

894 from sqlalchemy import exc 

895 

896 with engine.begin() as connection: 

897 trans = connection.begin_nested() 

898 try: 

899 connection.execute(table.insert(), {"username": "sandy"}) 

900 trans.commit() 

901 except exc.IntegrityError: # catch for duplicate username 

902 trans.rollback() # rollback to savepoint 

903 

904 # outer transaction continues 

905 connection.execute(...) 

906 

907 If :meth:`_engine.Connection.begin_nested` is called without first 

908 calling :meth:`_engine.Connection.begin` or 

909 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

910 will "autobegin" the outer transaction first. This outer transaction 

911 may be committed using "commit-as-you-go" style, e.g.:: 

912 

913 with engine.connect() as connection: # begin() wasn't called 

914 

915 with connection.begin_nested(): # will auto-"begin()" first 

916 connection.execute(...) 

917 # savepoint is released 

918 

919 connection.execute(...) 

920 

921 # explicitly commit outer transaction 

922 connection.commit() 

923 

924 # can continue working with connection here 

925 

926 .. versionchanged:: 2.0 

927 

928 :meth:`_engine.Connection.begin_nested` will now participate 

929 in the connection "autobegin" behavior that is new as of 

930 2.0 / "future" style connections in 1.4. 

931 

932 .. seealso:: 

933 

934 :meth:`_engine.Connection.begin` 

935 

936 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

937 

938 """ 

939 if self._transaction is None: 

940 self._autobegin() 

941 

942 return NestedTransaction(self) 

943 

944 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

945 """Begin a two-phase or XA transaction and return a transaction 

946 handle. 

947 

948 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

949 which in addition to the methods provided by 

950 :class:`.Transaction`, also provides a 

951 :meth:`~.TwoPhaseTransaction.prepare` method. 

952 

953 :param xid: the two phase transaction id. If not supplied, a 

954 random id will be generated. 

955 

956 .. seealso:: 

957 

958 :meth:`_engine.Connection.begin` 

959 

960 :meth:`_engine.Connection.begin_twophase` 

961 

962 """ 

963 

964 if self._transaction is not None: 

965 raise exc.InvalidRequestError( 

966 "Cannot start a two phase transaction when a transaction " 

967 "is already in progress." 

968 ) 

969 if xid is None: 

970 xid = self.engine.dialect.create_xid() 

971 return TwoPhaseTransaction(self, xid) 

972 

973 def commit(self) -> None: 

974 """Commit the transaction that is currently in progress. 

975 

976 This method commits the current transaction if one has been started. 

977 If no transaction was started, the method has no effect, assuming 

978 the connection is in a non-invalidated state. 

979 

980 A transaction is begun on a :class:`_engine.Connection` automatically 

981 whenever a statement is first executed, or when the 

982 :meth:`_engine.Connection.begin` method is called. 

983 

984 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

985 the primary database transaction that is linked to the 

986 :class:`_engine.Connection` object. It does not operate upon a 

987 SAVEPOINT that would have been invoked from the 

988 :meth:`_engine.Connection.begin_nested` method; for control of a 

989 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

990 :class:`_engine.NestedTransaction` that is returned by the 

991 :meth:`_engine.Connection.begin_nested` method itself. 

992 

993 

994 """ 

995 if self._transaction: 

996 self._transaction.commit() 

997 

998 def rollback(self) -> None: 

999 """Roll back the transaction that is currently in progress. 

1000 

1001 This method rolls back the current transaction if one has been started. 

1002 If no transaction was started, the method has no effect. If a 

1003 transaction was started and the connection is in an invalidated state, 

1004 the transaction is cleared using this method. 

1005 

1006 A transaction is begun on a :class:`_engine.Connection` automatically 

1007 whenever a statement is first executed, or when the 

1008 :meth:`_engine.Connection.begin` method is called. 

1009 

1010 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1011 upon the primary database transaction that is linked to the 

1012 :class:`_engine.Connection` object. It does not operate upon a 

1013 SAVEPOINT that would have been invoked from the 

1014 :meth:`_engine.Connection.begin_nested` method; for control of a 

1015 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1016 :class:`_engine.NestedTransaction` that is returned by the 

1017 :meth:`_engine.Connection.begin_nested` method itself. 

1018 

1019 

1020 """ 

1021 if self._transaction: 

1022 self._transaction.rollback() 

1023 

1024 def recover_twophase(self) -> List[Any]: 

1025 return self.engine.dialect.do_recover_twophase(self) 

1026 

1027 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1028 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1029 

1030 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1031 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1032 

1033 def in_transaction(self) -> bool: 

1034 """Return True if a transaction is in progress.""" 

1035 return self._transaction is not None and self._transaction.is_active 

1036 

1037 def in_nested_transaction(self) -> bool: 

1038 """Return True if a transaction is in progress.""" 

1039 return ( 

1040 self._nested_transaction is not None 

1041 and self._nested_transaction.is_active 

1042 ) 

1043 

1044 def _is_autocommit_isolation(self) -> bool: 

1045 opt_iso = self._execution_options.get("isolation_level", None) 

1046 return bool( 

1047 opt_iso == "AUTOCOMMIT" 

1048 or ( 

1049 opt_iso is None 

1050 and self.engine.dialect._on_connect_isolation_level 

1051 == "AUTOCOMMIT" 

1052 ) 

1053 ) 

1054 

1055 def _get_required_transaction(self) -> RootTransaction: 

1056 trans = self._transaction 

1057 if trans is None: 

1058 raise exc.InvalidRequestError("connection is not in a transaction") 

1059 return trans 

1060 

1061 def _get_required_nested_transaction(self) -> NestedTransaction: 

1062 trans = self._nested_transaction 

1063 if trans is None: 

1064 raise exc.InvalidRequestError( 

1065 "connection is not in a nested transaction" 

1066 ) 

1067 return trans 

1068 

1069 def get_transaction(self) -> Optional[RootTransaction]: 

1070 """Return the current root transaction in progress, if any. 

1071 

1072 .. versionadded:: 1.4 

1073 

1074 """ 

1075 

1076 return self._transaction 

1077 

1078 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1079 """Return the current nested transaction in progress, if any. 

1080 

1081 .. versionadded:: 1.4 

1082 

1083 """ 

1084 return self._nested_transaction 

1085 

1086 def _begin_impl(self, transaction: RootTransaction) -> None: 

1087 if self._echo: 

1088 if self._is_autocommit_isolation(): 

1089 self._log_info( 

1090 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1091 "autocommit mode)" 

1092 ) 

1093 else: 

1094 self._log_info("BEGIN (implicit)") 

1095 

1096 self.__in_begin = True 

1097 

1098 if self._has_events or self.engine._has_events: 

1099 self.dispatch.begin(self) 

1100 

1101 try: 

1102 self.engine.dialect.do_begin(self.connection) 

1103 except BaseException as e: 

1104 self._handle_dbapi_exception(e, None, None, None, None) 

1105 finally: 

1106 self.__in_begin = False 

1107 

1108 def _rollback_impl(self) -> None: 

1109 if self._has_events or self.engine._has_events: 

1110 self.dispatch.rollback(self) 

1111 

1112 if self._still_open_and_dbapi_connection_is_valid: 

1113 if self._echo: 

1114 if self._is_autocommit_isolation(): 

1115 if self.dialect.skip_autocommit_rollback: 

1116 self._log_info( 

1117 "ROLLBACK will be skipped by " 

1118 "skip_autocommit_rollback" 

1119 ) 

1120 else: 

1121 self._log_info( 

1122 "ROLLBACK using DBAPI connection.rollback(); " 

1123 "set skip_autocommit_rollback to prevent fully" 

1124 ) 

1125 else: 

1126 self._log_info("ROLLBACK") 

1127 try: 

1128 self.engine.dialect.do_rollback(self.connection) 

1129 except BaseException as e: 

1130 self._handle_dbapi_exception(e, None, None, None, None) 

1131 

1132 def _commit_impl(self) -> None: 

1133 if self._has_events or self.engine._has_events: 

1134 self.dispatch.commit(self) 

1135 

1136 if self._echo: 

1137 if self._is_autocommit_isolation(): 

1138 self._log_info( 

1139 "COMMIT using DBAPI connection.commit(), " 

1140 "has no effect due to autocommit mode" 

1141 ) 

1142 else: 

1143 self._log_info("COMMIT") 

1144 try: 

1145 self.engine.dialect.do_commit(self.connection) 

1146 except BaseException as e: 

1147 self._handle_dbapi_exception(e, None, None, None, None) 

1148 

1149 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1150 if self._has_events or self.engine._has_events: 

1151 self.dispatch.savepoint(self, name) 

1152 

1153 if name is None: 

1154 self.__savepoint_seq += 1 

1155 name = "sa_savepoint_%s" % self.__savepoint_seq 

1156 self.engine.dialect.do_savepoint(self, name) 

1157 return name 

1158 

1159 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1160 if self._has_events or self.engine._has_events: 

1161 self.dispatch.rollback_savepoint(self, name, None) 

1162 

1163 if self._still_open_and_dbapi_connection_is_valid: 

1164 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1165 

1166 def _release_savepoint_impl(self, name: str) -> None: 

1167 if self._has_events or self.engine._has_events: 

1168 self.dispatch.release_savepoint(self, name, None) 

1169 

1170 self.engine.dialect.do_release_savepoint(self, name) 

1171 

1172 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1173 if self._echo: 

1174 self._log_info("BEGIN TWOPHASE (implicit)") 

1175 if self._has_events or self.engine._has_events: 

1176 self.dispatch.begin_twophase(self, transaction.xid) 

1177 

1178 self.__in_begin = True 

1179 try: 

1180 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1181 except BaseException as e: 

1182 self._handle_dbapi_exception(e, None, None, None, None) 

1183 finally: 

1184 self.__in_begin = False 

1185 

1186 def _prepare_twophase_impl(self, xid: Any) -> None: 

1187 if self._has_events or self.engine._has_events: 

1188 self.dispatch.prepare_twophase(self, xid) 

1189 

1190 assert isinstance(self._transaction, TwoPhaseTransaction) 

1191 try: 

1192 self.engine.dialect.do_prepare_twophase(self, xid) 

1193 except BaseException as e: 

1194 self._handle_dbapi_exception(e, None, None, None, None) 

1195 

1196 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1197 if self._has_events or self.engine._has_events: 

1198 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1199 

1200 if self._still_open_and_dbapi_connection_is_valid: 

1201 assert isinstance(self._transaction, TwoPhaseTransaction) 

1202 try: 

1203 self.engine.dialect.do_rollback_twophase( 

1204 self, xid, is_prepared 

1205 ) 

1206 except BaseException as e: 

1207 self._handle_dbapi_exception(e, None, None, None, None) 

1208 

1209 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1210 if self._has_events or self.engine._has_events: 

1211 self.dispatch.commit_twophase(self, xid, is_prepared) 

1212 

1213 assert isinstance(self._transaction, TwoPhaseTransaction) 

1214 try: 

1215 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1216 except BaseException as e: 

1217 self._handle_dbapi_exception(e, None, None, None, None) 

1218 

1219 def close(self) -> None: 

1220 """Close this :class:`_engine.Connection`. 

1221 

1222 This results in a release of the underlying database 

1223 resources, that is, the DBAPI connection referenced 

1224 internally. The DBAPI connection is typically restored 

1225 back to the connection-holding :class:`_pool.Pool` referenced 

1226 by the :class:`_engine.Engine` that produced this 

1227 :class:`_engine.Connection`. Any transactional state present on 

1228 the DBAPI connection is also unconditionally released via 

1229 the DBAPI connection's ``rollback()`` method, regardless 

1230 of any :class:`.Transaction` object that may be 

1231 outstanding with regards to this :class:`_engine.Connection`. 

1232 

1233 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1234 if any transaction is in place. 

1235 

1236 After :meth:`_engine.Connection.close` is called, the 

1237 :class:`_engine.Connection` is permanently in a closed state, 

1238 and will allow no further operations. 

1239 

1240 """ 

1241 

1242 if self._transaction: 

1243 self._transaction.close() 

1244 skip_reset = True 

1245 else: 

1246 skip_reset = False 

1247 

1248 if self._dbapi_connection is not None: 

1249 conn = self._dbapi_connection 

1250 

1251 # as we just closed the transaction, close the connection 

1252 # pool connection without doing an additional reset 

1253 if skip_reset: 

1254 cast("_ConnectionFairy", conn)._close_special( 

1255 transaction_reset=True 

1256 ) 

1257 else: 

1258 conn.close() 

1259 

1260 # There is a slight chance that conn.close() may have 

1261 # triggered an invalidation here in which case 

1262 # _dbapi_connection would already be None, however usually 

1263 # it will be non-None here and in a "closed" state. 

1264 self._dbapi_connection = None 

1265 self.__can_reconnect = False 

1266 

1267 @overload 

1268 def scalar( 

1269 self, 

1270 statement: TypedReturnsRows[Tuple[_T]], 

1271 parameters: Optional[_CoreSingleExecuteParams] = None, 

1272 *, 

1273 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1274 ) -> Optional[_T]: ... 

1275 

1276 @overload 

1277 def scalar( 

1278 self, 

1279 statement: Executable, 

1280 parameters: Optional[_CoreSingleExecuteParams] = None, 

1281 *, 

1282 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1283 ) -> Any: ... 

1284 

1285 def scalar( 

1286 self, 

1287 statement: Executable, 

1288 parameters: Optional[_CoreSingleExecuteParams] = None, 

1289 *, 

1290 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1291 ) -> Any: 

1292 r"""Executes a SQL statement construct and returns a scalar object. 

1293 

1294 This method is shorthand for invoking the 

1295 :meth:`_engine.Result.scalar` method after invoking the 

1296 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1297 

1298 :return: a scalar Python value representing the first column of the 

1299 first row returned. 

1300 

1301 """ 

1302 distilled_parameters = _distill_params_20(parameters) 

1303 try: 

1304 meth = statement._execute_on_scalar 

1305 except AttributeError as err: 

1306 raise exc.ObjectNotExecutableError(statement) from err 

1307 else: 

1308 return meth( 

1309 self, 

1310 distilled_parameters, 

1311 execution_options or NO_OPTIONS, 

1312 ) 

1313 

1314 @overload 

1315 def scalars( 

1316 self, 

1317 statement: TypedReturnsRows[Tuple[_T]], 

1318 parameters: Optional[_CoreAnyExecuteParams] = None, 

1319 *, 

1320 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1321 ) -> ScalarResult[_T]: ... 

1322 

1323 @overload 

1324 def scalars( 

1325 self, 

1326 statement: Executable, 

1327 parameters: Optional[_CoreAnyExecuteParams] = None, 

1328 *, 

1329 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1330 ) -> ScalarResult[Any]: ... 

1331 

1332 def scalars( 

1333 self, 

1334 statement: Executable, 

1335 parameters: Optional[_CoreAnyExecuteParams] = None, 

1336 *, 

1337 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1338 ) -> ScalarResult[Any]: 

1339 """Executes and returns a scalar result set, which yields scalar values 

1340 from the first column of each row. 

1341 

1342 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1343 to receive a :class:`_result.Result` object, then invoking the 

1344 :meth:`_result.Result.scalars` method to produce a 

1345 :class:`_result.ScalarResult` instance. 

1346 

1347 :return: a :class:`_result.ScalarResult` 

1348 

1349 .. versionadded:: 1.4.24 

1350 

1351 """ 

1352 

1353 return self.execute( 

1354 statement, parameters, execution_options=execution_options 

1355 ).scalars() 

1356 

1357 @overload 

1358 def execute( 

1359 self, 

1360 statement: TypedReturnsRows[_T], 

1361 parameters: Optional[_CoreAnyExecuteParams] = None, 

1362 *, 

1363 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1364 ) -> CursorResult[_T]: ... 

1365 

1366 @overload 

1367 def execute( 

1368 self, 

1369 statement: Executable, 

1370 parameters: Optional[_CoreAnyExecuteParams] = None, 

1371 *, 

1372 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1373 ) -> CursorResult[Any]: ... 

1374 

1375 def execute( 

1376 self, 

1377 statement: Executable, 

1378 parameters: Optional[_CoreAnyExecuteParams] = None, 

1379 *, 

1380 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1381 ) -> CursorResult[Any]: 

1382 r"""Executes a SQL statement construct and returns a 

1383 :class:`_engine.CursorResult`. 

1384 

1385 :param statement: The statement to be executed. This is always 

1386 an object that is in both the :class:`_expression.ClauseElement` and 

1387 :class:`_expression.Executable` hierarchies, including: 

1388 

1389 * :class:`_expression.Select` 

1390 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1391 :class:`_expression.Delete` 

1392 * :class:`_expression.TextClause` and 

1393 :class:`_expression.TextualSelect` 

1394 * :class:`_schema.DDL` and objects which inherit from 

1395 :class:`_schema.ExecutableDDLElement` 

1396 

1397 :param parameters: parameters which will be bound into the statement. 

1398 This may be either a dictionary of parameter names to values, 

1399 or a mutable sequence (e.g. a list) of dictionaries. When a 

1400 list of dictionaries is passed, the underlying statement execution 

1401 will make use of the DBAPI ``cursor.executemany()`` method. 

1402 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1403 method will be used. 

1404 

1405 :param execution_options: optional dictionary of execution options, 

1406 which will be associated with the statement execution. This 

1407 dictionary can provide a subset of the options that are accepted 

1408 by :meth:`_engine.Connection.execution_options`. 

1409 

1410 :return: a :class:`_engine.Result` object. 

1411 

1412 """ 

1413 distilled_parameters = _distill_params_20(parameters) 

1414 try: 

1415 meth = statement._execute_on_connection 

1416 except AttributeError as err: 

1417 raise exc.ObjectNotExecutableError(statement) from err 

1418 else: 

1419 return meth( 

1420 self, 

1421 distilled_parameters, 

1422 execution_options or NO_OPTIONS, 

1423 ) 

1424 

1425 def _execute_function( 

1426 self, 

1427 func: FunctionElement[Any], 

1428 distilled_parameters: _CoreMultiExecuteParams, 

1429 execution_options: CoreExecuteOptionsParameter, 

1430 ) -> CursorResult[Any]: 

1431 """Execute a sql.FunctionElement object.""" 

1432 

1433 return self._execute_clauseelement( 

1434 func.select(), distilled_parameters, execution_options 

1435 ) 

1436 

1437 def _execute_default( 

1438 self, 

1439 default: DefaultGenerator, 

1440 distilled_parameters: _CoreMultiExecuteParams, 

1441 execution_options: CoreExecuteOptionsParameter, 

1442 ) -> Any: 

1443 """Execute a schema.ColumnDefault object.""" 

1444 

1445 execution_options = self._execution_options.merge_with( 

1446 execution_options 

1447 ) 

1448 

1449 event_multiparams: Optional[_CoreMultiExecuteParams] 

1450 event_params: Optional[_CoreAnyExecuteParams] 

1451 

1452 # note for event handlers, the "distilled parameters" which is always 

1453 # a list of dicts is broken out into separate "multiparams" and 

1454 # "params" collections, which allows the handler to distinguish 

1455 # between an executemany and execute style set of parameters. 

1456 if self._has_events or self.engine._has_events: 

1457 ( 

1458 default, 

1459 distilled_parameters, 

1460 event_multiparams, 

1461 event_params, 

1462 ) = self._invoke_before_exec_event( 

1463 default, distilled_parameters, execution_options 

1464 ) 

1465 else: 

1466 event_multiparams = event_params = None 

1467 

1468 try: 

1469 conn = self._dbapi_connection 

1470 if conn is None: 

1471 conn = self._revalidate_connection() 

1472 

1473 dialect = self.dialect 

1474 ctx = dialect.execution_ctx_cls._init_default( 

1475 dialect, self, conn, execution_options 

1476 ) 

1477 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1478 raise 

1479 except BaseException as e: 

1480 self._handle_dbapi_exception(e, None, None, None, None) 

1481 

1482 ret = ctx._exec_default(None, default, None) 

1483 

1484 if self._has_events or self.engine._has_events: 

1485 self.dispatch.after_execute( 

1486 self, 

1487 default, 

1488 event_multiparams, 

1489 event_params, 

1490 execution_options, 

1491 ret, 

1492 ) 

1493 

1494 return ret 

1495 

1496 def _execute_ddl( 

1497 self, 

1498 ddl: ExecutableDDLElement, 

1499 distilled_parameters: _CoreMultiExecuteParams, 

1500 execution_options: CoreExecuteOptionsParameter, 

1501 ) -> CursorResult[Any]: 

1502 """Execute a schema.DDL object.""" 

1503 

1504 exec_opts = ddl._execution_options.merge_with( 

1505 self._execution_options, execution_options 

1506 ) 

1507 

1508 event_multiparams: Optional[_CoreMultiExecuteParams] 

1509 event_params: Optional[_CoreSingleExecuteParams] 

1510 

1511 if self._has_events or self.engine._has_events: 

1512 ( 

1513 ddl, 

1514 distilled_parameters, 

1515 event_multiparams, 

1516 event_params, 

1517 ) = self._invoke_before_exec_event( 

1518 ddl, distilled_parameters, exec_opts 

1519 ) 

1520 else: 

1521 event_multiparams = event_params = None 

1522 

1523 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1524 

1525 dialect = self.dialect 

1526 

1527 compiled = ddl.compile( 

1528 dialect=dialect, schema_translate_map=schema_translate_map 

1529 ) 

1530 ret = self._execute_context( 

1531 dialect, 

1532 dialect.execution_ctx_cls._init_ddl, 

1533 compiled, 

1534 None, 

1535 exec_opts, 

1536 compiled, 

1537 ) 

1538 if self._has_events or self.engine._has_events: 

1539 self.dispatch.after_execute( 

1540 self, 

1541 ddl, 

1542 event_multiparams, 

1543 event_params, 

1544 exec_opts, 

1545 ret, 

1546 ) 

1547 return ret 

1548 

1549 def _invoke_before_exec_event( 

1550 self, 

1551 elem: Any, 

1552 distilled_params: _CoreMultiExecuteParams, 

1553 execution_options: _ExecuteOptions, 

1554 ) -> Tuple[ 

1555 Any, 

1556 _CoreMultiExecuteParams, 

1557 _CoreMultiExecuteParams, 

1558 _CoreSingleExecuteParams, 

1559 ]: 

1560 event_multiparams: _CoreMultiExecuteParams 

1561 event_params: _CoreSingleExecuteParams 

1562 

1563 if len(distilled_params) == 1: 

1564 event_multiparams, event_params = [], distilled_params[0] 

1565 else: 

1566 event_multiparams, event_params = distilled_params, {} 

1567 

1568 for fn in self.dispatch.before_execute: 

1569 elem, event_multiparams, event_params = fn( 

1570 self, 

1571 elem, 

1572 event_multiparams, 

1573 event_params, 

1574 execution_options, 

1575 ) 

1576 

1577 if event_multiparams: 

1578 distilled_params = list(event_multiparams) 

1579 if event_params: 

1580 raise exc.InvalidRequestError( 

1581 "Event handler can't return non-empty multiparams " 

1582 "and params at the same time" 

1583 ) 

1584 elif event_params: 

1585 distilled_params = [event_params] 

1586 else: 

1587 distilled_params = [] 

1588 

1589 return elem, distilled_params, event_multiparams, event_params 

1590 

1591 def _execute_clauseelement( 

1592 self, 

1593 elem: Executable, 

1594 distilled_parameters: _CoreMultiExecuteParams, 

1595 execution_options: CoreExecuteOptionsParameter, 

1596 ) -> CursorResult[Any]: 

1597 """Execute a sql.ClauseElement object.""" 

1598 

1599 execution_options = elem._execution_options.merge_with( 

1600 self._execution_options, execution_options 

1601 ) 

1602 

1603 has_events = self._has_events or self.engine._has_events 

1604 if has_events: 

1605 ( 

1606 elem, 

1607 distilled_parameters, 

1608 event_multiparams, 

1609 event_params, 

1610 ) = self._invoke_before_exec_event( 

1611 elem, distilled_parameters, execution_options 

1612 ) 

1613 

1614 if distilled_parameters: 

1615 # ensure we don't retain a link to the view object for keys() 

1616 # which links to the values, which we don't want to cache 

1617 keys = sorted(distilled_parameters[0]) 

1618 for_executemany = len(distilled_parameters) > 1 

1619 else: 

1620 keys = [] 

1621 for_executemany = False 

1622 

1623 dialect = self.dialect 

1624 

1625 schema_translate_map = execution_options.get( 

1626 "schema_translate_map", None 

1627 ) 

1628 

1629 compiled_cache: Optional[CompiledCacheType] = execution_options.get( 

1630 "compiled_cache", self.engine._compiled_cache 

1631 ) 

1632 

1633 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 

1634 dialect=dialect, 

1635 compiled_cache=compiled_cache, 

1636 column_keys=keys, 

1637 for_executemany=for_executemany, 

1638 schema_translate_map=schema_translate_map, 

1639 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1640 ) 

1641 ret = self._execute_context( 

1642 dialect, 

1643 dialect.execution_ctx_cls._init_compiled, 

1644 compiled_sql, 

1645 distilled_parameters, 

1646 execution_options, 

1647 compiled_sql, 

1648 distilled_parameters, 

1649 elem, 

1650 extracted_params, 

1651 cache_hit=cache_hit, 

1652 ) 

1653 if has_events: 

1654 self.dispatch.after_execute( 

1655 self, 

1656 elem, 

1657 event_multiparams, 

1658 event_params, 

1659 execution_options, 

1660 ret, 

1661 ) 

1662 return ret 

1663 

1664 def _execute_compiled( 

1665 self, 

1666 compiled: Compiled, 

1667 distilled_parameters: _CoreMultiExecuteParams, 

1668 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS, 

1669 ) -> CursorResult[Any]: 

1670 """Execute a sql.Compiled object. 

1671 

1672 TODO: why do we have this? likely deprecate or remove 

1673 

1674 """ 

1675 

1676 execution_options = compiled.execution_options.merge_with( 

1677 self._execution_options, execution_options 

1678 ) 

1679 

1680 if self._has_events or self.engine._has_events: 

1681 ( 

1682 compiled, 

1683 distilled_parameters, 

1684 event_multiparams, 

1685 event_params, 

1686 ) = self._invoke_before_exec_event( 

1687 compiled, distilled_parameters, execution_options 

1688 ) 

1689 

1690 dialect = self.dialect 

1691 

1692 ret = self._execute_context( 

1693 dialect, 

1694 dialect.execution_ctx_cls._init_compiled, 

1695 compiled, 

1696 distilled_parameters, 

1697 execution_options, 

1698 compiled, 

1699 distilled_parameters, 

1700 None, 

1701 None, 

1702 ) 

1703 if self._has_events or self.engine._has_events: 

1704 self.dispatch.after_execute( 

1705 self, 

1706 compiled, 

1707 event_multiparams, 

1708 event_params, 

1709 execution_options, 

1710 ret, 

1711 ) 

1712 return ret 

1713 

1714 def exec_driver_sql( 

1715 self, 

1716 statement: str, 

1717 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1718 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1719 ) -> CursorResult[Any]: 

1720 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1721 without any SQL compilation steps. 

1722 

1723 This can be used to pass any string directly to the 

1724 ``cursor.execute()`` method of the DBAPI in use. 

1725 

1726 :param statement: The statement str to be executed. Bound parameters 

1727 must use the underlying DBAPI's paramstyle, such as "qmark", 

1728 "pyformat", "format", etc. 

1729 

1730 :param parameters: represent bound parameter values to be used in the 

1731 execution. The format is one of: a dictionary of named parameters, 

1732 a tuple of positional parameters, or a list containing either 

1733 dictionaries or tuples for multiple-execute support. 

1734 

1735 :return: a :class:`_engine.CursorResult`. 

1736 

1737 E.g. multiple dictionaries:: 

1738 

1739 

1740 conn.exec_driver_sql( 

1741 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1742 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}], 

1743 ) 

1744 

1745 Single dictionary:: 

1746 

1747 conn.exec_driver_sql( 

1748 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1749 dict(id=1, value="v1"), 

1750 ) 

1751 

1752 Single tuple:: 

1753 

1754 conn.exec_driver_sql( 

1755 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1") 

1756 ) 

1757 

1758 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1759 not participate in the 

1760 :meth:`_events.ConnectionEvents.before_execute` and 

1761 :meth:`_events.ConnectionEvents.after_execute` events. To 

1762 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1763 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1764 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1765 

1766 .. seealso:: 

1767 

1768 :pep:`249` 

1769 

1770 """ 

1771 

1772 distilled_parameters = _distill_raw_params(parameters) 

1773 

1774 execution_options = self._execution_options.merge_with( 

1775 execution_options 

1776 ) 

1777 

1778 dialect = self.dialect 

1779 ret = self._execute_context( 

1780 dialect, 

1781 dialect.execution_ctx_cls._init_statement, 

1782 statement, 

1783 None, 

1784 execution_options, 

1785 statement, 

1786 distilled_parameters, 

1787 ) 

1788 

1789 return ret 

1790 

1791 def _execute_context( 

1792 self, 

1793 dialect: Dialect, 

1794 constructor: Callable[..., ExecutionContext], 

1795 statement: Union[str, Compiled], 

1796 parameters: Optional[_AnyMultiExecuteParams], 

1797 execution_options: _ExecuteOptions, 

1798 *args: Any, 

1799 **kw: Any, 

1800 ) -> CursorResult[Any]: 

1801 """Create an :class:`.ExecutionContext` and execute, returning 

1802 a :class:`_engine.CursorResult`.""" 

1803 

1804 if execution_options: 

1805 yp = execution_options.get("yield_per", None) 

1806 if yp: 

1807 execution_options = execution_options.union( 

1808 {"stream_results": True, "max_row_buffer": yp} 

1809 ) 

1810 try: 

1811 conn = self._dbapi_connection 

1812 if conn is None: 

1813 conn = self._revalidate_connection() 

1814 

1815 context = constructor( 

1816 dialect, self, conn, execution_options, *args, **kw 

1817 ) 

1818 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1819 raise 

1820 except BaseException as e: 

1821 self._handle_dbapi_exception( 

1822 e, str(statement), parameters, None, None 

1823 ) 

1824 

1825 if ( 

1826 self._transaction 

1827 and not self._transaction.is_active 

1828 or ( 

1829 self._nested_transaction 

1830 and not self._nested_transaction.is_active 

1831 ) 

1832 ): 

1833 self._invalid_transaction() 

1834 

1835 elif self._trans_context_manager: 

1836 TransactionalContext._trans_ctx_check(self) 

1837 

1838 if self._transaction is None: 

1839 self._autobegin() 

1840 

1841 context.pre_exec() 

1842 

1843 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1844 return self._exec_insertmany_context(dialect, context) 

1845 else: 

1846 return self._exec_single_context( 

1847 dialect, context, statement, parameters 

1848 ) 

1849 

1850 def _exec_single_context( 

1851 self, 

1852 dialect: Dialect, 

1853 context: ExecutionContext, 

1854 statement: Union[str, Compiled], 

1855 parameters: Optional[_AnyMultiExecuteParams], 

1856 ) -> CursorResult[Any]: 

1857 """continue the _execute_context() method for a single DBAPI 

1858 cursor.execute() or cursor.executemany() call. 

1859 

1860 """ 

1861 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1862 generic_setinputsizes = context._prepare_set_input_sizes() 

1863 

1864 if generic_setinputsizes: 

1865 try: 

1866 dialect.do_set_input_sizes( 

1867 context.cursor, generic_setinputsizes, context 

1868 ) 

1869 except BaseException as e: 

1870 self._handle_dbapi_exception( 

1871 e, str(statement), parameters, None, context 

1872 ) 

1873 

1874 cursor, str_statement, parameters = ( 

1875 context.cursor, 

1876 context.statement, 

1877 context.parameters, 

1878 ) 

1879 

1880 effective_parameters: Optional[_AnyExecuteParams] 

1881 

1882 if not context.executemany: 

1883 effective_parameters = parameters[0] 

1884 else: 

1885 effective_parameters = parameters 

1886 

1887 if self._has_events or self.engine._has_events: 

1888 for fn in self.dispatch.before_cursor_execute: 

1889 str_statement, effective_parameters = fn( 

1890 self, 

1891 cursor, 

1892 str_statement, 

1893 effective_parameters, 

1894 context, 

1895 context.executemany, 

1896 ) 

1897 

1898 if self._echo: 

1899 self._log_info(str_statement) 

1900 

1901 stats = context._get_cache_stats() 

1902 

1903 if not self.engine.hide_parameters: 

1904 self._log_info( 

1905 "[%s] %r", 

1906 stats, 

1907 sql_util._repr_params( 

1908 effective_parameters, 

1909 batches=10, 

1910 ismulti=context.executemany, 

1911 ), 

1912 ) 

1913 else: 

1914 self._log_info( 

1915 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1916 stats, 

1917 ) 

1918 

1919 evt_handled: bool = False 

1920 try: 

1921 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1922 effective_parameters = cast( 

1923 "_CoreMultiExecuteParams", effective_parameters 

1924 ) 

1925 if self.dialect._has_events: 

1926 for fn in self.dialect.dispatch.do_executemany: 

1927 if fn( 

1928 cursor, 

1929 str_statement, 

1930 effective_parameters, 

1931 context, 

1932 ): 

1933 evt_handled = True 

1934 break 

1935 if not evt_handled: 

1936 self.dialect.do_executemany( 

1937 cursor, 

1938 str_statement, 

1939 effective_parameters, 

1940 context, 

1941 ) 

1942 elif not effective_parameters and context.no_parameters: 

1943 if self.dialect._has_events: 

1944 for fn in self.dialect.dispatch.do_execute_no_params: 

1945 if fn(cursor, str_statement, context): 

1946 evt_handled = True 

1947 break 

1948 if not evt_handled: 

1949 self.dialect.do_execute_no_params( 

1950 cursor, str_statement, context 

1951 ) 

1952 else: 

1953 effective_parameters = cast( 

1954 "_CoreSingleExecuteParams", effective_parameters 

1955 ) 

1956 if self.dialect._has_events: 

1957 for fn in self.dialect.dispatch.do_execute: 

1958 if fn( 

1959 cursor, 

1960 str_statement, 

1961 effective_parameters, 

1962 context, 

1963 ): 

1964 evt_handled = True 

1965 break 

1966 if not evt_handled: 

1967 self.dialect.do_execute( 

1968 cursor, str_statement, effective_parameters, context 

1969 ) 

1970 

1971 if self._has_events or self.engine._has_events: 

1972 self.dispatch.after_cursor_execute( 

1973 self, 

1974 cursor, 

1975 str_statement, 

1976 effective_parameters, 

1977 context, 

1978 context.executemany, 

1979 ) 

1980 

1981 context.post_exec() 

1982 

1983 result = context._setup_result_proxy() 

1984 

1985 except BaseException as e: 

1986 self._handle_dbapi_exception( 

1987 e, str_statement, effective_parameters, cursor, context 

1988 ) 

1989 

1990 return result 

1991 

1992 def _exec_insertmany_context( 

1993 self, 

1994 dialect: Dialect, 

1995 context: ExecutionContext, 

1996 ) -> CursorResult[Any]: 

1997 """continue the _execute_context() method for an "insertmanyvalues" 

1998 operation, which will invoke DBAPI 

1999 cursor.execute() one or more times with individual log and 

2000 event hook calls. 

2001 

2002 """ 

2003 

2004 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

2005 generic_setinputsizes = context._prepare_set_input_sizes() 

2006 else: 

2007 generic_setinputsizes = None 

2008 

2009 cursor, str_statement, parameters = ( 

2010 context.cursor, 

2011 context.statement, 

2012 context.parameters, 

2013 ) 

2014 

2015 effective_parameters = parameters 

2016 

2017 engine_events = self._has_events or self.engine._has_events 

2018 if self.dialect._has_events: 

2019 do_execute_dispatch: Iterable[Any] = ( 

2020 self.dialect.dispatch.do_execute 

2021 ) 

2022 else: 

2023 do_execute_dispatch = () 

2024 

2025 if engine_events: 

2026 _WORKAROUND_ISSUE_13018 = getattr( 

2027 self, "_WORKAROUND_ISSUE_13018", False 

2028 ) 

2029 else: 

2030 _WORKAROUND_ISSUE_13018 = False 

2031 

2032 if self._echo: 

2033 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2034 

2035 preserve_rowcount = context.execution_options.get( 

2036 "preserve_rowcount", False 

2037 ) 

2038 rowcount = 0 

2039 

2040 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2041 self, 

2042 cursor, 

2043 str_statement, 

2044 effective_parameters, 

2045 generic_setinputsizes, 

2046 context, 

2047 ): 

2048 if imv_batch.processed_setinputsizes: 

2049 try: 

2050 dialect.do_set_input_sizes( 

2051 context.cursor, 

2052 imv_batch.processed_setinputsizes, 

2053 context, 

2054 ) 

2055 except BaseException as e: 

2056 self._handle_dbapi_exception( 

2057 e, 

2058 sql_util._long_statement(imv_batch.replaced_statement), 

2059 imv_batch.replaced_parameters, 

2060 None, 

2061 context, 

2062 is_sub_exec=True, 

2063 ) 

2064 

2065 sub_stmt = imv_batch.replaced_statement 

2066 sub_params = imv_batch.replaced_parameters 

2067 

2068 if engine_events: 

2069 for fn in self.dispatch.before_cursor_execute: 

2070 sub_stmt, sub_params = fn( 

2071 self, 

2072 cursor, 

2073 sub_stmt, 

2074 sub_params, 

2075 context, 

2076 True, 

2077 ) 

2078 

2079 if self._echo: 

2080 self._log_info(sql_util._long_statement(sub_stmt)) 

2081 

2082 imv_stats = f""" {imv_batch.batchnum}/{ 

2083 imv_batch.total_batches 

2084 } ({ 

2085 'ordered' 

2086 if imv_batch.rows_sorted else 'unordered' 

2087 }{ 

2088 '; batch not supported' 

2089 if imv_batch.is_downgraded 

2090 else '' 

2091 })""" 

2092 

2093 if imv_batch.batchnum == 1: 

2094 stats += imv_stats 

2095 else: 

2096 stats = f"insertmanyvalues{imv_stats}" 

2097 

2098 if not self.engine.hide_parameters: 

2099 self._log_info( 

2100 "[%s] %r", 

2101 stats, 

2102 sql_util._repr_params( 

2103 sub_params, 

2104 batches=10, 

2105 ismulti=False, 

2106 ), 

2107 ) 

2108 else: 

2109 self._log_info( 

2110 "[%s] [SQL parameters hidden due to " 

2111 "hide_parameters=True]", 

2112 stats, 

2113 ) 

2114 

2115 try: 

2116 for fn in do_execute_dispatch: 

2117 if fn( 

2118 cursor, 

2119 sub_stmt, 

2120 sub_params, 

2121 context, 

2122 ): 

2123 break 

2124 else: 

2125 dialect.do_execute( 

2126 cursor, 

2127 sub_stmt, 

2128 sub_params, 

2129 context, 

2130 ) 

2131 

2132 except BaseException as e: 

2133 self._handle_dbapi_exception( 

2134 e, 

2135 sql_util._long_statement(sub_stmt), 

2136 sub_params, 

2137 cursor, 

2138 context, 

2139 is_sub_exec=True, 

2140 ) 

2141 

2142 if engine_events: 

2143 self.dispatch.after_cursor_execute( 

2144 self, 

2145 cursor, 

2146 # TODO: this will be fixed by #13018 

2147 sub_stmt if _WORKAROUND_ISSUE_13018 else str_statement, 

2148 sub_params if _WORKAROUND_ISSUE_13018 else parameters, 

2149 context, 

2150 context.executemany, 

2151 ) 

2152 

2153 if preserve_rowcount: 

2154 rowcount += imv_batch.current_batch_size 

2155 

2156 try: 

2157 context.post_exec() 

2158 

2159 if preserve_rowcount: 

2160 context._rowcount = rowcount # type: ignore[attr-defined] 

2161 

2162 result = context._setup_result_proxy() 

2163 

2164 except BaseException as e: 

2165 self._handle_dbapi_exception( 

2166 e, str_statement, effective_parameters, cursor, context 

2167 ) 

2168 

2169 return result 

2170 

2171 def _cursor_execute( 

2172 self, 

2173 cursor: DBAPICursor, 

2174 statement: str, 

2175 parameters: _DBAPISingleExecuteParams, 

2176 context: Optional[ExecutionContext] = None, 

2177 ) -> None: 

2178 """Execute a statement + params on the given cursor. 

2179 

2180 Adds appropriate logging and exception handling. 

2181 

2182 This method is used by DefaultDialect for special-case 

2183 executions, such as for sequences and column defaults. 

2184 The path of statement execution in the majority of cases 

2185 terminates at _execute_context(). 

2186 

2187 """ 

2188 if self._has_events or self.engine._has_events: 

2189 for fn in self.dispatch.before_cursor_execute: 

2190 statement, parameters = fn( 

2191 self, cursor, statement, parameters, context, False 

2192 ) 

2193 

2194 if self._echo: 

2195 self._log_info(statement) 

2196 self._log_info("[raw sql] %r", parameters) 

2197 try: 

2198 for fn in ( 

2199 () 

2200 if not self.dialect._has_events 

2201 else self.dialect.dispatch.do_execute 

2202 ): 

2203 if fn(cursor, statement, parameters, context): 

2204 break 

2205 else: 

2206 self.dialect.do_execute(cursor, statement, parameters, context) 

2207 except BaseException as e: 

2208 self._handle_dbapi_exception( 

2209 e, statement, parameters, cursor, context 

2210 ) 

2211 

2212 if self._has_events or self.engine._has_events: 

2213 self.dispatch.after_cursor_execute( 

2214 self, cursor, statement, parameters, context, False 

2215 ) 

2216 

2217 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2218 """Close the given cursor, catching exceptions 

2219 and turning into log warnings. 

2220 

2221 """ 

2222 try: 

2223 cursor.close() 

2224 except Exception: 

2225 # log the error through the connection pool's logger. 

2226 self.engine.pool.logger.error( 

2227 "Error closing cursor", exc_info=True 

2228 ) 

2229 

2230 _reentrant_error = False 

2231 _is_disconnect = False 

2232 

2233 def _handle_dbapi_exception( 

2234 self, 

2235 e: BaseException, 

2236 statement: Optional[str], 

2237 parameters: Optional[_AnyExecuteParams], 

2238 cursor: Optional[DBAPICursor], 

2239 context: Optional[ExecutionContext], 

2240 is_sub_exec: bool = False, 

2241 ) -> NoReturn: 

2242 exc_info = sys.exc_info() 

2243 

2244 is_exit_exception = util.is_exit_exception(e) 

2245 

2246 if not self._is_disconnect: 

2247 self._is_disconnect = ( 

2248 isinstance(e, self.dialect.loaded_dbapi.Error) 

2249 and not self.closed 

2250 and self.dialect.is_disconnect( 

2251 e, 

2252 self._dbapi_connection if not self.invalidated else None, 

2253 cursor, 

2254 ) 

2255 ) or (is_exit_exception and not self.closed) 

2256 

2257 invalidate_pool_on_disconnect = not is_exit_exception 

2258 

2259 ismulti: bool = ( 

2260 not is_sub_exec and context.executemany 

2261 if context is not None 

2262 else False 

2263 ) 

2264 if self._reentrant_error: 

2265 raise exc.DBAPIError.instance( 

2266 statement, 

2267 parameters, 

2268 e, 

2269 self.dialect.loaded_dbapi.Error, 

2270 hide_parameters=self.engine.hide_parameters, 

2271 dialect=self.dialect, 

2272 ismulti=ismulti, 

2273 ).with_traceback(exc_info[2]) from e 

2274 self._reentrant_error = True 

2275 try: 

2276 # non-DBAPI error - if we already got a context, 

2277 # or there's no string statement, don't wrap it 

2278 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2279 statement is not None 

2280 and context is None 

2281 and not is_exit_exception 

2282 ) 

2283 

2284 if should_wrap: 

2285 sqlalchemy_exception = exc.DBAPIError.instance( 

2286 statement, 

2287 parameters, 

2288 cast(Exception, e), 

2289 self.dialect.loaded_dbapi.Error, 

2290 hide_parameters=self.engine.hide_parameters, 

2291 connection_invalidated=self._is_disconnect, 

2292 dialect=self.dialect, 

2293 ismulti=ismulti, 

2294 ) 

2295 else: 

2296 sqlalchemy_exception = None 

2297 

2298 newraise = None 

2299 

2300 if (self.dialect._has_events) and not self._execution_options.get( 

2301 "skip_user_error_events", False 

2302 ): 

2303 ctx = ExceptionContextImpl( 

2304 e, 

2305 sqlalchemy_exception, 

2306 self.engine, 

2307 self.dialect, 

2308 self, 

2309 cursor, 

2310 statement, 

2311 parameters, 

2312 context, 

2313 self._is_disconnect, 

2314 invalidate_pool_on_disconnect, 

2315 False, 

2316 ) 

2317 

2318 for fn in self.dialect.dispatch.handle_error: 

2319 try: 

2320 # handler returns an exception; 

2321 # call next handler in a chain 

2322 per_fn = fn(ctx) 

2323 if per_fn is not None: 

2324 ctx.chained_exception = newraise = per_fn 

2325 except Exception as _raised: 

2326 # handler raises an exception - stop processing 

2327 newraise = _raised 

2328 break 

2329 

2330 if self._is_disconnect != ctx.is_disconnect: 

2331 self._is_disconnect = ctx.is_disconnect 

2332 if sqlalchemy_exception: 

2333 sqlalchemy_exception.connection_invalidated = ( 

2334 ctx.is_disconnect 

2335 ) 

2336 

2337 # set up potentially user-defined value for 

2338 # invalidate pool. 

2339 invalidate_pool_on_disconnect = ( 

2340 ctx.invalidate_pool_on_disconnect 

2341 ) 

2342 

2343 if should_wrap and context: 

2344 context.handle_dbapi_exception(e) 

2345 

2346 if not self._is_disconnect: 

2347 if cursor: 

2348 self._safe_close_cursor(cursor) 

2349 # "autorollback" was mostly relevant in 1.x series. 

2350 # It's very unlikely to reach here, as the connection 

2351 # does autobegin so when we are here, we are usually 

2352 # in an explicit / semi-explicit transaction. 

2353 # however we have a test which manufactures this 

2354 # scenario in any case using an event handler. 

2355 # test/engine/test_execute.py-> test_actual_autorollback 

2356 if not self.in_transaction(): 

2357 self._rollback_impl() 

2358 

2359 if newraise: 

2360 raise newraise.with_traceback(exc_info[2]) from e 

2361 elif should_wrap: 

2362 assert sqlalchemy_exception is not None 

2363 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2364 else: 

2365 assert exc_info[1] is not None 

2366 raise exc_info[1].with_traceback(exc_info[2]) 

2367 finally: 

2368 del self._reentrant_error 

2369 if self._is_disconnect: 

2370 del self._is_disconnect 

2371 if not self.invalidated: 

2372 dbapi_conn_wrapper = self._dbapi_connection 

2373 assert dbapi_conn_wrapper is not None 

2374 if invalidate_pool_on_disconnect: 

2375 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2376 self.invalidate(e) 

2377 

2378 @classmethod 

2379 def _handle_dbapi_exception_noconnection( 

2380 cls, 

2381 e: BaseException, 

2382 dialect: Dialect, 

2383 engine: Optional[Engine] = None, 

2384 is_disconnect: Optional[bool] = None, 

2385 invalidate_pool_on_disconnect: bool = True, 

2386 is_pre_ping: bool = False, 

2387 ) -> NoReturn: 

2388 exc_info = sys.exc_info() 

2389 

2390 if is_disconnect is None: 

2391 is_disconnect = isinstance( 

2392 e, dialect.loaded_dbapi.Error 

2393 ) and dialect.is_disconnect(e, None, None) 

2394 

2395 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2396 

2397 if should_wrap: 

2398 sqlalchemy_exception = exc.DBAPIError.instance( 

2399 None, 

2400 None, 

2401 cast(Exception, e), 

2402 dialect.loaded_dbapi.Error, 

2403 hide_parameters=( 

2404 engine.hide_parameters if engine is not None else False 

2405 ), 

2406 connection_invalidated=is_disconnect, 

2407 dialect=dialect, 

2408 ) 

2409 else: 

2410 sqlalchemy_exception = None 

2411 

2412 newraise = None 

2413 

2414 if dialect._has_events: 

2415 ctx = ExceptionContextImpl( 

2416 e, 

2417 sqlalchemy_exception, 

2418 engine, 

2419 dialect, 

2420 None, 

2421 None, 

2422 None, 

2423 None, 

2424 None, 

2425 is_disconnect, 

2426 invalidate_pool_on_disconnect, 

2427 is_pre_ping, 

2428 ) 

2429 for fn in dialect.dispatch.handle_error: 

2430 try: 

2431 # handler returns an exception; 

2432 # call next handler in a chain 

2433 per_fn = fn(ctx) 

2434 if per_fn is not None: 

2435 ctx.chained_exception = newraise = per_fn 

2436 except Exception as _raised: 

2437 # handler raises an exception - stop processing 

2438 newraise = _raised 

2439 break 

2440 

2441 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2442 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect 

2443 

2444 if newraise: 

2445 raise newraise.with_traceback(exc_info[2]) from e 

2446 elif should_wrap: 

2447 assert sqlalchemy_exception is not None 

2448 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2449 else: 

2450 assert exc_info[1] is not None 

2451 raise exc_info[1].with_traceback(exc_info[2]) 

2452 

2453 def _run_ddl_visitor( 

2454 self, 

2455 visitorcallable: Type[InvokeDDLBase], 

2456 element: SchemaVisitable, 

2457 **kwargs: Any, 

2458 ) -> None: 

2459 """run a DDL visitor. 

2460 

2461 This method is only here so that the MockConnection can change the 

2462 options given to the visitor so that "checkfirst" is skipped. 

2463 

2464 """ 

2465 visitorcallable( 

2466 dialect=self.dialect, connection=self, **kwargs 

2467 ).traverse_single(element) 

2468 

2469 

2470class ExceptionContextImpl(ExceptionContext): 

2471 """Implement the :class:`.ExceptionContext` interface.""" 

2472 

2473 __slots__ = ( 

2474 "connection", 

2475 "engine", 

2476 "dialect", 

2477 "cursor", 

2478 "statement", 

2479 "parameters", 

2480 "original_exception", 

2481 "sqlalchemy_exception", 

2482 "chained_exception", 

2483 "execution_context", 

2484 "is_disconnect", 

2485 "invalidate_pool_on_disconnect", 

2486 "is_pre_ping", 

2487 ) 

2488 

2489 def __init__( 

2490 self, 

2491 exception: BaseException, 

2492 sqlalchemy_exception: Optional[exc.StatementError], 

2493 engine: Optional[Engine], 

2494 dialect: Dialect, 

2495 connection: Optional[Connection], 

2496 cursor: Optional[DBAPICursor], 

2497 statement: Optional[str], 

2498 parameters: Optional[_DBAPIAnyExecuteParams], 

2499 context: Optional[ExecutionContext], 

2500 is_disconnect: bool, 

2501 invalidate_pool_on_disconnect: bool, 

2502 is_pre_ping: bool, 

2503 ): 

2504 self.engine = engine 

2505 self.dialect = dialect 

2506 self.connection = connection 

2507 self.sqlalchemy_exception = sqlalchemy_exception 

2508 self.original_exception = exception 

2509 self.execution_context = context 

2510 self.statement = statement 

2511 self.parameters = parameters 

2512 self.is_disconnect = is_disconnect 

2513 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2514 self.is_pre_ping = is_pre_ping 

2515 

2516 

2517class Transaction(TransactionalContext): 

2518 """Represent a database transaction in progress. 

2519 

2520 The :class:`.Transaction` object is procured by 

2521 calling the :meth:`_engine.Connection.begin` method of 

2522 :class:`_engine.Connection`:: 

2523 

2524 from sqlalchemy import create_engine 

2525 

2526 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2527 connection = engine.connect() 

2528 trans = connection.begin() 

2529 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2530 trans.commit() 

2531 

2532 The object provides :meth:`.rollback` and :meth:`.commit` 

2533 methods in order to control transaction boundaries. It 

2534 also implements a context manager interface so that 

2535 the Python ``with`` statement can be used with the 

2536 :meth:`_engine.Connection.begin` method:: 

2537 

2538 with connection.begin(): 

2539 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2540 

2541 The Transaction object is **not** threadsafe. 

2542 

2543 .. seealso:: 

2544 

2545 :meth:`_engine.Connection.begin` 

2546 

2547 :meth:`_engine.Connection.begin_twophase` 

2548 

2549 :meth:`_engine.Connection.begin_nested` 

2550 

2551 .. index:: 

2552 single: thread safety; Transaction 

2553 """ # noqa 

2554 

2555 __slots__ = () 

2556 

2557 _is_root: bool = False 

2558 is_active: bool 

2559 connection: Connection 

2560 

2561 def __init__(self, connection: Connection): 

2562 raise NotImplementedError() 

2563 

2564 @property 

2565 def _deactivated_from_connection(self) -> bool: 

2566 """True if this transaction is totally deactivated from the connection 

2567 and therefore can no longer affect its state. 

2568 

2569 """ 

2570 raise NotImplementedError() 

2571 

2572 def _do_close(self) -> None: 

2573 raise NotImplementedError() 

2574 

2575 def _do_rollback(self) -> None: 

2576 raise NotImplementedError() 

2577 

2578 def _do_commit(self) -> None: 

2579 raise NotImplementedError() 

2580 

2581 @property 

2582 def is_valid(self) -> bool: 

2583 return self.is_active and not self.connection.invalidated 

2584 

2585 def close(self) -> None: 

2586 """Close this :class:`.Transaction`. 

2587 

2588 If this transaction is the base transaction in a begin/commit 

2589 nesting, the transaction will rollback(). Otherwise, the 

2590 method returns. 

2591 

2592 This is used to cancel a Transaction without affecting the scope of 

2593 an enclosing transaction. 

2594 

2595 """ 

2596 try: 

2597 self._do_close() 

2598 finally: 

2599 assert not self.is_active 

2600 

2601 def rollback(self) -> None: 

2602 """Roll back this :class:`.Transaction`. 

2603 

2604 The implementation of this may vary based on the type of transaction in 

2605 use: 

2606 

2607 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2608 it corresponds to a ROLLBACK. 

2609 

2610 * For a :class:`.NestedTransaction`, it corresponds to a 

2611 "ROLLBACK TO SAVEPOINT" operation. 

2612 

2613 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2614 phase transactions may be used. 

2615 

2616 

2617 """ 

2618 try: 

2619 self._do_rollback() 

2620 finally: 

2621 assert not self.is_active 

2622 

2623 def commit(self) -> None: 

2624 """Commit this :class:`.Transaction`. 

2625 

2626 The implementation of this may vary based on the type of transaction in 

2627 use: 

2628 

2629 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2630 it corresponds to a COMMIT. 

2631 

2632 * For a :class:`.NestedTransaction`, it corresponds to a 

2633 "RELEASE SAVEPOINT" operation. 

2634 

2635 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2636 phase transactions may be used. 

2637 

2638 """ 

2639 try: 

2640 self._do_commit() 

2641 finally: 

2642 assert not self.is_active 

2643 

2644 def _get_subject(self) -> Connection: 

2645 return self.connection 

2646 

2647 def _transaction_is_active(self) -> bool: 

2648 return self.is_active 

2649 

2650 def _transaction_is_closed(self) -> bool: 

2651 return not self._deactivated_from_connection 

2652 

2653 def _rollback_can_be_called(self) -> bool: 

2654 # for RootTransaction / NestedTransaction, it's safe to call 

2655 # rollback() even if the transaction is deactive and no warnings 

2656 # will be emitted. tested in 

2657 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2658 return True 

2659 

2660 

2661class RootTransaction(Transaction): 

2662 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2663 

2664 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2665 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2666 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2667 remains associated with the :class:`_engine.Connection` throughout its 

2668 active span. The current :class:`_engine.RootTransaction` in use is 

2669 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2670 :class:`_engine.Connection`. 

2671 

2672 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2673 "autobegin" behavior that will create a new 

2674 :class:`_engine.RootTransaction` whenever a connection in a 

2675 non-transactional state is used to emit commands on the DBAPI connection. 

2676 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2677 use can be controlled using the :meth:`_engine.Connection.commit` and 

2678 :meth:`_engine.Connection.rollback` methods. 

2679 

2680 

2681 """ 

2682 

2683 _is_root = True 

2684 

2685 __slots__ = ("connection", "is_active") 

2686 

2687 def __init__(self, connection: Connection): 

2688 assert connection._transaction is None 

2689 if connection._trans_context_manager: 

2690 TransactionalContext._trans_ctx_check(connection) 

2691 self.connection = connection 

2692 self._connection_begin_impl() 

2693 connection._transaction = self 

2694 

2695 self.is_active = True 

2696 

2697 def _deactivate_from_connection(self) -> None: 

2698 if self.is_active: 

2699 assert self.connection._transaction is self 

2700 self.is_active = False 

2701 

2702 elif self.connection._transaction is not self: 

2703 util.warn("transaction already deassociated from connection") 

2704 

2705 @property 

2706 def _deactivated_from_connection(self) -> bool: 

2707 return self.connection._transaction is not self 

2708 

2709 def _connection_begin_impl(self) -> None: 

2710 self.connection._begin_impl(self) 

2711 

2712 def _connection_rollback_impl(self) -> None: 

2713 self.connection._rollback_impl() 

2714 

2715 def _connection_commit_impl(self) -> None: 

2716 self.connection._commit_impl() 

2717 

2718 def _close_impl(self, try_deactivate: bool = False) -> None: 

2719 try: 

2720 if self.is_active: 

2721 self._connection_rollback_impl() 

2722 

2723 if self.connection._nested_transaction: 

2724 self.connection._nested_transaction._cancel() 

2725 finally: 

2726 if self.is_active or try_deactivate: 

2727 self._deactivate_from_connection() 

2728 if self.connection._transaction is self: 

2729 self.connection._transaction = None 

2730 

2731 assert not self.is_active 

2732 assert self.connection._transaction is not self 

2733 

2734 def _do_close(self) -> None: 

2735 self._close_impl() 

2736 

2737 def _do_rollback(self) -> None: 

2738 self._close_impl(try_deactivate=True) 

2739 

2740 def _do_commit(self) -> None: 

2741 if self.is_active: 

2742 assert self.connection._transaction is self 

2743 

2744 try: 

2745 self._connection_commit_impl() 

2746 finally: 

2747 # whether or not commit succeeds, cancel any 

2748 # nested transactions, make this transaction "inactive" 

2749 # and remove it as a reset agent 

2750 if self.connection._nested_transaction: 

2751 self.connection._nested_transaction._cancel() 

2752 

2753 self._deactivate_from_connection() 

2754 

2755 # ...however only remove as the connection's current transaction 

2756 # if commit succeeded. otherwise it stays on so that a rollback 

2757 # needs to occur. 

2758 self.connection._transaction = None 

2759 else: 

2760 if self.connection._transaction is self: 

2761 self.connection._invalid_transaction() 

2762 else: 

2763 raise exc.InvalidRequestError("This transaction is inactive") 

2764 

2765 assert not self.is_active 

2766 assert self.connection._transaction is not self 

2767 

2768 

2769class NestedTransaction(Transaction): 

2770 """Represent a 'nested', or SAVEPOINT transaction. 

2771 

2772 The :class:`.NestedTransaction` object is created by calling the 

2773 :meth:`_engine.Connection.begin_nested` method of 

2774 :class:`_engine.Connection`. 

2775 

2776 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2777 "commit" / "rollback" are as follows: 

2778 

2779 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2780 the savepoint is given an explicit name that is part of the state 

2781 of this object. 

2782 

2783 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2784 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2785 with this :class:`.NestedTransaction`. 

2786 

2787 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2788 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2789 associated with this :class:`.NestedTransaction`. 

2790 

2791 The rationale for mimicking the semantics of an outer transaction in 

2792 terms of savepoints so that code may deal with a "savepoint" transaction 

2793 and an "outer" transaction in an agnostic way. 

2794 

2795 .. seealso:: 

2796 

2797 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2798 

2799 """ 

2800 

2801 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2802 

2803 _savepoint: str 

2804 

2805 def __init__(self, connection: Connection): 

2806 assert connection._transaction is not None 

2807 if connection._trans_context_manager: 

2808 TransactionalContext._trans_ctx_check(connection) 

2809 self.connection = connection 

2810 self._savepoint = self.connection._savepoint_impl() 

2811 self.is_active = True 

2812 self._previous_nested = connection._nested_transaction 

2813 connection._nested_transaction = self 

2814 

2815 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2816 if self.connection._nested_transaction is self: 

2817 self.connection._nested_transaction = self._previous_nested 

2818 elif warn: 

2819 util.warn( 

2820 "nested transaction already deassociated from connection" 

2821 ) 

2822 

2823 @property 

2824 def _deactivated_from_connection(self) -> bool: 

2825 return self.connection._nested_transaction is not self 

2826 

2827 def _cancel(self) -> None: 

2828 # called by RootTransaction when the outer transaction is 

2829 # committed, rolled back, or closed to cancel all savepoints 

2830 # without any action being taken 

2831 self.is_active = False 

2832 self._deactivate_from_connection() 

2833 if self._previous_nested: 

2834 self._previous_nested._cancel() 

2835 

2836 def _close_impl( 

2837 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2838 ) -> None: 

2839 try: 

2840 if ( 

2841 self.is_active 

2842 and self.connection._transaction 

2843 and self.connection._transaction.is_active 

2844 ): 

2845 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2846 finally: 

2847 self.is_active = False 

2848 

2849 if deactivate_from_connection: 

2850 self._deactivate_from_connection(warn=warn_already_deactive) 

2851 

2852 assert not self.is_active 

2853 if deactivate_from_connection: 

2854 assert self.connection._nested_transaction is not self 

2855 

2856 def _do_close(self) -> None: 

2857 self._close_impl(True, False) 

2858 

2859 def _do_rollback(self) -> None: 

2860 self._close_impl(True, True) 

2861 

2862 def _do_commit(self) -> None: 

2863 if self.is_active: 

2864 try: 

2865 self.connection._release_savepoint_impl(self._savepoint) 

2866 finally: 

2867 # nested trans becomes inactive on failed release 

2868 # unconditionally. this prevents it from trying to 

2869 # emit SQL when it rolls back. 

2870 self.is_active = False 

2871 

2872 # but only de-associate from connection if it succeeded 

2873 self._deactivate_from_connection() 

2874 else: 

2875 if self.connection._nested_transaction is self: 

2876 self.connection._invalid_transaction() 

2877 else: 

2878 raise exc.InvalidRequestError( 

2879 "This nested transaction is inactive" 

2880 ) 

2881 

2882 

2883class TwoPhaseTransaction(RootTransaction): 

2884 """Represent a two-phase transaction. 

2885 

2886 A new :class:`.TwoPhaseTransaction` object may be procured 

2887 using the :meth:`_engine.Connection.begin_twophase` method. 

2888 

2889 The interface is the same as that of :class:`.Transaction` 

2890 with the addition of the :meth:`prepare` method. 

2891 

2892 """ 

2893 

2894 __slots__ = ("xid", "_is_prepared") 

2895 

2896 xid: Any 

2897 

2898 def __init__(self, connection: Connection, xid: Any): 

2899 self._is_prepared = False 

2900 self.xid = xid 

2901 super().__init__(connection) 

2902 

2903 def prepare(self) -> None: 

2904 """Prepare this :class:`.TwoPhaseTransaction`. 

2905 

2906 After a PREPARE, the transaction can be committed. 

2907 

2908 """ 

2909 if not self.is_active: 

2910 raise exc.InvalidRequestError("This transaction is inactive") 

2911 self.connection._prepare_twophase_impl(self.xid) 

2912 self._is_prepared = True 

2913 

2914 def _connection_begin_impl(self) -> None: 

2915 self.connection._begin_twophase_impl(self) 

2916 

2917 def _connection_rollback_impl(self) -> None: 

2918 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2919 

2920 def _connection_commit_impl(self) -> None: 

2921 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2922 

2923 

2924class Engine( 

2925 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2926): 

2927 """ 

2928 Connects a :class:`~sqlalchemy.pool.Pool` and 

2929 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2930 source of database connectivity and behavior. 

2931 

2932 An :class:`_engine.Engine` object is instantiated publicly using the 

2933 :func:`~sqlalchemy.create_engine` function. 

2934 

2935 .. seealso:: 

2936 

2937 :doc:`/core/engines` 

2938 

2939 :ref:`connections_toplevel` 

2940 

2941 """ 

2942 

2943 dispatch: dispatcher[ConnectionEventsTarget] 

2944 

2945 _compiled_cache: Optional[CompiledCacheType] 

2946 

2947 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2948 _has_events: bool = False 

2949 _connection_cls: Type[Connection] = Connection 

2950 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2951 _is_future: bool = False 

2952 

2953 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2954 _option_cls: Type[OptionEngine] 

2955 

2956 dialect: Dialect 

2957 pool: Pool 

2958 url: URL 

2959 hide_parameters: bool 

2960 

2961 def __init__( 

2962 self, 

2963 pool: Pool, 

2964 dialect: Dialect, 

2965 url: URL, 

2966 logging_name: Optional[str] = None, 

2967 echo: Optional[_EchoFlagType] = None, 

2968 query_cache_size: int = 500, 

2969 execution_options: Optional[Mapping[str, Any]] = None, 

2970 hide_parameters: bool = False, 

2971 ): 

2972 self.pool = pool 

2973 self.url = url 

2974 self.dialect = dialect 

2975 if logging_name: 

2976 self.logging_name = logging_name 

2977 self.echo = echo 

2978 self.hide_parameters = hide_parameters 

2979 if query_cache_size != 0: 

2980 self._compiled_cache = util.LRUCache( 

2981 query_cache_size, size_alert=self._lru_size_alert 

2982 ) 

2983 else: 

2984 self._compiled_cache = None 

2985 log.instance_logger(self, echoflag=echo) 

2986 if execution_options: 

2987 self.update_execution_options(**execution_options) 

2988 

2989 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2990 if self._should_log_info(): 

2991 self.logger.info( 

2992 "Compiled cache size pruning from %d items to %d. " 

2993 "Increase cache size to reduce the frequency of pruning.", 

2994 len(cache), 

2995 cache.capacity, 

2996 ) 

2997 

2998 @property 

2999 def engine(self) -> Engine: 

3000 """Returns this :class:`.Engine`. 

3001 

3002 Used for legacy schemes that accept :class:`.Connection` / 

3003 :class:`.Engine` objects within the same variable. 

3004 

3005 """ 

3006 return self 

3007 

3008 def clear_compiled_cache(self) -> None: 

3009 """Clear the compiled cache associated with the dialect. 

3010 

3011 This applies **only** to the built-in cache that is established 

3012 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

3013 It will not impact any dictionary caches that were passed via the 

3014 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

3015 

3016 .. versionadded:: 1.4 

3017 

3018 """ 

3019 if self._compiled_cache: 

3020 self._compiled_cache.clear() 

3021 

3022 def update_execution_options(self, **opt: Any) -> None: 

3023 r"""Update the default execution_options dictionary 

3024 of this :class:`_engine.Engine`. 

3025 

3026 The given keys/values in \**opt are added to the 

3027 default execution options that will be used for 

3028 all connections. The initial contents of this dictionary 

3029 can be sent via the ``execution_options`` parameter 

3030 to :func:`_sa.create_engine`. 

3031 

3032 .. seealso:: 

3033 

3034 :meth:`_engine.Connection.execution_options` 

3035 

3036 :meth:`_engine.Engine.execution_options` 

3037 

3038 """ 

3039 self.dispatch.set_engine_execution_options(self, opt) 

3040 self._execution_options = self._execution_options.union(opt) 

3041 self.dialect.set_engine_execution_options(self, opt) 

3042 

3043 @overload 

3044 def execution_options( 

3045 self, 

3046 *, 

3047 compiled_cache: Optional[CompiledCacheType] = ..., 

3048 logging_token: str = ..., 

3049 isolation_level: IsolationLevel = ..., 

3050 insertmanyvalues_page_size: int = ..., 

3051 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3052 **opt: Any, 

3053 ) -> OptionEngine: ... 

3054 

3055 @overload 

3056 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3057 

3058 def execution_options(self, **opt: Any) -> OptionEngine: 

3059 """Return a new :class:`_engine.Engine` that will provide 

3060 :class:`_engine.Connection` objects with the given execution options. 

3061 

3062 The returned :class:`_engine.Engine` remains related to the original 

3063 :class:`_engine.Engine` in that it shares the same connection pool and 

3064 other state: 

3065 

3066 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3067 is the 

3068 same instance. The :meth:`_engine.Engine.dispose` 

3069 method will replace 

3070 the connection pool instance for the parent engine as well 

3071 as this one. 

3072 * Event listeners are "cascaded" - meaning, the new 

3073 :class:`_engine.Engine` 

3074 inherits the events of the parent, and new events can be associated 

3075 with the new :class:`_engine.Engine` individually. 

3076 * The logging configuration and logging_name is copied from the parent 

3077 :class:`_engine.Engine`. 

3078 

3079 The intent of the :meth:`_engine.Engine.execution_options` method is 

3080 to implement schemes where multiple :class:`_engine.Engine` 

3081 objects refer to the same connection pool, but are differentiated 

3082 by options that affect some execution-level behavior for each 

3083 engine. One such example is breaking into separate "reader" and 

3084 "writer" :class:`_engine.Engine` instances, where one 

3085 :class:`_engine.Engine` 

3086 has a lower :term:`isolation level` setting configured or is even 

3087 transaction-disabled using "autocommit". An example of this 

3088 configuration is at :ref:`dbapi_autocommit_multiple`. 

3089 

3090 Another example is one that 

3091 uses a custom option ``shard_id`` which is consumed by an event 

3092 to change the current schema on a database connection:: 

3093 

3094 from sqlalchemy import event 

3095 from sqlalchemy.engine import Engine 

3096 

3097 primary_engine = create_engine("mysql+mysqldb://") 

3098 shard1 = primary_engine.execution_options(shard_id="shard1") 

3099 shard2 = primary_engine.execution_options(shard_id="shard2") 

3100 

3101 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3102 

3103 

3104 @event.listens_for(Engine, "before_cursor_execute") 

3105 def _switch_shard(conn, cursor, stmt, params, context, executemany): 

3106 shard_id = conn.get_execution_options().get("shard_id", "default") 

3107 current_shard = conn.info.get("current_shard", None) 

3108 

3109 if current_shard != shard_id: 

3110 cursor.execute("use %s" % shards[shard_id]) 

3111 conn.info["current_shard"] = shard_id 

3112 

3113 The above recipe illustrates two :class:`_engine.Engine` objects that 

3114 will each serve as factories for :class:`_engine.Connection` objects 

3115 that have pre-established "shard_id" execution options present. A 

3116 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3117 then interprets this execution option to emit a MySQL ``use`` statement 

3118 to switch databases before a statement execution, while at the same 

3119 time keeping track of which database we've established using the 

3120 :attr:`_engine.Connection.info` dictionary. 

3121 

3122 .. seealso:: 

3123 

3124 :meth:`_engine.Connection.execution_options` 

3125 - update execution options 

3126 on a :class:`_engine.Connection` object. 

3127 

3128 :meth:`_engine.Engine.update_execution_options` 

3129 - update the execution 

3130 options for a given :class:`_engine.Engine` in place. 

3131 

3132 :meth:`_engine.Engine.get_execution_options` 

3133 

3134 

3135 """ # noqa: E501 

3136 return self._option_cls(self, opt) 

3137 

3138 def get_execution_options(self) -> _ExecuteOptions: 

3139 """Get the non-SQL options which will take effect during execution. 

3140 

3141 .. versionadded: 1.3 

3142 

3143 .. seealso:: 

3144 

3145 :meth:`_engine.Engine.execution_options` 

3146 """ 

3147 return self._execution_options 

3148 

3149 @property 

3150 def name(self) -> str: 

3151 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3152 in use by this :class:`Engine`. 

3153 

3154 """ 

3155 

3156 return self.dialect.name 

3157 

3158 @property 

3159 def driver(self) -> str: 

3160 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3161 in use by this :class:`Engine`. 

3162 

3163 """ 

3164 

3165 return self.dialect.driver 

3166 

3167 echo = log.echo_property() 

3168 

3169 def __repr__(self) -> str: 

3170 return "Engine(%r)" % (self.url,) 

3171 

3172 def dispose(self, close: bool = True) -> None: 

3173 """Dispose of the connection pool used by this 

3174 :class:`_engine.Engine`. 

3175 

3176 A new connection pool is created immediately after the old one has been 

3177 disposed. The previous connection pool is disposed either actively, by 

3178 closing out all currently checked-in connections in that pool, or 

3179 passively, by losing references to it but otherwise not closing any 

3180 connections. The latter strategy is more appropriate for an initializer 

3181 in a forked Python process. 

3182 

3183 :param close: if left at its default of ``True``, has the 

3184 effect of fully closing all **currently checked in** 

3185 database connections. Connections that are still checked out 

3186 will **not** be closed, however they will no longer be associated 

3187 with this :class:`_engine.Engine`, 

3188 so when they are closed individually, eventually the 

3189 :class:`_pool.Pool` which they are associated with will 

3190 be garbage collected and they will be closed out fully, if 

3191 not already closed on checkin. 

3192 

3193 If set to ``False``, the previous connection pool is de-referenced, 

3194 and otherwise not touched in any way. 

3195 

3196 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3197 parameter to allow the replacement of a connection pool in a child 

3198 process without interfering with the connections used by the parent 

3199 process. 

3200 

3201 

3202 .. seealso:: 

3203 

3204 :ref:`engine_disposal` 

3205 

3206 :ref:`pooling_multiprocessing` 

3207 

3208 """ 

3209 if close: 

3210 self.pool.dispose() 

3211 self.pool = self.pool.recreate() 

3212 self.dispatch.engine_disposed(self) 

3213 

3214 @contextlib.contextmanager 

3215 def _optional_conn_ctx_manager( 

3216 self, connection: Optional[Connection] = None 

3217 ) -> Iterator[Connection]: 

3218 if connection is None: 

3219 with self.connect() as conn: 

3220 yield conn 

3221 else: 

3222 yield connection 

3223 

3224 @contextlib.contextmanager 

3225 def begin(self) -> Iterator[Connection]: 

3226 """Return a context manager delivering a :class:`_engine.Connection` 

3227 with a :class:`.Transaction` established. 

3228 

3229 E.g.:: 

3230 

3231 with engine.begin() as conn: 

3232 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)")) 

3233 conn.execute(text("my_special_procedure(5)")) 

3234 

3235 Upon successful operation, the :class:`.Transaction` 

3236 is committed. If an error is raised, the :class:`.Transaction` 

3237 is rolled back. 

3238 

3239 .. seealso:: 

3240 

3241 :meth:`_engine.Engine.connect` - procure a 

3242 :class:`_engine.Connection` from 

3243 an :class:`_engine.Engine`. 

3244 

3245 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3246 for a particular :class:`_engine.Connection`. 

3247 

3248 """ # noqa: E501 

3249 with self.connect() as conn: 

3250 with conn.begin(): 

3251 yield conn 

3252 

3253 def _run_ddl_visitor( 

3254 self, 

3255 visitorcallable: Type[InvokeDDLBase], 

3256 element: SchemaVisitable, 

3257 **kwargs: Any, 

3258 ) -> None: 

3259 with self.begin() as conn: 

3260 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3261 

3262 def connect(self) -> Connection: 

3263 """Return a new :class:`_engine.Connection` object. 

3264 

3265 The :class:`_engine.Connection` acts as a Python context manager, so 

3266 the typical use of this method looks like:: 

3267 

3268 with engine.connect() as connection: 

3269 connection.execute(text("insert into table values ('foo')")) 

3270 connection.commit() 

3271 

3272 Where above, after the block is completed, the connection is "closed" 

3273 and its underlying DBAPI resources are returned to the connection pool. 

3274 This also has the effect of rolling back any transaction that 

3275 was explicitly begun or was begun via autobegin, and will 

3276 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3277 started and is still in progress. 

3278 

3279 .. seealso:: 

3280 

3281 :meth:`_engine.Engine.begin` 

3282 

3283 """ 

3284 

3285 return self._connection_cls(self) 

3286 

3287 def raw_connection(self) -> PoolProxiedConnection: 

3288 """Return a "raw" DBAPI connection from the connection pool. 

3289 

3290 The returned object is a proxied version of the DBAPI 

3291 connection object used by the underlying driver in use. 

3292 The object will have all the same behavior as the real DBAPI 

3293 connection, except that its ``close()`` method will result in the 

3294 connection being returned to the pool, rather than being closed 

3295 for real. 

3296 

3297 This method provides direct DBAPI connection access for 

3298 special situations when the API provided by 

3299 :class:`_engine.Connection` 

3300 is not needed. When a :class:`_engine.Connection` object is already 

3301 present, the DBAPI connection is available using 

3302 the :attr:`_engine.Connection.connection` accessor. 

3303 

3304 .. seealso:: 

3305 

3306 :ref:`dbapi_connections` 

3307 

3308 """ 

3309 return self.pool.connect() 

3310 

3311 

3312class OptionEngineMixin(log.Identified): 

3313 _sa_propagate_class_events = False 

3314 

3315 dispatch: dispatcher[ConnectionEventsTarget] 

3316 _compiled_cache: Optional[CompiledCacheType] 

3317 dialect: Dialect 

3318 pool: Pool 

3319 url: URL 

3320 hide_parameters: bool 

3321 echo: log.echo_property 

3322 

3323 def __init__( 

3324 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3325 ): 

3326 self._proxied = proxied 

3327 self.url = proxied.url 

3328 self.dialect = proxied.dialect 

3329 self.logging_name = proxied.logging_name 

3330 self.echo = proxied.echo 

3331 self._compiled_cache = proxied._compiled_cache 

3332 self.hide_parameters = proxied.hide_parameters 

3333 log.instance_logger(self, echoflag=self.echo) 

3334 

3335 # note: this will propagate events that are assigned to the parent 

3336 # engine after this OptionEngine is created. Since we share 

3337 # the events of the parent we also disallow class-level events 

3338 # to apply to the OptionEngine class directly. 

3339 # 

3340 # the other way this can work would be to transfer existing 

3341 # events only, using: 

3342 # self.dispatch._update(proxied.dispatch) 

3343 # 

3344 # that might be more appropriate however it would be a behavioral 

3345 # change for logic that assigns events to the parent engine and 

3346 # would like it to take effect for the already-created sub-engine. 

3347 self.dispatch = self.dispatch._join(proxied.dispatch) 

3348 

3349 self._execution_options = proxied._execution_options 

3350 self.update_execution_options(**execution_options) 

3351 

3352 def update_execution_options(self, **opt: Any) -> None: 

3353 raise NotImplementedError() 

3354 

3355 if not typing.TYPE_CHECKING: 

3356 # https://github.com/python/typing/discussions/1095 

3357 

3358 @property 

3359 def pool(self) -> Pool: 

3360 return self._proxied.pool 

3361 

3362 @pool.setter 

3363 def pool(self, pool: Pool) -> None: 

3364 self._proxied.pool = pool 

3365 

3366 @property 

3367 def _has_events(self) -> bool: 

3368 return self._proxied._has_events or self.__dict__.get( 

3369 "_has_events", False 

3370 ) 

3371 

3372 @_has_events.setter 

3373 def _has_events(self, value: bool) -> None: 

3374 self.__dict__["_has_events"] = value 

3375 

3376 

3377class OptionEngine(OptionEngineMixin, Engine): 

3378 def update_execution_options(self, **opt: Any) -> None: 

3379 Engine.update_execution_options(self, **opt) 

3380 

3381 

3382Engine._option_cls = OptionEngine