Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1002 statements  

1# engine/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.""" 

8from __future__ import annotations 

9 

10import contextlib 

11import sys 

12import typing 

13from typing import Any 

14from typing import Callable 

15from typing import cast 

16from typing import Iterable 

17from typing import Iterator 

18from typing import List 

19from typing import Mapping 

20from typing import NoReturn 

21from typing import Optional 

22from typing import overload 

23from typing import Tuple 

24from typing import Type 

25from typing import TypeVar 

26from typing import Union 

27 

28from .interfaces import BindTyping 

29from .interfaces import ConnectionEventsTarget 

30from .interfaces import DBAPICursor 

31from .interfaces import ExceptionContext 

32from .interfaces import ExecuteStyle 

33from .interfaces import ExecutionContext 

34from .interfaces import IsolationLevel 

35from .util import _distill_params_20 

36from .util import _distill_raw_params 

37from .util import TransactionalContext 

38from .. import exc 

39from .. import inspection 

40from .. import log 

41from .. import util 

42from ..sql import compiler 

43from ..sql import util as sql_util 

44 

45if typing.TYPE_CHECKING: 

46 from . import CursorResult 

47 from . import ScalarResult 

48 from .interfaces import _AnyExecuteParams 

49 from .interfaces import _AnyMultiExecuteParams 

50 from .interfaces import _CoreAnyExecuteParams 

51 from .interfaces import _CoreMultiExecuteParams 

52 from .interfaces import _CoreSingleExecuteParams 

53 from .interfaces import _DBAPIAnyExecuteParams 

54 from .interfaces import _DBAPISingleExecuteParams 

55 from .interfaces import _ExecuteOptions 

56 from .interfaces import CompiledCacheType 

57 from .interfaces import CoreExecuteOptionsParameter 

58 from .interfaces import Dialect 

59 from .interfaces import SchemaTranslateMapType 

60 from .reflection import Inspector # noqa 

61 from .url import URL 

62 from ..event import dispatcher 

63 from ..log import _EchoFlagType 

64 from ..pool import _ConnectionFairy 

65 from ..pool import Pool 

66 from ..pool import PoolProxiedConnection 

67 from ..sql import Executable 

68 from ..sql._typing import _InfoType 

69 from ..sql.compiler import Compiled 

70 from ..sql.ddl import ExecutableDDLElement 

71 from ..sql.ddl import InvokeDDLBase 

72 from ..sql.functions import FunctionElement 

73 from ..sql.schema import DefaultGenerator 

74 from ..sql.schema import HasSchemaAttr 

75 from ..sql.schema import SchemaVisitable 

76 from ..sql.selectable import TypedReturnsRows 

77 

78 

79_T = TypeVar("_T", bound=Any) 

80_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 

81NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 

82 

83 

84class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 

85 """Provides high-level functionality for a wrapped DB-API connection. 

86 

87 The :class:`_engine.Connection` object is procured by calling the 

88 :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

89 object, and provides services for execution of SQL statements as well 

90 as transaction control. 

91 

92 The Connection object is **not** thread-safe. While a Connection can be 

93 shared among threads using properly synchronized access, it is still 

94 possible that the underlying DBAPI connection may not support shared 

95 access between threads. Check the DBAPI documentation for details. 

96 

97 The Connection object represents a single DBAPI connection checked out 

98 from the connection pool. In this state, the connection pool has no 

99 affect upon the connection, including its expiration or timeout state. 

100 For the connection pool to properly manage connections, connections 

101 should be returned to the connection pool (i.e. ``connection.close()``) 

102 whenever the connection is not in use. 

103 

104 .. index:: 

105 single: thread safety; Connection 

106 

107 """ 

108 

109 dialect: Dialect 

110 dispatch: dispatcher[ConnectionEventsTarget] 

111 

112 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

113 

114 # used by sqlalchemy.engine.util.TransactionalContext 

115 _trans_context_manager: Optional[TransactionalContext] = None 

116 

117 # legacy as of 2.0, should be eventually deprecated and 

118 # removed. was used in the "pre_ping" recipe that's been in the docs 

119 # a long time 

120 should_close_with_result = False 

121 

122 _dbapi_connection: Optional[PoolProxiedConnection] 

123 

124 _execution_options: _ExecuteOptions 

125 

126 _transaction: Optional[RootTransaction] 

127 _nested_transaction: Optional[NestedTransaction] 

128 

129 def __init__( 

130 self, 

131 engine: Engine, 

132 connection: Optional[PoolProxiedConnection] = None, 

133 _has_events: Optional[bool] = None, 

134 _allow_revalidate: bool = True, 

135 _allow_autobegin: bool = True, 

136 ): 

137 """Construct a new Connection.""" 

138 self.engine = engine 

139 self.dialect = dialect = engine.dialect 

140 

141 if connection is None: 

142 try: 

143 self._dbapi_connection = engine.raw_connection() 

144 except dialect.loaded_dbapi.Error as err: 

145 Connection._handle_dbapi_exception_noconnection( 

146 err, dialect, engine 

147 ) 

148 raise 

149 else: 

150 self._dbapi_connection = connection 

151 

152 self._transaction = self._nested_transaction = None 

153 self.__savepoint_seq = 0 

154 self.__in_begin = False 

155 

156 self.__can_reconnect = _allow_revalidate 

157 self._allow_autobegin = _allow_autobegin 

158 self._echo = self.engine._should_log_info() 

159 

160 if _has_events is None: 

161 # if _has_events is sent explicitly as False, 

162 # then don't join the dispatch of the engine; we don't 

163 # want to handle any of the engine's events in that case. 

164 self.dispatch = self.dispatch._join(engine.dispatch) 

165 self._has_events = _has_events or ( 

166 _has_events is None and engine._has_events 

167 ) 

168 

169 self._execution_options = engine._execution_options 

170 

171 if self._has_events or self.engine._has_events: 

172 self.dispatch.engine_connect(self) 

173 

174 # this can be assigned differently via 

175 # characteristics.LoggingTokenCharacteristic 

176 _message_formatter: Any = None 

177 

178 def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 

179 fmt = self._message_formatter 

180 

181 if fmt: 

182 message = fmt(message) 

183 

184 if log.STACKLEVEL: 

185 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

186 

187 self.engine.logger.info(message, *arg, **kw) 

188 

189 def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 

190 fmt = self._message_formatter 

191 

192 if fmt: 

193 message = fmt(message) 

194 

195 if log.STACKLEVEL: 

196 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

197 

198 self.engine.logger.debug(message, *arg, **kw) 

199 

200 @property 

201 def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 

202 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

203 self._execution_options.get("schema_translate_map", None) 

204 ) 

205 

206 return schema_translate_map 

207 

208 def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 

209 """Return the schema name for the given schema item taking into 

210 account current schema translate map. 

211 

212 """ 

213 

214 name = obj.schema 

215 schema_translate_map: Optional[SchemaTranslateMapType] = ( 

216 self._execution_options.get("schema_translate_map", None) 

217 ) 

218 

219 if ( 

220 schema_translate_map 

221 and name in schema_translate_map 

222 and obj._use_schema_map 

223 ): 

224 return schema_translate_map[name] 

225 else: 

226 return name 

227 

228 def __enter__(self) -> Connection: 

229 return self 

230 

231 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 

232 self.close() 

233 

234 @overload 

235 def execution_options( 

236 self, 

237 *, 

238 compiled_cache: Optional[CompiledCacheType] = ..., 

239 logging_token: str = ..., 

240 isolation_level: IsolationLevel = ..., 

241 no_parameters: bool = False, 

242 stream_results: bool = False, 

243 max_row_buffer: int = ..., 

244 yield_per: int = ..., 

245 insertmanyvalues_page_size: int = ..., 

246 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

247 preserve_rowcount: bool = False, 

248 **opt: Any, 

249 ) -> Connection: ... 

250 

251 @overload 

252 def execution_options(self, **opt: Any) -> Connection: ... 

253 

254 def execution_options(self, **opt: Any) -> Connection: 

255 r"""Set non-SQL options for the connection which take effect 

256 during execution. 

257 

258 This method modifies this :class:`_engine.Connection` **in-place**; 

259 the return value is the same :class:`_engine.Connection` object 

260 upon which the method is called. Note that this is in contrast 

261 to the behavior of the ``execution_options`` methods on other 

262 objects such as :meth:`_engine.Engine.execution_options` and 

263 :meth:`_sql.Executable.execution_options`. The rationale is that many 

264 such execution options necessarily modify the state of the base 

265 DBAPI connection in any case so there is no feasible means of 

266 keeping the effect of such an option localized to a "sub" connection. 

267 

268 .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` 

269 method, in contrast to other objects with this method, modifies 

270 the connection in-place without creating copy of it. 

271 

272 As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 

273 method accepts any arbitrary parameters including user defined names. 

274 All parameters given are consumable in a number of ways including 

275 by using the :meth:`_engine.Connection.get_execution_options` method. 

276 See the examples at :meth:`_sql.Executable.execution_options` 

277 and :meth:`_engine.Engine.execution_options`. 

278 

279 The keywords that are currently recognized by SQLAlchemy itself 

280 include all those listed under :meth:`.Executable.execution_options`, 

281 as well as others that are specific to :class:`_engine.Connection`. 

282 

283 :param compiled_cache: Available on: :class:`_engine.Connection`, 

284 :class:`_engine.Engine`. 

285 

286 A dictionary where :class:`.Compiled` objects 

287 will be cached when the :class:`_engine.Connection` 

288 compiles a clause 

289 expression into a :class:`.Compiled` object. This dictionary will 

290 supersede the statement cache that may be configured on the 

291 :class:`_engine.Engine` itself. If set to None, caching 

292 is disabled, even if the engine has a configured cache size. 

293 

294 Note that the ORM makes use of its own "compiled" caches for 

295 some operations, including flush operations. The caching 

296 used by the ORM internally supersedes a cache dictionary 

297 specified here. 

298 

299 :param logging_token: Available on: :class:`_engine.Connection`, 

300 :class:`_engine.Engine`, :class:`_sql.Executable`. 

301 

302 Adds the specified string token surrounded by brackets in log 

303 messages logged by the connection, i.e. the logging that's enabled 

304 either via the :paramref:`_sa.create_engine.echo` flag or via the 

305 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

306 per-connection or per-sub-engine token to be available which is 

307 useful for debugging concurrent connection scenarios. 

308 

309 .. versionadded:: 1.4.0b2 

310 

311 .. seealso:: 

312 

313 :ref:`dbengine_logging_tokens` - usage example 

314 

315 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

316 name used by the Python logger object itself. 

317 

318 :param isolation_level: Available on: :class:`_engine.Connection`, 

319 :class:`_engine.Engine`. 

320 

321 Set the transaction isolation level for the lifespan of this 

322 :class:`_engine.Connection` object. 

323 Valid values include those string 

324 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

325 parameter passed to :func:`_sa.create_engine`. These levels are 

326 semi-database specific; see individual dialect documentation for 

327 valid levels. 

328 

329 The isolation level option applies the isolation level by emitting 

330 statements on the DBAPI connection, and **necessarily affects the 

331 original Connection object overall**. The isolation level will remain 

332 at the given setting until explicitly changed, or when the DBAPI 

333 connection itself is :term:`released` to the connection pool, i.e. the 

334 :meth:`_engine.Connection.close` method is called, at which time an 

335 event handler will emit additional statements on the DBAPI connection 

336 in order to revert the isolation level change. 

337 

338 .. note:: The ``isolation_level`` execution option may only be 

339 established before the :meth:`_engine.Connection.begin` method is 

340 called, as well as before any SQL statements are emitted which 

341 would otherwise trigger "autobegin", or directly after a call to 

342 :meth:`_engine.Connection.commit` or 

343 :meth:`_engine.Connection.rollback`. A database cannot change the 

344 isolation level on a transaction in progress. 

345 

346 .. note:: The ``isolation_level`` execution option is implicitly 

347 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

348 the :meth:`_engine.Connection.invalidate` method, or if a 

349 disconnection error occurs. The new connection produced after the 

350 invalidation will **not** have the selected isolation level 

351 re-applied to it automatically. 

352 

353 .. seealso:: 

354 

355 :ref:`dbapi_autocommit` 

356 

357 :meth:`_engine.Connection.get_isolation_level` 

358 - view current actual level 

359 

360 :param no_parameters: Available on: :class:`_engine.Connection`, 

361 :class:`_sql.Executable`. 

362 

363 When ``True``, if the final parameter 

364 list or dictionary is totally empty, will invoke the 

365 statement on the cursor as ``cursor.execute(statement)``, 

366 not passing the parameter collection at all. 

367 Some DBAPIs such as psycopg2 and mysql-python consider 

368 percent signs as significant only when parameters are 

369 present; this option allows code to generate SQL 

370 containing percent signs (and possibly other characters) 

371 that is neutral regarding whether it's executed by the DBAPI 

372 or piped into a script that's later invoked by 

373 command line tools. 

374 

375 :param stream_results: Available on: :class:`_engine.Connection`, 

376 :class:`_sql.Executable`. 

377 

378 Indicate to the dialect that results should be "streamed" and not 

379 pre-buffered, if possible. For backends such as PostgreSQL, MySQL 

380 and MariaDB, this indicates the use of a "server side cursor" as 

381 opposed to a client side cursor. Other backends such as that of 

382 Oracle Database may already use server side cursors by default. 

383 

384 The usage of 

385 :paramref:`_engine.Connection.execution_options.stream_results` is 

386 usually combined with setting a fixed number of rows to to be fetched 

387 in batches, to allow for efficient iteration of database rows while 

388 at the same time not loading all result rows into memory at once; 

389 this can be configured on a :class:`_engine.Result` object using the 

390 :meth:`_engine.Result.yield_per` method, after execution has 

391 returned a new :class:`_engine.Result`. If 

392 :meth:`_engine.Result.yield_per` is not used, 

393 the :paramref:`_engine.Connection.execution_options.stream_results` 

394 mode of operation will instead use a dynamically sized buffer 

395 which buffers sets of rows at a time, growing on each batch 

396 based on a fixed growth size up until a limit which may 

397 be configured using the 

398 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

399 parameter. 

400 

401 When using the ORM to fetch ORM mapped objects from a result, 

402 :meth:`_engine.Result.yield_per` should always be used with 

403 :paramref:`_engine.Connection.execution_options.stream_results`, 

404 so that the ORM does not fetch all rows into new ORM objects at once. 

405 

406 For typical use, the 

407 :paramref:`_engine.Connection.execution_options.yield_per` execution 

408 option should be preferred, which sets up both 

409 :paramref:`_engine.Connection.execution_options.stream_results` and 

410 :meth:`_engine.Result.yield_per` at once. This option is supported 

411 both at a core level by :class:`_engine.Connection` as well as by the 

412 ORM :class:`_engine.Session`; the latter is described at 

413 :ref:`orm_queryguide_yield_per`. 

414 

415 .. seealso:: 

416 

417 :ref:`engine_stream_results` - background on 

418 :paramref:`_engine.Connection.execution_options.stream_results` 

419 

420 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

421 

422 :paramref:`_engine.Connection.execution_options.yield_per` 

423 

424 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

425 describing the ORM version of ``yield_per`` 

426 

427 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

428 :class:`_sql.Executable`. Sets a maximum 

429 buffer size to use when the 

430 :paramref:`_engine.Connection.execution_options.stream_results` 

431 execution option is used on a backend that supports server side 

432 cursors. The default value if not specified is 1000. 

433 

434 .. seealso:: 

435 

436 :paramref:`_engine.Connection.execution_options.stream_results` 

437 

438 :ref:`engine_stream_results` 

439 

440 

441 :param yield_per: Available on: :class:`_engine.Connection`, 

442 :class:`_sql.Executable`. Integer value applied which will 

443 set the :paramref:`_engine.Connection.execution_options.stream_results` 

444 execution option and invoke :meth:`_engine.Result.yield_per` 

445 automatically at once. Allows equivalent functionality as 

446 is present when using this parameter with the ORM. 

447 

448 .. versionadded:: 1.4.40 

449 

450 .. seealso:: 

451 

452 :ref:`engine_stream_results` - background and examples 

453 on using server side cursors with Core. 

454 

455 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

456 describing the ORM version of ``yield_per`` 

457 

458 :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 

459 :class:`_engine.Engine`. Number of rows to format into an 

460 INSERT statement when the statement uses "insertmanyvalues" mode, 

461 which is a paged form of bulk insert that is used for many backends 

462 when using :term:`executemany` execution typically in conjunction 

463 with RETURNING. Defaults to 1000. May also be modified on a 

464 per-engine basis using the 

465 :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 

466 

467 .. versionadded:: 2.0 

468 

469 .. seealso:: 

470 

471 :ref:`engine_insertmanyvalues` 

472 

473 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

474 :class:`_engine.Engine`, :class:`_sql.Executable`. 

475 

476 A dictionary mapping schema names to schema names, that will be 

477 applied to the :paramref:`_schema.Table.schema` element of each 

478 :class:`_schema.Table` 

479 encountered when SQL or DDL expression elements 

480 are compiled into strings; the resulting schema name will be 

481 converted based on presence in the map of the original name. 

482 

483 .. seealso:: 

484 

485 :ref:`schema_translating` 

486 

487 :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 

488 attribute will be unconditionally memoized within the result and 

489 made available via the :attr:`.CursorResult.rowcount` attribute. 

490 Normally, this attribute is only preserved for UPDATE and DELETE 

491 statements. Using this option, the DBAPIs rowcount value can 

492 be accessed for other kinds of statements such as INSERT and SELECT, 

493 to the degree that the DBAPI supports these statements. See 

494 :attr:`.CursorResult.rowcount` for notes regarding the behavior 

495 of this attribute. 

496 

497 .. versionadded:: 2.0.28 

498 

499 .. seealso:: 

500 

501 :meth:`_engine.Engine.execution_options` 

502 

503 :meth:`.Executable.execution_options` 

504 

505 :meth:`_engine.Connection.get_execution_options` 

506 

507 :ref:`orm_queryguide_execution_options` - documentation on all 

508 ORM-specific execution options 

509 

510 """ # noqa 

511 if self._has_events or self.engine._has_events: 

512 self.dispatch.set_connection_execution_options(self, opt) 

513 self._execution_options = self._execution_options.union(opt) 

514 self.dialect.set_connection_execution_options(self, opt) 

515 return self 

516 

517 def get_execution_options(self) -> _ExecuteOptions: 

518 """Get the non-SQL options which will take effect during execution. 

519 

520 .. versionadded:: 1.3 

521 

522 .. seealso:: 

523 

524 :meth:`_engine.Connection.execution_options` 

525 """ 

526 return self._execution_options 

527 

528 @property 

529 def _still_open_and_dbapi_connection_is_valid(self) -> bool: 

530 pool_proxied_connection = self._dbapi_connection 

531 return ( 

532 pool_proxied_connection is not None 

533 and pool_proxied_connection.is_valid 

534 ) 

535 

536 @property 

537 def closed(self) -> bool: 

538 """Return True if this connection is closed.""" 

539 

540 return self._dbapi_connection is None and not self.__can_reconnect 

541 

542 @property 

543 def invalidated(self) -> bool: 

544 """Return True if this connection was invalidated. 

545 

546 This does not indicate whether or not the connection was 

547 invalidated at the pool level, however 

548 

549 """ 

550 

551 # prior to 1.4, "invalid" was stored as a state independent of 

552 # "closed", meaning an invalidated connection could be "closed", 

553 # the _dbapi_connection would be None and closed=True, yet the 

554 # "invalid" flag would stay True. This meant that there were 

555 # three separate states (open/valid, closed/valid, closed/invalid) 

556 # when there is really no reason for that; a connection that's 

557 # "closed" does not need to be "invalid". So the state is now 

558 # represented by the two facts alone. 

559 

560 pool_proxied_connection = self._dbapi_connection 

561 return pool_proxied_connection is None and self.__can_reconnect 

562 

563 @property 

564 def connection(self) -> PoolProxiedConnection: 

565 """The underlying DB-API connection managed by this Connection. 

566 

567 This is a SQLAlchemy connection-pool proxied connection 

568 which then has the attribute 

569 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

570 actual driver connection. 

571 

572 .. seealso:: 

573 

574 

575 :ref:`dbapi_connections` 

576 

577 """ 

578 

579 if self._dbapi_connection is None: 

580 try: 

581 return self._revalidate_connection() 

582 except (exc.PendingRollbackError, exc.ResourceClosedError): 

583 raise 

584 except BaseException as e: 

585 self._handle_dbapi_exception(e, None, None, None, None) 

586 else: 

587 return self._dbapi_connection 

588 

589 def get_isolation_level(self) -> IsolationLevel: 

590 """Return the current **actual** isolation level that's present on 

591 the database within the scope of this connection. 

592 

593 This attribute will perform a live SQL operation against the database 

594 in order to procure the current isolation level, so the value returned 

595 is the actual level on the underlying DBAPI connection regardless of 

596 how this state was set. This will be one of the four actual isolation 

597 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

598 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

599 level setting. Third party dialects may also feature additional 

600 isolation level settings. 

601 

602 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

603 isolation level, which is a separate :term:`dbapi` setting that's 

604 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

605 in use, the database connection still has a "traditional" isolation 

606 mode in effect, that is typically one of the four values 

607 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

608 ``SERIALIZABLE``. 

609 

610 Compare to the :attr:`_engine.Connection.default_isolation_level` 

611 accessor which returns the isolation level that is present on the 

612 database at initial connection time. 

613 

614 .. seealso:: 

615 

616 :attr:`_engine.Connection.default_isolation_level` 

617 - view default level 

618 

619 :paramref:`_sa.create_engine.isolation_level` 

620 - set per :class:`_engine.Engine` isolation level 

621 

622 :paramref:`.Connection.execution_options.isolation_level` 

623 - set per :class:`_engine.Connection` isolation level 

624 

625 """ 

626 dbapi_connection = self.connection.dbapi_connection 

627 assert dbapi_connection is not None 

628 try: 

629 return self.dialect.get_isolation_level(dbapi_connection) 

630 except BaseException as e: 

631 self._handle_dbapi_exception(e, None, None, None, None) 

632 

633 @property 

634 def default_isolation_level(self) -> Optional[IsolationLevel]: 

635 """The initial-connection time isolation level associated with the 

636 :class:`_engine.Dialect` in use. 

637 

638 This value is independent of the 

639 :paramref:`.Connection.execution_options.isolation_level` and 

640 :paramref:`.Engine.execution_options.isolation_level` execution 

641 options, and is determined by the :class:`_engine.Dialect` when the 

642 first connection is created, by performing a SQL query against the 

643 database for the current isolation level before any additional commands 

644 have been emitted. 

645 

646 Calling this accessor does not invoke any new SQL queries. 

647 

648 .. seealso:: 

649 

650 :meth:`_engine.Connection.get_isolation_level` 

651 - view current actual isolation level 

652 

653 :paramref:`_sa.create_engine.isolation_level` 

654 - set per :class:`_engine.Engine` isolation level 

655 

656 :paramref:`.Connection.execution_options.isolation_level` 

657 - set per :class:`_engine.Connection` isolation level 

658 

659 """ 

660 return self.dialect.default_isolation_level 

661 

662 def _invalid_transaction(self) -> NoReturn: 

663 raise exc.PendingRollbackError( 

664 "Can't reconnect until invalid %stransaction is rolled " 

665 "back. Please rollback() fully before proceeding" 

666 % ("savepoint " if self._nested_transaction is not None else ""), 

667 code="8s2b", 

668 ) 

669 

670 def _revalidate_connection(self) -> PoolProxiedConnection: 

671 if self.__can_reconnect and self.invalidated: 

672 if self._transaction is not None: 

673 self._invalid_transaction() 

674 self._dbapi_connection = self.engine.raw_connection() 

675 return self._dbapi_connection 

676 raise exc.ResourceClosedError("This Connection is closed") 

677 

678 @property 

679 def info(self) -> _InfoType: 

680 """Info dictionary associated with the underlying DBAPI connection 

681 referred to by this :class:`_engine.Connection`, allowing user-defined 

682 data to be associated with the connection. 

683 

684 The data here will follow along with the DBAPI connection including 

685 after it is returned to the connection pool and used again 

686 in subsequent instances of :class:`_engine.Connection`. 

687 

688 """ 

689 

690 return self.connection.info 

691 

692 def invalidate(self, exception: Optional[BaseException] = None) -> None: 

693 """Invalidate the underlying DBAPI connection associated with 

694 this :class:`_engine.Connection`. 

695 

696 An attempt will be made to close the underlying DBAPI connection 

697 immediately; however if this operation fails, the error is logged 

698 but not raised. The connection is then discarded whether or not 

699 close() succeeded. 

700 

701 Upon the next use (where "use" typically means using the 

702 :meth:`_engine.Connection.execute` method or similar), 

703 this :class:`_engine.Connection` will attempt to 

704 procure a new DBAPI connection using the services of the 

705 :class:`_pool.Pool` as a source of connectivity (e.g. 

706 a "reconnection"). 

707 

708 If a transaction was in progress (e.g. the 

709 :meth:`_engine.Connection.begin` method has been called) when 

710 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

711 level all state associated with this transaction is lost, as 

712 the DBAPI connection is closed. The :class:`_engine.Connection` 

713 will not allow a reconnection to proceed until the 

714 :class:`.Transaction` object is ended, by calling the 

715 :meth:`.Transaction.rollback` method; until that point, any attempt at 

716 continuing to use the :class:`_engine.Connection` will raise an 

717 :class:`~sqlalchemy.exc.InvalidRequestError`. 

718 This is to prevent applications from accidentally 

719 continuing an ongoing transactional operations despite the 

720 fact that the transaction has been lost due to an 

721 invalidation. 

722 

723 The :meth:`_engine.Connection.invalidate` method, 

724 just like auto-invalidation, 

725 will at the connection pool level invoke the 

726 :meth:`_events.PoolEvents.invalidate` event. 

727 

728 :param exception: an optional ``Exception`` instance that's the 

729 reason for the invalidation. is passed along to event handlers 

730 and logging functions. 

731 

732 .. seealso:: 

733 

734 :ref:`pool_connection_invalidation` 

735 

736 """ 

737 

738 if self.invalidated: 

739 return 

740 

741 if self.closed: 

742 raise exc.ResourceClosedError("This Connection is closed") 

743 

744 if self._still_open_and_dbapi_connection_is_valid: 

745 pool_proxied_connection = self._dbapi_connection 

746 assert pool_proxied_connection is not None 

747 pool_proxied_connection.invalidate(exception) 

748 

749 self._dbapi_connection = None 

750 

751 def detach(self) -> None: 

752 """Detach the underlying DB-API connection from its connection pool. 

753 

754 E.g.:: 

755 

756 with engine.connect() as conn: 

757 conn.detach() 

758 conn.execute(text("SET search_path TO schema1, schema2")) 

759 

760 # work with connection 

761 

762 # connection is fully closed (since we used "with:", can 

763 # also call .close()) 

764 

765 This :class:`_engine.Connection` instance will remain usable. 

766 When closed 

767 (or exited from a context manager context as above), 

768 the DB-API connection will be literally closed and not 

769 returned to its originating pool. 

770 

771 This method can be used to insulate the rest of an application 

772 from a modified state on a connection (such as a transaction 

773 isolation level or similar). 

774 

775 """ 

776 

777 if self.closed: 

778 raise exc.ResourceClosedError("This Connection is closed") 

779 

780 pool_proxied_connection = self._dbapi_connection 

781 if pool_proxied_connection is None: 

782 raise exc.InvalidRequestError( 

783 "Can't detach an invalidated Connection" 

784 ) 

785 pool_proxied_connection.detach() 

786 

787 def _autobegin(self) -> None: 

788 if self._allow_autobegin and not self.__in_begin: 

789 self.begin() 

790 

791 def begin(self) -> RootTransaction: 

792 """Begin a transaction prior to autobegin occurring. 

793 

794 E.g.:: 

795 

796 with engine.connect() as conn: 

797 with conn.begin() as trans: 

798 conn.execute(table.insert(), {"username": "sandy"}) 

799 

800 The returned object is an instance of :class:`_engine.RootTransaction`. 

801 This object represents the "scope" of the transaction, 

802 which completes when either the :meth:`_engine.Transaction.rollback` 

803 or :meth:`_engine.Transaction.commit` method is called; the object 

804 also works as a context manager as illustrated above. 

805 

806 The :meth:`_engine.Connection.begin` method begins a 

807 transaction that normally will be begun in any case when the connection 

808 is first used to execute a statement. The reason this method might be 

809 used would be to invoke the :meth:`_events.ConnectionEvents.begin` 

810 event at a specific time, or to organize code within the scope of a 

811 connection checkout in terms of context managed blocks, such as:: 

812 

813 with engine.connect() as conn: 

814 with conn.begin(): 

815 conn.execute(...) 

816 conn.execute(...) 

817 

818 with conn.begin(): 

819 conn.execute(...) 

820 conn.execute(...) 

821 

822 The above code is not fundamentally any different in its behavior than 

823 the following code which does not use 

824 :meth:`_engine.Connection.begin`; the below style is known 

825 as "commit as you go" style:: 

826 

827 with engine.connect() as conn: 

828 conn.execute(...) 

829 conn.execute(...) 

830 conn.commit() 

831 

832 conn.execute(...) 

833 conn.execute(...) 

834 conn.commit() 

835 

836 From a database point of view, the :meth:`_engine.Connection.begin` 

837 method does not emit any SQL or change the state of the underlying 

838 DBAPI connection in any way; the Python DBAPI does not have any 

839 concept of explicit transaction begin. 

840 

841 .. seealso:: 

842 

843 :ref:`tutorial_working_with_transactions` - in the 

844 :ref:`unified_tutorial` 

845 

846 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

847 

848 :meth:`_engine.Connection.begin_twophase` - 

849 use a two phase /XID transaction 

850 

851 :meth:`_engine.Engine.begin` - context manager available from 

852 :class:`_engine.Engine` 

853 

854 """ 

855 if self._transaction is None: 

856 self._transaction = RootTransaction(self) 

857 return self._transaction 

858 else: 

859 raise exc.InvalidRequestError( 

860 "This connection has already initialized a SQLAlchemy " 

861 "Transaction() object via begin() or autobegin; can't " 

862 "call begin() here unless rollback() or commit() " 

863 "is called first." 

864 ) 

865 

866 def begin_nested(self) -> NestedTransaction: 

867 """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 

868 handle that controls the scope of the SAVEPOINT. 

869 

870 E.g.:: 

871 

872 with engine.begin() as connection: 

873 with connection.begin_nested(): 

874 connection.execute(table.insert(), {"username": "sandy"}) 

875 

876 The returned object is an instance of 

877 :class:`_engine.NestedTransaction`, which includes transactional 

878 methods :meth:`_engine.NestedTransaction.commit` and 

879 :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 

880 these methods correspond to the operations "RELEASE SAVEPOINT <name>" 

881 and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 

882 to the :class:`_engine.NestedTransaction` object and is generated 

883 automatically. Like any other :class:`_engine.Transaction`, the 

884 :class:`_engine.NestedTransaction` may be used as a context manager as 

885 illustrated above which will "release" or "rollback" corresponding to 

886 if the operation within the block were successful or raised an 

887 exception. 

888 

889 Nested transactions require SAVEPOINT support in the underlying 

890 database, else the behavior is undefined. SAVEPOINT is commonly used to 

891 run operations within a transaction that may fail, while continuing the 

892 outer transaction. E.g.:: 

893 

894 from sqlalchemy import exc 

895 

896 with engine.begin() as connection: 

897 trans = connection.begin_nested() 

898 try: 

899 connection.execute(table.insert(), {"username": "sandy"}) 

900 trans.commit() 

901 except exc.IntegrityError: # catch for duplicate username 

902 trans.rollback() # rollback to savepoint 

903 

904 # outer transaction continues 

905 connection.execute(...) 

906 

907 If :meth:`_engine.Connection.begin_nested` is called without first 

908 calling :meth:`_engine.Connection.begin` or 

909 :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 

910 will "autobegin" the outer transaction first. This outer transaction 

911 may be committed using "commit-as-you-go" style, e.g.:: 

912 

913 with engine.connect() as connection: # begin() wasn't called 

914 

915 with connection.begin_nested(): # will auto-"begin()" first 

916 connection.execute(...) 

917 # savepoint is released 

918 

919 connection.execute(...) 

920 

921 # explicitly commit outer transaction 

922 connection.commit() 

923 

924 # can continue working with connection here 

925 

926 .. versionchanged:: 2.0 

927 

928 :meth:`_engine.Connection.begin_nested` will now participate 

929 in the connection "autobegin" behavior that is new as of 

930 2.0 / "future" style connections in 1.4. 

931 

932 .. seealso:: 

933 

934 :meth:`_engine.Connection.begin` 

935 

936 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

937 

938 """ 

939 if self._transaction is None: 

940 self._autobegin() 

941 

942 return NestedTransaction(self) 

943 

944 def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 

945 """Begin a two-phase or XA transaction and return a transaction 

946 handle. 

947 

948 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

949 which in addition to the methods provided by 

950 :class:`.Transaction`, also provides a 

951 :meth:`~.TwoPhaseTransaction.prepare` method. 

952 

953 :param xid: the two phase transaction id. If not supplied, a 

954 random id will be generated. 

955 

956 .. seealso:: 

957 

958 :meth:`_engine.Connection.begin` 

959 

960 :meth:`_engine.Connection.begin_twophase` 

961 

962 """ 

963 

964 if self._transaction is not None: 

965 raise exc.InvalidRequestError( 

966 "Cannot start a two phase transaction when a transaction " 

967 "is already in progress." 

968 ) 

969 if xid is None: 

970 xid = self.engine.dialect.create_xid() 

971 return TwoPhaseTransaction(self, xid) 

972 

973 def commit(self) -> None: 

974 """Commit the transaction that is currently in progress. 

975 

976 This method commits the current transaction if one has been started. 

977 If no transaction was started, the method has no effect, assuming 

978 the connection is in a non-invalidated state. 

979 

980 A transaction is begun on a :class:`_engine.Connection` automatically 

981 whenever a statement is first executed, or when the 

982 :meth:`_engine.Connection.begin` method is called. 

983 

984 .. note:: The :meth:`_engine.Connection.commit` method only acts upon 

985 the primary database transaction that is linked to the 

986 :class:`_engine.Connection` object. It does not operate upon a 

987 SAVEPOINT that would have been invoked from the 

988 :meth:`_engine.Connection.begin_nested` method; for control of a 

989 SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 

990 :class:`_engine.NestedTransaction` that is returned by the 

991 :meth:`_engine.Connection.begin_nested` method itself. 

992 

993 

994 """ 

995 if self._transaction: 

996 self._transaction.commit() 

997 

998 def rollback(self) -> None: 

999 """Roll back the transaction that is currently in progress. 

1000 

1001 This method rolls back the current transaction if one has been started. 

1002 If no transaction was started, the method has no effect. If a 

1003 transaction was started and the connection is in an invalidated state, 

1004 the transaction is cleared using this method. 

1005 

1006 A transaction is begun on a :class:`_engine.Connection` automatically 

1007 whenever a statement is first executed, or when the 

1008 :meth:`_engine.Connection.begin` method is called. 

1009 

1010 .. note:: The :meth:`_engine.Connection.rollback` method only acts 

1011 upon the primary database transaction that is linked to the 

1012 :class:`_engine.Connection` object. It does not operate upon a 

1013 SAVEPOINT that would have been invoked from the 

1014 :meth:`_engine.Connection.begin_nested` method; for control of a 

1015 SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 

1016 :class:`_engine.NestedTransaction` that is returned by the 

1017 :meth:`_engine.Connection.begin_nested` method itself. 

1018 

1019 

1020 """ 

1021 if self._transaction: 

1022 self._transaction.rollback() 

1023 

1024 def recover_twophase(self) -> List[Any]: 

1025 return self.engine.dialect.do_recover_twophase(self) 

1026 

1027 def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 

1028 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

1029 

1030 def commit_prepared(self, xid: Any, recover: bool = False) -> None: 

1031 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

1032 

1033 def in_transaction(self) -> bool: 

1034 """Return True if a transaction is in progress.""" 

1035 return self._transaction is not None and self._transaction.is_active 

1036 

1037 def in_nested_transaction(self) -> bool: 

1038 """Return True if a transaction is in progress.""" 

1039 return ( 

1040 self._nested_transaction is not None 

1041 and self._nested_transaction.is_active 

1042 ) 

1043 

1044 def _is_autocommit_isolation(self) -> bool: 

1045 opt_iso = self._execution_options.get("isolation_level", None) 

1046 return bool( 

1047 opt_iso == "AUTOCOMMIT" 

1048 or ( 

1049 opt_iso is None 

1050 and self.engine.dialect._on_connect_isolation_level 

1051 == "AUTOCOMMIT" 

1052 ) 

1053 ) 

1054 

1055 def _get_required_transaction(self) -> RootTransaction: 

1056 trans = self._transaction 

1057 if trans is None: 

1058 raise exc.InvalidRequestError("connection is not in a transaction") 

1059 return trans 

1060 

1061 def _get_required_nested_transaction(self) -> NestedTransaction: 

1062 trans = self._nested_transaction 

1063 if trans is None: 

1064 raise exc.InvalidRequestError( 

1065 "connection is not in a nested transaction" 

1066 ) 

1067 return trans 

1068 

1069 def get_transaction(self) -> Optional[RootTransaction]: 

1070 """Return the current root transaction in progress, if any. 

1071 

1072 .. versionadded:: 1.4 

1073 

1074 """ 

1075 

1076 return self._transaction 

1077 

1078 def get_nested_transaction(self) -> Optional[NestedTransaction]: 

1079 """Return the current nested transaction in progress, if any. 

1080 

1081 .. versionadded:: 1.4 

1082 

1083 """ 

1084 return self._nested_transaction 

1085 

1086 def _begin_impl(self, transaction: RootTransaction) -> None: 

1087 if self._echo: 

1088 if self._is_autocommit_isolation(): 

1089 self._log_info( 

1090 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1091 "autocommit mode)" 

1092 ) 

1093 else: 

1094 self._log_info("BEGIN (implicit)") 

1095 

1096 self.__in_begin = True 

1097 

1098 if self._has_events or self.engine._has_events: 

1099 self.dispatch.begin(self) 

1100 

1101 try: 

1102 self.engine.dialect.do_begin(self.connection) 

1103 except BaseException as e: 

1104 self._handle_dbapi_exception(e, None, None, None, None) 

1105 finally: 

1106 self.__in_begin = False 

1107 

1108 def _rollback_impl(self) -> None: 

1109 if self._has_events or self.engine._has_events: 

1110 self.dispatch.rollback(self) 

1111 

1112 if self._still_open_and_dbapi_connection_is_valid: 

1113 if self._echo: 

1114 if self._is_autocommit_isolation(): 

1115 if self.dialect.skip_autocommit_rollback: 

1116 self._log_info( 

1117 "ROLLBACK will be skipped by " 

1118 "skip_autocommit_rollback" 

1119 ) 

1120 else: 

1121 self._log_info( 

1122 "ROLLBACK using DBAPI connection.rollback(); " 

1123 "set skip_autocommit_rollback to prevent fully" 

1124 ) 

1125 else: 

1126 self._log_info("ROLLBACK") 

1127 try: 

1128 self.engine.dialect.do_rollback(self.connection) 

1129 except BaseException as e: 

1130 self._handle_dbapi_exception(e, None, None, None, None) 

1131 

1132 def _commit_impl(self) -> None: 

1133 if self._has_events or self.engine._has_events: 

1134 self.dispatch.commit(self) 

1135 

1136 if self._echo: 

1137 if self._is_autocommit_isolation(): 

1138 self._log_info( 

1139 "COMMIT using DBAPI connection.commit(), " 

1140 "has no effect due to autocommit mode" 

1141 ) 

1142 else: 

1143 self._log_info("COMMIT") 

1144 try: 

1145 self.engine.dialect.do_commit(self.connection) 

1146 except BaseException as e: 

1147 self._handle_dbapi_exception(e, None, None, None, None) 

1148 

1149 def _savepoint_impl(self, name: Optional[str] = None) -> str: 

1150 if self._has_events or self.engine._has_events: 

1151 self.dispatch.savepoint(self, name) 

1152 

1153 if name is None: 

1154 self.__savepoint_seq += 1 

1155 name = "sa_savepoint_%s" % self.__savepoint_seq 

1156 self.engine.dialect.do_savepoint(self, name) 

1157 return name 

1158 

1159 def _rollback_to_savepoint_impl(self, name: str) -> None: 

1160 if self._has_events or self.engine._has_events: 

1161 self.dispatch.rollback_savepoint(self, name, None) 

1162 

1163 if self._still_open_and_dbapi_connection_is_valid: 

1164 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1165 

1166 def _release_savepoint_impl(self, name: str) -> None: 

1167 if self._has_events or self.engine._has_events: 

1168 self.dispatch.release_savepoint(self, name, None) 

1169 

1170 self.engine.dialect.do_release_savepoint(self, name) 

1171 

1172 def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 

1173 if self._echo: 

1174 self._log_info("BEGIN TWOPHASE (implicit)") 

1175 if self._has_events or self.engine._has_events: 

1176 self.dispatch.begin_twophase(self, transaction.xid) 

1177 

1178 self.__in_begin = True 

1179 try: 

1180 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1181 except BaseException as e: 

1182 self._handle_dbapi_exception(e, None, None, None, None) 

1183 finally: 

1184 self.__in_begin = False 

1185 

1186 def _prepare_twophase_impl(self, xid: Any) -> None: 

1187 if self._has_events or self.engine._has_events: 

1188 self.dispatch.prepare_twophase(self, xid) 

1189 

1190 assert isinstance(self._transaction, TwoPhaseTransaction) 

1191 try: 

1192 self.engine.dialect.do_prepare_twophase(self, xid) 

1193 except BaseException as e: 

1194 self._handle_dbapi_exception(e, None, None, None, None) 

1195 

1196 def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1197 if self._has_events or self.engine._has_events: 

1198 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1199 

1200 if self._still_open_and_dbapi_connection_is_valid: 

1201 assert isinstance(self._transaction, TwoPhaseTransaction) 

1202 try: 

1203 self.engine.dialect.do_rollback_twophase( 

1204 self, xid, is_prepared 

1205 ) 

1206 except BaseException as e: 

1207 self._handle_dbapi_exception(e, None, None, None, None) 

1208 

1209 def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 

1210 if self._has_events or self.engine._has_events: 

1211 self.dispatch.commit_twophase(self, xid, is_prepared) 

1212 

1213 assert isinstance(self._transaction, TwoPhaseTransaction) 

1214 try: 

1215 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1216 except BaseException as e: 

1217 self._handle_dbapi_exception(e, None, None, None, None) 

1218 

1219 def close(self) -> None: 

1220 """Close this :class:`_engine.Connection`. 

1221 

1222 This results in a release of the underlying database 

1223 resources, that is, the DBAPI connection referenced 

1224 internally. The DBAPI connection is typically restored 

1225 back to the connection-holding :class:`_pool.Pool` referenced 

1226 by the :class:`_engine.Engine` that produced this 

1227 :class:`_engine.Connection`. Any transactional state present on 

1228 the DBAPI connection is also unconditionally released via 

1229 the DBAPI connection's ``rollback()`` method, regardless 

1230 of any :class:`.Transaction` object that may be 

1231 outstanding with regards to this :class:`_engine.Connection`. 

1232 

1233 This has the effect of also calling :meth:`_engine.Connection.rollback` 

1234 if any transaction is in place. 

1235 

1236 After :meth:`_engine.Connection.close` is called, the 

1237 :class:`_engine.Connection` is permanently in a closed state, 

1238 and will allow no further operations. 

1239 

1240 """ 

1241 

1242 if self._transaction: 

1243 self._transaction.close() 

1244 skip_reset = True 

1245 else: 

1246 skip_reset = False 

1247 

1248 if self._dbapi_connection is not None: 

1249 conn = self._dbapi_connection 

1250 

1251 # as we just closed the transaction, close the connection 

1252 # pool connection without doing an additional reset 

1253 if skip_reset: 

1254 cast("_ConnectionFairy", conn)._close_special( 

1255 transaction_reset=True 

1256 ) 

1257 else: 

1258 conn.close() 

1259 

1260 # There is a slight chance that conn.close() may have 

1261 # triggered an invalidation here in which case 

1262 # _dbapi_connection would already be None, however usually 

1263 # it will be non-None here and in a "closed" state. 

1264 self._dbapi_connection = None 

1265 self.__can_reconnect = False 

1266 

1267 @overload 

1268 def scalar( 

1269 self, 

1270 statement: TypedReturnsRows[Tuple[_T]], 

1271 parameters: Optional[_CoreSingleExecuteParams] = None, 

1272 *, 

1273 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1274 ) -> Optional[_T]: ... 

1275 

1276 @overload 

1277 def scalar( 

1278 self, 

1279 statement: Executable, 

1280 parameters: Optional[_CoreSingleExecuteParams] = None, 

1281 *, 

1282 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1283 ) -> Any: ... 

1284 

1285 def scalar( 

1286 self, 

1287 statement: Executable, 

1288 parameters: Optional[_CoreSingleExecuteParams] = None, 

1289 *, 

1290 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1291 ) -> Any: 

1292 r"""Executes a SQL statement construct and returns a scalar object. 

1293 

1294 This method is shorthand for invoking the 

1295 :meth:`_engine.Result.scalar` method after invoking the 

1296 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

1297 

1298 :return: a scalar Python value representing the first column of the 

1299 first row returned. 

1300 

1301 """ 

1302 distilled_parameters = _distill_params_20(parameters) 

1303 try: 

1304 meth = statement._execute_on_scalar 

1305 except AttributeError as err: 

1306 raise exc.ObjectNotExecutableError(statement) from err 

1307 else: 

1308 return meth( 

1309 self, 

1310 distilled_parameters, 

1311 execution_options or NO_OPTIONS, 

1312 ) 

1313 

1314 @overload 

1315 def scalars( 

1316 self, 

1317 statement: TypedReturnsRows[Tuple[_T]], 

1318 parameters: Optional[_CoreAnyExecuteParams] = None, 

1319 *, 

1320 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1321 ) -> ScalarResult[_T]: ... 

1322 

1323 @overload 

1324 def scalars( 

1325 self, 

1326 statement: Executable, 

1327 parameters: Optional[_CoreAnyExecuteParams] = None, 

1328 *, 

1329 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1330 ) -> ScalarResult[Any]: ... 

1331 

1332 def scalars( 

1333 self, 

1334 statement: Executable, 

1335 parameters: Optional[_CoreAnyExecuteParams] = None, 

1336 *, 

1337 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1338 ) -> ScalarResult[Any]: 

1339 """Executes and returns a scalar result set, which yields scalar values 

1340 from the first column of each row. 

1341 

1342 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1343 to receive a :class:`_result.Result` object, then invoking the 

1344 :meth:`_result.Result.scalars` method to produce a 

1345 :class:`_result.ScalarResult` instance. 

1346 

1347 :return: a :class:`_result.ScalarResult` 

1348 

1349 .. versionadded:: 1.4.24 

1350 

1351 """ 

1352 

1353 return self.execute( 

1354 statement, parameters, execution_options=execution_options 

1355 ).scalars() 

1356 

1357 @overload 

1358 def execute( 

1359 self, 

1360 statement: TypedReturnsRows[_T], 

1361 parameters: Optional[_CoreAnyExecuteParams] = None, 

1362 *, 

1363 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1364 ) -> CursorResult[_T]: ... 

1365 

1366 @overload 

1367 def execute( 

1368 self, 

1369 statement: Executable, 

1370 parameters: Optional[_CoreAnyExecuteParams] = None, 

1371 *, 

1372 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1373 ) -> CursorResult[Any]: ... 

1374 

1375 def execute( 

1376 self, 

1377 statement: Executable, 

1378 parameters: Optional[_CoreAnyExecuteParams] = None, 

1379 *, 

1380 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1381 ) -> CursorResult[Any]: 

1382 r"""Executes a SQL statement construct and returns a 

1383 :class:`_engine.CursorResult`. 

1384 

1385 :param statement: The statement to be executed. This is always 

1386 an object that is in both the :class:`_expression.ClauseElement` and 

1387 :class:`_expression.Executable` hierarchies, including: 

1388 

1389 * :class:`_expression.Select` 

1390 * :class:`_expression.Insert`, :class:`_expression.Update`, 

1391 :class:`_expression.Delete` 

1392 * :class:`_expression.TextClause` and 

1393 :class:`_expression.TextualSelect` 

1394 * :class:`_schema.DDL` and objects which inherit from 

1395 :class:`_schema.ExecutableDDLElement` 

1396 

1397 :param parameters: parameters which will be bound into the statement. 

1398 This may be either a dictionary of parameter names to values, 

1399 or a mutable sequence (e.g. a list) of dictionaries. When a 

1400 list of dictionaries is passed, the underlying statement execution 

1401 will make use of the DBAPI ``cursor.executemany()`` method. 

1402 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

1403 method will be used. 

1404 

1405 :param execution_options: optional dictionary of execution options, 

1406 which will be associated with the statement execution. This 

1407 dictionary can provide a subset of the options that are accepted 

1408 by :meth:`_engine.Connection.execution_options`. 

1409 

1410 :return: a :class:`_engine.Result` object. 

1411 

1412 """ 

1413 distilled_parameters = _distill_params_20(parameters) 

1414 try: 

1415 meth = statement._execute_on_connection 

1416 except AttributeError as err: 

1417 raise exc.ObjectNotExecutableError(statement) from err 

1418 else: 

1419 return meth( 

1420 self, 

1421 distilled_parameters, 

1422 execution_options or NO_OPTIONS, 

1423 ) 

1424 

1425 def _execute_function( 

1426 self, 

1427 func: FunctionElement[Any], 

1428 distilled_parameters: _CoreMultiExecuteParams, 

1429 execution_options: CoreExecuteOptionsParameter, 

1430 ) -> CursorResult[Any]: 

1431 """Execute a sql.FunctionElement object.""" 

1432 

1433 return self._execute_clauseelement( 

1434 func.select(), distilled_parameters, execution_options 

1435 ) 

1436 

1437 def _execute_default( 

1438 self, 

1439 default: DefaultGenerator, 

1440 distilled_parameters: _CoreMultiExecuteParams, 

1441 execution_options: CoreExecuteOptionsParameter, 

1442 ) -> Any: 

1443 """Execute a schema.ColumnDefault object.""" 

1444 

1445 execution_options = self._execution_options.merge_with( 

1446 execution_options 

1447 ) 

1448 

1449 event_multiparams: Optional[_CoreMultiExecuteParams] 

1450 event_params: Optional[_CoreAnyExecuteParams] 

1451 

1452 # note for event handlers, the "distilled parameters" which is always 

1453 # a list of dicts is broken out into separate "multiparams" and 

1454 # "params" collections, which allows the handler to distinguish 

1455 # between an executemany and execute style set of parameters. 

1456 if self._has_events or self.engine._has_events: 

1457 ( 

1458 default, 

1459 distilled_parameters, 

1460 event_multiparams, 

1461 event_params, 

1462 ) = self._invoke_before_exec_event( 

1463 default, distilled_parameters, execution_options 

1464 ) 

1465 else: 

1466 event_multiparams = event_params = None 

1467 

1468 try: 

1469 conn = self._dbapi_connection 

1470 if conn is None: 

1471 conn = self._revalidate_connection() 

1472 

1473 dialect = self.dialect 

1474 ctx = dialect.execution_ctx_cls._init_default( 

1475 dialect, self, conn, execution_options 

1476 ) 

1477 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1478 raise 

1479 except BaseException as e: 

1480 self._handle_dbapi_exception(e, None, None, None, None) 

1481 

1482 ret = ctx._exec_default(None, default, None) 

1483 

1484 if self._has_events or self.engine._has_events: 

1485 self.dispatch.after_execute( 

1486 self, 

1487 default, 

1488 event_multiparams, 

1489 event_params, 

1490 execution_options, 

1491 ret, 

1492 ) 

1493 

1494 return ret 

1495 

1496 def _execute_ddl( 

1497 self, 

1498 ddl: ExecutableDDLElement, 

1499 distilled_parameters: _CoreMultiExecuteParams, 

1500 execution_options: CoreExecuteOptionsParameter, 

1501 ) -> CursorResult[Any]: 

1502 """Execute a schema.DDL object.""" 

1503 

1504 exec_opts = ddl._execution_options.merge_with( 

1505 self._execution_options, execution_options 

1506 ) 

1507 

1508 event_multiparams: Optional[_CoreMultiExecuteParams] 

1509 event_params: Optional[_CoreSingleExecuteParams] 

1510 

1511 if self._has_events or self.engine._has_events: 

1512 ( 

1513 ddl, 

1514 distilled_parameters, 

1515 event_multiparams, 

1516 event_params, 

1517 ) = self._invoke_before_exec_event( 

1518 ddl, distilled_parameters, exec_opts 

1519 ) 

1520 else: 

1521 event_multiparams = event_params = None 

1522 

1523 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1524 

1525 dialect = self.dialect 

1526 

1527 compiled = ddl.compile( 

1528 dialect=dialect, schema_translate_map=schema_translate_map 

1529 ) 

1530 ret = self._execute_context( 

1531 dialect, 

1532 dialect.execution_ctx_cls._init_ddl, 

1533 compiled, 

1534 None, 

1535 exec_opts, 

1536 compiled, 

1537 ) 

1538 if self._has_events or self.engine._has_events: 

1539 self.dispatch.after_execute( 

1540 self, 

1541 ddl, 

1542 event_multiparams, 

1543 event_params, 

1544 exec_opts, 

1545 ret, 

1546 ) 

1547 return ret 

1548 

1549 def _invoke_before_exec_event( 

1550 self, 

1551 elem: Any, 

1552 distilled_params: _CoreMultiExecuteParams, 

1553 execution_options: _ExecuteOptions, 

1554 ) -> Tuple[ 

1555 Any, 

1556 _CoreMultiExecuteParams, 

1557 _CoreMultiExecuteParams, 

1558 _CoreSingleExecuteParams, 

1559 ]: 

1560 event_multiparams: _CoreMultiExecuteParams 

1561 event_params: _CoreSingleExecuteParams 

1562 

1563 if len(distilled_params) == 1: 

1564 event_multiparams, event_params = [], distilled_params[0] 

1565 else: 

1566 event_multiparams, event_params = distilled_params, {} 

1567 

1568 for fn in self.dispatch.before_execute: 

1569 elem, event_multiparams, event_params = fn( 

1570 self, 

1571 elem, 

1572 event_multiparams, 

1573 event_params, 

1574 execution_options, 

1575 ) 

1576 

1577 if event_multiparams: 

1578 distilled_params = list(event_multiparams) 

1579 if event_params: 

1580 raise exc.InvalidRequestError( 

1581 "Event handler can't return non-empty multiparams " 

1582 "and params at the same time" 

1583 ) 

1584 elif event_params: 

1585 distilled_params = [event_params] 

1586 else: 

1587 distilled_params = [] 

1588 

1589 return elem, distilled_params, event_multiparams, event_params 

1590 

1591 def _execute_clauseelement( 

1592 self, 

1593 elem: Executable, 

1594 distilled_parameters: _CoreMultiExecuteParams, 

1595 execution_options: CoreExecuteOptionsParameter, 

1596 ) -> CursorResult[Any]: 

1597 """Execute a sql.ClauseElement object.""" 

1598 

1599 execution_options = elem._execution_options.merge_with( 

1600 self._execution_options, execution_options 

1601 ) 

1602 

1603 has_events = self._has_events or self.engine._has_events 

1604 if has_events: 

1605 ( 

1606 elem, 

1607 distilled_parameters, 

1608 event_multiparams, 

1609 event_params, 

1610 ) = self._invoke_before_exec_event( 

1611 elem, distilled_parameters, execution_options 

1612 ) 

1613 

1614 if distilled_parameters: 

1615 # ensure we don't retain a link to the view object for keys() 

1616 # which links to the values, which we don't want to cache 

1617 keys = sorted(distilled_parameters[0]) 

1618 for_executemany = len(distilled_parameters) > 1 

1619 else: 

1620 keys = [] 

1621 for_executemany = False 

1622 

1623 dialect = self.dialect 

1624 

1625 schema_translate_map = execution_options.get( 

1626 "schema_translate_map", None 

1627 ) 

1628 

1629 compiled_cache: Optional[CompiledCacheType] = execution_options.get( 

1630 "compiled_cache", self.engine._compiled_cache 

1631 ) 

1632 

1633 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 

1634 dialect=dialect, 

1635 compiled_cache=compiled_cache, 

1636 column_keys=keys, 

1637 for_executemany=for_executemany, 

1638 schema_translate_map=schema_translate_map, 

1639 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1640 ) 

1641 ret = self._execute_context( 

1642 dialect, 

1643 dialect.execution_ctx_cls._init_compiled, 

1644 compiled_sql, 

1645 distilled_parameters, 

1646 execution_options, 

1647 compiled_sql, 

1648 distilled_parameters, 

1649 elem, 

1650 extracted_params, 

1651 cache_hit=cache_hit, 

1652 ) 

1653 if has_events: 

1654 self.dispatch.after_execute( 

1655 self, 

1656 elem, 

1657 event_multiparams, 

1658 event_params, 

1659 execution_options, 

1660 ret, 

1661 ) 

1662 return ret 

1663 

1664 def _execute_compiled( 

1665 self, 

1666 compiled: Compiled, 

1667 distilled_parameters: _CoreMultiExecuteParams, 

1668 execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS, 

1669 ) -> CursorResult[Any]: 

1670 """Execute a sql.Compiled object. 

1671 

1672 TODO: why do we have this? likely deprecate or remove 

1673 

1674 """ 

1675 

1676 execution_options = compiled.execution_options.merge_with( 

1677 self._execution_options, execution_options 

1678 ) 

1679 

1680 if self._has_events or self.engine._has_events: 

1681 ( 

1682 compiled, 

1683 distilled_parameters, 

1684 event_multiparams, 

1685 event_params, 

1686 ) = self._invoke_before_exec_event( 

1687 compiled, distilled_parameters, execution_options 

1688 ) 

1689 

1690 dialect = self.dialect 

1691 

1692 ret = self._execute_context( 

1693 dialect, 

1694 dialect.execution_ctx_cls._init_compiled, 

1695 compiled, 

1696 distilled_parameters, 

1697 execution_options, 

1698 compiled, 

1699 distilled_parameters, 

1700 None, 

1701 None, 

1702 ) 

1703 if self._has_events or self.engine._has_events: 

1704 self.dispatch.after_execute( 

1705 self, 

1706 compiled, 

1707 event_multiparams, 

1708 event_params, 

1709 execution_options, 

1710 ret, 

1711 ) 

1712 return ret 

1713 

1714 def exec_driver_sql( 

1715 self, 

1716 statement: str, 

1717 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

1718 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

1719 ) -> CursorResult[Any]: 

1720 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1721 without any SQL compilation steps. 

1722 

1723 This can be used to pass any string directly to the 

1724 ``cursor.execute()`` method of the DBAPI in use. 

1725 

1726 :param statement: The statement str to be executed. Bound parameters 

1727 must use the underlying DBAPI's paramstyle, such as "qmark", 

1728 "pyformat", "format", etc. 

1729 

1730 :param parameters: represent bound parameter values to be used in the 

1731 execution. The format is one of: a dictionary of named parameters, 

1732 a tuple of positional parameters, or a list containing either 

1733 dictionaries or tuples for multiple-execute support. 

1734 

1735 :return: a :class:`_engine.CursorResult`. 

1736 

1737 E.g. multiple dictionaries:: 

1738 

1739 

1740 conn.exec_driver_sql( 

1741 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1742 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}], 

1743 ) 

1744 

1745 Single dictionary:: 

1746 

1747 conn.exec_driver_sql( 

1748 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1749 dict(id=1, value="v1"), 

1750 ) 

1751 

1752 Single tuple:: 

1753 

1754 conn.exec_driver_sql( 

1755 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1") 

1756 ) 

1757 

1758 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1759 not participate in the 

1760 :meth:`_events.ConnectionEvents.before_execute` and 

1761 :meth:`_events.ConnectionEvents.after_execute` events. To 

1762 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1763 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1764 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1765 

1766 .. seealso:: 

1767 

1768 :pep:`249` 

1769 

1770 """ 

1771 

1772 distilled_parameters = _distill_raw_params(parameters) 

1773 

1774 execution_options = self._execution_options.merge_with( 

1775 execution_options 

1776 ) 

1777 

1778 dialect = self.dialect 

1779 ret = self._execute_context( 

1780 dialect, 

1781 dialect.execution_ctx_cls._init_statement, 

1782 statement, 

1783 None, 

1784 execution_options, 

1785 statement, 

1786 distilled_parameters, 

1787 ) 

1788 

1789 return ret 

1790 

1791 def _execute_context( 

1792 self, 

1793 dialect: Dialect, 

1794 constructor: Callable[..., ExecutionContext], 

1795 statement: Union[str, Compiled], 

1796 parameters: Optional[_AnyMultiExecuteParams], 

1797 execution_options: _ExecuteOptions, 

1798 *args: Any, 

1799 **kw: Any, 

1800 ) -> CursorResult[Any]: 

1801 """Create an :class:`.ExecutionContext` and execute, returning 

1802 a :class:`_engine.CursorResult`.""" 

1803 

1804 if execution_options: 

1805 yp = execution_options.get("yield_per", None) 

1806 if yp: 

1807 execution_options = execution_options.union( 

1808 {"stream_results": True, "max_row_buffer": yp} 

1809 ) 

1810 try: 

1811 conn = self._dbapi_connection 

1812 if conn is None: 

1813 conn = self._revalidate_connection() 

1814 

1815 context = constructor( 

1816 dialect, self, conn, execution_options, *args, **kw 

1817 ) 

1818 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1819 raise 

1820 except BaseException as e: 

1821 self._handle_dbapi_exception( 

1822 e, str(statement), parameters, None, None 

1823 ) 

1824 

1825 if ( 

1826 self._transaction 

1827 and not self._transaction.is_active 

1828 or ( 

1829 self._nested_transaction 

1830 and not self._nested_transaction.is_active 

1831 ) 

1832 ): 

1833 self._invalid_transaction() 

1834 

1835 elif self._trans_context_manager: 

1836 TransactionalContext._trans_ctx_check(self) 

1837 

1838 if self._transaction is None: 

1839 self._autobegin() 

1840 

1841 context.pre_exec() 

1842 

1843 if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 

1844 return self._exec_insertmany_context(dialect, context) 

1845 else: 

1846 return self._exec_single_context( 

1847 dialect, context, statement, parameters 

1848 ) 

1849 

1850 def _exec_single_context( 

1851 self, 

1852 dialect: Dialect, 

1853 context: ExecutionContext, 

1854 statement: Union[str, Compiled], 

1855 parameters: Optional[_AnyMultiExecuteParams], 

1856 ) -> CursorResult[Any]: 

1857 """continue the _execute_context() method for a single DBAPI 

1858 cursor.execute() or cursor.executemany() call. 

1859 

1860 """ 

1861 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

1862 generic_setinputsizes = context._prepare_set_input_sizes() 

1863 

1864 if generic_setinputsizes: 

1865 try: 

1866 dialect.do_set_input_sizes( 

1867 context.cursor, generic_setinputsizes, context 

1868 ) 

1869 except BaseException as e: 

1870 self._handle_dbapi_exception( 

1871 e, str(statement), parameters, None, context 

1872 ) 

1873 

1874 cursor, str_statement, parameters = ( 

1875 context.cursor, 

1876 context.statement, 

1877 context.parameters, 

1878 ) 

1879 

1880 effective_parameters: Optional[_AnyExecuteParams] 

1881 

1882 if not context.executemany: 

1883 effective_parameters = parameters[0] 

1884 else: 

1885 effective_parameters = parameters 

1886 

1887 if self._has_events or self.engine._has_events: 

1888 for fn in self.dispatch.before_cursor_execute: 

1889 str_statement, effective_parameters = fn( 

1890 self, 

1891 cursor, 

1892 str_statement, 

1893 effective_parameters, 

1894 context, 

1895 context.executemany, 

1896 ) 

1897 

1898 if self._echo: 

1899 self._log_info(str_statement) 

1900 

1901 stats = context._get_cache_stats() 

1902 

1903 if not self.engine.hide_parameters: 

1904 self._log_info( 

1905 "[%s] %r", 

1906 stats, 

1907 sql_util._repr_params( 

1908 effective_parameters, 

1909 batches=10, 

1910 ismulti=context.executemany, 

1911 ), 

1912 ) 

1913 else: 

1914 self._log_info( 

1915 "[%s] [SQL parameters hidden due to hide_parameters=True]", 

1916 stats, 

1917 ) 

1918 

1919 evt_handled: bool = False 

1920 try: 

1921 if context.execute_style is ExecuteStyle.EXECUTEMANY: 

1922 effective_parameters = cast( 

1923 "_CoreMultiExecuteParams", effective_parameters 

1924 ) 

1925 if self.dialect._has_events: 

1926 for fn in self.dialect.dispatch.do_executemany: 

1927 if fn( 

1928 cursor, 

1929 str_statement, 

1930 effective_parameters, 

1931 context, 

1932 ): 

1933 evt_handled = True 

1934 break 

1935 if not evt_handled: 

1936 self.dialect.do_executemany( 

1937 cursor, 

1938 str_statement, 

1939 effective_parameters, 

1940 context, 

1941 ) 

1942 elif not effective_parameters and context.no_parameters: 

1943 if self.dialect._has_events: 

1944 for fn in self.dialect.dispatch.do_execute_no_params: 

1945 if fn(cursor, str_statement, context): 

1946 evt_handled = True 

1947 break 

1948 if not evt_handled: 

1949 self.dialect.do_execute_no_params( 

1950 cursor, str_statement, context 

1951 ) 

1952 else: 

1953 effective_parameters = cast( 

1954 "_CoreSingleExecuteParams", effective_parameters 

1955 ) 

1956 if self.dialect._has_events: 

1957 for fn in self.dialect.dispatch.do_execute: 

1958 if fn( 

1959 cursor, 

1960 str_statement, 

1961 effective_parameters, 

1962 context, 

1963 ): 

1964 evt_handled = True 

1965 break 

1966 if not evt_handled: 

1967 self.dialect.do_execute( 

1968 cursor, str_statement, effective_parameters, context 

1969 ) 

1970 

1971 if self._has_events or self.engine._has_events: 

1972 self.dispatch.after_cursor_execute( 

1973 self, 

1974 cursor, 

1975 str_statement, 

1976 effective_parameters, 

1977 context, 

1978 context.executemany, 

1979 ) 

1980 

1981 context.post_exec() 

1982 

1983 result = context._setup_result_proxy() 

1984 

1985 except BaseException as e: 

1986 self._handle_dbapi_exception( 

1987 e, str_statement, effective_parameters, cursor, context 

1988 ) 

1989 

1990 return result 

1991 

1992 def _exec_insertmany_context( 

1993 self, 

1994 dialect: Dialect, 

1995 context: ExecutionContext, 

1996 ) -> CursorResult[Any]: 

1997 """continue the _execute_context() method for an "insertmanyvalues" 

1998 operation, which will invoke DBAPI 

1999 cursor.execute() one or more times with individual log and 

2000 event hook calls. 

2001 

2002 """ 

2003 

2004 if dialect.bind_typing is BindTyping.SETINPUTSIZES: 

2005 generic_setinputsizes = context._prepare_set_input_sizes() 

2006 else: 

2007 generic_setinputsizes = None 

2008 

2009 cursor, str_statement, parameters = ( 

2010 context.cursor, 

2011 context.statement, 

2012 context.parameters, 

2013 ) 

2014 

2015 effective_parameters = parameters 

2016 

2017 engine_events = self._has_events or self.engine._has_events 

2018 if self.dialect._has_events: 

2019 do_execute_dispatch: Iterable[Any] = ( 

2020 self.dialect.dispatch.do_execute 

2021 ) 

2022 else: 

2023 do_execute_dispatch = () 

2024 

2025 if self._echo: 

2026 stats = context._get_cache_stats() + " (insertmanyvalues)" 

2027 

2028 preserve_rowcount = context.execution_options.get( 

2029 "preserve_rowcount", False 

2030 ) 

2031 rowcount = 0 

2032 

2033 for imv_batch in dialect._deliver_insertmanyvalues_batches( 

2034 self, 

2035 cursor, 

2036 str_statement, 

2037 effective_parameters, 

2038 generic_setinputsizes, 

2039 context, 

2040 ): 

2041 if imv_batch.processed_setinputsizes: 

2042 try: 

2043 dialect.do_set_input_sizes( 

2044 context.cursor, 

2045 imv_batch.processed_setinputsizes, 

2046 context, 

2047 ) 

2048 except BaseException as e: 

2049 self._handle_dbapi_exception( 

2050 e, 

2051 sql_util._long_statement(imv_batch.replaced_statement), 

2052 imv_batch.replaced_parameters, 

2053 None, 

2054 context, 

2055 is_sub_exec=True, 

2056 ) 

2057 

2058 sub_stmt = imv_batch.replaced_statement 

2059 sub_params = imv_batch.replaced_parameters 

2060 

2061 if engine_events: 

2062 for fn in self.dispatch.before_cursor_execute: 

2063 sub_stmt, sub_params = fn( 

2064 self, 

2065 cursor, 

2066 sub_stmt, 

2067 sub_params, 

2068 context, 

2069 True, 

2070 ) 

2071 

2072 if self._echo: 

2073 self._log_info(sql_util._long_statement(sub_stmt)) 

2074 

2075 imv_stats = f""" {imv_batch.batchnum}/{ 

2076 imv_batch.total_batches 

2077 } ({ 

2078 'ordered' 

2079 if imv_batch.rows_sorted else 'unordered' 

2080 }{ 

2081 '; batch not supported' 

2082 if imv_batch.is_downgraded 

2083 else '' 

2084 })""" 

2085 

2086 if imv_batch.batchnum == 1: 

2087 stats += imv_stats 

2088 else: 

2089 stats = f"insertmanyvalues{imv_stats}" 

2090 

2091 if not self.engine.hide_parameters: 

2092 self._log_info( 

2093 "[%s] %r", 

2094 stats, 

2095 sql_util._repr_params( 

2096 sub_params, 

2097 batches=10, 

2098 ismulti=False, 

2099 ), 

2100 ) 

2101 else: 

2102 self._log_info( 

2103 "[%s] [SQL parameters hidden due to " 

2104 "hide_parameters=True]", 

2105 stats, 

2106 ) 

2107 

2108 try: 

2109 for fn in do_execute_dispatch: 

2110 if fn( 

2111 cursor, 

2112 sub_stmt, 

2113 sub_params, 

2114 context, 

2115 ): 

2116 break 

2117 else: 

2118 dialect.do_execute( 

2119 cursor, 

2120 sub_stmt, 

2121 sub_params, 

2122 context, 

2123 ) 

2124 

2125 except BaseException as e: 

2126 self._handle_dbapi_exception( 

2127 e, 

2128 sql_util._long_statement(sub_stmt), 

2129 sub_params, 

2130 cursor, 

2131 context, 

2132 is_sub_exec=True, 

2133 ) 

2134 

2135 if engine_events: 

2136 self.dispatch.after_cursor_execute( 

2137 self, 

2138 cursor, 

2139 str_statement, 

2140 effective_parameters, 

2141 context, 

2142 context.executemany, 

2143 ) 

2144 

2145 if preserve_rowcount: 

2146 rowcount += imv_batch.current_batch_size 

2147 

2148 try: 

2149 context.post_exec() 

2150 

2151 if preserve_rowcount: 

2152 context._rowcount = rowcount # type: ignore[attr-defined] 

2153 

2154 result = context._setup_result_proxy() 

2155 

2156 except BaseException as e: 

2157 self._handle_dbapi_exception( 

2158 e, str_statement, effective_parameters, cursor, context 

2159 ) 

2160 

2161 return result 

2162 

2163 def _cursor_execute( 

2164 self, 

2165 cursor: DBAPICursor, 

2166 statement: str, 

2167 parameters: _DBAPISingleExecuteParams, 

2168 context: Optional[ExecutionContext] = None, 

2169 ) -> None: 

2170 """Execute a statement + params on the given cursor. 

2171 

2172 Adds appropriate logging and exception handling. 

2173 

2174 This method is used by DefaultDialect for special-case 

2175 executions, such as for sequences and column defaults. 

2176 The path of statement execution in the majority of cases 

2177 terminates at _execute_context(). 

2178 

2179 """ 

2180 if self._has_events or self.engine._has_events: 

2181 for fn in self.dispatch.before_cursor_execute: 

2182 statement, parameters = fn( 

2183 self, cursor, statement, parameters, context, False 

2184 ) 

2185 

2186 if self._echo: 

2187 self._log_info(statement) 

2188 self._log_info("[raw sql] %r", parameters) 

2189 try: 

2190 for fn in ( 

2191 () 

2192 if not self.dialect._has_events 

2193 else self.dialect.dispatch.do_execute 

2194 ): 

2195 if fn(cursor, statement, parameters, context): 

2196 break 

2197 else: 

2198 self.dialect.do_execute(cursor, statement, parameters, context) 

2199 except BaseException as e: 

2200 self._handle_dbapi_exception( 

2201 e, statement, parameters, cursor, context 

2202 ) 

2203 

2204 if self._has_events or self.engine._has_events: 

2205 self.dispatch.after_cursor_execute( 

2206 self, cursor, statement, parameters, context, False 

2207 ) 

2208 

2209 def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 

2210 """Close the given cursor, catching exceptions 

2211 and turning into log warnings. 

2212 

2213 """ 

2214 try: 

2215 cursor.close() 

2216 except Exception: 

2217 # log the error through the connection pool's logger. 

2218 self.engine.pool.logger.error( 

2219 "Error closing cursor", exc_info=True 

2220 ) 

2221 

2222 _reentrant_error = False 

2223 _is_disconnect = False 

2224 

2225 def _handle_dbapi_exception( 

2226 self, 

2227 e: BaseException, 

2228 statement: Optional[str], 

2229 parameters: Optional[_AnyExecuteParams], 

2230 cursor: Optional[DBAPICursor], 

2231 context: Optional[ExecutionContext], 

2232 is_sub_exec: bool = False, 

2233 ) -> NoReturn: 

2234 exc_info = sys.exc_info() 

2235 

2236 is_exit_exception = util.is_exit_exception(e) 

2237 

2238 if not self._is_disconnect: 

2239 self._is_disconnect = ( 

2240 isinstance(e, self.dialect.loaded_dbapi.Error) 

2241 and not self.closed 

2242 and self.dialect.is_disconnect( 

2243 e, 

2244 self._dbapi_connection if not self.invalidated else None, 

2245 cursor, 

2246 ) 

2247 ) or (is_exit_exception and not self.closed) 

2248 

2249 invalidate_pool_on_disconnect = not is_exit_exception 

2250 

2251 ismulti: bool = ( 

2252 not is_sub_exec and context.executemany 

2253 if context is not None 

2254 else False 

2255 ) 

2256 if self._reentrant_error: 

2257 raise exc.DBAPIError.instance( 

2258 statement, 

2259 parameters, 

2260 e, 

2261 self.dialect.loaded_dbapi.Error, 

2262 hide_parameters=self.engine.hide_parameters, 

2263 dialect=self.dialect, 

2264 ismulti=ismulti, 

2265 ).with_traceback(exc_info[2]) from e 

2266 self._reentrant_error = True 

2267 try: 

2268 # non-DBAPI error - if we already got a context, 

2269 # or there's no string statement, don't wrap it 

2270 should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 

2271 statement is not None 

2272 and context is None 

2273 and not is_exit_exception 

2274 ) 

2275 

2276 if should_wrap: 

2277 sqlalchemy_exception = exc.DBAPIError.instance( 

2278 statement, 

2279 parameters, 

2280 cast(Exception, e), 

2281 self.dialect.loaded_dbapi.Error, 

2282 hide_parameters=self.engine.hide_parameters, 

2283 connection_invalidated=self._is_disconnect, 

2284 dialect=self.dialect, 

2285 ismulti=ismulti, 

2286 ) 

2287 else: 

2288 sqlalchemy_exception = None 

2289 

2290 newraise = None 

2291 

2292 if (self.dialect._has_events) and not self._execution_options.get( 

2293 "skip_user_error_events", False 

2294 ): 

2295 ctx = ExceptionContextImpl( 

2296 e, 

2297 sqlalchemy_exception, 

2298 self.engine, 

2299 self.dialect, 

2300 self, 

2301 cursor, 

2302 statement, 

2303 parameters, 

2304 context, 

2305 self._is_disconnect, 

2306 invalidate_pool_on_disconnect, 

2307 False, 

2308 ) 

2309 

2310 for fn in self.dialect.dispatch.handle_error: 

2311 try: 

2312 # handler returns an exception; 

2313 # call next handler in a chain 

2314 per_fn = fn(ctx) 

2315 if per_fn is not None: 

2316 ctx.chained_exception = newraise = per_fn 

2317 except Exception as _raised: 

2318 # handler raises an exception - stop processing 

2319 newraise = _raised 

2320 break 

2321 

2322 if self._is_disconnect != ctx.is_disconnect: 

2323 self._is_disconnect = ctx.is_disconnect 

2324 if sqlalchemy_exception: 

2325 sqlalchemy_exception.connection_invalidated = ( 

2326 ctx.is_disconnect 

2327 ) 

2328 

2329 # set up potentially user-defined value for 

2330 # invalidate pool. 

2331 invalidate_pool_on_disconnect = ( 

2332 ctx.invalidate_pool_on_disconnect 

2333 ) 

2334 

2335 if should_wrap and context: 

2336 context.handle_dbapi_exception(e) 

2337 

2338 if not self._is_disconnect: 

2339 if cursor: 

2340 self._safe_close_cursor(cursor) 

2341 # "autorollback" was mostly relevant in 1.x series. 

2342 # It's very unlikely to reach here, as the connection 

2343 # does autobegin so when we are here, we are usually 

2344 # in an explicit / semi-explicit transaction. 

2345 # however we have a test which manufactures this 

2346 # scenario in any case using an event handler. 

2347 # test/engine/test_execute.py-> test_actual_autorollback 

2348 if not self.in_transaction(): 

2349 self._rollback_impl() 

2350 

2351 if newraise: 

2352 raise newraise.with_traceback(exc_info[2]) from e 

2353 elif should_wrap: 

2354 assert sqlalchemy_exception is not None 

2355 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2356 else: 

2357 assert exc_info[1] is not None 

2358 raise exc_info[1].with_traceback(exc_info[2]) 

2359 finally: 

2360 del self._reentrant_error 

2361 if self._is_disconnect: 

2362 del self._is_disconnect 

2363 if not self.invalidated: 

2364 dbapi_conn_wrapper = self._dbapi_connection 

2365 assert dbapi_conn_wrapper is not None 

2366 if invalidate_pool_on_disconnect: 

2367 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2368 self.invalidate(e) 

2369 

2370 @classmethod 

2371 def _handle_dbapi_exception_noconnection( 

2372 cls, 

2373 e: BaseException, 

2374 dialect: Dialect, 

2375 engine: Optional[Engine] = None, 

2376 is_disconnect: Optional[bool] = None, 

2377 invalidate_pool_on_disconnect: bool = True, 

2378 is_pre_ping: bool = False, 

2379 ) -> NoReturn: 

2380 exc_info = sys.exc_info() 

2381 

2382 if is_disconnect is None: 

2383 is_disconnect = isinstance( 

2384 e, dialect.loaded_dbapi.Error 

2385 ) and dialect.is_disconnect(e, None, None) 

2386 

2387 should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 

2388 

2389 if should_wrap: 

2390 sqlalchemy_exception = exc.DBAPIError.instance( 

2391 None, 

2392 None, 

2393 cast(Exception, e), 

2394 dialect.loaded_dbapi.Error, 

2395 hide_parameters=( 

2396 engine.hide_parameters if engine is not None else False 

2397 ), 

2398 connection_invalidated=is_disconnect, 

2399 dialect=dialect, 

2400 ) 

2401 else: 

2402 sqlalchemy_exception = None 

2403 

2404 newraise = None 

2405 

2406 if dialect._has_events: 

2407 ctx = ExceptionContextImpl( 

2408 e, 

2409 sqlalchemy_exception, 

2410 engine, 

2411 dialect, 

2412 None, 

2413 None, 

2414 None, 

2415 None, 

2416 None, 

2417 is_disconnect, 

2418 invalidate_pool_on_disconnect, 

2419 is_pre_ping, 

2420 ) 

2421 for fn in dialect.dispatch.handle_error: 

2422 try: 

2423 # handler returns an exception; 

2424 # call next handler in a chain 

2425 per_fn = fn(ctx) 

2426 if per_fn is not None: 

2427 ctx.chained_exception = newraise = per_fn 

2428 except Exception as _raised: 

2429 # handler raises an exception - stop processing 

2430 newraise = _raised 

2431 break 

2432 

2433 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2434 sqlalchemy_exception.connection_invalidated = ctx.is_disconnect 

2435 

2436 if newraise: 

2437 raise newraise.with_traceback(exc_info[2]) from e 

2438 elif should_wrap: 

2439 assert sqlalchemy_exception is not None 

2440 raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 

2441 else: 

2442 assert exc_info[1] is not None 

2443 raise exc_info[1].with_traceback(exc_info[2]) 

2444 

2445 def _run_ddl_visitor( 

2446 self, 

2447 visitorcallable: Type[InvokeDDLBase], 

2448 element: SchemaVisitable, 

2449 **kwargs: Any, 

2450 ) -> None: 

2451 """run a DDL visitor. 

2452 

2453 This method is only here so that the MockConnection can change the 

2454 options given to the visitor so that "checkfirst" is skipped. 

2455 

2456 """ 

2457 visitorcallable( 

2458 dialect=self.dialect, connection=self, **kwargs 

2459 ).traverse_single(element) 

2460 

2461 

2462class ExceptionContextImpl(ExceptionContext): 

2463 """Implement the :class:`.ExceptionContext` interface.""" 

2464 

2465 __slots__ = ( 

2466 "connection", 

2467 "engine", 

2468 "dialect", 

2469 "cursor", 

2470 "statement", 

2471 "parameters", 

2472 "original_exception", 

2473 "sqlalchemy_exception", 

2474 "chained_exception", 

2475 "execution_context", 

2476 "is_disconnect", 

2477 "invalidate_pool_on_disconnect", 

2478 "is_pre_ping", 

2479 ) 

2480 

2481 def __init__( 

2482 self, 

2483 exception: BaseException, 

2484 sqlalchemy_exception: Optional[exc.StatementError], 

2485 engine: Optional[Engine], 

2486 dialect: Dialect, 

2487 connection: Optional[Connection], 

2488 cursor: Optional[DBAPICursor], 

2489 statement: Optional[str], 

2490 parameters: Optional[_DBAPIAnyExecuteParams], 

2491 context: Optional[ExecutionContext], 

2492 is_disconnect: bool, 

2493 invalidate_pool_on_disconnect: bool, 

2494 is_pre_ping: bool, 

2495 ): 

2496 self.engine = engine 

2497 self.dialect = dialect 

2498 self.connection = connection 

2499 self.sqlalchemy_exception = sqlalchemy_exception 

2500 self.original_exception = exception 

2501 self.execution_context = context 

2502 self.statement = statement 

2503 self.parameters = parameters 

2504 self.is_disconnect = is_disconnect 

2505 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2506 self.is_pre_ping = is_pre_ping 

2507 

2508 

2509class Transaction(TransactionalContext): 

2510 """Represent a database transaction in progress. 

2511 

2512 The :class:`.Transaction` object is procured by 

2513 calling the :meth:`_engine.Connection.begin` method of 

2514 :class:`_engine.Connection`:: 

2515 

2516 from sqlalchemy import create_engine 

2517 

2518 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

2519 connection = engine.connect() 

2520 trans = connection.begin() 

2521 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2522 trans.commit() 

2523 

2524 The object provides :meth:`.rollback` and :meth:`.commit` 

2525 methods in order to control transaction boundaries. It 

2526 also implements a context manager interface so that 

2527 the Python ``with`` statement can be used with the 

2528 :meth:`_engine.Connection.begin` method:: 

2529 

2530 with connection.begin(): 

2531 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2532 

2533 The Transaction object is **not** threadsafe. 

2534 

2535 .. seealso:: 

2536 

2537 :meth:`_engine.Connection.begin` 

2538 

2539 :meth:`_engine.Connection.begin_twophase` 

2540 

2541 :meth:`_engine.Connection.begin_nested` 

2542 

2543 .. index:: 

2544 single: thread safety; Transaction 

2545 """ # noqa 

2546 

2547 __slots__ = () 

2548 

2549 _is_root: bool = False 

2550 is_active: bool 

2551 connection: Connection 

2552 

2553 def __init__(self, connection: Connection): 

2554 raise NotImplementedError() 

2555 

2556 @property 

2557 def _deactivated_from_connection(self) -> bool: 

2558 """True if this transaction is totally deactivated from the connection 

2559 and therefore can no longer affect its state. 

2560 

2561 """ 

2562 raise NotImplementedError() 

2563 

2564 def _do_close(self) -> None: 

2565 raise NotImplementedError() 

2566 

2567 def _do_rollback(self) -> None: 

2568 raise NotImplementedError() 

2569 

2570 def _do_commit(self) -> None: 

2571 raise NotImplementedError() 

2572 

2573 @property 

2574 def is_valid(self) -> bool: 

2575 return self.is_active and not self.connection.invalidated 

2576 

2577 def close(self) -> None: 

2578 """Close this :class:`.Transaction`. 

2579 

2580 If this transaction is the base transaction in a begin/commit 

2581 nesting, the transaction will rollback(). Otherwise, the 

2582 method returns. 

2583 

2584 This is used to cancel a Transaction without affecting the scope of 

2585 an enclosing transaction. 

2586 

2587 """ 

2588 try: 

2589 self._do_close() 

2590 finally: 

2591 assert not self.is_active 

2592 

2593 def rollback(self) -> None: 

2594 """Roll back this :class:`.Transaction`. 

2595 

2596 The implementation of this may vary based on the type of transaction in 

2597 use: 

2598 

2599 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2600 it corresponds to a ROLLBACK. 

2601 

2602 * For a :class:`.NestedTransaction`, it corresponds to a 

2603 "ROLLBACK TO SAVEPOINT" operation. 

2604 

2605 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2606 phase transactions may be used. 

2607 

2608 

2609 """ 

2610 try: 

2611 self._do_rollback() 

2612 finally: 

2613 assert not self.is_active 

2614 

2615 def commit(self) -> None: 

2616 """Commit this :class:`.Transaction`. 

2617 

2618 The implementation of this may vary based on the type of transaction in 

2619 use: 

2620 

2621 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2622 it corresponds to a COMMIT. 

2623 

2624 * For a :class:`.NestedTransaction`, it corresponds to a 

2625 "RELEASE SAVEPOINT" operation. 

2626 

2627 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2628 phase transactions may be used. 

2629 

2630 """ 

2631 try: 

2632 self._do_commit() 

2633 finally: 

2634 assert not self.is_active 

2635 

2636 def _get_subject(self) -> Connection: 

2637 return self.connection 

2638 

2639 def _transaction_is_active(self) -> bool: 

2640 return self.is_active 

2641 

2642 def _transaction_is_closed(self) -> bool: 

2643 return not self._deactivated_from_connection 

2644 

2645 def _rollback_can_be_called(self) -> bool: 

2646 # for RootTransaction / NestedTransaction, it's safe to call 

2647 # rollback() even if the transaction is deactive and no warnings 

2648 # will be emitted. tested in 

2649 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2650 return True 

2651 

2652 

2653class RootTransaction(Transaction): 

2654 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2655 

2656 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2657 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2658 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2659 remains associated with the :class:`_engine.Connection` throughout its 

2660 active span. The current :class:`_engine.RootTransaction` in use is 

2661 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2662 :class:`_engine.Connection`. 

2663 

2664 In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 

2665 "autobegin" behavior that will create a new 

2666 :class:`_engine.RootTransaction` whenever a connection in a 

2667 non-transactional state is used to emit commands on the DBAPI connection. 

2668 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2669 use can be controlled using the :meth:`_engine.Connection.commit` and 

2670 :meth:`_engine.Connection.rollback` methods. 

2671 

2672 

2673 """ 

2674 

2675 _is_root = True 

2676 

2677 __slots__ = ("connection", "is_active") 

2678 

2679 def __init__(self, connection: Connection): 

2680 assert connection._transaction is None 

2681 if connection._trans_context_manager: 

2682 TransactionalContext._trans_ctx_check(connection) 

2683 self.connection = connection 

2684 self._connection_begin_impl() 

2685 connection._transaction = self 

2686 

2687 self.is_active = True 

2688 

2689 def _deactivate_from_connection(self) -> None: 

2690 if self.is_active: 

2691 assert self.connection._transaction is self 

2692 self.is_active = False 

2693 

2694 elif self.connection._transaction is not self: 

2695 util.warn("transaction already deassociated from connection") 

2696 

2697 @property 

2698 def _deactivated_from_connection(self) -> bool: 

2699 return self.connection._transaction is not self 

2700 

2701 def _connection_begin_impl(self) -> None: 

2702 self.connection._begin_impl(self) 

2703 

2704 def _connection_rollback_impl(self) -> None: 

2705 self.connection._rollback_impl() 

2706 

2707 def _connection_commit_impl(self) -> None: 

2708 self.connection._commit_impl() 

2709 

2710 def _close_impl(self, try_deactivate: bool = False) -> None: 

2711 try: 

2712 if self.is_active: 

2713 self._connection_rollback_impl() 

2714 

2715 if self.connection._nested_transaction: 

2716 self.connection._nested_transaction._cancel() 

2717 finally: 

2718 if self.is_active or try_deactivate: 

2719 self._deactivate_from_connection() 

2720 if self.connection._transaction is self: 

2721 self.connection._transaction = None 

2722 

2723 assert not self.is_active 

2724 assert self.connection._transaction is not self 

2725 

2726 def _do_close(self) -> None: 

2727 self._close_impl() 

2728 

2729 def _do_rollback(self) -> None: 

2730 self._close_impl(try_deactivate=True) 

2731 

2732 def _do_commit(self) -> None: 

2733 if self.is_active: 

2734 assert self.connection._transaction is self 

2735 

2736 try: 

2737 self._connection_commit_impl() 

2738 finally: 

2739 # whether or not commit succeeds, cancel any 

2740 # nested transactions, make this transaction "inactive" 

2741 # and remove it as a reset agent 

2742 if self.connection._nested_transaction: 

2743 self.connection._nested_transaction._cancel() 

2744 

2745 self._deactivate_from_connection() 

2746 

2747 # ...however only remove as the connection's current transaction 

2748 # if commit succeeded. otherwise it stays on so that a rollback 

2749 # needs to occur. 

2750 self.connection._transaction = None 

2751 else: 

2752 if self.connection._transaction is self: 

2753 self.connection._invalid_transaction() 

2754 else: 

2755 raise exc.InvalidRequestError("This transaction is inactive") 

2756 

2757 assert not self.is_active 

2758 assert self.connection._transaction is not self 

2759 

2760 

2761class NestedTransaction(Transaction): 

2762 """Represent a 'nested', or SAVEPOINT transaction. 

2763 

2764 The :class:`.NestedTransaction` object is created by calling the 

2765 :meth:`_engine.Connection.begin_nested` method of 

2766 :class:`_engine.Connection`. 

2767 

2768 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2769 "commit" / "rollback" are as follows: 

2770 

2771 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2772 the savepoint is given an explicit name that is part of the state 

2773 of this object. 

2774 

2775 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2776 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2777 with this :class:`.NestedTransaction`. 

2778 

2779 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2780 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2781 associated with this :class:`.NestedTransaction`. 

2782 

2783 The rationale for mimicking the semantics of an outer transaction in 

2784 terms of savepoints so that code may deal with a "savepoint" transaction 

2785 and an "outer" transaction in an agnostic way. 

2786 

2787 .. seealso:: 

2788 

2789 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2790 

2791 """ 

2792 

2793 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2794 

2795 _savepoint: str 

2796 

2797 def __init__(self, connection: Connection): 

2798 assert connection._transaction is not None 

2799 if connection._trans_context_manager: 

2800 TransactionalContext._trans_ctx_check(connection) 

2801 self.connection = connection 

2802 self._savepoint = self.connection._savepoint_impl() 

2803 self.is_active = True 

2804 self._previous_nested = connection._nested_transaction 

2805 connection._nested_transaction = self 

2806 

2807 def _deactivate_from_connection(self, warn: bool = True) -> None: 

2808 if self.connection._nested_transaction is self: 

2809 self.connection._nested_transaction = self._previous_nested 

2810 elif warn: 

2811 util.warn( 

2812 "nested transaction already deassociated from connection" 

2813 ) 

2814 

2815 @property 

2816 def _deactivated_from_connection(self) -> bool: 

2817 return self.connection._nested_transaction is not self 

2818 

2819 def _cancel(self) -> None: 

2820 # called by RootTransaction when the outer transaction is 

2821 # committed, rolled back, or closed to cancel all savepoints 

2822 # without any action being taken 

2823 self.is_active = False 

2824 self._deactivate_from_connection() 

2825 if self._previous_nested: 

2826 self._previous_nested._cancel() 

2827 

2828 def _close_impl( 

2829 self, deactivate_from_connection: bool, warn_already_deactive: bool 

2830 ) -> None: 

2831 try: 

2832 if ( 

2833 self.is_active 

2834 and self.connection._transaction 

2835 and self.connection._transaction.is_active 

2836 ): 

2837 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2838 finally: 

2839 self.is_active = False 

2840 

2841 if deactivate_from_connection: 

2842 self._deactivate_from_connection(warn=warn_already_deactive) 

2843 

2844 assert not self.is_active 

2845 if deactivate_from_connection: 

2846 assert self.connection._nested_transaction is not self 

2847 

2848 def _do_close(self) -> None: 

2849 self._close_impl(True, False) 

2850 

2851 def _do_rollback(self) -> None: 

2852 self._close_impl(True, True) 

2853 

2854 def _do_commit(self) -> None: 

2855 if self.is_active: 

2856 try: 

2857 self.connection._release_savepoint_impl(self._savepoint) 

2858 finally: 

2859 # nested trans becomes inactive on failed release 

2860 # unconditionally. this prevents it from trying to 

2861 # emit SQL when it rolls back. 

2862 self.is_active = False 

2863 

2864 # but only de-associate from connection if it succeeded 

2865 self._deactivate_from_connection() 

2866 else: 

2867 if self.connection._nested_transaction is self: 

2868 self.connection._invalid_transaction() 

2869 else: 

2870 raise exc.InvalidRequestError( 

2871 "This nested transaction is inactive" 

2872 ) 

2873 

2874 

2875class TwoPhaseTransaction(RootTransaction): 

2876 """Represent a two-phase transaction. 

2877 

2878 A new :class:`.TwoPhaseTransaction` object may be procured 

2879 using the :meth:`_engine.Connection.begin_twophase` method. 

2880 

2881 The interface is the same as that of :class:`.Transaction` 

2882 with the addition of the :meth:`prepare` method. 

2883 

2884 """ 

2885 

2886 __slots__ = ("xid", "_is_prepared") 

2887 

2888 xid: Any 

2889 

2890 def __init__(self, connection: Connection, xid: Any): 

2891 self._is_prepared = False 

2892 self.xid = xid 

2893 super().__init__(connection) 

2894 

2895 def prepare(self) -> None: 

2896 """Prepare this :class:`.TwoPhaseTransaction`. 

2897 

2898 After a PREPARE, the transaction can be committed. 

2899 

2900 """ 

2901 if not self.is_active: 

2902 raise exc.InvalidRequestError("This transaction is inactive") 

2903 self.connection._prepare_twophase_impl(self.xid) 

2904 self._is_prepared = True 

2905 

2906 def _connection_begin_impl(self) -> None: 

2907 self.connection._begin_twophase_impl(self) 

2908 

2909 def _connection_rollback_impl(self) -> None: 

2910 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2911 

2912 def _connection_commit_impl(self) -> None: 

2913 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2914 

2915 

2916class Engine( 

2917 ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 

2918): 

2919 """ 

2920 Connects a :class:`~sqlalchemy.pool.Pool` and 

2921 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2922 source of database connectivity and behavior. 

2923 

2924 An :class:`_engine.Engine` object is instantiated publicly using the 

2925 :func:`~sqlalchemy.create_engine` function. 

2926 

2927 .. seealso:: 

2928 

2929 :doc:`/core/engines` 

2930 

2931 :ref:`connections_toplevel` 

2932 

2933 """ 

2934 

2935 dispatch: dispatcher[ConnectionEventsTarget] 

2936 

2937 _compiled_cache: Optional[CompiledCacheType] 

2938 

2939 _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 

2940 _has_events: bool = False 

2941 _connection_cls: Type[Connection] = Connection 

2942 _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 

2943 _is_future: bool = False 

2944 

2945 _schema_translate_map: Optional[SchemaTranslateMapType] = None 

2946 _option_cls: Type[OptionEngine] 

2947 

2948 dialect: Dialect 

2949 pool: Pool 

2950 url: URL 

2951 hide_parameters: bool 

2952 

2953 def __init__( 

2954 self, 

2955 pool: Pool, 

2956 dialect: Dialect, 

2957 url: URL, 

2958 logging_name: Optional[str] = None, 

2959 echo: Optional[_EchoFlagType] = None, 

2960 query_cache_size: int = 500, 

2961 execution_options: Optional[Mapping[str, Any]] = None, 

2962 hide_parameters: bool = False, 

2963 ): 

2964 self.pool = pool 

2965 self.url = url 

2966 self.dialect = dialect 

2967 if logging_name: 

2968 self.logging_name = logging_name 

2969 self.echo = echo 

2970 self.hide_parameters = hide_parameters 

2971 if query_cache_size != 0: 

2972 self._compiled_cache = util.LRUCache( 

2973 query_cache_size, size_alert=self._lru_size_alert 

2974 ) 

2975 else: 

2976 self._compiled_cache = None 

2977 log.instance_logger(self, echoflag=echo) 

2978 if execution_options: 

2979 self.update_execution_options(**execution_options) 

2980 

2981 def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 

2982 if self._should_log_info(): 

2983 self.logger.info( 

2984 "Compiled cache size pruning from %d items to %d. " 

2985 "Increase cache size to reduce the frequency of pruning.", 

2986 len(cache), 

2987 cache.capacity, 

2988 ) 

2989 

2990 @property 

2991 def engine(self) -> Engine: 

2992 """Returns this :class:`.Engine`. 

2993 

2994 Used for legacy schemes that accept :class:`.Connection` / 

2995 :class:`.Engine` objects within the same variable. 

2996 

2997 """ 

2998 return self 

2999 

3000 def clear_compiled_cache(self) -> None: 

3001 """Clear the compiled cache associated with the dialect. 

3002 

3003 This applies **only** to the built-in cache that is established 

3004 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

3005 It will not impact any dictionary caches that were passed via the 

3006 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

3007 

3008 .. versionadded:: 1.4 

3009 

3010 """ 

3011 if self._compiled_cache: 

3012 self._compiled_cache.clear() 

3013 

3014 def update_execution_options(self, **opt: Any) -> None: 

3015 r"""Update the default execution_options dictionary 

3016 of this :class:`_engine.Engine`. 

3017 

3018 The given keys/values in \**opt are added to the 

3019 default execution options that will be used for 

3020 all connections. The initial contents of this dictionary 

3021 can be sent via the ``execution_options`` parameter 

3022 to :func:`_sa.create_engine`. 

3023 

3024 .. seealso:: 

3025 

3026 :meth:`_engine.Connection.execution_options` 

3027 

3028 :meth:`_engine.Engine.execution_options` 

3029 

3030 """ 

3031 self.dispatch.set_engine_execution_options(self, opt) 

3032 self._execution_options = self._execution_options.union(opt) 

3033 self.dialect.set_engine_execution_options(self, opt) 

3034 

3035 @overload 

3036 def execution_options( 

3037 self, 

3038 *, 

3039 compiled_cache: Optional[CompiledCacheType] = ..., 

3040 logging_token: str = ..., 

3041 isolation_level: IsolationLevel = ..., 

3042 insertmanyvalues_page_size: int = ..., 

3043 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

3044 **opt: Any, 

3045 ) -> OptionEngine: ... 

3046 

3047 @overload 

3048 def execution_options(self, **opt: Any) -> OptionEngine: ... 

3049 

3050 def execution_options(self, **opt: Any) -> OptionEngine: 

3051 """Return a new :class:`_engine.Engine` that will provide 

3052 :class:`_engine.Connection` objects with the given execution options. 

3053 

3054 The returned :class:`_engine.Engine` remains related to the original 

3055 :class:`_engine.Engine` in that it shares the same connection pool and 

3056 other state: 

3057 

3058 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

3059 is the 

3060 same instance. The :meth:`_engine.Engine.dispose` 

3061 method will replace 

3062 the connection pool instance for the parent engine as well 

3063 as this one. 

3064 * Event listeners are "cascaded" - meaning, the new 

3065 :class:`_engine.Engine` 

3066 inherits the events of the parent, and new events can be associated 

3067 with the new :class:`_engine.Engine` individually. 

3068 * The logging configuration and logging_name is copied from the parent 

3069 :class:`_engine.Engine`. 

3070 

3071 The intent of the :meth:`_engine.Engine.execution_options` method is 

3072 to implement schemes where multiple :class:`_engine.Engine` 

3073 objects refer to the same connection pool, but are differentiated 

3074 by options that affect some execution-level behavior for each 

3075 engine. One such example is breaking into separate "reader" and 

3076 "writer" :class:`_engine.Engine` instances, where one 

3077 :class:`_engine.Engine` 

3078 has a lower :term:`isolation level` setting configured or is even 

3079 transaction-disabled using "autocommit". An example of this 

3080 configuration is at :ref:`dbapi_autocommit_multiple`. 

3081 

3082 Another example is one that 

3083 uses a custom option ``shard_id`` which is consumed by an event 

3084 to change the current schema on a database connection:: 

3085 

3086 from sqlalchemy import event 

3087 from sqlalchemy.engine import Engine 

3088 

3089 primary_engine = create_engine("mysql+mysqldb://") 

3090 shard1 = primary_engine.execution_options(shard_id="shard1") 

3091 shard2 = primary_engine.execution_options(shard_id="shard2") 

3092 

3093 shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 

3094 

3095 

3096 @event.listens_for(Engine, "before_cursor_execute") 

3097 def _switch_shard(conn, cursor, stmt, params, context, executemany): 

3098 shard_id = conn.get_execution_options().get("shard_id", "default") 

3099 current_shard = conn.info.get("current_shard", None) 

3100 

3101 if current_shard != shard_id: 

3102 cursor.execute("use %s" % shards[shard_id]) 

3103 conn.info["current_shard"] = shard_id 

3104 

3105 The above recipe illustrates two :class:`_engine.Engine` objects that 

3106 will each serve as factories for :class:`_engine.Connection` objects 

3107 that have pre-established "shard_id" execution options present. A 

3108 :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 

3109 then interprets this execution option to emit a MySQL ``use`` statement 

3110 to switch databases before a statement execution, while at the same 

3111 time keeping track of which database we've established using the 

3112 :attr:`_engine.Connection.info` dictionary. 

3113 

3114 .. seealso:: 

3115 

3116 :meth:`_engine.Connection.execution_options` 

3117 - update execution options 

3118 on a :class:`_engine.Connection` object. 

3119 

3120 :meth:`_engine.Engine.update_execution_options` 

3121 - update the execution 

3122 options for a given :class:`_engine.Engine` in place. 

3123 

3124 :meth:`_engine.Engine.get_execution_options` 

3125 

3126 

3127 """ # noqa: E501 

3128 return self._option_cls(self, opt) 

3129 

3130 def get_execution_options(self) -> _ExecuteOptions: 

3131 """Get the non-SQL options which will take effect during execution. 

3132 

3133 .. versionadded: 1.3 

3134 

3135 .. seealso:: 

3136 

3137 :meth:`_engine.Engine.execution_options` 

3138 """ 

3139 return self._execution_options 

3140 

3141 @property 

3142 def name(self) -> str: 

3143 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3144 in use by this :class:`Engine`. 

3145 

3146 """ 

3147 

3148 return self.dialect.name 

3149 

3150 @property 

3151 def driver(self) -> str: 

3152 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3153 in use by this :class:`Engine`. 

3154 

3155 """ 

3156 

3157 return self.dialect.driver 

3158 

3159 echo = log.echo_property() 

3160 

3161 def __repr__(self) -> str: 

3162 return "Engine(%r)" % (self.url,) 

3163 

3164 def dispose(self, close: bool = True) -> None: 

3165 """Dispose of the connection pool used by this 

3166 :class:`_engine.Engine`. 

3167 

3168 A new connection pool is created immediately after the old one has been 

3169 disposed. The previous connection pool is disposed either actively, by 

3170 closing out all currently checked-in connections in that pool, or 

3171 passively, by losing references to it but otherwise not closing any 

3172 connections. The latter strategy is more appropriate for an initializer 

3173 in a forked Python process. 

3174 

3175 :param close: if left at its default of ``True``, has the 

3176 effect of fully closing all **currently checked in** 

3177 database connections. Connections that are still checked out 

3178 will **not** be closed, however they will no longer be associated 

3179 with this :class:`_engine.Engine`, 

3180 so when they are closed individually, eventually the 

3181 :class:`_pool.Pool` which they are associated with will 

3182 be garbage collected and they will be closed out fully, if 

3183 not already closed on checkin. 

3184 

3185 If set to ``False``, the previous connection pool is de-referenced, 

3186 and otherwise not touched in any way. 

3187 

3188 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3189 parameter to allow the replacement of a connection pool in a child 

3190 process without interfering with the connections used by the parent 

3191 process. 

3192 

3193 

3194 .. seealso:: 

3195 

3196 :ref:`engine_disposal` 

3197 

3198 :ref:`pooling_multiprocessing` 

3199 

3200 """ 

3201 if close: 

3202 self.pool.dispose() 

3203 self.pool = self.pool.recreate() 

3204 self.dispatch.engine_disposed(self) 

3205 

3206 @contextlib.contextmanager 

3207 def _optional_conn_ctx_manager( 

3208 self, connection: Optional[Connection] = None 

3209 ) -> Iterator[Connection]: 

3210 if connection is None: 

3211 with self.connect() as conn: 

3212 yield conn 

3213 else: 

3214 yield connection 

3215 

3216 @contextlib.contextmanager 

3217 def begin(self) -> Iterator[Connection]: 

3218 """Return a context manager delivering a :class:`_engine.Connection` 

3219 with a :class:`.Transaction` established. 

3220 

3221 E.g.:: 

3222 

3223 with engine.begin() as conn: 

3224 conn.execute(text("insert into table (x, y, z) values (1, 2, 3)")) 

3225 conn.execute(text("my_special_procedure(5)")) 

3226 

3227 Upon successful operation, the :class:`.Transaction` 

3228 is committed. If an error is raised, the :class:`.Transaction` 

3229 is rolled back. 

3230 

3231 .. seealso:: 

3232 

3233 :meth:`_engine.Engine.connect` - procure a 

3234 :class:`_engine.Connection` from 

3235 an :class:`_engine.Engine`. 

3236 

3237 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3238 for a particular :class:`_engine.Connection`. 

3239 

3240 """ # noqa: E501 

3241 with self.connect() as conn: 

3242 with conn.begin(): 

3243 yield conn 

3244 

3245 def _run_ddl_visitor( 

3246 self, 

3247 visitorcallable: Type[InvokeDDLBase], 

3248 element: SchemaVisitable, 

3249 **kwargs: Any, 

3250 ) -> None: 

3251 with self.begin() as conn: 

3252 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3253 

3254 def connect(self) -> Connection: 

3255 """Return a new :class:`_engine.Connection` object. 

3256 

3257 The :class:`_engine.Connection` acts as a Python context manager, so 

3258 the typical use of this method looks like:: 

3259 

3260 with engine.connect() as connection: 

3261 connection.execute(text("insert into table values ('foo')")) 

3262 connection.commit() 

3263 

3264 Where above, after the block is completed, the connection is "closed" 

3265 and its underlying DBAPI resources are returned to the connection pool. 

3266 This also has the effect of rolling back any transaction that 

3267 was explicitly begun or was begun via autobegin, and will 

3268 emit the :meth:`_events.ConnectionEvents.rollback` event if one was 

3269 started and is still in progress. 

3270 

3271 .. seealso:: 

3272 

3273 :meth:`_engine.Engine.begin` 

3274 

3275 """ 

3276 

3277 return self._connection_cls(self) 

3278 

3279 def raw_connection(self) -> PoolProxiedConnection: 

3280 """Return a "raw" DBAPI connection from the connection pool. 

3281 

3282 The returned object is a proxied version of the DBAPI 

3283 connection object used by the underlying driver in use. 

3284 The object will have all the same behavior as the real DBAPI 

3285 connection, except that its ``close()`` method will result in the 

3286 connection being returned to the pool, rather than being closed 

3287 for real. 

3288 

3289 This method provides direct DBAPI connection access for 

3290 special situations when the API provided by 

3291 :class:`_engine.Connection` 

3292 is not needed. When a :class:`_engine.Connection` object is already 

3293 present, the DBAPI connection is available using 

3294 the :attr:`_engine.Connection.connection` accessor. 

3295 

3296 .. seealso:: 

3297 

3298 :ref:`dbapi_connections` 

3299 

3300 """ 

3301 return self.pool.connect() 

3302 

3303 

3304class OptionEngineMixin(log.Identified): 

3305 _sa_propagate_class_events = False 

3306 

3307 dispatch: dispatcher[ConnectionEventsTarget] 

3308 _compiled_cache: Optional[CompiledCacheType] 

3309 dialect: Dialect 

3310 pool: Pool 

3311 url: URL 

3312 hide_parameters: bool 

3313 echo: log.echo_property 

3314 

3315 def __init__( 

3316 self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 

3317 ): 

3318 self._proxied = proxied 

3319 self.url = proxied.url 

3320 self.dialect = proxied.dialect 

3321 self.logging_name = proxied.logging_name 

3322 self.echo = proxied.echo 

3323 self._compiled_cache = proxied._compiled_cache 

3324 self.hide_parameters = proxied.hide_parameters 

3325 log.instance_logger(self, echoflag=self.echo) 

3326 

3327 # note: this will propagate events that are assigned to the parent 

3328 # engine after this OptionEngine is created. Since we share 

3329 # the events of the parent we also disallow class-level events 

3330 # to apply to the OptionEngine class directly. 

3331 # 

3332 # the other way this can work would be to transfer existing 

3333 # events only, using: 

3334 # self.dispatch._update(proxied.dispatch) 

3335 # 

3336 # that might be more appropriate however it would be a behavioral 

3337 # change for logic that assigns events to the parent engine and 

3338 # would like it to take effect for the already-created sub-engine. 

3339 self.dispatch = self.dispatch._join(proxied.dispatch) 

3340 

3341 self._execution_options = proxied._execution_options 

3342 self.update_execution_options(**execution_options) 

3343 

3344 def update_execution_options(self, **opt: Any) -> None: 

3345 raise NotImplementedError() 

3346 

3347 if not typing.TYPE_CHECKING: 

3348 # https://github.com/python/typing/discussions/1095 

3349 

3350 @property 

3351 def pool(self) -> Pool: 

3352 return self._proxied.pool 

3353 

3354 @pool.setter 

3355 def pool(self, pool: Pool) -> None: 

3356 self._proxied.pool = pool 

3357 

3358 @property 

3359 def _has_events(self) -> bool: 

3360 return self._proxied._has_events or self.__dict__.get( 

3361 "_has_events", False 

3362 ) 

3363 

3364 @_has_events.setter 

3365 def _has_events(self, value: bool) -> None: 

3366 self.__dict__["_has_events"] = value 

3367 

3368 

3369class OptionEngine(OptionEngineMixin, Engine): 

3370 def update_execution_options(self, **opt: Any) -> None: 

3371 Engine.update_execution_options(self, **opt) 

3372 

3373 

3374Engine._option_cls = OptionEngine