Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1004 statements  

1# engine/base.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`. 

8 

9""" 

10from __future__ import annotations 

11 

12import contextlib 

13import sys 

14import typing 

15from typing import Any 

16from typing import Callable 

17from typing import cast 

18from typing import Iterable 

19from typing import Iterator 

20from typing import List 

21from typing import Mapping 

22from typing import NoReturn 

23from typing import Optional 

24from typing import overload 

25from typing import Tuple 

26from typing import Type 

27from typing import TypeVar 

28from typing import Union 

29 

30from .interfaces import BindTyping 

31from .interfaces import ConnectionEventsTarget 

32from .interfaces import DBAPICursor 

33from .interfaces import ExceptionContext 

34from .interfaces import ExecuteStyle 

35from .interfaces import ExecutionContext 

36from .interfaces import IsolationLevel 

37from .util import _distill_params_20 

38from .util import _distill_raw_params 

39from .util import TransactionalContext 

40from .. import exc 

41from .. import inspection 

42from .. import log 

43from .. import util 

44from ..sql import compiler 

45from ..sql import util as sql_util 

46from ..util.typing import TupleAny 

47from ..util.typing import TypeVarTuple 

48from ..util.typing import Unpack 

49 

50if typing.TYPE_CHECKING: 

51 from . import CursorResult 

52 from . import ScalarResult 

53 from .interfaces import _AnyExecuteParams 

54 from .interfaces import _AnyMultiExecuteParams 

55 from .interfaces import _CoreAnyExecuteParams 

56 from .interfaces import _CoreMultiExecuteParams 

57 from .interfaces import _CoreSingleExecuteParams 

58 from .interfaces import _DBAPIAnyExecuteParams 

59 from .interfaces import _DBAPISingleExecuteParams 

60 from .interfaces import _ExecuteOptions 

61 from .interfaces import CompiledCacheType 

62 from .interfaces import CoreExecuteOptionsParameter 

63 from .interfaces import Dialect 

64 from .interfaces import SchemaTranslateMapType 

65 from .reflection import Inspector # noqa 

66 from .url import URL 

67 from ..event import dispatcher 

68 from ..log import _EchoFlagType 

69 from ..pool import _ConnectionFairy 

70 from ..pool import Pool 

71 from ..pool import PoolProxiedConnection 

72 from ..sql import Executable 

73 from ..sql._typing import _InfoType 

74 from ..sql.compiler import Compiled 

75 from ..sql.ddl import ExecutableDDLElement 

76 from ..sql.ddl import SchemaDropper 

77 from ..sql.ddl import SchemaGenerator 

78 from ..sql.functions import FunctionElement 

79 from ..sql.schema import DefaultGenerator 

80 from ..sql.schema import HasSchemaAttr 

81 from ..sql.schema import SchemaItem 

82 from ..sql.selectable import TypedReturnsRows 

83 

84 

85_T = TypeVar("_T", bound=Any) 

86_Ts = TypeVarTuple("_Ts") 

87_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

88NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

89 

90 

91class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

92 """Provides high-level functionality for a wrapped DB-API connection. 

93 

94 The :class:`_engine.Connection` object is procured by calling the 

95 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

96 object, and provides services for execution of SQL statements as well 

97 as transaction control. 

98 

99 The Connection object is **not** thread-safe. While a Connection can be 

100 shared among threads using properly synchronized access, it is still 

101 possible that the underlying DBAPI connection may not support shared 

102 access between threads. Check the DBAPI documentation for details. 

103 

104 The Connection object represents a single DBAPI connection checked out 

105 from the connection pool. In this state, the connection pool has no 

106 affect upon the connection, including its expiration or timeout state. 

107 For the connection pool to properly manage connections, connections 

108 should be returned to the connection pool (i.e. ``connection.close()``) 

109 whenever the connection is not in use. 

110 

111 .. index:: 

112 single: thread safety; Connection 

113 

114 """ 

115 

116 dialect: Dialect 

117 dispatch: dispatcher[ConnectionEventsTarget] 

118 

119 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

120 

121 # used by sqlalchemy.engine.util.TransactionalContext 

122 _trans_context_manager: Optional[TransactionalContext] = None 

123 

124 # legacy as of 2.0, should be eventually deprecated and 

125 # removed. was used in the "pre_ping" recipe that's been in the docs 

126 # a long time 

127 should_close_with_result = False 

128 

129 _dbapi_connection: Optional[PoolProxiedConnection] 

130 

131 _execution_options: _ExecuteOptions 

132 

133 _transaction: Optional[RootTransaction] 

134 _nested_transaction: Optional[NestedTransaction] 

135 

136 def __init__( 

137 self, 

138 engine: Engine, 

139 connection: Optional[PoolProxiedConnection] = None, 

140 _has_events: Optional[bool] = None, 

141 _allow_revalidate: bool = True, 

142 _allow_autobegin: bool = True, 

143 ): 

144 """Construct a new Connection.""" 

145 self.engine = engine 

146 self.dialect = dialect = engine.dialect 

147 

148 if connection is None: 

149 try: 

150 self._dbapi_connection = engine.raw_connection() 

151 except dialect.loaded_dbapi.Error as err: 

152 Connection._handle_dbapi_exception_noconnection( 

153 err, dialect, engine 

154 ) 

155 raise 

156 else: 

157 self._dbapi_connection = connection 

158 

159 self._transaction = self._nested_transaction = None 

160 self.__savepoint_seq = 0 

161 self.__in_begin = False 

162 

163 self.__can_reconnect = _allow_revalidate 

164 self._allow_autobegin = _allow_autobegin 

165 self._echo = self.engine._should_log_info() 

166 

167 if _has_events is None: 

168 # if _has_events is sent explicitly as False, 

169 # then don't join the dispatch of the engine; we don't 

170 # want to handle any of the engine's events in that case. 

171 self.dispatch = self.dispatch._join(engine.dispatch) 

172 self._has_events = _has_events or ( 

173 _has_events is None and engine._has_events 

174 ) 

175 

176 self._execution_options = engine._execution_options 

177 

178 if self._has_events or self.engine._has_events: 

179 self.dispatch.engine_connect(self) 

180 

181 # this can be assigned differently via 

182 # characteristics.LoggingTokenCharacteristic 

183 _message_formatter: Any = None 

184 

185 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

186 fmt = self._message_formatter 

187 

188 if fmt: 

189 message = fmt(message) 

190 

191 if log.STACKLEVEL: 

192 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

193 

194 self.engine.logger.info(message, *arg, **kw) 

195 

196 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

197 fmt = self._message_formatter 

198 

199 if fmt: 

200 message = fmt(message) 

201 

202 if log.STACKLEVEL: 

203 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

204 

205 self.engine.logger.debug(message, *arg, **kw) 

206 

207 @property 

208 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

209 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

210 self._execution_options.get("schema_translate_map", None) 

211 ) 

212 

213 return schema_translate_map 

214 

215 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

216 """Return the schema name for the given schema item taking into 

217 account current schema translate map. 

218 

219 """ 

220 

221 name = obj.schema 

222 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

223 self._execution_options.get("schema_translate_map", None) 

224 ) 

225 

226 if ( 

227 schema_translate_map 

228 and name in schema_translate_map 

229 and obj._use_schema_map 

230 ): 

231 return schema_translate_map[name] 

232 else: 

233 return name 

234 

235 def __enter__(self) -> Connection: 

236 return self 

237 

238 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

239 self.close() 

240 

241 @overload 

242 def execution_options( 

243 self, 

244 *, 

245 compiled_cache: Optional[CompiledCacheType] = ..., 

246 logging_token: str = ..., 

247 isolation_level: IsolationLevel = ..., 

248 no_parameters: bool = False, 

249 stream_results: bool = False, 

250 max_row_buffer: int = ..., 

251 yield_per: int = ..., 

252 insertmanyvalues_page_size: int = ..., 

253 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

254 preserve_rowcount: bool = False, 

255 **opt: Any, 

256 ) -> Connection: ... 

257 

258 @overload 

259 def execution_options(self, **opt: Any) -> Connection: ... 

260 

261 def execution_options(self, **opt: Any) -> Connection: 

262 r"""Set non-SQL options for the connection which take effect 

263 during execution. 

264 

265 This method modifies this :class:`_engine.Connection` **in-place**; 

266 the return value is the same :class:`_engine.Connection` object 

267 upon which the method is called. Note that this is in contrast 

268 to the behavior of the ``execution_options`` methods on other 

269 objects such as :meth:`_engine.Engine.execution_options` and 

270 :meth:`_sql.Executable.execution_options`. The rationale is that many 

271 such execution options necessarily modify the state of the base 

272 DBAPI connection in any case so there is no feasible means of 

273 keeping the effect of such an option localized to a "sub" connection. 

274 

275 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

276 method, in contrast to other objects with this method, modifies 

277 the connection in-place without creating copy of it. 

278 

279 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

280 method accepts any arbitrary parameters including user defined names. 

281 All parameters given are consumable in a number of ways including 

282 by using the :meth:`_engine.Connection.get_execution_options` method. 

283 See the examples at :meth:`_sql.Executable.execution_options` 

284 and :meth:`_engine.Engine.execution_options`. 

285 

286 The keywords that are currently recognized by SQLAlchemy itself 

287 include all those listed under :meth:`.Executable.execution_options`, 

288 as well as others that are specific to :class:`_engine.Connection`. 

289 

290 :param compiled_cache: Available on: :class:`_engine.Connection`, 

291 :class:`_engine.Engine`. 

292 

293 A dictionary where :class:`.Compiled` objects 

294 will be cached when the :class:`_engine.Connection` 

295 compiles a clause 

296 expression into a :class:`.Compiled` object. This dictionary will 

297 supersede the statement cache that may be configured on the 

298 :class:`_engine.Engine` itself. If set to None, caching 

299 is disabled, even if the engine has a configured cache size. 

300 

301 Note that the ORM makes use of its own "compiled" caches for 

302 some operations, including flush operations. The caching 

303 used by the ORM internally supersedes a cache dictionary 

304 specified here. 

305 

306 :param logging_token: Available on: :class:`_engine.Connection`, 

307 :class:`_engine.Engine`, :class:`_sql.Executable`. 

308 

309 Adds the specified string token surrounded by brackets in log 

310 messages logged by the connection, i.e. the logging that's enabled 

311 either via the :paramref:`_sa.create_engine.echo` flag or via the 

312 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

313 per-connection or per-sub-engine token to be available which is 

314 useful for debugging concurrent connection scenarios. 

315 

316 .. versionadded:: 1.4.0b2 

317 

318 .. seealso:: 

319 

320 :ref:`dbengine_logging_tokens` - usage example 

321 

322 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

323 name used by the Python logger object itself. 

324 

325 :param isolation_level: Available on: :class:`_engine.Connection`, 

326 :class:`_engine.Engine`. 

327 

328 Set the transaction isolation level for the lifespan of this 

329 :class:`_engine.Connection` object. 

330 Valid values include those string 

331 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

332 parameter passed to :func:`_sa.create_engine`. These levels are 

333 semi-database specific; see individual dialect documentation for 

334 valid levels. 

335 

336 The isolation level option applies the isolation level by emitting 

337 statements on the DBAPI connection, and **necessarily affects the 

338 original Connection object overall**. The isolation level will remain 

339 at the given setting until explicitly changed, or when the DBAPI 

340 connection itself is :term:`released` to the connection pool, i.e. the 

341 :meth:`_engine.Connection.close` method is called, at which time an 

342 event handler will emit additional statements on the DBAPI connection 

343 in order to revert the isolation level change. 

344 

345 .. note:: The ``isolation_level`` execution option may only be 

346 established before the :meth:`_engine.Connection.begin` method is 

347 called, as well as before any SQL statements are emitted which 

348 would otherwise trigger "autobegin", or directly after a call to 

349 :meth:`_engine.Connection.commit` or 

350 :meth:`_engine.Connection.rollback`. A database cannot change the 

351 isolation level on a transaction in progress. 

352 

353 .. note:: The ``isolation_level`` execution option is implicitly 

354 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

355 the :meth:`_engine.Connection.invalidate` method, or if a 

356 disconnection error occurs. The new connection produced after the 

357 invalidation will **not** have the selected isolation level 

358 re-applied to it automatically. 

359 

360 .. seealso:: 

361 

362 :ref:`dbapi_autocommit` 

363 

364 :meth:`_engine.Connection.get_isolation_level` 

365 - view current actual level 

366 

367 :param no_parameters: Available on: :class:`_engine.Connection`, 

368 :class:`_sql.Executable`. 

369 

370 When ``True``, if the final parameter 

371 list or dictionary is totally empty, will invoke the 

372 statement on the cursor as ``cursor.execute(statement)``, 

373 not passing the parameter collection at all. 

374 Some DBAPIs such as psycopg2 and mysql-python consider 

375 percent signs as significant only when parameters are 

376 present; this option allows code to generate SQL 

377 containing percent signs (and possibly other characters) 

378 that is neutral regarding whether it's executed by the DBAPI 

379 or piped into a script that's later invoked by 

380 command line tools. 

381 

382 :param stream_results: Available on: :class:`_engine.Connection`, 

383 :class:`_sql.Executable`. 

384 

385 Indicate to the dialect that results should be 

386 "streamed" and not pre-buffered, if possible. For backends 

387 such as PostgreSQL, MySQL and MariaDB, this indicates the use of 

388 a "server side cursor" as opposed to a client side cursor. 

389 Other backends such as that of Oracle may already use server 

390 side cursors by default. 

391 

392 The usage of 

393 :paramref:`_engine.Connection.execution_options.stream_results` is 

394 usually combined with setting a fixed number of rows to to be fetched 

395 in batches, to allow for efficient iteration of database rows while 

396 at the same time not loading all result rows into memory at once; 

397 this can be configured on a :class:`_engine.Result` object using the 

398 :meth:`_engine.Result.yield_per` method, after execution has 

399 returned a new :class:`_engine.Result`. If 

400 :meth:`_engine.Result.yield_per` is not used, 

401 the :paramref:`_engine.Connection.execution_options.stream_results` 

402 mode of operation will instead use a dynamically sized buffer 

403 which buffers sets of rows at a time, growing on each batch 

404 based on a fixed growth size up until a limit which may 

405 be configured using the 

406 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

407 parameter. 

408 

409 When using the ORM to fetch ORM mapped objects from a result, 

410 :meth:`_engine.Result.yield_per` should always be used with 

411 :paramref:`_engine.Connection.execution_options.stream_results`, 

412 so that the ORM does not fetch all rows into new ORM objects at once. 

413 

414 For typical use, the 

415 :paramref:`_engine.Connection.execution_options.yield_per` execution 

416 option should be preferred, which sets up both 

417 :paramref:`_engine.Connection.execution_options.stream_results` and 

418 :meth:`_engine.Result.yield_per` at once. This option is supported 

419 both at a core level by :class:`_engine.Connection` as well as by the 

420 ORM :class:`_engine.Session`; the latter is described at 

421 :ref:`orm_queryguide_yield_per`. 

422 

423 .. seealso:: 

424 

425 :ref:`engine_stream_results` - background on 

426 :paramref:`_engine.Connection.execution_options.stream_results` 

427 

428 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

429 

430 :paramref:`_engine.Connection.execution_options.yield_per` 

431 

432 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

433 describing the ORM version of ``yield_per`` 

434 

435 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

436 :class:`_sql.Executable`. Sets a maximum 

437 buffer size to use when the 

438 :paramref:`_engine.Connection.execution_options.stream_results` 

439 execution option is used on a backend that supports server side 

440 cursors. The default value if not specified is 1000. 

441 

442 .. seealso:: 

443 

444 :paramref:`_engine.Connection.execution_options.stream_results` 

445 

446 :ref:`engine_stream_results` 

447 

448 

449 :param yield_per: Available on: :class:`_engine.Connection`, 

450 :class:`_sql.Executable`. Integer value applied which will 

451 set the :paramref:`_engine.Connection.execution_options.stream_results` 

452 execution option and invoke :meth:`_engine.Result.yield_per` 

453 automatically at once. Allows equivalent functionality as 

454 is present when using this parameter with the ORM. 

455 

456 .. versionadded:: 1.4.40 

457 

458 .. seealso:: 

459 

460 :ref:`engine_stream_results` - background and examples 

461 on using server side cursors with Core. 

462 

463 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

464 describing the ORM version of ``yield_per`` 

465 

466 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

467 :class:`_engine.Engine`. Number of rows to format into an 

468 INSERT statement when the statement uses "insertmanyvalues" mode, 

469 which is a paged form of bulk insert that is used for many backends 

470 when using :term:`executemany` execution typically in conjunction 

471 with RETURNING. Defaults to 1000. May also be modified on a 

472 per-engine basis using the 

473 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

474 

475 .. versionadded:: 2.0 

476 

477 .. seealso:: 

478 

479 :ref:`engine_insertmanyvalues` 

480 

481 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

482 :class:`_engine.Engine`, :class:`_sql.Executable`. 

483 

484 A dictionary mapping schema names to schema names, that will be 

485 applied to the :paramref:`_schema.Table.schema` element of each 

486 :class:`_schema.Table` 

487 encountered when SQL or DDL expression elements 

488 are compiled into strings; the resulting schema name will be 

489 converted based on presence in the map of the original name. 

490 

491 .. seealso:: 

492 

493 :ref:`schema_translating` 

494 

495 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

496 attribute will be unconditionally memoized within the result and 

497 made available via the :attr:`.CursorResult.rowcount` attribute. 

498 Normally, this attribute is only preserved for UPDATE and DELETE 

499 statements. Using this option, the DBAPIs rowcount value can 

500 be accessed for other kinds of statements such as INSERT and SELECT, 

501 to the degree that the DBAPI supports these statements. See 

502 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

503 of this attribute. 

504 

505 .. versionadded:: 2.0.28 

506 

507 .. seealso:: 

508 

509 :meth:`_engine.Engine.execution_options` 

510 

511 :meth:`.Executable.execution_options` 

512 

513 :meth:`_engine.Connection.get_execution_options` 

514 

515 :ref:`orm_queryguide_execution_options` - documentation on all 

516 ORM-specific execution options 

517 

518 """ # noqa 

519 if self._has_events or self.engine._has_events: 

520 self.dispatch.set_connection_execution_options(self, opt) 

521 self._execution_options = self._execution_options.union(opt) 

522 self.dialect.set_connection_execution_options(self, opt) 

523 return self 

524 

525 def get_execution_options(self) -> _ExecuteOptions: 

526 """Get the non-SQL options which will take effect during execution. 

527 

528 .. versionadded:: 1.3 

529 

530 .. seealso:: 

531 

532 :meth:`_engine.Connection.execution_options` 

533 """ 

534 return self._execution_options 

535 

536 @property 

537 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

538 pool_proxied_connection = self._dbapi_connection 

539 return ( 

540 pool_proxied_connection is not None 

541 and pool_proxied_connection.is_valid 

542 ) 

543 

544 @property 

545 def closed(self) -> bool: 

546 """Return True if this connection is closed.""" 

547 

548 return self._dbapi_connection is None and not self.__can_reconnect 

549 

550 @property 

551 def invalidated(self) -> bool: 

552 """Return True if this connection was invalidated. 

553 

554 This does not indicate whether or not the connection was 

555 invalidated at the pool level, however 

556 

557 """ 

558 

559 # prior to 1.4, "invalid" was stored as a state independent of 

560 # "closed", meaning an invalidated connection could be "closed", 

561 # the _dbapi_connection would be None and closed=True, yet the 

562 # "invalid" flag would stay True. This meant that there were 

563 # three separate states (open/valid, closed/valid, closed/invalid) 

564 # when there is really no reason for that; a connection that's 

565 # "closed" does not need to be "invalid". So the state is now 

566 # represented by the two facts alone. 

567 

568 pool_proxied_connection = self._dbapi_connection 

569 return pool_proxied_connection is None and self.__can_reconnect 

570 

571 @property 

572 def connection(self) -> PoolProxiedConnection: 

573 """The underlying DB-API connection managed by this Connection. 

574 

575 This is a SQLAlchemy connection-pool proxied connection 

576 which then has the attribute 

577 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

578 actual driver connection. 

579 

580 .. seealso:: 

581 

582 

583 :ref:`dbapi_connections` 

584 

585 """ 

586 

587 if self._dbapi_connection is None: 

588 try: 

589 return self._revalidate_connection() 

590 except (exc.PendingRollbackError, exc.ResourceClosedError): 

591 raise 

592 except BaseException as e: 

593 self._handle_dbapi_exception(e, None, None, None, None) 

594 else: 

595 return self._dbapi_connection 

596 

597 def get_isolation_level(self) -> IsolationLevel: 

598 """Return the current **actual** isolation level that's present on 

599 the database within the scope of this connection. 

600 

601 This attribute will perform a live SQL operation against the database 

602 in order to procure the current isolation level, so the value returned 

603 is the actual level on the underlying DBAPI connection regardless of 

604 how this state was set. This will be one of the four actual isolation 

605 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

606 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

607 level setting. Third party dialects may also feature additional 

608 isolation level settings. 

609 

610 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

611 isolation level, which is a separate :term:`dbapi` setting that's 

612 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

613 in use, the database connection still has a "traditional" isolation 

614 mode in effect, that is typically one of the four values 

615 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

616 ``SERIALIZABLE``. 

617 

618 Compare to the :attr:`_engine.Connection.default_isolation_level` 

619 accessor which returns the isolation level that is present on the 

620 database at initial connection time. 

621 

622 .. seealso:: 

623 

624 :attr:`_engine.Connection.default_isolation_level` 

625 - view default level 

626 

627 :paramref:`_sa.create_engine.isolation_level` 

628 - set per :class:`_engine.Engine` isolation level 

629 

630 :paramref:`.Connection.execution_options.isolation_level` 

631 - set per :class:`_engine.Connection` isolation level 

632 

633 """ 

634 dbapi_connection = self.connection.dbapi_connection 

635 assert dbapi_connection is not None 

636 try: 

637 return self.dialect.get_isolation_level(dbapi_connection) 

638 except BaseException as e: 

639 self._handle_dbapi_exception(e, None, None, None, None) 

640 

641 @property 

642 def default_isolation_level(self) -> Optional[IsolationLevel]: 

643 """The initial-connection time isolation level associated with the 

644 :class:`_engine.Dialect` in use. 

645 

646 This value is independent of the 

647 :paramref:`.Connection.execution_options.isolation_level` and 

648 :paramref:`.Engine.execution_options.isolation_level` execution 

649 options, and is determined by the :class:`_engine.Dialect` when the 

650 first connection is created, by performing a SQL query against the 

651 database for the current isolation level before any additional commands 

652 have been emitted. 

653 

654 Calling this accessor does not invoke any new SQL queries. 

655 

656 .. seealso:: 

657 

658 :meth:`_engine.Connection.get_isolation_level` 

659 - view current actual isolation level 

660 

661 :paramref:`_sa.create_engine.isolation_level` 

662 - set per :class:`_engine.Engine` isolation level 

663 

664 :paramref:`.Connection.execution_options.isolation_level` 

665 - set per :class:`_engine.Connection` isolation level 

666 

667 """ 

668 return self.dialect.default_isolation_level 

669 

670 def _invalid_transaction(self) -> NoReturn: 

671 raise exc.PendingRollbackError( 

672 "Can't reconnect until invalid %stransaction is rolled " 

673 "back. Please rollback() fully before proceeding" 

674 % ("savepoint " if self._nested_transaction is not None else ""), 

675 code="8s2b", 

676 ) 

677 

678 def _revalidate_connection(self) -> PoolProxiedConnection: 

679 if self.__can_reconnect and self.invalidated: 

680 if self._transaction is not None: 

681 self._invalid_transaction() 

682 self._dbapi_connection = self.engine.raw_connection() 

683 return self._dbapi_connection 

684 raise exc.ResourceClosedError("This Connection is closed") 

685 

686 @property 

687 def info(self) -> _InfoType: 

688 """Info dictionary associated with the underlying DBAPI connection 

689 referred to by this :class:`_engine.Connection`, allowing user-defined 

690 data to be associated with the connection. 

691 

692 The data here will follow along with the DBAPI connection including 

693 after it is returned to the connection pool and used again 

694 in subsequent instances of :class:`_engine.Connection`. 

695 

696 """ 

697 

698 return self.connection.info 

699 

700 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

701 """Invalidate the underlying DBAPI connection associated with 

702 this :class:`_engine.Connection`. 

703 

704 An attempt will be made to close the underlying DBAPI connection 

705 immediately; however if this operation fails, the error is logged 

706 but not raised. The connection is then discarded whether or not 

707 close() succeeded. 

708 

709 Upon the next use (where "use" typically means using the 

710 :meth:`_engine.Connection.execute` method or similar), 

711 this :class:`_engine.Connection` will attempt to 

712 procure a new DBAPI connection using the services of the 

713 :class:`_pool.Pool` as a source of connectivity (e.g. 

714 a "reconnection"). 

715 

716 If a transaction was in progress (e.g. the 

717 :meth:`_engine.Connection.begin` method has been called) when 

718 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

719 level all state associated with this transaction is lost, as 

720 the DBAPI connection is closed. The :class:`_engine.Connection` 

721 will not allow a reconnection to proceed until the 

722 :class:`.Transaction` object is ended, by calling the 

723 :meth:`.Transaction.rollback` method; until that point, any attempt at 

724 continuing to use the :class:`_engine.Connection` will raise an 

725 :class:`~sqlalchemy.exc.InvalidRequestError`. 

726 This is to prevent applications from accidentally 

727 continuing an ongoing transactional operations despite the 

728 fact that the transaction has been lost due to an 

729 invalidation. 

730 

731 The :meth:`_engine.Connection.invalidate` method, 

732 just like auto-invalidation, 

733 will at the connection pool level invoke the 

734 :meth:`_events.PoolEvents.invalidate` event. 

735 

736 :param exception: an optional ``Exception`` instance that's the 

737 reason for the invalidation. is passed along to event handlers 

738 and logging functions. 

739 

740 .. seealso:: 

741 

742 :ref:`pool_connection_invalidation` 

743 

744 """ 

745 

746 if self.invalidated: 

747 return 

748 

749 if self.closed: 

750 raise exc.ResourceClosedError("This Connection is closed") 

751 

752 if self._still_open_and_dbapi_connection_is_valid: 

753 pool_proxied_connection = self._dbapi_connection 

754 assert pool_proxied_connection is not None 

755 pool_proxied_connection.invalidate(exception) 

756 

757 self._dbapi_connection = None 

758 

759 def detach(self) -> None: 

760 """Detach the underlying DB-API connection from its connection pool. 

761 

762 E.g.:: 

763 

764 with engine.connect() as conn: 

765 conn.detach() 

766 conn.execute(text("SET search_path TO schema1, schema2")) 

767 

768 # work with connection 

769 

770 # connection is fully closed (since we used "with:", can 

771 # also call .close()) 

772 

773 This :class:`_engine.Connection` instance will remain usable. 

774 When closed 

775 (or exited from a context manager context as above), 

776 the DB-API connection will be literally closed and not 

777 returned to its originating pool. 

778 

779 This method can be used to insulate the rest of an application 

780 from a modified state on a connection (such as a transaction 

781 isolation level or similar). 

782 

783 """ 

784 

785 if self.closed: 

786 raise exc.ResourceClosedError("This Connection is closed") 

787 

788 pool_proxied_connection = self._dbapi_connection 

789 if pool_proxied_connection is None: 

790 raise exc.InvalidRequestError( 

791 "Can't detach an invalidated Connection" 

792 ) 

793 pool_proxied_connection.detach() 

794 

795 def _autobegin(self) -> None: 

796 if self._allow_autobegin and not self.__in_begin: 

797 self.begin() 

798 

799 def begin(self) -> RootTransaction: 

800 """Begin a transaction prior to autobegin occurring. 

801 

802 E.g.:: 

803 

804 with engine.connect() as conn: 

805 with conn.begin() as trans: 

806 conn.execute(table.insert(), {"username": "sandy"}) 

807 

808 

809 The returned object is an instance of :class:`_engine.RootTransaction`. 

810 This object represents the "scope" of the transaction, 

811 which completes when either the :meth:`_engine.Transaction.rollback` 

812 or :meth:`_engine.Transaction.commit` method is called; the object 

813 also works as a context manager as illustrated above. 

814 

815 The :meth:`_engine.Connection.begin` method begins a 

816 transaction that normally will be begun in any case when the connection 

817 is first used to execute a statement. The reason this method might be 

818 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

819 event at a specific time, or to organize code within the scope of a 

820 connection checkout in terms of context managed blocks, such as:: 

821 

822 with engine.connect() as conn: 

823 with conn.begin(): 

824 conn.execute(...) 

825 conn.execute(...) 

826 

827 with conn.begin(): 

828 conn.execute(...) 

829 conn.execute(...) 

830 

831 The above code is not fundamentally any different in its behavior than 

832 the following code which does not use 

833 :meth:`_engine.Connection.begin`; the below style is known 

834 as "commit as you go" style:: 

835 

836 with engine.connect() as conn: 

837 conn.execute(...) 

838 conn.execute(...) 

839 conn.commit() 

840 

841 conn.execute(...) 

842 conn.execute(...) 

843 conn.commit() 

844 

845 From a database point of view, the :meth:`_engine.Connection.begin` 

846 method does not emit any SQL or change the state of the underlying 

847 DBAPI connection in any way; the Python DBAPI does not have any 

848 concept of explicit transaction begin. 

849 

850 .. seealso:: 

851 

852 :ref:`tutorial_working_with_transactions` - in the 

853 :ref:`unified_tutorial` 

854 

855 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

856 

857 :meth:`_engine.Connection.begin_twophase` - 

858 use a two phase /XID transaction 

859 

860 :meth:`_engine.Engine.begin` - context manager available from 

861 :class:`_engine.Engine` 

862 

863 """ 

864 if self._transaction is None: 

865 self._transaction = RootTransaction(self) 

866 return self._transaction 

867 else: 

868 raise exc.InvalidRequestError( 

869 "This connection has already initialized a SQLAlchemy " 

870 "Transaction() object via begin() or autobegin; can't " 

871 "call begin() here unless rollback() or commit() " 

872 "is called first." 

873 ) 

874 

875 def begin_nested(self) -> NestedTransaction: 

876 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

877 handle that controls the scope of the SAVEPOINT. 

878 

879 E.g.:: 

880 

881 with engine.begin() as connection: 

882 with connection.begin_nested(): 

883 connection.execute(table.insert(), {"username": "sandy"}) 

884 

885 The returned object is an instance of 

886 :class:`_engine.NestedTransaction`, which includes transactional 

887 methods :meth:`_engine.NestedTransaction.commit` and 

888 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

889 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

890 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

891 to the :class:`_engine.NestedTransaction` object and is generated 

892 automatically. Like any other :class:`_engine.Transaction`, the 

893 :class:`_engine.NestedTransaction` may be used as a context manager as 

894 illustrated above which will "release" or "rollback" corresponding to 

895 if the operation within the block were successful or raised an 

896 exception. 

897 

898 Nested transactions require SAVEPOINT support in the underlying 

899 database, else the behavior is undefined. SAVEPOINT is commonly used to 

900 run operations within a transaction that may fail, while continuing the 

901 outer transaction. E.g.:: 

902 

903 from sqlalchemy import exc 

904 

905 with engine.begin() as connection: 

906 trans = connection.begin_nested() 

907 try: 

908 connection.execute(table.insert(), {"username": "sandy"}) 

909 trans.commit() 

910 except exc.IntegrityError: # catch for duplicate username 

911 trans.rollback() # rollback to savepoint 

912 

913 # outer transaction continues 

914 connection.execute( ... ) 

915 

916 If :meth:`_engine.Connection.begin_nested` is called without first 

917 calling :meth:`_engine.Connection.begin` or 

918 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

919 will "autobegin" the outer transaction first. This outer transaction 

920 may be committed using "commit-as-you-go" style, e.g.:: 

921 

922 with engine.connect() as connection: # begin() wasn't called 

923 

924 with connection.begin_nested(): will auto-"begin()" first 

925 connection.execute( ... ) 

926 # savepoint is released 

927 

928 connection.execute( ... ) 

929 

930 # explicitly commit outer transaction 

931 connection.commit() 

932 

933 # can continue working with connection here 

934 

935 .. versionchanged:: 2.0 

936 

937 :meth:`_engine.Connection.begin_nested` will now participate 

938 in the connection "autobegin" behavior that is new as of 

939 2.0 / "future" style connections in 1.4. 

940 

941 .. seealso:: 

942 

943 :meth:`_engine.Connection.begin` 

944 

945 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

946 

947 """ 

948 if self._transaction is None: 

949 self._autobegin() 

950 

951 return NestedTransaction(self) 

952 

953 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

954 """Begin a two-phase or XA transaction and return a transaction 

955 handle. 

956 

957 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

958 which in addition to the methods provided by 

959 :class:`.Transaction`, also provides a 

960 :meth:`~.TwoPhaseTransaction.prepare` method. 

961 

962 :param xid: the two phase transaction id. If not supplied, a 

963 random id will be generated. 

964 

965 .. seealso:: 

966 

967 :meth:`_engine.Connection.begin` 

968 

969 :meth:`_engine.Connection.begin_twophase` 

970 

971 """ 

972 

973 if self._transaction is not None: 

974 raise exc.InvalidRequestError( 

975 "Cannot start a two phase transaction when a transaction " 

976 "is already in progress." 

977 ) 

978 if xid is None: 

979 xid = self.engine.dialect.create_xid() 

980 return TwoPhaseTransaction(self, xid) 

981 

982 def commit(self) -> None: 

983 """Commit the transaction that is currently in progress. 

984 

985 This method commits the current transaction if one has been started. 

986 If no transaction was started, the method has no effect, assuming 

987 the connection is in a non-invalidated state. 

988 

989 A transaction is begun on a :class:`_engine.Connection` automatically 

990 whenever a statement is first executed, or when the 

991 :meth:`_engine.Connection.begin` method is called. 

992 

993 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

994 the primary database transaction that is linked to the 

995 :class:`_engine.Connection` object. It does not operate upon a 

996 SAVEPOINT that would have been invoked from the 

997 :meth:`_engine.Connection.begin_nested` method; for control of a 

998 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

999 :class:`_engine.NestedTransaction` that is returned by the 

1000 :meth:`_engine.Connection.begin_nested` method itself. 

1001 

1002 

1003 """ 

1004 if self._transaction: 

1005 self._transaction.commit() 

1006 

1007 def rollback(self) -> None: 

1008 """Roll back the transaction that is currently in progress. 

1009 

1010 This method rolls back the current transaction if one has been started. 

1011 If no transaction was started, the method has no effect. If a 

1012 transaction was started and the connection is in an invalidated state, 

1013 the transaction is cleared using this method. 

1014 

1015 A transaction is begun on a :class:`_engine.Connection` automatically 

1016 whenever a statement is first executed, or when the 

1017 :meth:`_engine.Connection.begin` method is called. 

1018 

1019 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1020 upon the primary database transaction that is linked to the 

1021 :class:`_engine.Connection` object. It does not operate upon a 

1022 SAVEPOINT that would have been invoked from the 

1023 :meth:`_engine.Connection.begin_nested` method; for control of a 

1024 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1025 :class:`_engine.NestedTransaction` that is returned by the 

1026 :meth:`_engine.Connection.begin_nested` method itself. 

1027 

1028 

1029 """ 

1030 if self._transaction: 

1031 self._transaction.rollback() 

1032 

1033 def recover_twophase(self) -> List[Any]: 

1034 return self.engine.dialect.do_recover_twophase(self) 

1035 

1036 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1037 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1038 

1039 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1040 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1041 

1042 def in_transaction(self) -> bool: 

1043 """Return True if a transaction is in progress.""" 

1044 return self._transaction is not None and self._transaction.is_active 

1045 

1046 def in_nested_transaction(self) -> bool: 

1047 """Return True if a transaction is in progress.""" 

1048 return ( 

1049 self._nested_transaction is not None 

1050 and self._nested_transaction.is_active 

1051 ) 

1052 

1053 def _is_autocommit_isolation(self) -> bool: 

1054 opt_iso = self._execution_options.get("isolation_level", None) 

1055 return bool( 

1056 opt_iso == "AUTOCOMMIT" 

1057 or ( 

1058 opt_iso is None 

1059 and self.engine.dialect._on_connect_isolation_level 

1060 == "AUTOCOMMIT" 

1061 ) 

1062 ) 

1063 

1064 def _get_required_transaction(self) -> RootTransaction: 

1065 trans = self._transaction 

1066 if trans is None: 

1067 raise exc.InvalidRequestError("connection is not in a transaction") 

1068 return trans 

1069 

1070 def _get_required_nested_transaction(self) -> NestedTransaction: 

1071 trans = self._nested_transaction 

1072 if trans is None: 

1073 raise exc.InvalidRequestError( 

1074 "connection is not in a nested transaction" 

1075 ) 

1076 return trans 

1077 

1078 def get_transaction(self) -> Optional[RootTransaction]: 

1079 """Return the current root transaction in progress, if any. 

1080 

1081 .. versionadded:: 1.4 

1082 

1083 """ 

1084 

1085 return self._transaction 

1086 

1087 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1088 """Return the current nested transaction in progress, if any. 

1089 

1090 .. versionadded:: 1.4 

1091 

1092 """ 

1093 return self._nested_transaction 

1094 

1095 def _begin_impl(self, transaction: RootTransaction) -> None: 

1096 if self._echo: 

1097 if self._is_autocommit_isolation(): 

1098 self._log_info( 

1099 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1100 "autocommit mode)" 

1101 ) 

1102 else: 

1103 self._log_info("BEGIN (implicit)") 

1104 

1105 self.__in_begin = True 

1106 

1107 if self._has_events or self.engine._has_events: 

1108 self.dispatch.begin(self) 

1109 

1110 try: 

1111 self.engine.dialect.do_begin(self.connection) 

1112 except BaseException as e: 

1113 self._handle_dbapi_exception(e, None, None, None, None) 

1114 finally: 

1115 self.__in_begin = False 

1116 

1117 def _rollback_impl(self) -> None: 

1118 if self._has_events or self.engine._has_events: 

1119 self.dispatch.rollback(self) 

1120 

1121 if self._still_open_and_dbapi_connection_is_valid: 

1122 if self._echo: 

1123 if self._is_autocommit_isolation(): 

1124 self._log_info( 

1125 "ROLLBACK using DBAPI connection.rollback(), " 

1126 "DBAPI should ignore due to autocommit mode" 

1127 ) 

1128 else: 

1129 self._log_info("ROLLBACK") 

1130 try: 

1131 self.engine.dialect.do_rollback(self.connection) 

1132 except BaseException as e: 

1133 self._handle_dbapi_exception(e, None, None, None, None) 

1134 

1135 def _commit_impl(self) -> None: 

1136 if self._has_events or self.engine._has_events: 

1137 self.dispatch.commit(self) 

1138 

1139 if self._echo: 

1140 if self._is_autocommit_isolation(): 

1141 self._log_info( 

1142 "COMMIT using DBAPI connection.commit(), " 

1143 "DBAPI should ignore due to autocommit mode" 

1144 ) 

1145 else: 

1146 self._log_info("COMMIT") 

1147 try: 

1148 self.engine.dialect.do_commit(self.connection) 

1149 except BaseException as e: 

1150 self._handle_dbapi_exception(e, None, None, None, None) 

1151 

1152 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1153 if self._has_events or self.engine._has_events: 

1154 self.dispatch.savepoint(self, name) 

1155 

1156 if name is None: 

1157 self.__savepoint_seq += 1 

1158 name = "sa_savepoint_%s" % self.__savepoint_seq 

1159 self.engine.dialect.do_savepoint(self, name) 

1160 return name 

1161 

1162 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1163 if self._has_events or self.engine._has_events: 

1164 self.dispatch.rollback_savepoint(self, name, None) 

1165 

1166 if self._still_open_and_dbapi_connection_is_valid: 

1167 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1168 

1169 def _release_savepoint_impl(self, name: str) -> None: 

1170 if self._has_events or self.engine._has_events: 

1171 self.dispatch.release_savepoint(self, name, None) 

1172 

1173 self.engine.dialect.do_release_savepoint(self, name) 

1174 

1175 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1176 if self._echo: 

1177 self._log_info("BEGIN TWOPHASE (implicit)") 

1178 if self._has_events or self.engine._has_events: 

1179 self.dispatch.begin_twophase(self, transaction.xid) 

1180 

1181 self.__in_begin = True 

1182 try: 

1183 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1184 except BaseException as e: 

1185 self._handle_dbapi_exception(e, None, None, None, None) 

1186 finally: 

1187 self.__in_begin = False 

1188 

1189 def _prepare_twophase_impl(self, xid: Any) -> None: 

1190 if self._has_events or self.engine._has_events: 

1191 self.dispatch.prepare_twophase(self, xid) 

1192 

1193 assert isinstance(self._transaction, TwoPhaseTransaction) 

1194 try: 

1195 self.engine.dialect.do_prepare_twophase(self, xid) 

1196 except BaseException as e: 

1197 self._handle_dbapi_exception(e, None, None, None, None) 

1198 

1199 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1200 if self._has_events or self.engine._has_events: 

1201 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1202 

1203 if self._still_open_and_dbapi_connection_is_valid: 

1204 assert isinstance(self._transaction, TwoPhaseTransaction) 

1205 try: 

1206 self.engine.dialect.do_rollback_twophase( 

1207 self, xid, is_prepared 

1208 ) 

1209 except BaseException as e: 

1210 self._handle_dbapi_exception(e, None, None, None, None) 

1211 

1212 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1213 if self._has_events or self.engine._has_events: 

1214 self.dispatch.commit_twophase(self, xid, is_prepared) 

1215 

1216 assert isinstance(self._transaction, TwoPhaseTransaction) 

1217 try: 

1218 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1219 except BaseException as e: 

1220 self._handle_dbapi_exception(e, None, None, None, None) 

1221 

1222 def close(self) -> None: 

1223 """Close this :class:`_engine.Connection`. 

1224 

1225 This results in a release of the underlying database 

1226 resources, that is, the DBAPI connection referenced 

1227 internally. The DBAPI connection is typically restored 

1228 back to the connection-holding :class:`_pool.Pool` referenced 

1229 by the :class:`_engine.Engine` that produced this 

1230 :class:`_engine.Connection`. Any transactional state present on 

1231 the DBAPI connection is also unconditionally released via 

1232 the DBAPI connection's ``rollback()`` method, regardless 

1233 of any :class:`.Transaction` object that may be 

1234 outstanding with regards to this :class:`_engine.Connection`. 

1235 

1236 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1237 if any transaction is in place. 

1238 

1239 After :meth:`_engine.Connection.close` is called, the 

1240 :class:`_engine.Connection` is permanently in a closed state, 

1241 and will allow no further operations. 

1242 

1243 """ 

1244 

1245 if self._transaction: 

1246 self._transaction.close() 

1247 skip_reset = True 

1248 else: 

1249 skip_reset = False 

1250 

1251 if self._dbapi_connection is not None: 

1252 conn = self._dbapi_connection 

1253 

1254 # as we just closed the transaction, close the connection 

1255 # pool connection without doing an additional reset 

1256 if skip_reset: 

1257 cast("_ConnectionFairy", conn)._close_special( 

1258 transaction_reset=True 

1259 ) 

1260 else: 

1261 conn.close() 

1262 

1263 # There is a slight chance that conn.close() may have 

1264 # triggered an invalidation here in which case 

1265 # _dbapi_connection would already be None, however usually 

1266 # it will be non-None here and in a "closed" state. 

1267 self._dbapi_connection = None 

1268 self.__can_reconnect = False 

1269 

1270 @overload 

1271 def scalar( 

1272 self, 

1273 statement: TypedReturnsRows[_T], 

1274 parameters: Optional[_CoreSingleExecuteParams] = None, 

1275 *, 

1276 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1277 ) -> Optional[_T]: ... 

1278 

1279 @overload 

1280 def scalar( 

1281 self, 

1282 statement: Executable, 

1283 parameters: Optional[_CoreSingleExecuteParams] = None, 

1284 *, 

1285 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1286 ) -> Any: ... 

1287 

1288 def scalar( 

1289 self, 

1290 statement: Executable, 

1291 parameters: Optional[_CoreSingleExecuteParams] = None, 

1292 *, 

1293 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1294 ) -> Any: 

1295 r"""Executes a SQL statement construct and returns a scalar object. 

1296 

1297 This method is shorthand for invoking the 

1298 :meth:`_engine.Result.scalar` method after invoking the 

1299 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1300 

1301 :return: a scalar Python value representing the first column of the 

1302 first row returned. 

1303 

1304 """ 

1305 distilled_parameters = _distill_params_20(parameters) 

1306 try: 

1307 meth = statement._execute_on_scalar 

1308 except AttributeError as err: 

1309 raise exc.ObjectNotExecutableError(statement) from err 

1310 else: 

1311 return meth( 

1312 self, 

1313 distilled_parameters, 

1314 execution_options or NO_OPTIONS, 

1315 ) 

1316 

1317 @overload 

1318 def scalars( 

1319 self, 

1320 statement: TypedReturnsRows[_T], 

1321 parameters: Optional[_CoreAnyExecuteParams] = None, 

1322 *, 

1323 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1324 ) -> ScalarResult[_T]: ... 

1325 

1326 @overload 

1327 def scalars( 

1328 self, 

1329 statement: Executable, 

1330 parameters: Optional[_CoreAnyExecuteParams] = None, 

1331 *, 

1332 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1333 ) -> ScalarResult[Any]: ... 

1334 

1335 def scalars( 

1336 self, 

1337 statement: Executable, 

1338 parameters: Optional[_CoreAnyExecuteParams] = None, 

1339 *, 

1340 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1341 ) -> ScalarResult[Any]: 

1342 """Executes and returns a scalar result set, which yields scalar values 

1343 from the first column of each row. 

1344 

1345 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1346 to receive a :class:`_result.Result` object, then invoking the 

1347 :meth:`_result.Result.scalars` method to produce a 

1348 :class:`_result.ScalarResult` instance. 

1349 

1350 :return: a :class:`_result.ScalarResult` 

1351 

1352 .. versionadded:: 1.4.24 

1353 

1354 """ 

1355 

1356 return self.execute( 

1357 statement, parameters, execution_options=execution_options 

1358 ).scalars() 

1359 

1360 @overload 

1361 def execute( 

1362 self, 

1363 statement: TypedReturnsRows[Unpack[_Ts]], 

1364 parameters: Optional[_CoreAnyExecuteParams] = None, 

1365 *, 

1366 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1367 ) -> CursorResult[Unpack[_Ts]]: ... 

1368 

1369 @overload 

1370 def execute( 

1371 self, 

1372 statement: Executable, 

1373 parameters: Optional[_CoreAnyExecuteParams] = None, 

1374 *, 

1375 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1376 ) -> CursorResult[Unpack[TupleAny]]: ... 

1377 

1378 def execute( 

1379 self, 

1380 statement: Executable, 

1381 parameters: Optional[_CoreAnyExecuteParams] = None, 

1382 *, 

1383 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1384 ) -> CursorResult[Unpack[TupleAny]]: 

1385 r"""Executes a SQL statement construct and returns a 

1386 :class:`_engine.CursorResult`. 

1387 

1388 :param statement: The statement to be executed. This is always 

1389 an object that is in both the :class:`_expression.ClauseElement` and 

1390 :class:`_expression.Executable` hierarchies, including: 

1391 

1392 * :class:`_expression.Select` 

1393 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1394 :class:`_expression.Delete` 

1395 * :class:`_expression.TextClause` and 

1396 :class:`_expression.TextualSelect` 

1397 * :class:`_schema.DDL` and objects which inherit from 

1398 :class:`_schema.ExecutableDDLElement` 

1399 

1400 :param parameters: parameters which will be bound into the statement. 

1401 This may be either a dictionary of parameter names to values, 

1402 or a mutable sequence (e.g. a list) of dictionaries. When a 

1403 list of dictionaries is passed, the underlying statement execution 

1404 will make use of the DBAPI ``cursor.executemany()`` method. 

1405 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1406 method will be used. 

1407 

1408 :param execution_options: optional dictionary of execution options, 

1409 which will be associated with the statement execution. This 

1410 dictionary can provide a subset of the options that are accepted 

1411 by :meth:`_engine.Connection.execution_options`. 

1412 

1413 :return: a :class:`_engine.Result` object. 

1414 

1415 """ 

1416 distilled_parameters = _distill_params_20(parameters) 

1417 try: 

1418 meth = statement._execute_on_connection 

1419 except AttributeError as err: 

1420 raise exc.ObjectNotExecutableError(statement) from err 

1421 else: 

1422 return meth( 

1423 self, 

1424 distilled_parameters, 

1425 execution_options or NO_OPTIONS, 

1426 ) 

1427 

1428 def _execute_function( 

1429 self, 

1430 func: FunctionElement[Any], 

1431 distilled_parameters: _CoreMultiExecuteParams, 

1432 execution_options: CoreExecuteOptionsParameter, 

1433 ) -> CursorResult[Unpack[TupleAny]]: 

1434 """Execute a sql.FunctionElement object.""" 

1435 

1436 return self._execute_clauseelement( 

1437 func.select(), distilled_parameters, execution_options 

1438 ) 

1439 

1440 def _execute_default( 

1441 self, 

1442 default: DefaultGenerator, 

1443 distilled_parameters: _CoreMultiExecuteParams, 

1444 execution_options: CoreExecuteOptionsParameter, 

1445 ) -> Any: 

1446 """Execute a schema.ColumnDefault object.""" 

1447 

1448 exec_opts = self._execution_options.merge_with(execution_options) 

1449 

1450 event_multiparams: Optional[_CoreMultiExecuteParams] 

1451 event_params: Optional[_CoreAnyExecuteParams] 

1452 

1453 # note for event handlers, the "distilled parameters" which is always 

1454 # a list of dicts is broken out into separate "multiparams" and 

1455 # "params" collections, which allows the handler to distinguish 

1456 # between an executemany and execute style set of parameters. 

1457 if self._has_events or self.engine._has_events: 

1458 ( 

1459 default, 

1460 distilled_parameters, 

1461 event_multiparams, 

1462 event_params, 

1463 ) = self._invoke_before_exec_event( 

1464 default, distilled_parameters, exec_opts 

1465 ) 

1466 else: 

1467 event_multiparams = event_params = None 

1468 

1469 try: 

1470 conn = self._dbapi_connection 

1471 if conn is None: 

1472 conn = self._revalidate_connection() 

1473 

1474 dialect = self.dialect 

1475 ctx = dialect.execution_ctx_cls._init_default( 

1476 dialect, self, conn, exec_opts 

1477 ) 

1478 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1479 raise 

1480 except BaseException as e: 

1481 self._handle_dbapi_exception(e, None, None, None, None) 

1482 

1483 ret = ctx._exec_default(None, default, None) 

1484 

1485 if self._has_events or self.engine._has_events: 

1486 self.dispatch.after_execute( 

1487 self, 

1488 default, 

1489 event_multiparams, 

1490 event_params, 

1491 exec_opts, 

1492 ret, 

1493 ) 

1494 

1495 return ret 

1496 

1497 def _execute_ddl( 

1498 self, 

1499 ddl: ExecutableDDLElement, 

1500 distilled_parameters: _CoreMultiExecuteParams, 

1501 execution_options: CoreExecuteOptionsParameter, 

1502 ) -> CursorResult[Unpack[TupleAny]]: 

1503 """Execute a schema.DDL object.""" 

1504 

1505 exec_opts = ddl._execution_options.merge_with( 

1506 self._execution_options, execution_options 

1507 ) 

1508 

1509 event_multiparams: Optional[_CoreMultiExecuteParams] 

1510 event_params: Optional[_CoreSingleExecuteParams] 

1511 

1512 if self._has_events or self.engine._has_events: 

1513 ( 

1514 ddl, 

1515 distilled_parameters, 

1516 event_multiparams, 

1517 event_params, 

1518 ) = self._invoke_before_exec_event( 

1519 ddl, distilled_parameters, exec_opts 

1520 ) 

1521 else: 

1522 event_multiparams = event_params = None 

1523 

1524 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1525 

1526 dialect = self.dialect 

1527 

1528 compiled = ddl.compile( 

1529 dialect=dialect, schema_translate_map=schema_translate_map 

1530 ) 

1531 ret = self._execute_context( 

1532 dialect, 

1533 dialect.execution_ctx_cls._init_ddl, 

1534 compiled, 

1535 None, 

1536 exec_opts, 

1537 compiled, 

1538 ) 

1539 if self._has_events or self.engine._has_events: 

1540 self.dispatch.after_execute( 

1541 self, 

1542 ddl, 

1543 event_multiparams, 

1544 event_params, 

1545 exec_opts, 

1546 ret, 

1547 ) 

1548 return ret 

1549 

1550 def _invoke_before_exec_event( 

1551 self, 

1552 elem: Any, 

1553 distilled_params: _CoreMultiExecuteParams, 

1554 execution_options: _ExecuteOptions, 

1555 ) -> Tuple[ 

1556 Any, 

1557 _CoreMultiExecuteParams, 

1558 _CoreMultiExecuteParams, 

1559 _CoreSingleExecuteParams, 

1560 ]: 

1561 event_multiparams: _CoreMultiExecuteParams 

1562 event_params: _CoreSingleExecuteParams 

1563 

1564 if len(distilled_params) == 1: 

1565 event_multiparams, event_params = [], distilled_params[0] 

1566 else: 

1567 event_multiparams, event_params = distilled_params, {} 

1568 

1569 for fn in self.dispatch.before_execute: 

1570 elem, event_multiparams, event_params = fn( 

1571 self, 

1572 elem, 

1573 event_multiparams, 

1574 event_params, 

1575 execution_options, 

1576 ) 

1577 

1578 if event_multiparams: 

1579 distilled_params = list(event_multiparams) 

1580 if event_params: 

1581 raise exc.InvalidRequestError( 

1582 "Event handler can't return non-empty multiparams " 

1583 "and params at the same time" 

1584 ) 

1585 elif event_params: 

1586 distilled_params = [event_params] 

1587 else: 

1588 distilled_params = [] 

1589 

1590 return elem, distilled_params, event_multiparams, event_params 

1591 

1592 def _execute_clauseelement( 

1593 self, 

1594 elem: Executable, 

1595 distilled_parameters: _CoreMultiExecuteParams, 

1596 execution_options: CoreExecuteOptionsParameter, 

1597 ) -> CursorResult[Unpack[TupleAny]]: 

1598 """Execute a sql.ClauseElement object.""" 

1599 

1600 exec_opts = elem._execution_options.merge_with( 

1601 self._execution_options, execution_options 

1602 ) 

1603 

1604 has_events = self._has_events or self.engine._has_events 

1605 if has_events: 

1606 ( 

1607 elem, 

1608 distilled_parameters, 

1609 event_multiparams, 

1610 event_params, 

1611 ) = self._invoke_before_exec_event( 

1612 elem, distilled_parameters, exec_opts 

1613 ) 

1614 

1615 if distilled_parameters: 

1616 # ensure we don't retain a link to the view object for keys() 

1617 # which links to the values, which we don't want to cache 

1618 keys = sorted(distilled_parameters[0]) 

1619 for_executemany = len(distilled_parameters) > 1 

1620 else: 

1621 keys = [] 

1622 for_executemany = False 

1623 

1624 dialect = self.dialect 

1625 

1626 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1627 

1628 compiled_cache: Optional[CompiledCacheType] = exec_opts.get( 

1629 "compiled_cache", self.engine._compiled_cache 

1630 ) 

1631 

1632 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 

1633 dialect=dialect, 

1634 compiled_cache=compiled_cache, 

1635 column_keys=keys, 

1636 for_executemany=for_executemany, 

1637 schema_translate_map=schema_translate_map, 

1638 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1639 ) 

1640 ret = self._execute_context( 

1641 dialect, 

1642 dialect.execution_ctx_cls._init_compiled, 

1643 compiled_sql, 

1644 distilled_parameters, 

1645 exec_opts, 

1646 compiled_sql, 

1647 distilled_parameters, 

1648 elem, 

1649 extracted_params, 

1650 cache_hit=cache_hit, 

1651 ) 

1652 if has_events: 

1653 self.dispatch.after_execute( 

1654 self, 

1655 elem, 

1656 event_multiparams, 

1657 event_params, 

1658 exec_opts, 

1659 ret, 

1660 ) 

1661 return ret 

1662 

1663 def _execute_compiled( 

1664 self, 

1665 compiled: Compiled, 

1666 distilled_parameters: _CoreMultiExecuteParams, 

1667 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS, 

1668 ) -> CursorResult[Unpack[TupleAny]]: 

1669 """Execute a sql.Compiled object. 

1670 

1671 TODO: why do we have this? likely deprecate or remove 

1672 

1673 """ 

1674 

1675 exec_opts = compiled.execution_options.merge_with( 

1676 self._execution_options, execution_options 

1677 ) 

1678 

1679 if self._has_events or self.engine._has_events: 

1680 ( 

1681 compiled, 

1682 distilled_parameters, 

1683 event_multiparams, 

1684 event_params, 

1685 ) = self._invoke_before_exec_event( 

1686 compiled, distilled_parameters, exec_opts 

1687 ) 

1688 

1689 dialect = self.dialect 

1690 

1691 ret = self._execute_context( 

1692 dialect, 

1693 dialect.execution_ctx_cls._init_compiled, 

1694 compiled, 

1695 distilled_parameters, 

1696 exec_opts, 

1697 compiled, 

1698 distilled_parameters, 

1699 None, 

1700 None, 

1701 ) 

1702 if self._has_events or self.engine._has_events: 

1703 self.dispatch.after_execute( 

1704 self, 

1705 compiled, 

1706 event_multiparams, 

1707 event_params, 

1708 exec_opts, 

1709 ret, 

1710 ) 

1711 return ret 

1712 

1713 def exec_driver_sql( 

1714 self, 

1715 statement: str, 

1716 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1717 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1718 ) -> CursorResult[Unpack[TupleAny]]: 

1719 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1720 without any SQL compilation steps. 

1721 

1722 This can be used to pass any string directly to the 

1723 ``cursor.execute()`` method of the DBAPI in use. 

1724 

1725 :param statement: The statement str to be executed. Bound parameters 

1726 must use the underlying DBAPI's paramstyle, such as "qmark", 

1727 "pyformat", "format", etc. 

1728 

1729 :param parameters: represent bound parameter values to be used in the 

1730 execution. The format is one of: a dictionary of named parameters, 

1731 a tuple of positional parameters, or a list containing either 

1732 dictionaries or tuples for multiple-execute support. 

1733 

1734 :return: a :class:`_engine.CursorResult`. 

1735 

1736 E.g. multiple dictionaries:: 

1737 

1738 

1739 conn.exec_driver_sql( 

1740 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1741 [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}] 

1742 ) 

1743 

1744 Single dictionary:: 

1745 

1746 conn.exec_driver_sql( 

1747 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1748 dict(id=1, value="v1") 

1749 ) 

1750 

1751 Single tuple:: 

1752 

1753 conn.exec_driver_sql( 

1754 "INSERT INTO table (id, value) VALUES (?, ?)", 

1755 (1, 'v1') 

1756 ) 

1757 

1758 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1759 not participate in the 

1760 :meth:`_events.ConnectionEvents.before_execute` and 

1761 :meth:`_events.ConnectionEvents.after_execute` events. To 

1762 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1763 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1764 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1765 

1766 .. seealso:: 

1767 

1768 :pep:`249` 

1769 

1770 """ 

1771 

1772 distilled_parameters = _distill_raw_params(parameters) 

1773 

1774 exec_opts = self._execution_options.merge_with(execution_options) 

1775 

1776 dialect = self.dialect 

1777 ret = self._execute_context( 

1778 dialect, 

1779 dialect.execution_ctx_cls._init_statement, 

1780 statement, 

1781 None, 

1782 exec_opts, 

1783 statement, 

1784 distilled_parameters, 

1785 ) 

1786 

1787 return ret 

1788 

1789 def _execute_context( 

1790 self, 

1791 dialect: Dialect, 

1792 constructor: Callable[..., ExecutionContext], 

1793 statement: Union[str, Compiled], 

1794 parameters: Optional[_AnyMultiExecuteParams], 

1795 execution_options: _ExecuteOptions, 

1796 *args: Any, 

1797 **kw: Any, 

1798 ) -> CursorResult[Unpack[TupleAny]]: 

1799 """Create an :class:`.ExecutionContext` and execute, returning 

1800 a :class:`_engine.CursorResult`.""" 

1801 

1802 if execution_options: 

1803 yp = execution_options.get("yield_per", None) 

1804 if yp: 

1805 execution_options = execution_options.union( 

1806 {"stream_results": True, "max_row_buffer": yp} 

1807 ) 

1808 try: 

1809 conn = self._dbapi_connection 

1810 if conn is None: 

1811 conn = self._revalidate_connection() 

1812 

1813 context = constructor( 

1814 dialect, self, conn, execution_options, *args, **kw 

1815 ) 

1816 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1817 raise 

1818 except BaseException as e: 

1819 self._handle_dbapi_exception( 

1820 e, str(statement), parameters, None, None 

1821 ) 

1822 

1823 if ( 

1824 self._transaction 

1825 and not self._transaction.is_active 

1826 or ( 

1827 self._nested_transaction 

1828 and not self._nested_transaction.is_active 

1829 ) 

1830 ): 

1831 self._invalid_transaction() 

1832 

1833 elif self._trans_context_manager: 

1834 TransactionalContext._trans_ctx_check(self) 

1835 

1836 if self._transaction is None: 

1837 self._autobegin() 

1838 

1839 context.pre_exec() 

1840 

1841 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1842 return self._exec_insertmany_context(dialect, context) 

1843 else: 

1844 return self._exec_single_context( 

1845 dialect, context, statement, parameters 

1846 ) 

1847 

1848 def _exec_single_context( 

1849 self, 

1850 dialect: Dialect, 

1851 context: ExecutionContext, 

1852 statement: Union[str, Compiled], 

1853 parameters: Optional[_AnyMultiExecuteParams], 

1854 ) -> CursorResult[Unpack[TupleAny]]: 

1855 """continue the _execute_context() method for a single DBAPI 

1856 cursor.execute() or cursor.executemany() call. 

1857 

1858 """ 

1859 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1860 generic_setinputsizes = context._prepare_set_input_sizes() 

1861 

1862 if generic_setinputsizes: 

1863 try: 

1864 dialect.do_set_input_sizes( 

1865 context.cursor, generic_setinputsizes, context 

1866 ) 

1867 except BaseException as e: 

1868 self._handle_dbapi_exception( 

1869 e, str(statement), parameters, None, context 

1870 ) 

1871 

1872 cursor, str_statement, parameters = ( 

1873 context.cursor, 

1874 context.statement, 

1875 context.parameters, 

1876 ) 

1877 

1878 effective_parameters: Optional[_AnyExecuteParams] 

1879 

1880 if not context.executemany: 

1881 effective_parameters = parameters[0] 

1882 else: 

1883 effective_parameters = parameters 

1884 

1885 if self._has_events or self.engine._has_events: 

1886 for fn in self.dispatch.before_cursor_execute: 

1887 str_statement, effective_parameters = fn( 

1888 self, 

1889 cursor, 

1890 str_statement, 

1891 effective_parameters, 

1892 context, 

1893 context.executemany, 

1894 ) 

1895 

1896 if self._echo: 

1897 self._log_info(str_statement) 

1898 

1899 stats = context._get_cache_stats() 

1900 

1901 if not self.engine.hide_parameters: 

1902 self._log_info( 

1903 "[%s] %r", 

1904 stats, 

1905 sql_util._repr_params( 

1906 effective_parameters, 

1907 batches=10, 

1908 ismulti=context.executemany, 

1909 ), 

1910 ) 

1911 else: 

1912 self._log_info( 

1913 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1914 stats, 

1915 ) 

1916 

1917 evt_handled: bool = False 

1918 try: 

1919 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1920 effective_parameters = cast( 

1921 "_CoreMultiExecuteParams", effective_parameters 

1922 ) 

1923 if self.dialect._has_events: 

1924 for fn in self.dialect.dispatch.do_executemany: 

1925 if fn( 

1926 cursor, 

1927 str_statement, 

1928 effective_parameters, 

1929 context, 

1930 ): 

1931 evt_handled = True 

1932 break 

1933 if not evt_handled: 

1934 self.dialect.do_executemany( 

1935 cursor, 

1936 str_statement, 

1937 effective_parameters, 

1938 context, 

1939 ) 

1940 elif not effective_parameters and context.no_parameters: 

1941 if self.dialect._has_events: 

1942 for fn in self.dialect.dispatch.do_execute_no_params: 

1943 if fn(cursor, str_statement, context): 

1944 evt_handled = True 

1945 break 

1946 if not evt_handled: 

1947 self.dialect.do_execute_no_params( 

1948 cursor, str_statement, context 

1949 ) 

1950 else: 

1951 effective_parameters = cast( 

1952 "_CoreSingleExecuteParams", effective_parameters 

1953 ) 

1954 if self.dialect._has_events: 

1955 for fn in self.dialect.dispatch.do_execute: 

1956 if fn( 

1957 cursor, 

1958 str_statement, 

1959 effective_parameters, 

1960 context, 

1961 ): 

1962 evt_handled = True 

1963 break 

1964 if not evt_handled: 

1965 self.dialect.do_execute( 

1966 cursor, str_statement, effective_parameters, context 

1967 ) 

1968 

1969 if self._has_events or self.engine._has_events: 

1970 self.dispatch.after_cursor_execute( 

1971 self, 

1972 cursor, 

1973 str_statement, 

1974 effective_parameters, 

1975 context, 

1976 context.executemany, 

1977 ) 

1978 

1979 context.post_exec() 

1980 

1981 result = context._setup_result_proxy() 

1982 

1983 except BaseException as e: 

1984 self._handle_dbapi_exception( 

1985 e, str_statement, effective_parameters, cursor, context 

1986 ) 

1987 

1988 return result 

1989 

1990 def _exec_insertmany_context( 

1991 self, 

1992 dialect: Dialect, 

1993 context: ExecutionContext, 

1994 ) -> CursorResult[Unpack[TupleAny]]: 

1995 """continue the _execute_context() method for an "insertmanyvalues" 

1996 operation, which will invoke DBAPI 

1997 cursor.execute() one or more times with individual log and 

1998 event hook calls. 

1999 

2000 """ 

2001 

2002 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

2003 generic_setinputsizes = context._prepare_set_input_sizes() 

2004 else: 

2005 generic_setinputsizes = None 

2006 

2007 cursor, str_statement, parameters = ( 

2008 context.cursor, 

2009 context.statement, 

2010 context.parameters, 

2011 ) 

2012 

2013 effective_parameters = parameters 

2014 

2015 engine_events = self._has_events or self.engine._has_events 

2016 if self.dialect._has_events: 

2017 do_execute_dispatch: Iterable[Any] = ( 

2018 self.dialect.dispatch.do_execute 

2019 ) 

2020 else: 

2021 do_execute_dispatch = () 

2022 

2023 if self._echo: 

2024 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2025 

2026 preserve_rowcount = context.execution_options.get( 

2027 "preserve_rowcount", False 

2028 ) 

2029 rowcount = 0 

2030 

2031 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2032 self, 

2033 cursor, 

2034 str_statement, 

2035 effective_parameters, 

2036 generic_setinputsizes, 

2037 context, 

2038 ): 

2039 if imv_batch.processed_setinputsizes: 

2040 try: 

2041 dialect.do_set_input_sizes( 

2042 context.cursor, 

2043 imv_batch.processed_setinputsizes, 

2044 context, 

2045 ) 

2046 except BaseException as e: 

2047 self._handle_dbapi_exception( 

2048 e, 

2049 sql_util._long_statement(imv_batch.replaced_statement), 

2050 imv_batch.replaced_parameters, 

2051 None, 

2052 context, 

2053 is_sub_exec=True, 

2054 ) 

2055 

2056 sub_stmt = imv_batch.replaced_statement 

2057 sub_params = imv_batch.replaced_parameters 

2058 

2059 if engine_events: 

2060 for fn in self.dispatch.before_cursor_execute: 

2061 sub_stmt, sub_params = fn( 

2062 self, 

2063 cursor, 

2064 sub_stmt, 

2065 sub_params, 

2066 context, 

2067 True, 

2068 ) 

2069 

2070 if self._echo: 

2071 self._log_info(sql_util._long_statement(sub_stmt)) 

2072 

2073 imv_stats = f""" {imv_batch.batchnum}/{ 

2074 imv_batch.total_batches 

2075 } ({ 

2076 'ordered' 

2077 if imv_batch.rows_sorted else 'unordered' 

2078 }{ 

2079 '; batch not supported' 

2080 if imv_batch.is_downgraded 

2081 else '' 

2082 })""" 

2083 

2084 if imv_batch.batchnum == 1: 

2085 stats += imv_stats 

2086 else: 

2087 stats = f"insertmanyvalues{imv_stats}" 

2088 

2089 if not self.engine.hide_parameters: 

2090 self._log_info( 

2091 "[%s] %r", 

2092 stats, 

2093 sql_util._repr_params( 

2094 sub_params, 

2095 batches=10, 

2096 ismulti=False, 

2097 ), 

2098 ) 

2099 else: 

2100 self._log_info( 

2101 "[%s] [SQL parameters hidden due to " 

2102 "hide_parameters=True]", 

2103 stats, 

2104 ) 

2105 

2106 try: 

2107 for fn in do_execute_dispatch: 

2108 if fn( 

2109 cursor, 

2110 sub_stmt, 

2111 sub_params, 

2112 context, 

2113 ): 

2114 break 

2115 else: 

2116 dialect.do_execute( 

2117 cursor, 

2118 sub_stmt, 

2119 sub_params, 

2120 context, 

2121 ) 

2122 

2123 except BaseException as e: 

2124 self._handle_dbapi_exception( 

2125 e, 

2126 sql_util._long_statement(sub_stmt), 

2127 sub_params, 

2128 cursor, 

2129 context, 

2130 is_sub_exec=True, 

2131 ) 

2132 

2133 if engine_events: 

2134 self.dispatch.after_cursor_execute( 

2135 self, 

2136 cursor, 

2137 str_statement, 

2138 effective_parameters, 

2139 context, 

2140 context.executemany, 

2141 ) 

2142 

2143 if preserve_rowcount: 

2144 rowcount += imv_batch.current_batch_size 

2145 

2146 try: 

2147 context.post_exec() 

2148 

2149 if preserve_rowcount: 

2150 context._rowcount = rowcount # type: ignore[attr-defined] 

2151 

2152 result = context._setup_result_proxy() 

2153 

2154 except BaseException as e: 

2155 self._handle_dbapi_exception( 

2156 e, str_statement, effective_parameters, cursor, context 

2157 ) 

2158 

2159 return result 

2160 

2161 def _cursor_execute( 

2162 self, 

2163 cursor: DBAPICursor, 

2164 statement: str, 

2165 parameters: _DBAPISingleExecuteParams, 

2166 context: Optional[ExecutionContext] = None, 

2167 ) -> None: 

2168 """Execute a statement + params on the given cursor. 

2169 

2170 Adds appropriate logging and exception handling. 

2171 

2172 This method is used by DefaultDialect for special-case 

2173 executions, such as for sequences and column defaults. 

2174 The path of statement execution in the majority of cases 

2175 terminates at _execute_context(). 

2176 

2177 """ 

2178 if self._has_events or self.engine._has_events: 

2179 for fn in self.dispatch.before_cursor_execute: 

2180 statement, parameters = fn( 

2181 self, cursor, statement, parameters, context, False 

2182 ) 

2183 

2184 if self._echo: 

2185 self._log_info(statement) 

2186 self._log_info("[raw sql] %r", parameters) 

2187 try: 

2188 for fn in ( 

2189 () 

2190 if not self.dialect._has_events 

2191 else self.dialect.dispatch.do_execute 

2192 ): 

2193 if fn(cursor, statement, parameters, context): 

2194 break 

2195 else: 

2196 self.dialect.do_execute(cursor, statement, parameters, context) 

2197 except BaseException as e: 

2198 self._handle_dbapi_exception( 

2199 e, statement, parameters, cursor, context 

2200 ) 

2201 

2202 if self._has_events or self.engine._has_events: 

2203 self.dispatch.after_cursor_execute( 

2204 self, cursor, statement, parameters, context, False 

2205 ) 

2206 

2207 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2208 """Close the given cursor, catching exceptions 

2209 and turning into log warnings. 

2210 

2211 """ 

2212 try: 

2213 cursor.close() 

2214 except Exception: 

2215 # log the error through the connection pool's logger. 

2216 self.engine.pool.logger.error( 

2217 "Error closing cursor", exc_info=True 

2218 ) 

2219 

2220 _reentrant_error = False 

2221 _is_disconnect = False 

2222 

2223 def _handle_dbapi_exception( 

2224 self, 

2225 e: BaseException, 

2226 statement: Optional[str], 

2227 parameters: Optional[_AnyExecuteParams], 

2228 cursor: Optional[DBAPICursor], 

2229 context: Optional[ExecutionContext], 

2230 is_sub_exec: bool = False, 

2231 ) -> NoReturn: 

2232 exc_info = sys.exc_info() 

2233 

2234 is_exit_exception = util.is_exit_exception(e) 

2235 

2236 if not self._is_disconnect: 

2237 self._is_disconnect = ( 

2238 isinstance(e, self.dialect.loaded_dbapi.Error) 

2239 and not self.closed 

2240 and self.dialect.is_disconnect( 

2241 e, 

2242 self._dbapi_connection if not self.invalidated else None, 

2243 cursor, 

2244 ) 

2245 ) or (is_exit_exception and not self.closed) 

2246 

2247 invalidate_pool_on_disconnect = not is_exit_exception 

2248 

2249 ismulti: bool = ( 

2250 not is_sub_exec and context.executemany 

2251 if context is not None 

2252 else False 

2253 ) 

2254 if self._reentrant_error: 

2255 raise exc.DBAPIError.instance( 

2256 statement, 

2257 parameters, 

2258 e, 

2259 self.dialect.loaded_dbapi.Error, 

2260 hide_parameters=self.engine.hide_parameters, 

2261 dialect=self.dialect, 

2262 ismulti=ismulti, 

2263 ).with_traceback(exc_info[2]) from e 

2264 self._reentrant_error = True 

2265 try: 

2266 # non-DBAPI error - if we already got a context, 

2267 # or there's no string statement, don't wrap it 

2268 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2269 statement is not None 

2270 and context is None 

2271 and not is_exit_exception 

2272 ) 

2273 

2274 if should_wrap: 

2275 sqlalchemy_exception = exc.DBAPIError.instance( 

2276 statement, 

2277 parameters, 

2278 cast(Exception, e), 

2279 self.dialect.loaded_dbapi.Error, 

2280 hide_parameters=self.engine.hide_parameters, 

2281 connection_invalidated=self._is_disconnect, 

2282 dialect=self.dialect, 

2283 ismulti=ismulti, 

2284 ) 

2285 else: 

2286 sqlalchemy_exception = None 

2287 

2288 newraise = None 

2289 

2290 if (self.dialect._has_events) and not self._execution_options.get( 

2291 "skip_user_error_events", False 

2292 ): 

2293 ctx = ExceptionContextImpl( 

2294 e, 

2295 sqlalchemy_exception, 

2296 self.engine, 

2297 self.dialect, 

2298 self, 

2299 cursor, 

2300 statement, 

2301 parameters, 

2302 context, 

2303 self._is_disconnect, 

2304 invalidate_pool_on_disconnect, 

2305 False, 

2306 ) 

2307 

2308 for fn in self.dialect.dispatch.handle_error: 

2309 try: 

2310 # handler returns an exception; 

2311 # call next handler in a chain 

2312 per_fn = fn(ctx) 

2313 if per_fn is not None: 

2314 ctx.chained_exception = newraise = per_fn 

2315 except Exception as _raised: 

2316 # handler raises an exception - stop processing 

2317 newraise = _raised 

2318 break 

2319 

2320 if self._is_disconnect != ctx.is_disconnect: 

2321 self._is_disconnect = ctx.is_disconnect 

2322 if sqlalchemy_exception: 

2323 sqlalchemy_exception.connection_invalidated = ( 

2324 ctx.is_disconnect 

2325 ) 

2326 

2327 # set up potentially user-defined value for 

2328 # invalidate pool. 

2329 invalidate_pool_on_disconnect = ( 

2330 ctx.invalidate_pool_on_disconnect 

2331 ) 

2332 

2333 if should_wrap and context: 

2334 context.handle_dbapi_exception(e) 

2335 

2336 if not self._is_disconnect: 

2337 if cursor: 

2338 self._safe_close_cursor(cursor) 

2339 # "autorollback" was mostly relevant in 1.x series. 

2340 # It's very unlikely to reach here, as the connection 

2341 # does autobegin so when we are here, we are usually 

2342 # in an explicit / semi-explicit transaction. 

2343 # however we have a test which manufactures this 

2344 # scenario in any case using an event handler. 

2345 # test/engine/test_execute.py-> test_actual_autorollback 

2346 if not self.in_transaction(): 

2347 self._rollback_impl() 

2348 

2349 if newraise: 

2350 raise newraise.with_traceback(exc_info[2]) from e 

2351 elif should_wrap: 

2352 assert sqlalchemy_exception is not None 

2353 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2354 else: 

2355 assert exc_info[1] is not None 

2356 raise exc_info[1].with_traceback(exc_info[2]) 

2357 finally: 

2358 del self._reentrant_error 

2359 if self._is_disconnect: 

2360 del self._is_disconnect 

2361 if not self.invalidated: 

2362 dbapi_conn_wrapper = self._dbapi_connection 

2363 assert dbapi_conn_wrapper is not None 

2364 if invalidate_pool_on_disconnect: 

2365 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2366 self.invalidate(e) 

2367 

2368 @classmethod 

2369 def _handle_dbapi_exception_noconnection( 

2370 cls, 

2371 e: BaseException, 

2372 dialect: Dialect, 

2373 engine: Optional[Engine] = None, 

2374 is_disconnect: Optional[bool] = None, 

2375 invalidate_pool_on_disconnect: bool = True, 

2376 is_pre_ping: bool = False, 

2377 ) -> NoReturn: 

2378 exc_info = sys.exc_info() 

2379 

2380 if is_disconnect is None: 

2381 is_disconnect = isinstance( 

2382 e, dialect.loaded_dbapi.Error 

2383 ) and dialect.is_disconnect(e, None, None) 

2384 

2385 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2386 

2387 if should_wrap: 

2388 sqlalchemy_exception = exc.DBAPIError.instance( 

2389 None, 

2390 None, 

2391 cast(Exception, e), 

2392 dialect.loaded_dbapi.Error, 

2393 hide_parameters=( 

2394 engine.hide_parameters if engine is not None else False 

2395 ), 

2396 connection_invalidated=is_disconnect, 

2397 dialect=dialect, 

2398 ) 

2399 else: 

2400 sqlalchemy_exception = None 

2401 

2402 newraise = None 

2403 

2404 if dialect._has_events: 

2405 ctx = ExceptionContextImpl( 

2406 e, 

2407 sqlalchemy_exception, 

2408 engine, 

2409 dialect, 

2410 None, 

2411 None, 

2412 None, 

2413 None, 

2414 None, 

2415 is_disconnect, 

2416 invalidate_pool_on_disconnect, 

2417 is_pre_ping, 

2418 ) 

2419 for fn in dialect.dispatch.handle_error: 

2420 try: 

2421 # handler returns an exception; 

2422 # call next handler in a chain 

2423 per_fn = fn(ctx) 

2424 if per_fn is not None: 

2425 ctx.chained_exception = newraise = per_fn 

2426 except Exception as _raised: 

2427 # handler raises an exception - stop processing 

2428 newraise = _raised 

2429 break 

2430 

2431 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2432 sqlalchemy_exception.connection_invalidated = is_disconnect = ( 

2433 ctx.is_disconnect 

2434 ) 

2435 

2436 if newraise: 

2437 raise newraise.with_traceback(exc_info[2]) from e 

2438 elif should_wrap: 

2439 assert sqlalchemy_exception is not None 

2440 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2441 else: 

2442 assert exc_info[1] is not None 

2443 raise exc_info[1].with_traceback(exc_info[2]) 

2444 

2445 def _run_ddl_visitor( 

2446 self, 

2447 visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]], 

2448 element: SchemaItem, 

2449 **kwargs: Any, 

2450 ) -> None: 

2451 """run a DDL visitor. 

2452 

2453 This method is only here so that the MockConnection can change the 

2454 options given to the visitor so that "checkfirst" is skipped. 

2455 

2456 """ 

2457 visitorcallable(self.dialect, self, **kwargs).traverse_single(element) 

2458 

2459 

2460class ExceptionContextImpl(ExceptionContext): 

2461 """Implement the :class:`.ExceptionContext` interface.""" 

2462 

2463 __slots__ = ( 

2464 "connection", 

2465 "engine", 

2466 "dialect", 

2467 "cursor", 

2468 "statement", 

2469 "parameters", 

2470 "original_exception", 

2471 "sqlalchemy_exception", 

2472 "chained_exception", 

2473 "execution_context", 

2474 "is_disconnect", 

2475 "invalidate_pool_on_disconnect", 

2476 "is_pre_ping", 

2477 ) 

2478 

2479 def __init__( 

2480 self, 

2481 exception: BaseException, 

2482 sqlalchemy_exception: Optional[exc.StatementError], 

2483 engine: Optional[Engine], 

2484 dialect: Dialect, 

2485 connection: Optional[Connection], 

2486 cursor: Optional[DBAPICursor], 

2487 statement: Optional[str], 

2488 parameters: Optional[_DBAPIAnyExecuteParams], 

2489 context: Optional[ExecutionContext], 

2490 is_disconnect: bool, 

2491 invalidate_pool_on_disconnect: bool, 

2492 is_pre_ping: bool, 

2493 ): 

2494 self.engine = engine 

2495 self.dialect = dialect 

2496 self.connection = connection 

2497 self.sqlalchemy_exception = sqlalchemy_exception 

2498 self.original_exception = exception 

2499 self.execution_context = context 

2500 self.statement = statement 

2501 self.parameters = parameters 

2502 self.is_disconnect = is_disconnect 

2503 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2504 self.is_pre_ping = is_pre_ping 

2505 

2506 

2507class Transaction(TransactionalContext): 

2508 """Represent a database transaction in progress. 

2509 

2510 The :class:`.Transaction` object is procured by 

2511 calling the :meth:`_engine.Connection.begin` method of 

2512 :class:`_engine.Connection`:: 

2513 

2514 from sqlalchemy import create_engine 

2515 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2516 connection = engine.connect() 

2517 trans = connection.begin() 

2518 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2519 trans.commit() 

2520 

2521 The object provides :meth:`.rollback` and :meth:`.commit` 

2522 methods in order to control transaction boundaries. It 

2523 also implements a context manager interface so that 

2524 the Python ``with`` statement can be used with the 

2525 :meth:`_engine.Connection.begin` method:: 

2526 

2527 with connection.begin(): 

2528 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2529 

2530 The Transaction object is **not** threadsafe. 

2531 

2532 .. seealso:: 

2533 

2534 :meth:`_engine.Connection.begin` 

2535 

2536 :meth:`_engine.Connection.begin_twophase` 

2537 

2538 :meth:`_engine.Connection.begin_nested` 

2539 

2540 .. index:: 

2541 single: thread safety; Transaction 

2542 """ # noqa 

2543 

2544 __slots__ = () 

2545 

2546 _is_root: bool = False 

2547 is_active: bool 

2548 connection: Connection 

2549 

2550 def __init__(self, connection: Connection): 

2551 raise NotImplementedError() 

2552 

2553 @property 

2554 def _deactivated_from_connection(self) -> bool: 

2555 """True if this transaction is totally deactivated from the connection 

2556 and therefore can no longer affect its state. 

2557 

2558 """ 

2559 raise NotImplementedError() 

2560 

2561 def _do_close(self) -> None: 

2562 raise NotImplementedError() 

2563 

2564 def _do_rollback(self) -> None: 

2565 raise NotImplementedError() 

2566 

2567 def _do_commit(self) -> None: 

2568 raise NotImplementedError() 

2569 

2570 @property 

2571 def is_valid(self) -> bool: 

2572 return self.is_active and not self.connection.invalidated 

2573 

2574 def close(self) -> None: 

2575 """Close this :class:`.Transaction`. 

2576 

2577 If this transaction is the base transaction in a begin/commit 

2578 nesting, the transaction will rollback(). Otherwise, the 

2579 method returns. 

2580 

2581 This is used to cancel a Transaction without affecting the scope of 

2582 an enclosing transaction. 

2583 

2584 """ 

2585 try: 

2586 self._do_close() 

2587 finally: 

2588 assert not self.is_active 

2589 

2590 def rollback(self) -> None: 

2591 """Roll back this :class:`.Transaction`. 

2592 

2593 The implementation of this may vary based on the type of transaction in 

2594 use: 

2595 

2596 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2597 it corresponds to a ROLLBACK. 

2598 

2599 * For a :class:`.NestedTransaction`, it corresponds to a 

2600 "ROLLBACK TO SAVEPOINT" operation. 

2601 

2602 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2603 phase transactions may be used. 

2604 

2605 

2606 """ 

2607 try: 

2608 self._do_rollback() 

2609 finally: 

2610 assert not self.is_active 

2611 

2612 def commit(self) -> None: 

2613 """Commit this :class:`.Transaction`. 

2614 

2615 The implementation of this may vary based on the type of transaction in 

2616 use: 

2617 

2618 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2619 it corresponds to a COMMIT. 

2620 

2621 * For a :class:`.NestedTransaction`, it corresponds to a 

2622 "RELEASE SAVEPOINT" operation. 

2623 

2624 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2625 phase transactions may be used. 

2626 

2627 """ 

2628 try: 

2629 self._do_commit() 

2630 finally: 

2631 assert not self.is_active 

2632 

2633 def _get_subject(self) -> Connection: 

2634 return self.connection 

2635 

2636 def _transaction_is_active(self) -> bool: 

2637 return self.is_active 

2638 

2639 def _transaction_is_closed(self) -> bool: 

2640 return not self._deactivated_from_connection 

2641 

2642 def _rollback_can_be_called(self) -> bool: 

2643 # for RootTransaction / NestedTransaction, it's safe to call 

2644 # rollback() even if the transaction is deactive and no warnings 

2645 # will be emitted. tested in 

2646 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2647 return True 

2648 

2649 

2650class RootTransaction(Transaction): 

2651 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2652 

2653 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2654 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2655 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2656 remains associated with the :class:`_engine.Connection` throughout its 

2657 active span. The current :class:`_engine.RootTransaction` in use is 

2658 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2659 :class:`_engine.Connection`. 

2660 

2661 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2662 "autobegin" behavior that will create a new 

2663 :class:`_engine.RootTransaction` whenever a connection in a 

2664 non-transactional state is used to emit commands on the DBAPI connection. 

2665 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2666 use can be controlled using the :meth:`_engine.Connection.commit` and 

2667 :meth:`_engine.Connection.rollback` methods. 

2668 

2669 

2670 """ 

2671 

2672 _is_root = True 

2673 

2674 __slots__ = ("connection", "is_active") 

2675 

2676 def __init__(self, connection: Connection): 

2677 assert connection._transaction is None 

2678 if connection._trans_context_manager: 

2679 TransactionalContext._trans_ctx_check(connection) 

2680 self.connection = connection 

2681 self._connection_begin_impl() 

2682 connection._transaction = self 

2683 

2684 self.is_active = True 

2685 

2686 def _deactivate_from_connection(self) -> None: 

2687 if self.is_active: 

2688 assert self.connection._transaction is self 

2689 self.is_active = False 

2690 

2691 elif self.connection._transaction is not self: 

2692 util.warn("transaction already deassociated from connection") 

2693 

2694 @property 

2695 def _deactivated_from_connection(self) -> bool: 

2696 return self.connection._transaction is not self 

2697 

2698 def _connection_begin_impl(self) -> None: 

2699 self.connection._begin_impl(self) 

2700 

2701 def _connection_rollback_impl(self) -> None: 

2702 self.connection._rollback_impl() 

2703 

2704 def _connection_commit_impl(self) -> None: 

2705 self.connection._commit_impl() 

2706 

2707 def _close_impl(self, try_deactivate: bool = False) -> None: 

2708 try: 

2709 if self.is_active: 

2710 self._connection_rollback_impl() 

2711 

2712 if self.connection._nested_transaction: 

2713 self.connection._nested_transaction._cancel() 

2714 finally: 

2715 if self.is_active or try_deactivate: 

2716 self._deactivate_from_connection() 

2717 if self.connection._transaction is self: 

2718 self.connection._transaction = None 

2719 

2720 assert not self.is_active 

2721 assert self.connection._transaction is not self 

2722 

2723 def _do_close(self) -> None: 

2724 self._close_impl() 

2725 

2726 def _do_rollback(self) -> None: 

2727 self._close_impl(try_deactivate=True) 

2728 

2729 def _do_commit(self) -> None: 

2730 if self.is_active: 

2731 assert self.connection._transaction is self 

2732 

2733 try: 

2734 self._connection_commit_impl() 

2735 finally: 

2736 # whether or not commit succeeds, cancel any 

2737 # nested transactions, make this transaction "inactive" 

2738 # and remove it as a reset agent 

2739 if self.connection._nested_transaction: 

2740 self.connection._nested_transaction._cancel() 

2741 

2742 self._deactivate_from_connection() 

2743 

2744 # ...however only remove as the connection's current transaction 

2745 # if commit succeeded. otherwise it stays on so that a rollback 

2746 # needs to occur. 

2747 self.connection._transaction = None 

2748 else: 

2749 if self.connection._transaction is self: 

2750 self.connection._invalid_transaction() 

2751 else: 

2752 raise exc.InvalidRequestError("This transaction is inactive") 

2753 

2754 assert not self.is_active 

2755 assert self.connection._transaction is not self 

2756 

2757 

2758class NestedTransaction(Transaction): 

2759 """Represent a 'nested', or SAVEPOINT transaction. 

2760 

2761 The :class:`.NestedTransaction` object is created by calling the 

2762 :meth:`_engine.Connection.begin_nested` method of 

2763 :class:`_engine.Connection`. 

2764 

2765 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2766 "commit" / "rollback" are as follows: 

2767 

2768 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2769 the savepoint is given an explicit name that is part of the state 

2770 of this object. 

2771 

2772 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2773 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2774 with this :class:`.NestedTransaction`. 

2775 

2776 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2777 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2778 associated with this :class:`.NestedTransaction`. 

2779 

2780 The rationale for mimicking the semantics of an outer transaction in 

2781 terms of savepoints so that code may deal with a "savepoint" transaction 

2782 and an "outer" transaction in an agnostic way. 

2783 

2784 .. seealso:: 

2785 

2786 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2787 

2788 """ 

2789 

2790 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2791 

2792 _savepoint: str 

2793 

2794 def __init__(self, connection: Connection): 

2795 assert connection._transaction is not None 

2796 if connection._trans_context_manager: 

2797 TransactionalContext._trans_ctx_check(connection) 

2798 self.connection = connection 

2799 self._savepoint = self.connection._savepoint_impl() 

2800 self.is_active = True 

2801 self._previous_nested = connection._nested_transaction 

2802 connection._nested_transaction = self 

2803 

2804 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2805 if self.connection._nested_transaction is self: 

2806 self.connection._nested_transaction = self._previous_nested 

2807 elif warn: 

2808 util.warn( 

2809 "nested transaction already deassociated from connection" 

2810 ) 

2811 

2812 @property 

2813 def _deactivated_from_connection(self) -> bool: 

2814 return self.connection._nested_transaction is not self 

2815 

2816 def _cancel(self) -> None: 

2817 # called by RootTransaction when the outer transaction is 

2818 # committed, rolled back, or closed to cancel all savepoints 

2819 # without any action being taken 

2820 self.is_active = False 

2821 self._deactivate_from_connection() 

2822 if self._previous_nested: 

2823 self._previous_nested._cancel() 

2824 

2825 def _close_impl( 

2826 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2827 ) -> None: 

2828 try: 

2829 if ( 

2830 self.is_active 

2831 and self.connection._transaction 

2832 and self.connection._transaction.is_active 

2833 ): 

2834 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2835 finally: 

2836 self.is_active = False 

2837 

2838 if deactivate_from_connection: 

2839 self._deactivate_from_connection(warn=warn_already_deactive) 

2840 

2841 assert not self.is_active 

2842 if deactivate_from_connection: 

2843 assert self.connection._nested_transaction is not self 

2844 

2845 def _do_close(self) -> None: 

2846 self._close_impl(True, False) 

2847 

2848 def _do_rollback(self) -> None: 

2849 self._close_impl(True, True) 

2850 

2851 def _do_commit(self) -> None: 

2852 if self.is_active: 

2853 try: 

2854 self.connection._release_savepoint_impl(self._savepoint) 

2855 finally: 

2856 # nested trans becomes inactive on failed release 

2857 # unconditionally. this prevents it from trying to 

2858 # emit SQL when it rolls back. 

2859 self.is_active = False 

2860 

2861 # but only de-associate from connection if it succeeded 

2862 self._deactivate_from_connection() 

2863 else: 

2864 if self.connection._nested_transaction is self: 

2865 self.connection._invalid_transaction() 

2866 else: 

2867 raise exc.InvalidRequestError( 

2868 "This nested transaction is inactive" 

2869 ) 

2870 

2871 

2872class TwoPhaseTransaction(RootTransaction): 

2873 """Represent a two-phase transaction. 

2874 

2875 A new :class:`.TwoPhaseTransaction` object may be procured 

2876 using the :meth:`_engine.Connection.begin_twophase` method. 

2877 

2878 The interface is the same as that of :class:`.Transaction` 

2879 with the addition of the :meth:`prepare` method. 

2880 

2881 """ 

2882 

2883 __slots__ = ("xid", "_is_prepared") 

2884 

2885 xid: Any 

2886 

2887 def __init__(self, connection: Connection, xid: Any): 

2888 self._is_prepared = False 

2889 self.xid = xid 

2890 super().__init__(connection) 

2891 

2892 def prepare(self) -> None: 

2893 """Prepare this :class:`.TwoPhaseTransaction`. 

2894 

2895 After a PREPARE, the transaction can be committed. 

2896 

2897 """ 

2898 if not self.is_active: 

2899 raise exc.InvalidRequestError("This transaction is inactive") 

2900 self.connection._prepare_twophase_impl(self.xid) 

2901 self._is_prepared = True 

2902 

2903 def _connection_begin_impl(self) -> None: 

2904 self.connection._begin_twophase_impl(self) 

2905 

2906 def _connection_rollback_impl(self) -> None: 

2907 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2908 

2909 def _connection_commit_impl(self) -> None: 

2910 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2911 

2912 

2913class Engine( 

2914 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2915): 

2916 """ 

2917 Connects a :class:`~sqlalchemy.pool.Pool` and 

2918 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2919 source of database connectivity and behavior. 

2920 

2921 An :class:`_engine.Engine` object is instantiated publicly using the 

2922 :func:`~sqlalchemy.create_engine` function. 

2923 

2924 .. seealso:: 

2925 

2926 :doc:`/core/engines` 

2927 

2928 :ref:`connections_toplevel` 

2929 

2930 """ 

2931 

2932 dispatch: dispatcher[ConnectionEventsTarget] 

2933 

2934 _compiled_cache: Optional[CompiledCacheType] 

2935 

2936 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2937 _has_events: bool = False 

2938 _connection_cls: Type[Connection] = Connection 

2939 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2940 _is_future: bool = False 

2941 

2942 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2943 _option_cls: Type[OptionEngine] 

2944 

2945 dialect: Dialect 

2946 pool: Pool 

2947 url: URL 

2948 hide_parameters: bool 

2949 

2950 def __init__( 

2951 self, 

2952 pool: Pool, 

2953 dialect: Dialect, 

2954 url: URL, 

2955 logging_name: Optional[str] = None, 

2956 echo: Optional[_EchoFlagType] = None, 

2957 query_cache_size: int = 500, 

2958 execution_options: Optional[Mapping[str, Any]] = None, 

2959 hide_parameters: bool = False, 

2960 ): 

2961 self.pool = pool 

2962 self.url = url 

2963 self.dialect = dialect 

2964 if logging_name: 

2965 self.logging_name = logging_name 

2966 self.echo = echo 

2967 self.hide_parameters = hide_parameters 

2968 if query_cache_size != 0: 

2969 self._compiled_cache = util.LRUCache( 

2970 query_cache_size, size_alert=self._lru_size_alert 

2971 ) 

2972 else: 

2973 self._compiled_cache = None 

2974 log.instance_logger(self, echoflag=echo) 

2975 if execution_options: 

2976 self.update_execution_options(**execution_options) 

2977 

2978 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2979 if self._should_log_info(): 

2980 self.logger.info( 

2981 "Compiled cache size pruning from %d items to %d. " 

2982 "Increase cache size to reduce the frequency of pruning.", 

2983 len(cache), 

2984 cache.capacity, 

2985 ) 

2986 

2987 @property 

2988 def engine(self) -> Engine: 

2989 """Returns this :class:`.Engine`. 

2990 

2991 Used for legacy schemes that accept :class:`.Connection` / 

2992 :class:`.Engine` objects within the same variable. 

2993 

2994 """ 

2995 return self 

2996 

2997 def clear_compiled_cache(self) -> None: 

2998 """Clear the compiled cache associated with the dialect. 

2999 

3000 This applies **only** to the built-in cache that is established 

3001 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

3002 It will not impact any dictionary caches that were passed via the 

3003 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

3004 

3005 .. versionadded:: 1.4 

3006 

3007 """ 

3008 if self._compiled_cache: 

3009 self._compiled_cache.clear() 

3010 

3011 def update_execution_options(self, **opt: Any) -> None: 

3012 r"""Update the default execution_options dictionary 

3013 of this :class:`_engine.Engine`. 

3014 

3015 The given keys/values in \**opt are added to the 

3016 default execution options that will be used for 

3017 all connections. The initial contents of this dictionary 

3018 can be sent via the ``execution_options`` parameter 

3019 to :func:`_sa.create_engine`. 

3020 

3021 .. seealso:: 

3022 

3023 :meth:`_engine.Connection.execution_options` 

3024 

3025 :meth:`_engine.Engine.execution_options` 

3026 

3027 """ 

3028 self.dispatch.set_engine_execution_options(self, opt) 

3029 self._execution_options = self._execution_options.union(opt) 

3030 self.dialect.set_engine_execution_options(self, opt) 

3031 

3032 @overload 

3033 def execution_options( 

3034 self, 

3035 *, 

3036 compiled_cache: Optional[CompiledCacheType] = ..., 

3037 logging_token: str = ..., 

3038 isolation_level: IsolationLevel = ..., 

3039 insertmanyvalues_page_size: int = ..., 

3040 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3041 **opt: Any, 

3042 ) -> OptionEngine: ... 

3043 

3044 @overload 

3045 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3046 

3047 def execution_options(self, **opt: Any) -> OptionEngine: 

3048 """Return a new :class:`_engine.Engine` that will provide 

3049 :class:`_engine.Connection` objects with the given execution options. 

3050 

3051 The returned :class:`_engine.Engine` remains related to the original 

3052 :class:`_engine.Engine` in that it shares the same connection pool and 

3053 other state: 

3054 

3055 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3056 is the 

3057 same instance. The :meth:`_engine.Engine.dispose` 

3058 method will replace 

3059 the connection pool instance for the parent engine as well 

3060 as this one. 

3061 * Event listeners are "cascaded" - meaning, the new 

3062 :class:`_engine.Engine` 

3063 inherits the events of the parent, and new events can be associated 

3064 with the new :class:`_engine.Engine` individually. 

3065 * The logging configuration and logging_name is copied from the parent 

3066 :class:`_engine.Engine`. 

3067 

3068 The intent of the :meth:`_engine.Engine.execution_options` method is 

3069 to implement schemes where multiple :class:`_engine.Engine` 

3070 objects refer to the same connection pool, but are differentiated 

3071 by options that affect some execution-level behavior for each 

3072 engine. One such example is breaking into separate "reader" and 

3073 "writer" :class:`_engine.Engine` instances, where one 

3074 :class:`_engine.Engine` 

3075 has a lower :term:`isolation level` setting configured or is even 

3076 transaction-disabled using "autocommit". An example of this 

3077 configuration is at :ref:`dbapi_autocommit_multiple`. 

3078 

3079 Another example is one that 

3080 uses a custom option ``shard_id`` which is consumed by an event 

3081 to change the current schema on a database connection:: 

3082 

3083 from sqlalchemy import event 

3084 from sqlalchemy.engine import Engine 

3085 

3086 primary_engine = create_engine("mysql+mysqldb://") 

3087 shard1 = primary_engine.execution_options(shard_id="shard1") 

3088 shard2 = primary_engine.execution_options(shard_id="shard2") 

3089 

3090 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3091 

3092 @event.listens_for(Engine, "before_cursor_execute") 

3093 def _switch_shard(conn, cursor, stmt, 

3094 params, context, executemany): 

3095 shard_id = conn.get_execution_options().get('shard_id', "default") 

3096 current_shard = conn.info.get("current_shard", None) 

3097 

3098 if current_shard != shard_id: 

3099 cursor.execute("use %s" % shards[shard_id]) 

3100 conn.info["current_shard"] = shard_id 

3101 

3102 The above recipe illustrates two :class:`_engine.Engine` objects that 

3103 will each serve as factories for :class:`_engine.Connection` objects 

3104 that have pre-established "shard_id" execution options present. A 

3105 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3106 then interprets this execution option to emit a MySQL ``use`` statement 

3107 to switch databases before a statement execution, while at the same 

3108 time keeping track of which database we've established using the 

3109 :attr:`_engine.Connection.info` dictionary. 

3110 

3111 .. seealso:: 

3112 

3113 :meth:`_engine.Connection.execution_options` 

3114 - update execution options 

3115 on a :class:`_engine.Connection` object. 

3116 

3117 :meth:`_engine.Engine.update_execution_options` 

3118 - update the execution 

3119 options for a given :class:`_engine.Engine` in place. 

3120 

3121 :meth:`_engine.Engine.get_execution_options` 

3122 

3123 

3124 """ # noqa: E501 

3125 return self._option_cls(self, opt) 

3126 

3127 def get_execution_options(self) -> _ExecuteOptions: 

3128 """Get the non-SQL options which will take effect during execution. 

3129 

3130 .. versionadded: 1.3 

3131 

3132 .. seealso:: 

3133 

3134 :meth:`_engine.Engine.execution_options` 

3135 """ 

3136 return self._execution_options 

3137 

3138 @property 

3139 def name(self) -> str: 

3140 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3141 in use by this :class:`Engine`. 

3142 

3143 """ 

3144 

3145 return self.dialect.name 

3146 

3147 @property 

3148 def driver(self) -> str: 

3149 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3150 in use by this :class:`Engine`. 

3151 

3152 """ 

3153 

3154 return self.dialect.driver 

3155 

3156 echo = log.echo_property() 

3157 

3158 def __repr__(self) -> str: 

3159 return "Engine(%r)" % (self.url,) 

3160 

3161 def dispose(self, close: bool = True) -> None: 

3162 """Dispose of the connection pool used by this 

3163 :class:`_engine.Engine`. 

3164 

3165 A new connection pool is created immediately after the old one has been 

3166 disposed. The previous connection pool is disposed either actively, by 

3167 closing out all currently checked-in connections in that pool, or 

3168 passively, by losing references to it but otherwise not closing any 

3169 connections. The latter strategy is more appropriate for an initializer 

3170 in a forked Python process. 

3171 

3172 :param close: if left at its default of ``True``, has the 

3173 effect of fully closing all **currently checked in** 

3174 database connections. Connections that are still checked out 

3175 will **not** be closed, however they will no longer be associated 

3176 with this :class:`_engine.Engine`, 

3177 so when they are closed individually, eventually the 

3178 :class:`_pool.Pool` which they are associated with will 

3179 be garbage collected and they will be closed out fully, if 

3180 not already closed on checkin. 

3181 

3182 If set to ``False``, the previous connection pool is de-referenced, 

3183 and otherwise not touched in any way. 

3184 

3185 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3186 parameter to allow the replacement of a connection pool in a child 

3187 process without interfering with the connections used by the parent 

3188 process. 

3189 

3190 

3191 .. seealso:: 

3192 

3193 :ref:`engine_disposal` 

3194 

3195 :ref:`pooling_multiprocessing` 

3196 

3197 """ 

3198 if close: 

3199 self.pool.dispose() 

3200 self.pool = self.pool.recreate() 

3201 self.dispatch.engine_disposed(self) 

3202 

3203 @contextlib.contextmanager 

3204 def _optional_conn_ctx_manager( 

3205 self, connection: Optional[Connection] = None 

3206 ) -> Iterator[Connection]: 

3207 if connection is None: 

3208 with self.connect() as conn: 

3209 yield conn 

3210 else: 

3211 yield connection 

3212 

3213 @contextlib.contextmanager 

3214 def begin(self) -> Iterator[Connection]: 

3215 """Return a context manager delivering a :class:`_engine.Connection` 

3216 with a :class:`.Transaction` established. 

3217 

3218 E.g.:: 

3219 

3220 with engine.begin() as conn: 

3221 conn.execute( 

3222 text("insert into table (x, y, z) values (1, 2, 3)") 

3223 ) 

3224 conn.execute(text("my_special_procedure(5)")) 

3225 

3226 Upon successful operation, the :class:`.Transaction` 

3227 is committed. If an error is raised, the :class:`.Transaction` 

3228 is rolled back. 

3229 

3230 .. seealso:: 

3231 

3232 :meth:`_engine.Engine.connect` - procure a 

3233 :class:`_engine.Connection` from 

3234 an :class:`_engine.Engine`. 

3235 

3236 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3237 for a particular :class:`_engine.Connection`. 

3238 

3239 """ 

3240 with self.connect() as conn: 

3241 with conn.begin(): 

3242 yield conn 

3243 

3244 def _run_ddl_visitor( 

3245 self, 

3246 visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]], 

3247 element: SchemaItem, 

3248 **kwargs: Any, 

3249 ) -> None: 

3250 with self.begin() as conn: 

3251 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3252 

3253 def connect(self) -> Connection: 

3254 """Return a new :class:`_engine.Connection` object. 

3255 

3256 The :class:`_engine.Connection` acts as a Python context manager, so 

3257 the typical use of this method looks like:: 

3258 

3259 with engine.connect() as connection: 

3260 connection.execute(text("insert into table values ('foo')")) 

3261 connection.commit() 

3262 

3263 Where above, after the block is completed, the connection is "closed" 

3264 and its underlying DBAPI resources are returned to the connection pool. 

3265 This also has the effect of rolling back any transaction that 

3266 was explicitly begun or was begun via autobegin, and will 

3267 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3268 started and is still in progress. 

3269 

3270 .. seealso:: 

3271 

3272 :meth:`_engine.Engine.begin` 

3273 

3274 """ 

3275 

3276 return self._connection_cls(self) 

3277 

3278 def raw_connection(self) -> PoolProxiedConnection: 

3279 """Return a "raw" DBAPI connection from the connection pool. 

3280 

3281 The returned object is a proxied version of the DBAPI 

3282 connection object used by the underlying driver in use. 

3283 The object will have all the same behavior as the real DBAPI 

3284 connection, except that its ``close()`` method will result in the 

3285 connection being returned to the pool, rather than being closed 

3286 for real. 

3287 

3288 This method provides direct DBAPI connection access for 

3289 special situations when the API provided by 

3290 :class:`_engine.Connection` 

3291 is not needed. When a :class:`_engine.Connection` object is already 

3292 present, the DBAPI connection is available using 

3293 the :attr:`_engine.Connection.connection` accessor. 

3294 

3295 .. seealso:: 

3296 

3297 :ref:`dbapi_connections` 

3298 

3299 """ 

3300 return self.pool.connect() 

3301 

3302 

3303class OptionEngineMixin(log.Identified): 

3304 _sa_propagate_class_events = False 

3305 

3306 dispatch: dispatcher[ConnectionEventsTarget] 

3307 _compiled_cache: Optional[CompiledCacheType] 

3308 dialect: Dialect 

3309 pool: Pool 

3310 url: URL 

3311 hide_parameters: bool 

3312 echo: log.echo_property 

3313 

3314 def __init__( 

3315 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3316 ): 

3317 self._proxied = proxied 

3318 self.url = proxied.url 

3319 self.dialect = proxied.dialect 

3320 self.logging_name = proxied.logging_name 

3321 self.echo = proxied.echo 

3322 self._compiled_cache = proxied._compiled_cache 

3323 self.hide_parameters = proxied.hide_parameters 

3324 log.instance_logger(self, echoflag=self.echo) 

3325 

3326 # note: this will propagate events that are assigned to the parent 

3327 # engine after this OptionEngine is created. Since we share 

3328 # the events of the parent we also disallow class-level events 

3329 # to apply to the OptionEngine class directly. 

3330 # 

3331 # the other way this can work would be to transfer existing 

3332 # events only, using: 

3333 # self.dispatch._update(proxied.dispatch) 

3334 # 

3335 # that might be more appropriate however it would be a behavioral 

3336 # change for logic that assigns events to the parent engine and 

3337 # would like it to take effect for the already-created sub-engine. 

3338 self.dispatch = self.dispatch._join(proxied.dispatch) 

3339 

3340 self._execution_options = proxied._execution_options 

3341 self.update_execution_options(**execution_options) 

3342 

3343 def update_execution_options(self, **opt: Any) -> None: 

3344 raise NotImplementedError() 

3345 

3346 if not typing.TYPE_CHECKING: 

3347 # https://github.com/python/typing/discussions/1095 

3348 

3349 @property 

3350 def pool(self) -> Pool: 

3351 return self._proxied.pool 

3352 

3353 @pool.setter 

3354 def pool(self, pool: Pool) -> None: 

3355 self._proxied.pool = pool 

3356 

3357 @property 

3358 def _has_events(self) -> bool: 

3359 return self._proxied._has_events or self.__dict__.get( 

3360 "_has_events", False 

3361 ) 

3362 

3363 @_has_events.setter 

3364 def _has_events(self, value: bool) -> None: 

3365 self.__dict__["_has_events"] = value 

3366 

3367 

3368class OptionEngine(OptionEngineMixin, Engine): 

3369 def update_execution_options(self, **opt: Any) -> None: 

3370 Engine.update_execution_options(self, **opt) 

3371 

3372 

3373Engine._option_cls = OptionEngine