Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1027 statements  

1# engine/base.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7from __future__ import with_statement 

8 

9import contextlib 

10import sys 

11 

12from .interfaces import Connectable 

13from .interfaces import ExceptionContext 

14from .util import _distill_params 

15from .util import _distill_params_20 

16from .util import TransactionalContext 

17from .. import exc 

18from .. import inspection 

19from .. import log 

20from .. import util 

21from ..sql import compiler 

22from ..sql import util as sql_util 

23 

24 

25"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`. 

26 

27""" 

28 

29_EMPTY_EXECUTION_OPTS = util.immutabledict() 

30 

31 

32class Connection(Connectable): 

33 """Provides high-level functionality for a wrapped DB-API connection. 

34 

35 **This is the SQLAlchemy 1.x.x version** of the :class:`_engine.Connection` 

36 class. For the :term:`2.0 style` version, which features some API 

37 differences, see :class:`_future.Connection`. 

38 

39 The :class:`_engine.Connection` object is procured by calling 

40 the :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 

41 object, and provides services for execution of SQL statements as well 

42 as transaction control. 

43 

44 The Connection object is **not** thread-safe. While a Connection can be 

45 shared among threads using properly synchronized access, it is still 

46 possible that the underlying DBAPI connection may not support shared 

47 access between threads. Check the DBAPI documentation for details. 

48 

49 The Connection object represents a single DBAPI connection checked out 

50 from the connection pool. In this state, the connection pool has no affect 

51 upon the connection, including its expiration or timeout state. For the 

52 connection pool to properly manage connections, connections should be 

53 returned to the connection pool (i.e. ``connection.close()``) whenever the 

54 connection is not in use. 

55 

56 .. index:: 

57 single: thread safety; Connection 

58 

59 """ 

60 

61 _is_future = False 

62 _sqla_logger_namespace = "sqlalchemy.engine.Connection" 

63 

64 # used by sqlalchemy.engine.util.TransactionalContext 

65 _trans_context_manager = None 

66 

67 def __init__( 

68 self, 

69 engine, 

70 connection=None, 

71 close_with_result=False, 

72 _branch_from=None, 

73 _execution_options=None, 

74 _dispatch=None, 

75 _has_events=None, 

76 _allow_revalidate=True, 

77 ): 

78 """Construct a new Connection.""" 

79 self.engine = engine 

80 self.dialect = engine.dialect 

81 self.__branch_from = _branch_from 

82 

83 if _branch_from: 

84 # branching is always "from" the root connection 

85 assert _branch_from.__branch_from is None 

86 self._dbapi_connection = connection 

87 self._execution_options = _execution_options 

88 self._echo = _branch_from._echo 

89 self.should_close_with_result = False 

90 self.dispatch = _dispatch 

91 self._has_events = _branch_from._has_events 

92 else: 

93 self._dbapi_connection = ( 

94 connection 

95 if connection is not None 

96 else engine.raw_connection() 

97 ) 

98 

99 self._transaction = self._nested_transaction = None 

100 self.__savepoint_seq = 0 

101 self.__in_begin = False 

102 self.should_close_with_result = close_with_result 

103 

104 self.__can_reconnect = _allow_revalidate 

105 self._echo = self.engine._should_log_info() 

106 

107 if _has_events is None: 

108 # if _has_events is sent explicitly as False, 

109 # then don't join the dispatch of the engine; we don't 

110 # want to handle any of the engine's events in that case. 

111 self.dispatch = self.dispatch._join(engine.dispatch) 

112 self._has_events = _has_events or ( 

113 _has_events is None and engine._has_events 

114 ) 

115 

116 assert not _execution_options 

117 self._execution_options = engine._execution_options 

118 

119 if self._has_events or self.engine._has_events: 

120 self.dispatch.engine_connect(self, _branch_from is not None) 

121 

122 @util.memoized_property 

123 def _message_formatter(self): 

124 if "logging_token" in self._execution_options: 

125 token = self._execution_options["logging_token"] 

126 return lambda msg: "[%s] %s" % (token, msg) 

127 else: 

128 return None 

129 

130 def _log_info(self, message, *arg, **kw): 

131 fmt = self._message_formatter 

132 

133 if fmt: 

134 message = fmt(message) 

135 

136 if log.STACKLEVEL: 

137 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

138 

139 self.engine.logger.info(message, *arg, **kw) 

140 

141 def _log_debug(self, message, *arg, **kw): 

142 fmt = self._message_formatter 

143 

144 if fmt: 

145 message = fmt(message) 

146 

147 if log.STACKLEVEL: 

148 kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 

149 

150 self.engine.logger.debug(message, *arg, **kw) 

151 

152 @property 

153 def _schema_translate_map(self): 

154 return self._execution_options.get("schema_translate_map", None) 

155 

156 def schema_for_object(self, obj): 

157 """Return the schema name for the given schema item taking into 

158 account current schema translate map. 

159 

160 """ 

161 

162 name = obj.schema 

163 schema_translate_map = self._execution_options.get( 

164 "schema_translate_map", None 

165 ) 

166 

167 if ( 

168 schema_translate_map 

169 and name in schema_translate_map 

170 and obj._use_schema_map 

171 ): 

172 return schema_translate_map[name] 

173 else: 

174 return name 

175 

176 def _branch(self): 

177 """Return a new Connection which references this Connection's 

178 engine and connection; but does not have close_with_result enabled, 

179 and also whose close() method does nothing. 

180 

181 .. deprecated:: 1.4 the "branching" concept will be removed in 

182 SQLAlchemy 2.0 as well as the "Connection.connect()" method which 

183 is the only consumer for this. 

184 

185 The Core uses this very sparingly, only in the case of 

186 custom SQL default functions that are to be INSERTed as the 

187 primary key of a row where we need to get the value back, so we have 

188 to invoke it distinctly - this is a very uncommon case. 

189 

190 Userland code accesses _branch() when the connect() 

191 method is called. The branched connection 

192 acts as much as possible like the parent, except that it stays 

193 connected when a close() event occurs. 

194 

195 """ 

196 return self.engine._connection_cls( 

197 self.engine, 

198 self._dbapi_connection, 

199 _branch_from=self.__branch_from if self.__branch_from else self, 

200 _execution_options=self._execution_options, 

201 _has_events=self._has_events, 

202 _dispatch=self.dispatch, 

203 ) 

204 

205 def _generate_for_options(self): 

206 """define connection method chaining behavior for execution_options""" 

207 

208 if self._is_future: 

209 return self 

210 else: 

211 c = self.__class__.__new__(self.__class__) 

212 c.__dict__ = self.__dict__.copy() 

213 return c 

214 

215 def __enter__(self): 

216 return self 

217 

218 def __exit__(self, type_, value, traceback): 

219 self.close() 

220 

221 def execution_options(self, **opt): 

222 r""" Set non-SQL options for the connection which take effect 

223 during execution. 

224 

225 For a "future" style connection, this method returns this same 

226 :class:`_future.Connection` object with the new options added. 

227 

228 For a legacy connection, this method returns a copy of this 

229 :class:`_engine.Connection` which references the same underlying DBAPI 

230 connection, but also defines the given execution options which will 

231 take effect for a call to 

232 :meth:`execute`. As the new :class:`_engine.Connection` references the 

233 same underlying resource, it's usually a good idea to ensure that 

234 the copies will be discarded immediately, which is implicit if used 

235 as in:: 

236 

237 result = connection.execution_options(stream_results=True).\ 

238 execute(stmt) 

239 

240 Note that any key/value can be passed to 

241 :meth:`_engine.Connection.execution_options`, 

242 and it will be stored in the 

243 ``_execution_options`` dictionary of the :class:`_engine.Connection`. 

244 It 

245 is suitable for usage by end-user schemes to communicate with 

246 event listeners, for example. 

247 

248 The keywords that are currently recognized by SQLAlchemy itself 

249 include all those listed under :meth:`.Executable.execution_options`, 

250 as well as others that are specific to :class:`_engine.Connection`. 

251 

252 :param autocommit: Available on: Connection, statement. 

253 When True, a COMMIT will be invoked after execution 

254 when executed in 'autocommit' mode, i.e. when an explicit 

255 transaction is not begun on the connection. Note that this 

256 is **library level, not DBAPI level autocommit**. The DBAPI 

257 connection will remain in a real transaction unless the 

258 "AUTOCOMMIT" isolation level is used. 

259 

260 .. deprecated:: 1.4 The "autocommit" execution option is deprecated 

261 and will be removed in SQLAlchemy 2.0. See 

262 :ref:`migration_20_autocommit` for discussion. 

263 

264 :param compiled_cache: Available on: Connection. 

265 A dictionary where :class:`.Compiled` objects 

266 will be cached when the :class:`_engine.Connection` 

267 compiles a clause 

268 expression into a :class:`.Compiled` object. This dictionary will 

269 supersede the statement cache that may be configured on the 

270 :class:`_engine.Engine` itself. If set to None, caching 

271 is disabled, even if the engine has a configured cache size. 

272 

273 Note that the ORM makes use of its own "compiled" caches for 

274 some operations, including flush operations. The caching 

275 used by the ORM internally supersedes a cache dictionary 

276 specified here. 

277 

278 :param logging_token: Available on: :class:`_engine.Connection`, 

279 :class:`_engine.Engine`. 

280 

281 Adds the specified string token surrounded by brackets in log 

282 messages logged by the connection, i.e. the logging that's enabled 

283 either via the :paramref:`_sa.create_engine.echo` flag or via the 

284 ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 

285 per-connection or per-sub-engine token to be available which is 

286 useful for debugging concurrent connection scenarios. 

287 

288 .. versionadded:: 1.4.0b2 

289 

290 .. seealso:: 

291 

292 :ref:`dbengine_logging_tokens` - usage example 

293 

294 :paramref:`_sa.create_engine.logging_name` - adds a name to the 

295 name used by the Python logger object itself. 

296 

297 :param isolation_level: Available on: :class:`_engine.Connection`. 

298 

299 Set the transaction isolation level for the lifespan of this 

300 :class:`_engine.Connection` object. 

301 Valid values include those string 

302 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

303 parameter passed to :func:`_sa.create_engine`. These levels are 

304 semi-database specific; see individual dialect documentation for 

305 valid levels. 

306 

307 The isolation level option applies the isolation level by emitting 

308 statements on the DBAPI connection, and **necessarily affects the 

309 original Connection object overall**, not just the copy that is 

310 returned by the call to :meth:`_engine.Connection.execution_options` 

311 method. The isolation level will remain at the given setting until 

312 the DBAPI connection itself is returned to the connection pool, i.e. 

313 the :meth:`_engine.Connection.close` method on the original 

314 :class:`_engine.Connection` is called, 

315 where an event handler will emit 

316 additional statements on the DBAPI connection in order to revert the 

317 isolation level change. 

318 

319 .. warning:: The ``isolation_level`` execution option should 

320 **not** be used when a transaction is already established, that 

321 is, the :meth:`_engine.Connection.begin` 

322 method or similar has been 

323 called. A database cannot change the isolation level on a 

324 transaction in progress, and different DBAPIs and/or 

325 SQLAlchemy dialects may implicitly roll back or commit 

326 the transaction, or not affect the connection at all. 

327 

328 .. note:: The ``isolation_level`` execution option is implicitly 

329 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

330 the :meth:`_engine.Connection.invalidate` method, or if a 

331 disconnection error occurs. The new connection produced after 

332 the invalidation will not have the isolation level re-applied 

333 to it automatically. 

334 

335 .. seealso:: 

336 

337 :paramref:`_sa.create_engine.isolation_level` 

338 - set per :class:`_engine.Engine` isolation level 

339 

340 :meth:`_engine.Connection.get_isolation_level` 

341 - view current actual level 

342 

343 :ref:`SQLite Transaction Isolation <sqlite_isolation_level>` 

344 

345 :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>` 

346 

347 :ref:`MySQL Transaction Isolation <mysql_isolation_level>` 

348 

349 :ref:`SQL Server Transaction Isolation <mssql_isolation_level>` 

350 

351 :ref:`session_transaction_isolation` - for the ORM 

352 

353 :param no_parameters: When ``True``, if the final parameter 

354 list or dictionary is totally empty, will invoke the 

355 statement on the cursor as ``cursor.execute(statement)``, 

356 not passing the parameter collection at all. 

357 Some DBAPIs such as psycopg2 and mysql-python consider 

358 percent signs as significant only when parameters are 

359 present; this option allows code to generate SQL 

360 containing percent signs (and possibly other characters) 

361 that is neutral regarding whether it's executed by the DBAPI 

362 or piped into a script that's later invoked by 

363 command line tools. 

364 

365 :param stream_results: Available on: Connection, statement. 

366 Indicate to the dialect that results should be 

367 "streamed" and not pre-buffered, if possible. For backends 

368 such as PostgreSQL, MySQL and MariaDB, this indicates the use of 

369 a "server side cursor" as opposed to a client side cursor. 

370 Other backends such as that of Oracle may already use server 

371 side cursors by default. 

372 

373 The usage of 

374 :paramref:`_engine.Connection.execution_options.stream_results` is 

375 usually combined with setting a fixed number of rows to to be fetched 

376 in batches, to allow for efficient iteration of database rows while 

377 at the same time not loading all result rows into memory at once; 

378 this can be configured on a :class:`_engine.Result` object using the 

379 :meth:`_engine.Result.yield_per` method, after execution has 

380 returned a new :class:`_engine.Result`. If 

381 :meth:`_engine.Result.yield_per` is not used, 

382 the :paramref:`_engine.Connection.execution_options.stream_results` 

383 mode of operation will instead use a dynamically sized buffer 

384 which buffers sets of rows at a time, growing on each batch 

385 based on a fixed growth size up until a limit which may 

386 be configured using the 

387 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

388 parameter. 

389 

390 When using the ORM to fetch ORM mapped objects from a result, 

391 :meth:`_engine.Result.yield_per` should always be used with 

392 :paramref:`_engine.Connection.execution_options.stream_results`, 

393 so that the ORM does not fetch all rows into new ORM objects at once. 

394 

395 For typical use, the 

396 :paramref:`_engine.Connection.execution_options.yield_per` execution 

397 option should be preferred, which sets up both 

398 :paramref:`_engine.Connection.execution_options.stream_results` and 

399 :meth:`_engine.Result.yield_per` at once. This option is supported 

400 both at a core level by :class:`_engine.Connection` as well as by the 

401 ORM :class:`_engine.Session`; the latter is described at 

402 :ref:`orm_queryguide_yield_per`. 

403 

404 .. seealso:: 

405 

406 :ref:`engine_stream_results` - background on 

407 :paramref:`_engine.Connection.execution_options.stream_results` 

408 

409 :paramref:`_engine.Connection.execution_options.max_row_buffer` 

410 

411 :paramref:`_engine.Connection.execution_options.yield_per` 

412 

413 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

414 describing the ORM version of ``yield_per`` 

415 

416 :param max_row_buffer: Available on: :class:`_engine.Connection`, 

417 :class:`_sql.Executable`. Sets a maximum 

418 buffer size to use when the 

419 :paramref:`_engine.Connection.execution_options.stream_results` 

420 execution option is used on a backend that supports server side 

421 cursors. The default value if not specified is 1000. 

422 

423 .. seealso:: 

424 

425 :paramref:`_engine.Connection.execution_options.stream_results` 

426 

427 :ref:`engine_stream_results` 

428 

429 

430 :param yield_per: Available on: :class:`_engine.Connection`, 

431 :class:`_sql.Executable`. Integer value applied which will 

432 set the :paramref:`_engine.Connection.execution_options.stream_results` 

433 execution option and invoke :meth:`_engine.Result.yield_per` 

434 automatically at once. Allows equivalent functionality as 

435 is present when using this parameter with the ORM. 

436 

437 .. versionadded:: 1.4.40 

438 

439 .. seealso:: 

440 

441 :ref:`engine_stream_results` - background and examples 

442 on using server side cursors with Core. 

443 

444 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 

445 describing the ORM version of ``yield_per`` 

446 

447 :param schema_translate_map: Available on: :class:`_engine.Connection`, 

448 :class:`_engine.Engine`, :class:`_sql.Executable`. 

449 

450 :param schema_translate_map: Available on: Connection, Engine. 

451 A dictionary mapping schema names to schema names, that will be 

452 applied to the :paramref:`_schema.Table.schema` element of each 

453 :class:`_schema.Table` 

454 encountered when SQL or DDL expression elements 

455 are compiled into strings; the resulting schema name will be 

456 converted based on presence in the map of the original name. 

457 

458 .. versionadded:: 1.1 

459 

460 .. seealso:: 

461 

462 :ref:`schema_translating` 

463 

464 .. seealso:: 

465 

466 :meth:`_engine.Engine.execution_options` 

467 

468 :meth:`.Executable.execution_options` 

469 

470 :meth:`_engine.Connection.get_execution_options` 

471 

472 

473 """ # noqa 

474 c = self._generate_for_options() 

475 c._execution_options = c._execution_options.union(opt) 

476 if self._has_events or self.engine._has_events: 

477 self.dispatch.set_connection_execution_options(c, opt) 

478 self.dialect.set_connection_execution_options(c, opt) 

479 return c 

480 

481 def get_execution_options(self): 

482 """Get the non-SQL options which will take effect during execution. 

483 

484 .. versionadded:: 1.3 

485 

486 .. seealso:: 

487 

488 :meth:`_engine.Connection.execution_options` 

489 """ 

490 return self._execution_options 

491 

492 @property 

493 def closed(self): 

494 """Return True if this connection is closed.""" 

495 

496 # note this is independent for a "branched" connection vs. 

497 # the base 

498 

499 return self._dbapi_connection is None and not self.__can_reconnect 

500 

501 @property 

502 def invalidated(self): 

503 """Return True if this connection was invalidated.""" 

504 

505 # prior to 1.4, "invalid" was stored as a state independent of 

506 # "closed", meaning an invalidated connection could be "closed", 

507 # the _dbapi_connection would be None and closed=True, yet the 

508 # "invalid" flag would stay True. This meant that there were 

509 # three separate states (open/valid, closed/valid, closed/invalid) 

510 # when there is really no reason for that; a connection that's 

511 # "closed" does not need to be "invalid". So the state is now 

512 # represented by the two facts alone. 

513 

514 if self.__branch_from: 

515 return self.__branch_from.invalidated 

516 

517 return self._dbapi_connection is None and not self.closed 

518 

519 @property 

520 def connection(self): 

521 """The underlying DB-API connection managed by this Connection. 

522 

523 This is a SQLAlchemy connection-pool proxied connection 

524 which then has the attribute 

525 :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 

526 actual driver connection. 

527 

528 .. seealso:: 

529 

530 

531 :ref:`dbapi_connections` 

532 

533 """ 

534 

535 if self._dbapi_connection is None: 

536 try: 

537 return self._revalidate_connection() 

538 except (exc.PendingRollbackError, exc.ResourceClosedError): 

539 raise 

540 except BaseException as e: 

541 self._handle_dbapi_exception(e, None, None, None, None) 

542 else: 

543 return self._dbapi_connection 

544 

545 def get_isolation_level(self): 

546 """Return the current **actual** isolation level that's present on 

547 the database within the scope of this connection. 

548 

549 This attribute will perform a live SQL operation against the database 

550 in order to procure the current isolation level, so the value returned 

551 is the actual level on the underlying DBAPI connection regardless of 

552 how this state was set. This will be one of the four actual isolation 

553 modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

554 ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 

555 level setting. Third party dialects may also feature additional 

556 isolation level settings. 

557 

558 .. note:: This method **will not report** on the ``AUTOCOMMIT`` 

559 isolation level, which is a separate :term:`dbapi` setting that's 

560 independent of **actual** isolation level. When ``AUTOCOMMIT`` is 

561 in use, the database connection still has a "traditional" isolation 

562 mode in effect, that is typically one of the four values 

563 ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 

564 ``SERIALIZABLE``. 

565 

566 Compare to the :attr:`_engine.Connection.default_isolation_level` 

567 accessor which returns the isolation level that is present on the 

568 database at initial connection time. 

569 

570 .. versionadded:: 0.9.9 

571 

572 .. seealso:: 

573 

574 :attr:`_engine.Connection.default_isolation_level` 

575 - view default level 

576 

577 :paramref:`_sa.create_engine.isolation_level` 

578 - set per :class:`_engine.Engine` isolation level 

579 

580 :paramref:`.Connection.execution_options.isolation_level` 

581 - set per :class:`_engine.Connection` isolation level 

582 

583 """ 

584 try: 

585 return self.dialect.get_isolation_level(self.connection) 

586 except BaseException as e: 

587 self._handle_dbapi_exception(e, None, None, None, None) 

588 

589 @property 

590 def default_isolation_level(self): 

591 """The initial-connection time isolation level associated with the 

592 :class:`_engine.Dialect` in use. 

593 

594 This value is independent of the 

595 :paramref:`.Connection.execution_options.isolation_level` and 

596 :paramref:`.Engine.execution_options.isolation_level` execution 

597 options, and is determined by the :class:`_engine.Dialect` when the 

598 first connection is created, by performing a SQL query against the 

599 database for the current isolation level before any additional commands 

600 have been emitted. 

601 

602 Calling this accessor does not invoke any new SQL queries. 

603 

604 .. versionadded:: 0.9.9 

605 

606 .. seealso:: 

607 

608 :meth:`_engine.Connection.get_isolation_level` 

609 - view current actual isolation level 

610 

611 :paramref:`_sa.create_engine.isolation_level` 

612 - set per :class:`_engine.Engine` isolation level 

613 

614 :paramref:`.Connection.execution_options.isolation_level` 

615 - set per :class:`_engine.Connection` isolation level 

616 

617 """ 

618 return self.dialect.default_isolation_level 

619 

620 def _invalid_transaction(self): 

621 if self.invalidated: 

622 raise exc.PendingRollbackError( 

623 "Can't reconnect until invalid %stransaction is rolled " 

624 "back." 

625 % ( 

626 "savepoint " 

627 if self._nested_transaction is not None 

628 else "" 

629 ), 

630 code="8s2b", 

631 ) 

632 else: 

633 assert not self._is_future 

634 raise exc.PendingRollbackError( 

635 "This connection is on an inactive %stransaction. " 

636 "Please rollback() fully before proceeding." 

637 % ( 

638 "savepoint " 

639 if self._nested_transaction is not None 

640 else "" 

641 ), 

642 code="8s2a", 

643 ) 

644 

645 def _revalidate_connection(self): 

646 if self.__branch_from: 

647 return self.__branch_from._revalidate_connection() 

648 if self.__can_reconnect and self.invalidated: 

649 if self._transaction is not None: 

650 self._invalid_transaction() 

651 self._dbapi_connection = self.engine.raw_connection( 

652 _connection=self 

653 ) 

654 return self._dbapi_connection 

655 raise exc.ResourceClosedError("This Connection is closed") 

656 

657 @property 

658 def _still_open_and_dbapi_connection_is_valid(self): 

659 return self._dbapi_connection is not None and getattr( 

660 self._dbapi_connection, "is_valid", False 

661 ) 

662 

663 @property 

664 def info(self): 

665 """Info dictionary associated with the underlying DBAPI connection 

666 referred to by this :class:`_engine.Connection`, allowing user-defined 

667 data to be associated with the connection. 

668 

669 The data here will follow along with the DBAPI connection including 

670 after it is returned to the connection pool and used again 

671 in subsequent instances of :class:`_engine.Connection`. 

672 

673 """ 

674 

675 return self.connection.info 

676 

677 @util.deprecated_20(":meth:`.Connection.connect`") 

678 def connect(self, close_with_result=False): 

679 """Returns a branched version of this :class:`_engine.Connection`. 

680 

681 The :meth:`_engine.Connection.close` method on the returned 

682 :class:`_engine.Connection` can be called and this 

683 :class:`_engine.Connection` will remain open. 

684 

685 This method provides usage symmetry with 

686 :meth:`_engine.Engine.connect`, including for usage 

687 with context managers. 

688 

689 """ 

690 

691 return self._branch() 

692 

693 def invalidate(self, exception=None): 

694 """Invalidate the underlying DBAPI connection associated with 

695 this :class:`_engine.Connection`. 

696 

697 An attempt will be made to close the underlying DBAPI connection 

698 immediately; however if this operation fails, the error is logged 

699 but not raised. The connection is then discarded whether or not 

700 close() succeeded. 

701 

702 Upon the next use (where "use" typically means using the 

703 :meth:`_engine.Connection.execute` method or similar), 

704 this :class:`_engine.Connection` will attempt to 

705 procure a new DBAPI connection using the services of the 

706 :class:`_pool.Pool` as a source of connectivity (e.g. 

707 a "reconnection"). 

708 

709 If a transaction was in progress (e.g. the 

710 :meth:`_engine.Connection.begin` method has been called) when 

711 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

712 level all state associated with this transaction is lost, as 

713 the DBAPI connection is closed. The :class:`_engine.Connection` 

714 will not allow a reconnection to proceed until the 

715 :class:`.Transaction` object is ended, by calling the 

716 :meth:`.Transaction.rollback` method; until that point, any attempt at 

717 continuing to use the :class:`_engine.Connection` will raise an 

718 :class:`~sqlalchemy.exc.InvalidRequestError`. 

719 This is to prevent applications from accidentally 

720 continuing an ongoing transactional operations despite the 

721 fact that the transaction has been lost due to an 

722 invalidation. 

723 

724 The :meth:`_engine.Connection.invalidate` method, 

725 just like auto-invalidation, 

726 will at the connection pool level invoke the 

727 :meth:`_events.PoolEvents.invalidate` event. 

728 

729 :param exception: an optional ``Exception`` instance that's the 

730 reason for the invalidation. is passed along to event handlers 

731 and logging functions. 

732 

733 .. seealso:: 

734 

735 :ref:`pool_connection_invalidation` 

736 

737 """ 

738 

739 if self.__branch_from: 

740 return self.__branch_from.invalidate(exception=exception) 

741 

742 if self.invalidated: 

743 return 

744 

745 if self.closed: 

746 raise exc.ResourceClosedError("This Connection is closed") 

747 

748 if self._still_open_and_dbapi_connection_is_valid: 

749 self._dbapi_connection.invalidate(exception) 

750 self._dbapi_connection = None 

751 

752 def detach(self): 

753 """Detach the underlying DB-API connection from its connection pool. 

754 

755 E.g.:: 

756 

757 with engine.connect() as conn: 

758 conn.detach() 

759 conn.execute(text("SET search_path TO schema1, schema2")) 

760 

761 # work with connection 

762 

763 # connection is fully closed (since we used "with:", can 

764 # also call .close()) 

765 

766 This :class:`_engine.Connection` instance will remain usable. 

767 When closed 

768 (or exited from a context manager context as above), 

769 the DB-API connection will be literally closed and not 

770 returned to its originating pool. 

771 

772 This method can be used to insulate the rest of an application 

773 from a modified state on a connection (such as a transaction 

774 isolation level or similar). 

775 

776 """ 

777 

778 self._dbapi_connection.detach() 

779 

780 def _autobegin(self): 

781 self.begin() 

782 

783 def begin(self): 

784 """Begin a transaction and return a transaction handle. 

785 

786 The returned object is an instance of :class:`.Transaction`. 

787 This object represents the "scope" of the transaction, 

788 which completes when either the :meth:`.Transaction.rollback` 

789 or :meth:`.Transaction.commit` method is called. 

790 

791 .. tip:: 

792 

793 The :meth:`_engine.Connection.begin` method is invoked when using 

794 the :meth:`_engine.Engine.begin` context manager method as well. 

795 All documentation that refers to behaviors specific to the 

796 :meth:`_engine.Connection.begin` method also apply to use of the 

797 :meth:`_engine.Engine.begin` method. 

798 

799 Legacy use: nested calls to :meth:`.begin` on the same 

800 :class:`_engine.Connection` will return new :class:`.Transaction` 

801 objects that represent an emulated transaction within the scope of the 

802 enclosing transaction, that is:: 

803 

804 trans = conn.begin() # outermost transaction 

805 trans2 = conn.begin() # "nested" 

806 trans2.commit() # does nothing 

807 trans.commit() # actually commits 

808 

809 Calls to :meth:`.Transaction.commit` only have an effect 

810 when invoked via the outermost :class:`.Transaction` object, though the 

811 :meth:`.Transaction.rollback` method of any of the 

812 :class:`.Transaction` objects will roll back the 

813 transaction. 

814 

815 .. tip:: 

816 

817 The above "nesting" behavior is a legacy behavior specific to 

818 :term:`1.x style` use and will be removed in SQLAlchemy 2.0. For 

819 notes on :term:`2.0 style` use, see 

820 :meth:`_future.Connection.begin`. 

821 

822 

823 .. seealso:: 

824 

825 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

826 

827 :meth:`_engine.Connection.begin_twophase` - 

828 use a two phase /XID transaction 

829 

830 :meth:`_engine.Engine.begin` - context manager available from 

831 :class:`_engine.Engine` 

832 

833 """ 

834 if self._is_future: 

835 assert not self.__branch_from 

836 elif self.__branch_from: 

837 return self.__branch_from.begin() 

838 

839 if self.__in_begin: 

840 # for dialects that emit SQL within the process of 

841 # dialect.do_begin() or dialect.do_begin_twophase(), this 

842 # flag prevents "autobegin" from being emitted within that 

843 # process, while allowing self._transaction to remain at None 

844 # until it's complete. 

845 return 

846 elif self._transaction is None: 

847 self._transaction = RootTransaction(self) 

848 return self._transaction 

849 else: 

850 if self._is_future: 

851 raise exc.InvalidRequestError( 

852 "This connection has already initialized a SQLAlchemy " 

853 "Transaction() object via begin() or autobegin; can't " 

854 "call begin() here unless rollback() or commit() " 

855 "is called first." 

856 ) 

857 else: 

858 return MarkerTransaction(self) 

859 

860 def begin_nested(self): 

861 """Begin a nested transaction (i.e. SAVEPOINT) and return a 

862 transaction handle, assuming an outer transaction is already 

863 established. 

864 

865 Nested transactions require SAVEPOINT support in the 

866 underlying database. Any transaction in the hierarchy may 

867 ``commit`` and ``rollback``, however the outermost transaction 

868 still controls the overall ``commit`` or ``rollback`` of the 

869 transaction of a whole. 

870 

871 The legacy form of :meth:`_engine.Connection.begin_nested` method has 

872 alternate behaviors based on whether or not the 

873 :meth:`_engine.Connection.begin` method was called previously. If 

874 :meth:`_engine.Connection.begin` was not called, then this method will 

875 behave the same as the :meth:`_engine.Connection.begin` method and 

876 return a :class:`.RootTransaction` object that begins and commits a 

877 real transaction - **no savepoint is invoked**. If 

878 :meth:`_engine.Connection.begin` **has** been called, and a 

879 :class:`.RootTransaction` is already established, then this method 

880 returns an instance of :class:`.NestedTransaction` which will invoke 

881 and manage the scope of a SAVEPOINT. 

882 

883 .. tip:: 

884 

885 The above mentioned behavior of 

886 :meth:`_engine.Connection.begin_nested` is a legacy behavior 

887 specific to :term:`1.x style` use. In :term:`2.0 style` use, the 

888 :meth:`_future.Connection.begin_nested` method instead autobegins 

889 the outer transaction that can be committed using 

890 "commit-as-you-go" style; see 

891 :meth:`_future.Connection.begin_nested` for migration details. 

892 

893 .. versionchanged:: 1.4.13 The behavior of 

894 :meth:`_engine.Connection.begin_nested` 

895 as returning a :class:`.RootTransaction` if 

896 :meth:`_engine.Connection.begin` were not called has been restored 

897 as was the case in 1.3.x versions; in previous 1.4.x versions, an 

898 outer transaction would be "autobegun" but would not be committed. 

899 

900 

901 .. seealso:: 

902 

903 :meth:`_engine.Connection.begin` 

904 

905 :ref:`session_begin_nested` - ORM support for SAVEPOINT 

906 

907 """ 

908 if self._is_future: 

909 assert not self.__branch_from 

910 elif self.__branch_from: 

911 return self.__branch_from.begin_nested() 

912 

913 if self._transaction is None: 

914 if not self._is_future: 

915 util.warn_deprecated_20( 

916 "Calling Connection.begin_nested() in 2.0 style use will " 

917 "return a NestedTransaction (SAVEPOINT) in all cases, " 

918 "that will not commit the outer transaction. For code " 

919 "that is cross-compatible between 1.x and 2.0 style use, " 

920 "ensure Connection.begin() is called before calling " 

921 "Connection.begin_nested()." 

922 ) 

923 return self.begin() 

924 else: 

925 self._autobegin() 

926 

927 return NestedTransaction(self) 

928 

929 def begin_twophase(self, xid=None): 

930 """Begin a two-phase or XA transaction and return a transaction 

931 handle. 

932 

933 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

934 which in addition to the methods provided by 

935 :class:`.Transaction`, also provides a 

936 :meth:`~.TwoPhaseTransaction.prepare` method. 

937 

938 :param xid: the two phase transaction id. If not supplied, a 

939 random id will be generated. 

940 

941 .. seealso:: 

942 

943 :meth:`_engine.Connection.begin` 

944 

945 :meth:`_engine.Connection.begin_twophase` 

946 

947 """ 

948 

949 if self.__branch_from: 

950 return self.__branch_from.begin_twophase(xid=xid) 

951 

952 if self._transaction is not None: 

953 raise exc.InvalidRequestError( 

954 "Cannot start a two phase transaction when a transaction " 

955 "is already in progress." 

956 ) 

957 if xid is None: 

958 xid = self.engine.dialect.create_xid() 

959 return TwoPhaseTransaction(self, xid) 

960 

961 def recover_twophase(self): 

962 return self.engine.dialect.do_recover_twophase(self) 

963 

964 def rollback_prepared(self, xid, recover=False): 

965 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

966 

967 def commit_prepared(self, xid, recover=False): 

968 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

969 

970 def in_transaction(self): 

971 """Return True if a transaction is in progress.""" 

972 if self.__branch_from is not None: 

973 return self.__branch_from.in_transaction() 

974 

975 return self._transaction is not None and self._transaction.is_active 

976 

977 def in_nested_transaction(self): 

978 """Return True if a transaction is in progress.""" 

979 if self.__branch_from is not None: 

980 return self.__branch_from.in_nested_transaction() 

981 

982 return ( 

983 self._nested_transaction is not None 

984 and self._nested_transaction.is_active 

985 ) 

986 

987 def _is_autocommit_isolation(self): 

988 opt_iso = self._execution_options.get("isolation_level", None) 

989 return bool( 

990 opt_iso == "AUTOCOMMIT" 

991 or ( 

992 opt_iso is None 

993 and getattr(self.engine.dialect, "isolation_level", None) 

994 == "AUTOCOMMIT" 

995 ) 

996 ) 

997 

998 def get_transaction(self): 

999 """Return the current root transaction in progress, if any. 

1000 

1001 .. versionadded:: 1.4 

1002 

1003 """ 

1004 

1005 if self.__branch_from is not None: 

1006 return self.__branch_from.get_transaction() 

1007 

1008 return self._transaction 

1009 

1010 def get_nested_transaction(self): 

1011 """Return the current nested transaction in progress, if any. 

1012 

1013 .. versionadded:: 1.4 

1014 

1015 """ 

1016 if self.__branch_from is not None: 

1017 

1018 return self.__branch_from.get_nested_transaction() 

1019 

1020 return self._nested_transaction 

1021 

1022 def _begin_impl(self, transaction): 

1023 assert not self.__branch_from 

1024 

1025 if self._echo: 

1026 if self._is_autocommit_isolation(): 

1027 self._log_info( 

1028 "BEGIN (implicit; DBAPI should not BEGIN due to " 

1029 "autocommit mode)" 

1030 ) 

1031 else: 

1032 self._log_info("BEGIN (implicit)") 

1033 

1034 self.__in_begin = True 

1035 

1036 if self._has_events or self.engine._has_events: 

1037 self.dispatch.begin(self) 

1038 

1039 try: 

1040 self.engine.dialect.do_begin(self.connection) 

1041 except BaseException as e: 

1042 self._handle_dbapi_exception(e, None, None, None, None) 

1043 finally: 

1044 self.__in_begin = False 

1045 

1046 def _rollback_impl(self): 

1047 assert not self.__branch_from 

1048 

1049 if self._has_events or self.engine._has_events: 

1050 self.dispatch.rollback(self) 

1051 

1052 if self._still_open_and_dbapi_connection_is_valid: 

1053 if self._echo: 

1054 if self._is_autocommit_isolation(): 

1055 self._log_info( 

1056 "ROLLBACK using DBAPI connection.rollback(), " 

1057 "DBAPI should ignore due to autocommit mode" 

1058 ) 

1059 else: 

1060 self._log_info("ROLLBACK") 

1061 try: 

1062 self.engine.dialect.do_rollback(self.connection) 

1063 except BaseException as e: 

1064 self._handle_dbapi_exception(e, None, None, None, None) 

1065 

1066 def _commit_impl(self, autocommit=False): 

1067 assert not self.__branch_from 

1068 

1069 # AUTOCOMMIT isolation-level is a dialect-specific concept, however 

1070 # if a connection has this set as the isolation level, we can skip 

1071 # the "autocommit" warning as the operation will do "autocommit" 

1072 # in any case 

1073 if autocommit and not self._is_autocommit_isolation(): 

1074 util.warn_deprecated_20( 

1075 "The current statement is being autocommitted using " 

1076 "implicit autocommit, which will be removed in " 

1077 "SQLAlchemy 2.0. " 

1078 "Use the .begin() method of Engine or Connection in order to " 

1079 "use an explicit transaction for DML and DDL statements." 

1080 ) 

1081 

1082 if self._has_events or self.engine._has_events: 

1083 self.dispatch.commit(self) 

1084 

1085 if self._echo: 

1086 if self._is_autocommit_isolation(): 

1087 self._log_info( 

1088 "COMMIT using DBAPI connection.commit(), " 

1089 "DBAPI should ignore due to autocommit mode" 

1090 ) 

1091 else: 

1092 self._log_info("COMMIT") 

1093 try: 

1094 self.engine.dialect.do_commit(self.connection) 

1095 except BaseException as e: 

1096 self._handle_dbapi_exception(e, None, None, None, None) 

1097 

1098 def _savepoint_impl(self, name=None): 

1099 assert not self.__branch_from 

1100 

1101 if self._has_events or self.engine._has_events: 

1102 self.dispatch.savepoint(self, name) 

1103 

1104 if name is None: 

1105 self.__savepoint_seq += 1 

1106 name = "sa_savepoint_%s" % self.__savepoint_seq 

1107 if self._still_open_and_dbapi_connection_is_valid: 

1108 self.engine.dialect.do_savepoint(self, name) 

1109 return name 

1110 

1111 def _rollback_to_savepoint_impl(self, name): 

1112 assert not self.__branch_from 

1113 

1114 if self._has_events or self.engine._has_events: 

1115 self.dispatch.rollback_savepoint(self, name, None) 

1116 

1117 if self._still_open_and_dbapi_connection_is_valid: 

1118 self.engine.dialect.do_rollback_to_savepoint(self, name) 

1119 

1120 def _release_savepoint_impl(self, name): 

1121 assert not self.__branch_from 

1122 

1123 if self._has_events or self.engine._has_events: 

1124 self.dispatch.release_savepoint(self, name, None) 

1125 

1126 if self._still_open_and_dbapi_connection_is_valid: 

1127 self.engine.dialect.do_release_savepoint(self, name) 

1128 

1129 def _begin_twophase_impl(self, transaction): 

1130 assert not self.__branch_from 

1131 

1132 if self._echo: 

1133 self._log_info("BEGIN TWOPHASE (implicit)") 

1134 if self._has_events or self.engine._has_events: 

1135 self.dispatch.begin_twophase(self, transaction.xid) 

1136 

1137 if self._still_open_and_dbapi_connection_is_valid: 

1138 self.__in_begin = True 

1139 try: 

1140 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

1141 except BaseException as e: 

1142 self._handle_dbapi_exception(e, None, None, None, None) 

1143 finally: 

1144 self.__in_begin = False 

1145 

1146 def _prepare_twophase_impl(self, xid): 

1147 assert not self.__branch_from 

1148 

1149 if self._has_events or self.engine._has_events: 

1150 self.dispatch.prepare_twophase(self, xid) 

1151 

1152 if self._still_open_and_dbapi_connection_is_valid: 

1153 assert isinstance(self._transaction, TwoPhaseTransaction) 

1154 try: 

1155 self.engine.dialect.do_prepare_twophase(self, xid) 

1156 except BaseException as e: 

1157 self._handle_dbapi_exception(e, None, None, None, None) 

1158 

1159 def _rollback_twophase_impl(self, xid, is_prepared): 

1160 assert not self.__branch_from 

1161 

1162 if self._has_events or self.engine._has_events: 

1163 self.dispatch.rollback_twophase(self, xid, is_prepared) 

1164 

1165 if self._still_open_and_dbapi_connection_is_valid: 

1166 assert isinstance(self._transaction, TwoPhaseTransaction) 

1167 try: 

1168 self.engine.dialect.do_rollback_twophase( 

1169 self, xid, is_prepared 

1170 ) 

1171 except BaseException as e: 

1172 self._handle_dbapi_exception(e, None, None, None, None) 

1173 

1174 def _commit_twophase_impl(self, xid, is_prepared): 

1175 assert not self.__branch_from 

1176 

1177 if self._has_events or self.engine._has_events: 

1178 self.dispatch.commit_twophase(self, xid, is_prepared) 

1179 

1180 if self._still_open_and_dbapi_connection_is_valid: 

1181 assert isinstance(self._transaction, TwoPhaseTransaction) 

1182 try: 

1183 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

1184 except BaseException as e: 

1185 self._handle_dbapi_exception(e, None, None, None, None) 

1186 

1187 def _autorollback(self): 

1188 if self.__branch_from: 

1189 self.__branch_from._autorollback() 

1190 

1191 if not self.in_transaction(): 

1192 self._rollback_impl() 

1193 

1194 def _warn_for_legacy_exec_format(self): 

1195 util.warn_deprecated_20( 

1196 "The connection.execute() method in " 

1197 "SQLAlchemy 2.0 will accept parameters as a single " 

1198 "dictionary or a " 

1199 "single sequence of dictionaries only. " 

1200 "Parameters passed as keyword arguments, tuples or positionally " 

1201 "oriented dictionaries and/or tuples " 

1202 "will no longer be accepted." 

1203 ) 

1204 

1205 def close(self): 

1206 """Close this :class:`_engine.Connection`. 

1207 

1208 This results in a release of the underlying database 

1209 resources, that is, the DBAPI connection referenced 

1210 internally. The DBAPI connection is typically restored 

1211 back to the connection-holding :class:`_pool.Pool` referenced 

1212 by the :class:`_engine.Engine` that produced this 

1213 :class:`_engine.Connection`. Any transactional state present on 

1214 the DBAPI connection is also unconditionally released via 

1215 the DBAPI connection's ``rollback()`` method, regardless 

1216 of any :class:`.Transaction` object that may be 

1217 outstanding with regards to this :class:`_engine.Connection`. 

1218 

1219 After :meth:`_engine.Connection.close` is called, the 

1220 :class:`_engine.Connection` is permanently in a closed state, 

1221 and will allow no further operations. 

1222 

1223 """ 

1224 

1225 if self.__branch_from: 

1226 assert not self._is_future 

1227 util.warn_deprecated_20( 

1228 "The .close() method on a so-called 'branched' connection is " 

1229 "deprecated as of 1.4, as are 'branched' connections overall, " 

1230 "and will be removed in a future release. If this is a " 

1231 "default-handling function, don't close the connection." 

1232 ) 

1233 self._dbapi_connection = None 

1234 self.__can_reconnect = False 

1235 return 

1236 

1237 if self._transaction: 

1238 self._transaction.close() 

1239 skip_reset = True 

1240 else: 

1241 skip_reset = False 

1242 

1243 if self._dbapi_connection is not None: 

1244 conn = self._dbapi_connection 

1245 

1246 # as we just closed the transaction, close the connection 

1247 # pool connection without doing an additional reset 

1248 if skip_reset: 

1249 conn._close_special(transaction_reset=True) 

1250 else: 

1251 conn.close() 

1252 

1253 # There is a slight chance that conn.close() may have 

1254 # triggered an invalidation here in which case 

1255 # _dbapi_connection would already be None, however usually 

1256 # it will be non-None here and in a "closed" state. 

1257 self._dbapi_connection = None 

1258 self.__can_reconnect = False 

1259 

1260 def scalar(self, object_, *multiparams, **params): 

1261 """Executes and returns the first column of the first row. 

1262 

1263 The underlying result/cursor is closed after execution. 

1264 

1265 """ 

1266 

1267 return self.execute(object_, *multiparams, **params).scalar() 

1268 

1269 def scalars(self, object_, *multiparams, **params): 

1270 """Executes and returns a scalar result set, which yields scalar values 

1271 from the first column of each row. 

1272 

1273 This method is equivalent to calling :meth:`_engine.Connection.execute` 

1274 to receive a :class:`_result.Result` object, then invoking the 

1275 :meth:`_result.Result.scalars` method to produce a 

1276 :class:`_result.ScalarResult` instance. 

1277 

1278 :return: a :class:`_result.ScalarResult` 

1279 

1280 .. versionadded:: 1.4.24 

1281 

1282 """ 

1283 

1284 return self.execute(object_, *multiparams, **params).scalars() 

1285 

1286 def execute(self, statement, *multiparams, **params): 

1287 r"""Executes a SQL statement construct and returns a 

1288 :class:`_engine.CursorResult`. 

1289 

1290 :param statement: The statement to be executed. May be 

1291 one of: 

1292 

1293 * a plain string (deprecated) 

1294 * any :class:`_expression.ClauseElement` construct that is also 

1295 a subclass of :class:`.Executable`, such as a 

1296 :func:`_expression.select` construct 

1297 * a :class:`.FunctionElement`, such as that generated 

1298 by :data:`.func`, will be automatically wrapped in 

1299 a SELECT statement, which is then executed. 

1300 * a :class:`.DDLElement` object 

1301 * a :class:`.DefaultGenerator` object 

1302 * a :class:`.Compiled` object 

1303 

1304 .. deprecated:: 2.0 passing a string to 

1305 :meth:`_engine.Connection.execute` is 

1306 deprecated and will be removed in version 2.0. Use the 

1307 :func:`_expression.text` construct with 

1308 :meth:`_engine.Connection.execute`, or the 

1309 :meth:`_engine.Connection.exec_driver_sql` 

1310 method to invoke a driver-level 

1311 SQL string. 

1312 

1313 :param \*multiparams/\**params: represent bound parameter 

1314 values to be used in the execution. Typically, 

1315 the format is either a collection of one or more 

1316 dictionaries passed to \*multiparams:: 

1317 

1318 conn.execute( 

1319 table.insert(), 

1320 {"id":1, "value":"v1"}, 

1321 {"id":2, "value":"v2"} 

1322 ) 

1323 

1324 ...or individual key/values interpreted by \**params:: 

1325 

1326 conn.execute( 

1327 table.insert(), id=1, value="v1" 

1328 ) 

1329 

1330 In the case that a plain SQL string is passed, and the underlying 

1331 DBAPI accepts positional bind parameters, a collection of tuples 

1332 or individual values in \*multiparams may be passed:: 

1333 

1334 conn.execute( 

1335 "INSERT INTO table (id, value) VALUES (?, ?)", 

1336 (1, "v1"), (2, "v2") 

1337 ) 

1338 

1339 conn.execute( 

1340 "INSERT INTO table (id, value) VALUES (?, ?)", 

1341 1, "v1" 

1342 ) 

1343 

1344 Note above, the usage of a question mark "?" or other 

1345 symbol is contingent upon the "paramstyle" accepted by the DBAPI 

1346 in use, which may be any of "qmark", "named", "pyformat", "format", 

1347 "numeric". See `pep-249 

1348 <https://www.python.org/dev/peps/pep-0249/>`_ for details on 

1349 paramstyle. 

1350 

1351 To execute a textual SQL statement which uses bound parameters in a 

1352 DBAPI-agnostic way, use the :func:`_expression.text` construct. 

1353 

1354 .. deprecated:: 2.0 use of tuple or scalar positional parameters 

1355 is deprecated. All params should be dicts or sequences of dicts. 

1356 Use :meth:`.exec_driver_sql` to execute a plain string with 

1357 tuple or scalar positional parameters. 

1358 

1359 """ 

1360 

1361 if isinstance(statement, util.string_types): 

1362 util.warn_deprecated_20( 

1363 "Passing a string to Connection.execute() is " 

1364 "deprecated and will be removed in version 2.0. Use the " 

1365 "text() construct, " 

1366 "or the Connection.exec_driver_sql() method to invoke a " 

1367 "driver-level SQL string." 

1368 ) 

1369 

1370 return self._exec_driver_sql( 

1371 statement, 

1372 multiparams, 

1373 params, 

1374 _EMPTY_EXECUTION_OPTS, 

1375 future=False, 

1376 ) 

1377 

1378 try: 

1379 meth = statement._execute_on_connection 

1380 except AttributeError as err: 

1381 util.raise_( 

1382 exc.ObjectNotExecutableError(statement), replace_context=err 

1383 ) 

1384 else: 

1385 return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS) 

1386 

1387 def _execute_function(self, func, multiparams, params, execution_options): 

1388 """Execute a sql.FunctionElement object.""" 

1389 

1390 return self._execute_clauseelement( 

1391 func.select(), multiparams, params, execution_options 

1392 ) 

1393 

1394 def _execute_default( 

1395 self, 

1396 default, 

1397 multiparams, 

1398 params, 

1399 # migrate is calling this directly :( 

1400 execution_options=_EMPTY_EXECUTION_OPTS, 

1401 ): 

1402 """Execute a schema.ColumnDefault object.""" 

1403 

1404 execution_options = self._execution_options.merge_with( 

1405 execution_options 

1406 ) 

1407 

1408 distilled_parameters = _distill_params(self, multiparams, params) 

1409 

1410 if self._has_events or self.engine._has_events: 

1411 ( 

1412 default, 

1413 distilled_params, 

1414 event_multiparams, 

1415 event_params, 

1416 ) = self._invoke_before_exec_event( 

1417 default, distilled_parameters, execution_options 

1418 ) 

1419 

1420 try: 

1421 conn = self._dbapi_connection 

1422 if conn is None: 

1423 conn = self._revalidate_connection() 

1424 

1425 dialect = self.dialect 

1426 ctx = dialect.execution_ctx_cls._init_default( 

1427 dialect, self, conn, execution_options 

1428 ) 

1429 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1430 raise 

1431 except BaseException as e: 

1432 self._handle_dbapi_exception(e, None, None, None, None) 

1433 

1434 ret = ctx._exec_default(None, default, None) 

1435 if self.should_close_with_result: 

1436 self.close() 

1437 

1438 if self._has_events or self.engine._has_events: 

1439 self.dispatch.after_execute( 

1440 self, 

1441 default, 

1442 event_multiparams, 

1443 event_params, 

1444 execution_options, 

1445 ret, 

1446 ) 

1447 

1448 return ret 

1449 

1450 def _execute_ddl(self, ddl, multiparams, params, execution_options): 

1451 """Execute a schema.DDL object.""" 

1452 

1453 execution_options = ddl._execution_options.merge_with( 

1454 self._execution_options, execution_options 

1455 ) 

1456 

1457 distilled_parameters = _distill_params(self, multiparams, params) 

1458 

1459 if self._has_events or self.engine._has_events: 

1460 ( 

1461 ddl, 

1462 distilled_params, 

1463 event_multiparams, 

1464 event_params, 

1465 ) = self._invoke_before_exec_event( 

1466 ddl, distilled_parameters, execution_options 

1467 ) 

1468 

1469 exec_opts = self._execution_options.merge_with(execution_options) 

1470 schema_translate_map = exec_opts.get("schema_translate_map", None) 

1471 

1472 dialect = self.dialect 

1473 

1474 compiled = ddl.compile( 

1475 dialect=dialect, schema_translate_map=schema_translate_map 

1476 ) 

1477 ret = self._execute_context( 

1478 dialect, 

1479 dialect.execution_ctx_cls._init_ddl, 

1480 compiled, 

1481 None, 

1482 execution_options, 

1483 compiled, 

1484 ) 

1485 if self._has_events or self.engine._has_events: 

1486 self.dispatch.after_execute( 

1487 self, 

1488 ddl, 

1489 event_multiparams, 

1490 event_params, 

1491 execution_options, 

1492 ret, 

1493 ) 

1494 return ret 

1495 

1496 def _invoke_before_exec_event( 

1497 self, elem, distilled_params, execution_options 

1498 ): 

1499 

1500 if len(distilled_params) == 1: 

1501 event_multiparams, event_params = [], distilled_params[0] 

1502 else: 

1503 event_multiparams, event_params = distilled_params, {} 

1504 

1505 for fn in self.dispatch.before_execute: 

1506 elem, event_multiparams, event_params = fn( 

1507 self, 

1508 elem, 

1509 event_multiparams, 

1510 event_params, 

1511 execution_options, 

1512 ) 

1513 

1514 if event_multiparams: 

1515 distilled_params = list(event_multiparams) 

1516 if event_params: 

1517 raise exc.InvalidRequestError( 

1518 "Event handler can't return non-empty multiparams " 

1519 "and params at the same time" 

1520 ) 

1521 elif event_params: 

1522 distilled_params = [event_params] 

1523 else: 

1524 distilled_params = [] 

1525 

1526 return elem, distilled_params, event_multiparams, event_params 

1527 

1528 def _execute_clauseelement( 

1529 self, elem, multiparams, params, execution_options 

1530 ): 

1531 """Execute a sql.ClauseElement object.""" 

1532 

1533 execution_options = elem._execution_options.merge_with( 

1534 self._execution_options, execution_options 

1535 ) 

1536 

1537 distilled_params = _distill_params(self, multiparams, params) 

1538 

1539 has_events = self._has_events or self.engine._has_events 

1540 if has_events: 

1541 ( 

1542 elem, 

1543 distilled_params, 

1544 event_multiparams, 

1545 event_params, 

1546 ) = self._invoke_before_exec_event( 

1547 elem, distilled_params, execution_options 

1548 ) 

1549 

1550 if distilled_params: 

1551 # ensure we don't retain a link to the view object for keys() 

1552 # which links to the values, which we don't want to cache 

1553 keys = sorted(distilled_params[0]) 

1554 for_executemany = len(distilled_params) > 1 

1555 else: 

1556 keys = [] 

1557 for_executemany = False 

1558 

1559 dialect = self.dialect 

1560 

1561 schema_translate_map = execution_options.get( 

1562 "schema_translate_map", None 

1563 ) 

1564 

1565 compiled_cache = execution_options.get( 

1566 "compiled_cache", self.engine._compiled_cache 

1567 ) 

1568 

1569 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 

1570 dialect=dialect, 

1571 compiled_cache=compiled_cache, 

1572 column_keys=keys, 

1573 for_executemany=for_executemany, 

1574 schema_translate_map=schema_translate_map, 

1575 linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 

1576 ) 

1577 ret = self._execute_context( 

1578 dialect, 

1579 dialect.execution_ctx_cls._init_compiled, 

1580 compiled_sql, 

1581 distilled_params, 

1582 execution_options, 

1583 compiled_sql, 

1584 distilled_params, 

1585 elem, 

1586 extracted_params, 

1587 cache_hit=cache_hit, 

1588 ) 

1589 if has_events: 

1590 self.dispatch.after_execute( 

1591 self, 

1592 elem, 

1593 event_multiparams, 

1594 event_params, 

1595 execution_options, 

1596 ret, 

1597 ) 

1598 return ret 

1599 

1600 def _execute_compiled( 

1601 self, 

1602 compiled, 

1603 multiparams, 

1604 params, 

1605 execution_options=_EMPTY_EXECUTION_OPTS, 

1606 ): 

1607 """Execute a sql.Compiled object. 

1608 

1609 TODO: why do we have this? likely deprecate or remove 

1610 

1611 """ 

1612 

1613 execution_options = compiled.execution_options.merge_with( 

1614 self._execution_options, execution_options 

1615 ) 

1616 distilled_parameters = _distill_params(self, multiparams, params) 

1617 

1618 if self._has_events or self.engine._has_events: 

1619 ( 

1620 compiled, 

1621 distilled_params, 

1622 event_multiparams, 

1623 event_params, 

1624 ) = self._invoke_before_exec_event( 

1625 compiled, distilled_parameters, execution_options 

1626 ) 

1627 

1628 dialect = self.dialect 

1629 

1630 ret = self._execute_context( 

1631 dialect, 

1632 dialect.execution_ctx_cls._init_compiled, 

1633 compiled, 

1634 distilled_parameters, 

1635 execution_options, 

1636 compiled, 

1637 distilled_parameters, 

1638 None, 

1639 None, 

1640 ) 

1641 if self._has_events or self.engine._has_events: 

1642 self.dispatch.after_execute( 

1643 self, 

1644 compiled, 

1645 event_multiparams, 

1646 event_params, 

1647 execution_options, 

1648 ret, 

1649 ) 

1650 return ret 

1651 

1652 def _exec_driver_sql( 

1653 self, statement, multiparams, params, execution_options, future 

1654 ): 

1655 

1656 execution_options = self._execution_options.merge_with( 

1657 execution_options 

1658 ) 

1659 

1660 distilled_parameters = _distill_params(self, multiparams, params) 

1661 

1662 if not future: 

1663 if self._has_events or self.engine._has_events: 

1664 ( 

1665 statement, 

1666 distilled_params, 

1667 event_multiparams, 

1668 event_params, 

1669 ) = self._invoke_before_exec_event( 

1670 statement, distilled_parameters, execution_options 

1671 ) 

1672 

1673 dialect = self.dialect 

1674 ret = self._execute_context( 

1675 dialect, 

1676 dialect.execution_ctx_cls._init_statement, 

1677 statement, 

1678 distilled_parameters, 

1679 execution_options, 

1680 statement, 

1681 distilled_parameters, 

1682 ) 

1683 

1684 if not future: 

1685 if self._has_events or self.engine._has_events: 

1686 self.dispatch.after_execute( 

1687 self, 

1688 statement, 

1689 event_multiparams, 

1690 event_params, 

1691 execution_options, 

1692 ret, 

1693 ) 

1694 return ret 

1695 

1696 def _execute_20( 

1697 self, 

1698 statement, 

1699 parameters=None, 

1700 execution_options=_EMPTY_EXECUTION_OPTS, 

1701 ): 

1702 args_10style, kwargs_10style = _distill_params_20(parameters) 

1703 try: 

1704 meth = statement._execute_on_connection 

1705 except AttributeError as err: 

1706 util.raise_( 

1707 exc.ObjectNotExecutableError(statement), replace_context=err 

1708 ) 

1709 else: 

1710 return meth(self, args_10style, kwargs_10style, execution_options) 

1711 

1712 def exec_driver_sql( 

1713 self, statement, parameters=None, execution_options=None 

1714 ): 

1715 r"""Executes a string SQL statement on the DBAPI cursor directly, 

1716 without any SQL compilation steps. 

1717 

1718 This can be used to pass any string directly to the 

1719 ``cursor.execute()`` method of the DBAPI in use. 

1720 

1721 :param statement: The statement str to be executed. Bound parameters 

1722 must use the underlying DBAPI's paramstyle, such as "qmark", 

1723 "pyformat", "format", etc. 

1724 

1725 :param parameters: represent bound parameter values to be used in the 

1726 execution. The format is one of: a dictionary of named parameters, 

1727 a tuple of positional parameters, or a list containing either 

1728 dictionaries or tuples for multiple-execute support. 

1729 

1730 :return: a :class:`_engine.CursorResult`. 

1731 

1732 E.g. multiple dictionaries:: 

1733 

1734 

1735 conn.exec_driver_sql( 

1736 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1737 [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}] 

1738 ) 

1739 

1740 Single dictionary:: 

1741 

1742 conn.exec_driver_sql( 

1743 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 

1744 dict(id=1, value="v1") 

1745 ) 

1746 

1747 Single tuple:: 

1748 

1749 conn.exec_driver_sql( 

1750 "INSERT INTO table (id, value) VALUES (?, ?)", 

1751 (1, 'v1') 

1752 ) 

1753 

1754 .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 

1755 not participate in the 

1756 :meth:`_events.ConnectionEvents.before_execute` and 

1757 :meth:`_events.ConnectionEvents.after_execute` events. To 

1758 intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 

1759 :meth:`_events.ConnectionEvents.before_cursor_execute` and 

1760 :meth:`_events.ConnectionEvents.after_cursor_execute`. 

1761 

1762 .. seealso:: 

1763 

1764 :pep:`249` 

1765 

1766 """ 

1767 

1768 args_10style, kwargs_10style = _distill_params_20(parameters) 

1769 

1770 return self._exec_driver_sql( 

1771 statement, 

1772 args_10style, 

1773 kwargs_10style, 

1774 execution_options, 

1775 future=True, 

1776 ) 

1777 

1778 def _execute_context( 

1779 self, 

1780 dialect, 

1781 constructor, 

1782 statement, 

1783 parameters, 

1784 execution_options, 

1785 *args, 

1786 **kw 

1787 ): 

1788 """Create an :class:`.ExecutionContext` and execute, returning 

1789 a :class:`_engine.CursorResult`.""" 

1790 

1791 branched = self 

1792 if self.__branch_from: 

1793 # if this is a "branched" connection, do everything in terms 

1794 # of the "root" connection, *except* for .close(), which is 

1795 # the only feature that branching provides 

1796 self = self.__branch_from 

1797 

1798 if execution_options: 

1799 yp = execution_options.get("yield_per", None) 

1800 if yp: 

1801 execution_options = execution_options.union( 

1802 {"stream_results": True, "max_row_buffer": yp} 

1803 ) 

1804 

1805 try: 

1806 conn = self._dbapi_connection 

1807 if conn is None: 

1808 conn = self._revalidate_connection() 

1809 

1810 context = constructor( 

1811 dialect, self, conn, execution_options, *args, **kw 

1812 ) 

1813 except (exc.PendingRollbackError, exc.ResourceClosedError): 

1814 raise 

1815 except BaseException as e: 

1816 self._handle_dbapi_exception( 

1817 e, util.text_type(statement), parameters, None, None 

1818 ) 

1819 

1820 if ( 

1821 self._transaction 

1822 and not self._transaction.is_active 

1823 or ( 

1824 self._nested_transaction 

1825 and not self._nested_transaction.is_active 

1826 ) 

1827 ): 

1828 self._invalid_transaction() 

1829 

1830 elif self._trans_context_manager: 

1831 TransactionalContext._trans_ctx_check(self) 

1832 

1833 if self._is_future and self._transaction is None: 

1834 self._autobegin() 

1835 

1836 context.pre_exec() 

1837 

1838 if dialect.use_setinputsizes: 

1839 context._set_input_sizes() 

1840 

1841 cursor, statement, parameters = ( 

1842 context.cursor, 

1843 context.statement, 

1844 context.parameters, 

1845 ) 

1846 

1847 if not context.executemany: 

1848 parameters = parameters[0] 

1849 

1850 if self._has_events or self.engine._has_events: 

1851 for fn in self.dispatch.before_cursor_execute: 

1852 statement, parameters = fn( 

1853 self, 

1854 cursor, 

1855 statement, 

1856 parameters, 

1857 context, 

1858 context.executemany, 

1859 ) 

1860 

1861 if self._echo: 

1862 

1863 self._log_info(statement) 

1864 

1865 stats = context._get_cache_stats() 

1866 

1867 if not self.engine.hide_parameters: 

1868 self._log_info( 

1869 "[%s] %r", 

1870 stats, 

1871 sql_util._repr_params( 

1872 parameters, batches=10, ismulti=context.executemany 

1873 ), 

1874 ) 

1875 else: 

1876 self._log_info( 

1877 "[%s] [SQL parameters hidden due to hide_parameters=True]" 

1878 % (stats,) 

1879 ) 

1880 

1881 evt_handled = False 

1882 try: 

1883 if context.executemany: 

1884 if self.dialect._has_events: 

1885 for fn in self.dialect.dispatch.do_executemany: 

1886 if fn(cursor, statement, parameters, context): 

1887 evt_handled = True 

1888 break 

1889 if not evt_handled: 

1890 self.dialect.do_executemany( 

1891 cursor, statement, parameters, context 

1892 ) 

1893 elif not parameters and context.no_parameters: 

1894 if self.dialect._has_events: 

1895 for fn in self.dialect.dispatch.do_execute_no_params: 

1896 if fn(cursor, statement, context): 

1897 evt_handled = True 

1898 break 

1899 if not evt_handled: 

1900 self.dialect.do_execute_no_params( 

1901 cursor, statement, context 

1902 ) 

1903 else: 

1904 if self.dialect._has_events: 

1905 for fn in self.dialect.dispatch.do_execute: 

1906 if fn(cursor, statement, parameters, context): 

1907 evt_handled = True 

1908 break 

1909 if not evt_handled: 

1910 self.dialect.do_execute( 

1911 cursor, statement, parameters, context 

1912 ) 

1913 

1914 if self._has_events or self.engine._has_events: 

1915 self.dispatch.after_cursor_execute( 

1916 self, 

1917 cursor, 

1918 statement, 

1919 parameters, 

1920 context, 

1921 context.executemany, 

1922 ) 

1923 

1924 context.post_exec() 

1925 

1926 result = context._setup_result_proxy() 

1927 

1928 if not self._is_future: 

1929 should_close_with_result = branched.should_close_with_result 

1930 

1931 if not result._soft_closed and should_close_with_result: 

1932 result._autoclose_connection = True 

1933 

1934 if ( 

1935 # usually we're in a transaction so avoid relatively 

1936 # expensive / legacy should_autocommit call 

1937 self._transaction is None 

1938 and context.should_autocommit 

1939 ): 

1940 self._commit_impl(autocommit=True) 

1941 

1942 # for "connectionless" execution, we have to close this 

1943 # Connection after the statement is complete. 

1944 # legacy stuff. 

1945 if should_close_with_result and context._soft_closed: 

1946 assert not self._is_future 

1947 

1948 # CursorResult already exhausted rows / has no rows. 

1949 # close us now 

1950 branched.close() 

1951 

1952 except BaseException as e: 

1953 self._handle_dbapi_exception( 

1954 e, statement, parameters, cursor, context 

1955 ) 

1956 

1957 return result 

1958 

1959 def _cursor_execute(self, cursor, statement, parameters, context=None): 

1960 """Execute a statement + params on the given cursor. 

1961 

1962 Adds appropriate logging and exception handling. 

1963 

1964 This method is used by DefaultDialect for special-case 

1965 executions, such as for sequences and column defaults. 

1966 The path of statement execution in the majority of cases 

1967 terminates at _execute_context(). 

1968 

1969 """ 

1970 if self._has_events or self.engine._has_events: 

1971 for fn in self.dispatch.before_cursor_execute: 

1972 statement, parameters = fn( 

1973 self, cursor, statement, parameters, context, False 

1974 ) 

1975 

1976 if self._echo: 

1977 self._log_info(statement) 

1978 self._log_info("[raw sql] %r", parameters) 

1979 try: 

1980 for fn in ( 

1981 () 

1982 if not self.dialect._has_events 

1983 else self.dialect.dispatch.do_execute 

1984 ): 

1985 if fn(cursor, statement, parameters, context): 

1986 break 

1987 else: 

1988 self.dialect.do_execute(cursor, statement, parameters, context) 

1989 except BaseException as e: 

1990 self._handle_dbapi_exception( 

1991 e, statement, parameters, cursor, context 

1992 ) 

1993 

1994 if self._has_events or self.engine._has_events: 

1995 self.dispatch.after_cursor_execute( 

1996 self, cursor, statement, parameters, context, False 

1997 ) 

1998 

1999 def _safe_close_cursor(self, cursor): 

2000 """Close the given cursor, catching exceptions 

2001 and turning into log warnings. 

2002 

2003 """ 

2004 try: 

2005 cursor.close() 

2006 except Exception: 

2007 # log the error through the connection pool's logger. 

2008 self.engine.pool.logger.error( 

2009 "Error closing cursor", exc_info=True 

2010 ) 

2011 

2012 _reentrant_error = False 

2013 _is_disconnect = False 

2014 

2015 def _handle_dbapi_exception( 

2016 self, e, statement, parameters, cursor, context 

2017 ): 

2018 exc_info = sys.exc_info() 

2019 

2020 is_exit_exception = util.is_exit_exception(e) 

2021 

2022 if not self._is_disconnect: 

2023 self._is_disconnect = ( 

2024 isinstance(e, self.dialect.dbapi.Error) 

2025 and not self.closed 

2026 and self.dialect.is_disconnect( 

2027 e, 

2028 self._dbapi_connection if not self.invalidated else None, 

2029 cursor, 

2030 ) 

2031 ) or (is_exit_exception and not self.closed) 

2032 

2033 invalidate_pool_on_disconnect = not is_exit_exception 

2034 

2035 if self._reentrant_error: 

2036 util.raise_( 

2037 exc.DBAPIError.instance( 

2038 statement, 

2039 parameters, 

2040 e, 

2041 self.dialect.dbapi.Error, 

2042 hide_parameters=self.engine.hide_parameters, 

2043 dialect=self.dialect, 

2044 ismulti=context.executemany 

2045 if context is not None 

2046 else None, 

2047 ), 

2048 with_traceback=exc_info[2], 

2049 from_=e, 

2050 ) 

2051 self._reentrant_error = True 

2052 try: 

2053 # non-DBAPI error - if we already got a context, 

2054 # or there's no string statement, don't wrap it 

2055 should_wrap = isinstance(e, self.dialect.dbapi.Error) or ( 

2056 statement is not None 

2057 and context is None 

2058 and not is_exit_exception 

2059 ) 

2060 

2061 if should_wrap: 

2062 sqlalchemy_exception = exc.DBAPIError.instance( 

2063 statement, 

2064 parameters, 

2065 e, 

2066 self.dialect.dbapi.Error, 

2067 hide_parameters=self.engine.hide_parameters, 

2068 connection_invalidated=self._is_disconnect, 

2069 dialect=self.dialect, 

2070 ismulti=context.executemany 

2071 if context is not None 

2072 else None, 

2073 ) 

2074 else: 

2075 sqlalchemy_exception = None 

2076 

2077 newraise = None 

2078 

2079 if ( 

2080 self._has_events or self.engine._has_events 

2081 ) and not self._execution_options.get( 

2082 "skip_user_error_events", False 

2083 ): 

2084 ctx = ExceptionContextImpl( 

2085 e, 

2086 sqlalchemy_exception, 

2087 self.engine, 

2088 self, 

2089 cursor, 

2090 statement, 

2091 parameters, 

2092 context, 

2093 self._is_disconnect, 

2094 invalidate_pool_on_disconnect, 

2095 ) 

2096 

2097 for fn in self.dispatch.handle_error: 

2098 try: 

2099 # handler returns an exception; 

2100 # call next handler in a chain 

2101 per_fn = fn(ctx) 

2102 if per_fn is not None: 

2103 ctx.chained_exception = newraise = per_fn 

2104 except Exception as _raised: 

2105 # handler raises an exception - stop processing 

2106 newraise = _raised 

2107 break 

2108 

2109 if self._is_disconnect != ctx.is_disconnect: 

2110 self._is_disconnect = ctx.is_disconnect 

2111 if sqlalchemy_exception: 

2112 sqlalchemy_exception.connection_invalidated = ( 

2113 ctx.is_disconnect 

2114 ) 

2115 

2116 # set up potentially user-defined value for 

2117 # invalidate pool. 

2118 invalidate_pool_on_disconnect = ( 

2119 ctx.invalidate_pool_on_disconnect 

2120 ) 

2121 

2122 if should_wrap and context: 

2123 context.handle_dbapi_exception(e) 

2124 

2125 if not self._is_disconnect: 

2126 if cursor: 

2127 self._safe_close_cursor(cursor) 

2128 with util.safe_reraise(warn_only=True): 

2129 self._autorollback() 

2130 

2131 if newraise: 

2132 util.raise_(newraise, with_traceback=exc_info[2], from_=e) 

2133 elif should_wrap: 

2134 util.raise_( 

2135 sqlalchemy_exception, with_traceback=exc_info[2], from_=e 

2136 ) 

2137 else: 

2138 util.raise_(exc_info[1], with_traceback=exc_info[2]) 

2139 

2140 finally: 

2141 del self._reentrant_error 

2142 if self._is_disconnect: 

2143 del self._is_disconnect 

2144 if not self.invalidated: 

2145 dbapi_conn_wrapper = self._dbapi_connection 

2146 if invalidate_pool_on_disconnect: 

2147 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

2148 self.invalidate(e) 

2149 if self.should_close_with_result: 

2150 assert not self._is_future 

2151 self.close() 

2152 

2153 @classmethod 

2154 def _handle_dbapi_exception_noconnection(cls, e, dialect, engine): 

2155 exc_info = sys.exc_info() 

2156 

2157 is_disconnect = dialect.is_disconnect(e, None, None) 

2158 

2159 should_wrap = isinstance(e, dialect.dbapi.Error) 

2160 

2161 if should_wrap: 

2162 sqlalchemy_exception = exc.DBAPIError.instance( 

2163 None, 

2164 None, 

2165 e, 

2166 dialect.dbapi.Error, 

2167 hide_parameters=engine.hide_parameters, 

2168 connection_invalidated=is_disconnect, 

2169 ) 

2170 else: 

2171 sqlalchemy_exception = None 

2172 

2173 newraise = None 

2174 

2175 if engine._has_events: 

2176 ctx = ExceptionContextImpl( 

2177 e, 

2178 sqlalchemy_exception, 

2179 engine, 

2180 None, 

2181 None, 

2182 None, 

2183 None, 

2184 None, 

2185 is_disconnect, 

2186 True, 

2187 ) 

2188 for fn in engine.dispatch.handle_error: 

2189 try: 

2190 # handler returns an exception; 

2191 # call next handler in a chain 

2192 per_fn = fn(ctx) 

2193 if per_fn is not None: 

2194 ctx.chained_exception = newraise = per_fn 

2195 except Exception as _raised: 

2196 # handler raises an exception - stop processing 

2197 newraise = _raised 

2198 break 

2199 

2200 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

2201 sqlalchemy_exception.connection_invalidated = ( 

2202 is_disconnect 

2203 ) = ctx.is_disconnect 

2204 

2205 if newraise: 

2206 util.raise_(newraise, with_traceback=exc_info[2], from_=e) 

2207 elif should_wrap: 

2208 util.raise_( 

2209 sqlalchemy_exception, with_traceback=exc_info[2], from_=e 

2210 ) 

2211 else: 

2212 util.raise_(exc_info[1], with_traceback=exc_info[2]) 

2213 

2214 def _run_ddl_visitor(self, visitorcallable, element, **kwargs): 

2215 """run a DDL visitor. 

2216 

2217 This method is only here so that the MockConnection can change the 

2218 options given to the visitor so that "checkfirst" is skipped. 

2219 

2220 """ 

2221 visitorcallable(self.dialect, self, **kwargs).traverse_single(element) 

2222 

2223 @util.deprecated( 

2224 "1.4", 

2225 "The :meth:`_engine.Connection.transaction` " 

2226 "method is deprecated and will be " 

2227 "removed in a future release. Use the :meth:`_engine.Engine.begin` " 

2228 "context manager instead.", 

2229 ) 

2230 def transaction(self, callable_, *args, **kwargs): 

2231 r"""Execute the given function within a transaction boundary. 

2232 

2233 The function is passed this :class:`_engine.Connection` 

2234 as the first argument, followed by the given \*args and \**kwargs, 

2235 e.g.:: 

2236 

2237 def do_something(conn, x, y): 

2238 conn.execute(text("some statement"), {'x':x, 'y':y}) 

2239 

2240 conn.transaction(do_something, 5, 10) 

2241 

2242 The operations inside the function are all invoked within the 

2243 context of a single :class:`.Transaction`. 

2244 Upon success, the transaction is committed. If an 

2245 exception is raised, the transaction is rolled back 

2246 before propagating the exception. 

2247 

2248 .. note:: 

2249 

2250 The :meth:`.transaction` method is superseded by 

2251 the usage of the Python ``with:`` statement, which can 

2252 be used with :meth:`_engine.Connection.begin`:: 

2253 

2254 with conn.begin(): 

2255 conn.execute(text("some statement"), {'x':5, 'y':10}) 

2256 

2257 As well as with :meth:`_engine.Engine.begin`:: 

2258 

2259 with engine.begin() as conn: 

2260 conn.execute(text("some statement"), {'x':5, 'y':10}) 

2261 

2262 .. seealso:: 

2263 

2264 :meth:`_engine.Engine.begin` - engine-level transactional 

2265 context 

2266 

2267 :meth:`_engine.Engine.transaction` - engine-level version of 

2268 :meth:`_engine.Connection.transaction` 

2269 

2270 """ 

2271 

2272 kwargs["_sa_skip_warning"] = True 

2273 trans = self.begin() 

2274 try: 

2275 ret = self.run_callable(callable_, *args, **kwargs) 

2276 trans.commit() 

2277 return ret 

2278 except: 

2279 with util.safe_reraise(): 

2280 trans.rollback() 

2281 

2282 @util.deprecated( 

2283 "1.4", 

2284 "The :meth:`_engine.Connection.run_callable` " 

2285 "method is deprecated and will " 

2286 "be removed in a future release. Invoke the callable function " 

2287 "directly, passing the Connection.", 

2288 ) 

2289 def run_callable(self, callable_, *args, **kwargs): 

2290 r"""Given a callable object or function, execute it, passing 

2291 a :class:`_engine.Connection` as the first argument. 

2292 

2293 The given \*args and \**kwargs are passed subsequent 

2294 to the :class:`_engine.Connection` argument. 

2295 

2296 This function, along with :meth:`_engine.Engine.run_callable`, 

2297 allows a function to be run with a :class:`_engine.Connection` 

2298 or :class:`_engine.Engine` object without the need to know 

2299 which one is being dealt with. 

2300 

2301 """ 

2302 return callable_(self, *args, **kwargs) 

2303 

2304 

2305class ExceptionContextImpl(ExceptionContext): 

2306 """Implement the :class:`.ExceptionContext` interface.""" 

2307 

2308 def __init__( 

2309 self, 

2310 exception, 

2311 sqlalchemy_exception, 

2312 engine, 

2313 connection, 

2314 cursor, 

2315 statement, 

2316 parameters, 

2317 context, 

2318 is_disconnect, 

2319 invalidate_pool_on_disconnect, 

2320 ): 

2321 self.engine = engine 

2322 self.connection = connection 

2323 self.sqlalchemy_exception = sqlalchemy_exception 

2324 self.original_exception = exception 

2325 self.execution_context = context 

2326 self.statement = statement 

2327 self.parameters = parameters 

2328 self.is_disconnect = is_disconnect 

2329 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

2330 

2331 

2332class Transaction(TransactionalContext): 

2333 """Represent a database transaction in progress. 

2334 

2335 The :class:`.Transaction` object is procured by 

2336 calling the :meth:`_engine.Connection.begin` method of 

2337 :class:`_engine.Connection`:: 

2338 

2339 from sqlalchemy import create_engine 

2340 engine = create_engine("postgresql://scott:tiger@localhost/test") 

2341 connection = engine.connect() 

2342 trans = connection.begin() 

2343 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2344 trans.commit() 

2345 

2346 The object provides :meth:`.rollback` and :meth:`.commit` 

2347 methods in order to control transaction boundaries. It 

2348 also implements a context manager interface so that 

2349 the Python ``with`` statement can be used with the 

2350 :meth:`_engine.Connection.begin` method:: 

2351 

2352 with connection.begin(): 

2353 connection.execute(text("insert into x (a, b) values (1, 2)")) 

2354 

2355 The Transaction object is **not** threadsafe. 

2356 

2357 .. seealso:: 

2358 

2359 :meth:`_engine.Connection.begin` 

2360 

2361 :meth:`_engine.Connection.begin_twophase` 

2362 

2363 :meth:`_engine.Connection.begin_nested` 

2364 

2365 .. index:: 

2366 single: thread safety; Transaction 

2367 """ 

2368 

2369 __slots__ = () 

2370 

2371 _is_root = False 

2372 

2373 def __init__(self, connection): 

2374 raise NotImplementedError() 

2375 

2376 def _do_deactivate(self): 

2377 """do whatever steps are necessary to set this transaction as 

2378 "deactive", however leave this transaction object in place as far 

2379 as the connection's state. 

2380 

2381 for a "real" transaction this should roll back the transaction 

2382 and ensure this transaction is no longer a reset agent. 

2383 

2384 this is used for nesting of marker transactions where the marker 

2385 can set the "real" transaction as rolled back, however it stays 

2386 in place. 

2387 

2388 for 2.0 we hope to remove this nesting feature. 

2389 

2390 """ 

2391 raise NotImplementedError() 

2392 

2393 @property 

2394 def _deactivated_from_connection(self): 

2395 """True if this transaction is totally deactivated from the connection 

2396 and therefore can no longer affect its state. 

2397 

2398 """ 

2399 raise NotImplementedError() 

2400 

2401 def _do_close(self): 

2402 raise NotImplementedError() 

2403 

2404 def _do_rollback(self): 

2405 raise NotImplementedError() 

2406 

2407 def _do_commit(self): 

2408 raise NotImplementedError() 

2409 

2410 @property 

2411 def is_valid(self): 

2412 return self.is_active and not self.connection.invalidated 

2413 

2414 def close(self): 

2415 """Close this :class:`.Transaction`. 

2416 

2417 If this transaction is the base transaction in a begin/commit 

2418 nesting, the transaction will rollback(). Otherwise, the 

2419 method returns. 

2420 

2421 This is used to cancel a Transaction without affecting the scope of 

2422 an enclosing transaction. 

2423 

2424 """ 

2425 try: 

2426 self._do_close() 

2427 finally: 

2428 assert not self.is_active 

2429 

2430 def rollback(self): 

2431 """Roll back this :class:`.Transaction`. 

2432 

2433 The implementation of this may vary based on the type of transaction in 

2434 use: 

2435 

2436 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2437 it corresponds to a ROLLBACK. 

2438 

2439 * For a :class:`.NestedTransaction`, it corresponds to a 

2440 "ROLLBACK TO SAVEPOINT" operation. 

2441 

2442 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2443 phase transactions may be used. 

2444 

2445 

2446 """ 

2447 try: 

2448 self._do_rollback() 

2449 finally: 

2450 assert not self.is_active 

2451 

2452 def commit(self): 

2453 """Commit this :class:`.Transaction`. 

2454 

2455 The implementation of this may vary based on the type of transaction in 

2456 use: 

2457 

2458 * For a simple database transaction (e.g. :class:`.RootTransaction`), 

2459 it corresponds to a COMMIT. 

2460 

2461 * For a :class:`.NestedTransaction`, it corresponds to a 

2462 "RELEASE SAVEPOINT" operation. 

2463 

2464 * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 

2465 phase transactions may be used. 

2466 

2467 """ 

2468 try: 

2469 self._do_commit() 

2470 finally: 

2471 assert not self.is_active 

2472 

2473 def _get_subject(self): 

2474 return self.connection 

2475 

2476 def _transaction_is_active(self): 

2477 return self.is_active 

2478 

2479 def _transaction_is_closed(self): 

2480 return not self._deactivated_from_connection 

2481 

2482 def _rollback_can_be_called(self): 

2483 # for RootTransaction / NestedTransaction, it's safe to call 

2484 # rollback() even if the transaction is deactive and no warnings 

2485 # will be emitted. tested in 

2486 # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 

2487 return True 

2488 

2489 

2490class MarkerTransaction(Transaction): 

2491 """A 'marker' transaction that is used for nested begin() calls. 

2492 

2493 .. deprecated:: 1.4 future connection for 2.0 won't support this pattern. 

2494 

2495 """ 

2496 

2497 __slots__ = ("connection", "_is_active", "_transaction") 

2498 

2499 def __init__(self, connection): 

2500 assert connection._transaction is not None 

2501 if not connection._transaction.is_active: 

2502 raise exc.InvalidRequestError( 

2503 "the current transaction on this connection is inactive. " 

2504 "Please issue a rollback first." 

2505 ) 

2506 

2507 assert not connection._is_future 

2508 util.warn_deprecated_20( 

2509 "Calling .begin() when a transaction is already begun, creating " 

2510 "a 'sub' transaction, is deprecated " 

2511 "and will be removed in 2.0. See the documentation section " 

2512 "'Migrating from the nesting pattern' for background on how " 

2513 "to migrate from this pattern." 

2514 ) 

2515 

2516 self.connection = connection 

2517 

2518 if connection._trans_context_manager: 

2519 TransactionalContext._trans_ctx_check(connection) 

2520 

2521 if connection._nested_transaction is not None: 

2522 self._transaction = connection._nested_transaction 

2523 else: 

2524 self._transaction = connection._transaction 

2525 self._is_active = True 

2526 

2527 @property 

2528 def _deactivated_from_connection(self): 

2529 return not self.is_active 

2530 

2531 @property 

2532 def is_active(self): 

2533 return self._is_active and self._transaction.is_active 

2534 

2535 def _deactivate(self): 

2536 self._is_active = False 

2537 

2538 def _do_close(self): 

2539 # does not actually roll back the root 

2540 self._deactivate() 

2541 

2542 def _do_rollback(self): 

2543 # does roll back the root 

2544 if self._is_active: 

2545 try: 

2546 self._transaction._do_deactivate() 

2547 finally: 

2548 self._deactivate() 

2549 

2550 def _do_commit(self): 

2551 self._deactivate() 

2552 

2553 

2554class RootTransaction(Transaction): 

2555 """Represent the "root" transaction on a :class:`_engine.Connection`. 

2556 

2557 This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 

2558 for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 

2559 is created by calling upon the :meth:`_engine.Connection.begin` method, and 

2560 remains associated with the :class:`_engine.Connection` throughout its 

2561 active span. The current :class:`_engine.RootTransaction` in use is 

2562 accessible via the :attr:`_engine.Connection.get_transaction` method of 

2563 :class:`_engine.Connection`. 

2564 

2565 In :term:`2.0 style` use, the :class:`_future.Connection` also employs 

2566 "autobegin" behavior that will create a new 

2567 :class:`_engine.RootTransaction` whenever a connection in a 

2568 non-transactional state is used to emit commands on the DBAPI connection. 

2569 The scope of the :class:`_engine.RootTransaction` in 2.0 style 

2570 use can be controlled using the :meth:`_future.Connection.commit` and 

2571 :meth:`_future.Connection.rollback` methods. 

2572 

2573 

2574 """ 

2575 

2576 _is_root = True 

2577 

2578 __slots__ = ("connection", "is_active") 

2579 

2580 def __init__(self, connection): 

2581 assert connection._transaction is None 

2582 if connection._trans_context_manager: 

2583 TransactionalContext._trans_ctx_check(connection) 

2584 self.connection = connection 

2585 self._connection_begin_impl() 

2586 connection._transaction = self 

2587 

2588 self.is_active = True 

2589 

2590 def _deactivate_from_connection(self): 

2591 if self.is_active: 

2592 assert self.connection._transaction is self 

2593 self.is_active = False 

2594 

2595 elif self.connection._transaction is not self: 

2596 util.warn("transaction already deassociated from connection") 

2597 

2598 @property 

2599 def _deactivated_from_connection(self): 

2600 return self.connection._transaction is not self 

2601 

2602 def _do_deactivate(self): 

2603 # called from a MarkerTransaction to cancel this root transaction. 

2604 # the transaction stays in place as connection._transaction, but 

2605 # is no longer active and is no longer the reset agent for the 

2606 # pooled connection. the connection won't support a new begin() 

2607 # until this transaction is explicitly closed, rolled back, 

2608 # or committed. 

2609 

2610 assert self.connection._transaction is self 

2611 

2612 if self.is_active: 

2613 self._connection_rollback_impl() 

2614 

2615 # handle case where a savepoint was created inside of a marker 

2616 # transaction that refers to a root. nested has to be cancelled 

2617 # also. 

2618 if self.connection._nested_transaction: 

2619 self.connection._nested_transaction._cancel() 

2620 

2621 self._deactivate_from_connection() 

2622 

2623 def _connection_begin_impl(self): 

2624 self.connection._begin_impl(self) 

2625 

2626 def _connection_rollback_impl(self): 

2627 self.connection._rollback_impl() 

2628 

2629 def _connection_commit_impl(self): 

2630 self.connection._commit_impl() 

2631 

2632 def _close_impl(self, try_deactivate=False): 

2633 try: 

2634 if self.is_active: 

2635 self._connection_rollback_impl() 

2636 

2637 if self.connection._nested_transaction: 

2638 self.connection._nested_transaction._cancel() 

2639 finally: 

2640 if self.is_active or try_deactivate: 

2641 self._deactivate_from_connection() 

2642 if self.connection._transaction is self: 

2643 self.connection._transaction = None 

2644 

2645 assert not self.is_active 

2646 assert self.connection._transaction is not self 

2647 

2648 def _do_close(self): 

2649 self._close_impl() 

2650 

2651 def _do_rollback(self): 

2652 self._close_impl(try_deactivate=True) 

2653 

2654 def _do_commit(self): 

2655 if self.is_active: 

2656 assert self.connection._transaction is self 

2657 

2658 try: 

2659 self._connection_commit_impl() 

2660 finally: 

2661 # whether or not commit succeeds, cancel any 

2662 # nested transactions, make this transaction "inactive" 

2663 # and remove it as a reset agent 

2664 if self.connection._nested_transaction: 

2665 self.connection._nested_transaction._cancel() 

2666 

2667 self._deactivate_from_connection() 

2668 

2669 # ...however only remove as the connection's current transaction 

2670 # if commit succeeded. otherwise it stays on so that a rollback 

2671 # needs to occur. 

2672 self.connection._transaction = None 

2673 else: 

2674 if self.connection._transaction is self: 

2675 self.connection._invalid_transaction() 

2676 else: 

2677 raise exc.InvalidRequestError("This transaction is inactive") 

2678 

2679 assert not self.is_active 

2680 assert self.connection._transaction is not self 

2681 

2682 

2683class NestedTransaction(Transaction): 

2684 """Represent a 'nested', or SAVEPOINT transaction. 

2685 

2686 The :class:`.NestedTransaction` object is created by calling the 

2687 :meth:`_engine.Connection.begin_nested` method of 

2688 :class:`_engine.Connection`. 

2689 

2690 When using :class:`.NestedTransaction`, the semantics of "begin" / 

2691 "commit" / "rollback" are as follows: 

2692 

2693 * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 

2694 the savepoint is given an explicit name that is part of the state 

2695 of this object. 

2696 

2697 * The :meth:`.NestedTransaction.commit` method corresponds to a 

2698 "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 

2699 with this :class:`.NestedTransaction`. 

2700 

2701 * The :meth:`.NestedTransaction.rollback` method corresponds to a 

2702 "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 

2703 associated with this :class:`.NestedTransaction`. 

2704 

2705 The rationale for mimicking the semantics of an outer transaction in 

2706 terms of savepoints so that code may deal with a "savepoint" transaction 

2707 and an "outer" transaction in an agnostic way. 

2708 

2709 .. seealso:: 

2710 

2711 :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 

2712 

2713 """ 

2714 

2715 __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 

2716 

2717 def __init__(self, connection): 

2718 assert connection._transaction is not None 

2719 if connection._trans_context_manager: 

2720 TransactionalContext._trans_ctx_check(connection) 

2721 self.connection = connection 

2722 self._savepoint = self.connection._savepoint_impl() 

2723 self.is_active = True 

2724 self._previous_nested = connection._nested_transaction 

2725 connection._nested_transaction = self 

2726 

2727 def _deactivate_from_connection(self, warn=True): 

2728 if self.connection._nested_transaction is self: 

2729 self.connection._nested_transaction = self._previous_nested 

2730 elif warn: 

2731 util.warn( 

2732 "nested transaction already deassociated from connection" 

2733 ) 

2734 

2735 @property 

2736 def _deactivated_from_connection(self): 

2737 return self.connection._nested_transaction is not self 

2738 

2739 def _cancel(self): 

2740 # called by RootTransaction when the outer transaction is 

2741 # committed, rolled back, or closed to cancel all savepoints 

2742 # without any action being taken 

2743 self.is_active = False 

2744 self._deactivate_from_connection() 

2745 if self._previous_nested: 

2746 self._previous_nested._cancel() 

2747 

2748 def _close_impl(self, deactivate_from_connection, warn_already_deactive): 

2749 try: 

2750 if self.is_active and self.connection._transaction.is_active: 

2751 self.connection._rollback_to_savepoint_impl(self._savepoint) 

2752 finally: 

2753 self.is_active = False 

2754 

2755 if deactivate_from_connection: 

2756 self._deactivate_from_connection(warn=warn_already_deactive) 

2757 

2758 assert not self.is_active 

2759 if deactivate_from_connection: 

2760 assert self.connection._nested_transaction is not self 

2761 

2762 def _do_deactivate(self): 

2763 self._close_impl(False, False) 

2764 

2765 def _do_close(self): 

2766 self._close_impl(True, False) 

2767 

2768 def _do_rollback(self): 

2769 self._close_impl(True, True) 

2770 

2771 def _do_commit(self): 

2772 if self.is_active: 

2773 try: 

2774 self.connection._release_savepoint_impl(self._savepoint) 

2775 finally: 

2776 # nested trans becomes inactive on failed release 

2777 # unconditionally. this prevents it from trying to 

2778 # emit SQL when it rolls back. 

2779 self.is_active = False 

2780 

2781 # but only de-associate from connection if it succeeded 

2782 self._deactivate_from_connection() 

2783 else: 

2784 if self.connection._nested_transaction is self: 

2785 self.connection._invalid_transaction() 

2786 else: 

2787 raise exc.InvalidRequestError( 

2788 "This nested transaction is inactive" 

2789 ) 

2790 

2791 

2792class TwoPhaseTransaction(RootTransaction): 

2793 """Represent a two-phase transaction. 

2794 

2795 A new :class:`.TwoPhaseTransaction` object may be procured 

2796 using the :meth:`_engine.Connection.begin_twophase` method. 

2797 

2798 The interface is the same as that of :class:`.Transaction` 

2799 with the addition of the :meth:`prepare` method. 

2800 

2801 """ 

2802 

2803 __slots__ = ("connection", "is_active", "xid", "_is_prepared") 

2804 

2805 def __init__(self, connection, xid): 

2806 self._is_prepared = False 

2807 self.xid = xid 

2808 super(TwoPhaseTransaction, self).__init__(connection) 

2809 

2810 def prepare(self): 

2811 """Prepare this :class:`.TwoPhaseTransaction`. 

2812 

2813 After a PREPARE, the transaction can be committed. 

2814 

2815 """ 

2816 if not self.is_active: 

2817 raise exc.InvalidRequestError("This transaction is inactive") 

2818 self.connection._prepare_twophase_impl(self.xid) 

2819 self._is_prepared = True 

2820 

2821 def _connection_begin_impl(self): 

2822 self.connection._begin_twophase_impl(self) 

2823 

2824 def _connection_rollback_impl(self): 

2825 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

2826 

2827 def _connection_commit_impl(self): 

2828 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

2829 

2830 

2831class Engine(Connectable, log.Identified): 

2832 """ 

2833 Connects a :class:`~sqlalchemy.pool.Pool` and 

2834 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

2835 source of database connectivity and behavior. 

2836 

2837 This is the **SQLAlchemy 1.x version** of :class:`_engine.Engine`. For 

2838 the :term:`2.0 style` version, which includes some API differences, 

2839 see :class:`_future.Engine`. 

2840 

2841 An :class:`_engine.Engine` object is instantiated publicly using the 

2842 :func:`~sqlalchemy.create_engine` function. 

2843 

2844 .. seealso:: 

2845 

2846 :doc:`/core/engines` 

2847 

2848 :ref:`connections_toplevel` 

2849 

2850 """ 

2851 

2852 _execution_options = _EMPTY_EXECUTION_OPTS 

2853 _has_events = False 

2854 _connection_cls = Connection 

2855 _sqla_logger_namespace = "sqlalchemy.engine.Engine" 

2856 _is_future = False 

2857 

2858 _schema_translate_map = None 

2859 

2860 def __init__( 

2861 self, 

2862 pool, 

2863 dialect, 

2864 url, 

2865 logging_name=None, 

2866 echo=None, 

2867 query_cache_size=500, 

2868 execution_options=None, 

2869 hide_parameters=False, 

2870 ): 

2871 self.pool = pool 

2872 self.url = url 

2873 self.dialect = dialect 

2874 if logging_name: 

2875 self.logging_name = logging_name 

2876 self.echo = echo 

2877 self.hide_parameters = hide_parameters 

2878 if query_cache_size != 0: 

2879 self._compiled_cache = util.LRUCache( 

2880 query_cache_size, size_alert=self._lru_size_alert 

2881 ) 

2882 else: 

2883 self._compiled_cache = None 

2884 log.instance_logger(self, echoflag=echo) 

2885 if execution_options: 

2886 self.update_execution_options(**execution_options) 

2887 

2888 def _lru_size_alert(self, cache): 

2889 if self._should_log_info: 

2890 self.logger.info( 

2891 "Compiled cache size pruning from %d items to %d. " 

2892 "Increase cache size to reduce the frequency of pruning.", 

2893 len(cache), 

2894 cache.capacity, 

2895 ) 

2896 

2897 @property 

2898 def engine(self): 

2899 return self 

2900 

2901 def clear_compiled_cache(self): 

2902 """Clear the compiled cache associated with the dialect. 

2903 

2904 This applies **only** to the built-in cache that is established 

2905 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

2906 It will not impact any dictionary caches that were passed via the 

2907 :paramref:`.Connection.execution_options.query_cache` parameter. 

2908 

2909 .. versionadded:: 1.4 

2910 

2911 """ 

2912 if self._compiled_cache: 

2913 self._compiled_cache.clear() 

2914 

2915 def update_execution_options(self, **opt): 

2916 r"""Update the default execution_options dictionary 

2917 of this :class:`_engine.Engine`. 

2918 

2919 The given keys/values in \**opt are added to the 

2920 default execution options that will be used for 

2921 all connections. The initial contents of this dictionary 

2922 can be sent via the ``execution_options`` parameter 

2923 to :func:`_sa.create_engine`. 

2924 

2925 .. seealso:: 

2926 

2927 :meth:`_engine.Connection.execution_options` 

2928 

2929 :meth:`_engine.Engine.execution_options` 

2930 

2931 """ 

2932 self._execution_options = self._execution_options.union(opt) 

2933 self.dispatch.set_engine_execution_options(self, opt) 

2934 self.dialect.set_engine_execution_options(self, opt) 

2935 

2936 def execution_options(self, **opt): 

2937 """Return a new :class:`_engine.Engine` that will provide 

2938 :class:`_engine.Connection` objects with the given execution options. 

2939 

2940 The returned :class:`_engine.Engine` remains related to the original 

2941 :class:`_engine.Engine` in that it shares the same connection pool and 

2942 other state: 

2943 

2944 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

2945 is the 

2946 same instance. The :meth:`_engine.Engine.dispose` 

2947 method will replace 

2948 the connection pool instance for the parent engine as well 

2949 as this one. 

2950 * Event listeners are "cascaded" - meaning, the new 

2951 :class:`_engine.Engine` 

2952 inherits the events of the parent, and new events can be associated 

2953 with the new :class:`_engine.Engine` individually. 

2954 * The logging configuration and logging_name is copied from the parent 

2955 :class:`_engine.Engine`. 

2956 

2957 The intent of the :meth:`_engine.Engine.execution_options` method is 

2958 to implement "sharding" schemes where multiple :class:`_engine.Engine` 

2959 objects refer to the same connection pool, but are differentiated 

2960 by options that would be consumed by a custom event:: 

2961 

2962 primary_engine = create_engine("mysql://") 

2963 shard1 = primary_engine.execution_options(shard_id="shard1") 

2964 shard2 = primary_engine.execution_options(shard_id="shard2") 

2965 

2966 Above, the ``shard1`` engine serves as a factory for 

2967 :class:`_engine.Connection` 

2968 objects that will contain the execution option 

2969 ``shard_id=shard1``, and ``shard2`` will produce 

2970 :class:`_engine.Connection` 

2971 objects that contain the execution option ``shard_id=shard2``. 

2972 

2973 An event handler can consume the above execution option to perform 

2974 a schema switch or other operation, given a connection. Below 

2975 we emit a MySQL ``use`` statement to switch databases, at the same 

2976 time keeping track of which database we've established using the 

2977 :attr:`_engine.Connection.info` dictionary, 

2978 which gives us a persistent 

2979 storage space that follows the DBAPI connection:: 

2980 

2981 from sqlalchemy import event 

2982 from sqlalchemy.engine import Engine 

2983 

2984 shards = {"default": "base", shard_1: "db1", "shard_2": "db2"} 

2985 

2986 @event.listens_for(Engine, "before_cursor_execute") 

2987 def _switch_shard(conn, cursor, stmt, 

2988 params, context, executemany): 

2989 shard_id = conn._execution_options.get('shard_id', "default") 

2990 current_shard = conn.info.get("current_shard", None) 

2991 

2992 if current_shard != shard_id: 

2993 cursor.execute("use %s" % shards[shard_id]) 

2994 conn.info["current_shard"] = shard_id 

2995 

2996 .. seealso:: 

2997 

2998 :meth:`_engine.Connection.execution_options` 

2999 - update execution options 

3000 on a :class:`_engine.Connection` object. 

3001 

3002 :meth:`_engine.Engine.update_execution_options` 

3003 - update the execution 

3004 options for a given :class:`_engine.Engine` in place. 

3005 

3006 :meth:`_engine.Engine.get_execution_options` 

3007 

3008 

3009 """ 

3010 return self._option_cls(self, opt) 

3011 

3012 def get_execution_options(self): 

3013 """Get the non-SQL options which will take effect during execution. 

3014 

3015 .. versionadded: 1.3 

3016 

3017 .. seealso:: 

3018 

3019 :meth:`_engine.Engine.execution_options` 

3020 """ 

3021 return self._execution_options 

3022 

3023 @property 

3024 def name(self): 

3025 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3026 in use by this :class:`Engine`.""" 

3027 

3028 return self.dialect.name 

3029 

3030 @property 

3031 def driver(self): 

3032 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

3033 in use by this :class:`Engine`.""" 

3034 

3035 return self.dialect.driver 

3036 

3037 echo = log.echo_property() 

3038 

3039 def __repr__(self): 

3040 return "Engine(%r)" % (self.url,) 

3041 

3042 def dispose(self, close=True): 

3043 """Dispose of the connection pool used by this 

3044 :class:`_engine.Engine`. 

3045 

3046 A new connection pool is created immediately after the old one has been 

3047 disposed. The previous connection pool is disposed either actively, by 

3048 closing out all currently checked-in connections in that pool, or 

3049 passively, by losing references to it but otherwise not closing any 

3050 connections. The latter strategy is more appropriate for an initializer 

3051 in a forked Python process. 

3052 

3053 :param close: if left at its default of ``True``, has the 

3054 effect of fully closing all **currently checked in** 

3055 database connections. Connections that are still checked out 

3056 will **not** be closed, however they will no longer be associated 

3057 with this :class:`_engine.Engine`, 

3058 so when they are closed individually, eventually the 

3059 :class:`_pool.Pool` which they are associated with will 

3060 be garbage collected and they will be closed out fully, if 

3061 not already closed on checkin. 

3062 

3063 If set to ``False``, the previous connection pool is de-referenced, 

3064 and otherwise not touched in any way. 

3065 

3066 .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` 

3067 parameter to allow the replacement of a connection pool in a child 

3068 process without interfering with the connections used by the parent 

3069 process. 

3070 

3071 

3072 .. seealso:: 

3073 

3074 :ref:`engine_disposal` 

3075 

3076 :ref:`pooling_multiprocessing` 

3077 

3078 """ 

3079 if close: 

3080 self.pool.dispose() 

3081 self.pool = self.pool.recreate() 

3082 self.dispatch.engine_disposed(self) 

3083 

3084 def _execute_default( 

3085 self, default, multiparams=(), params=util.EMPTY_DICT 

3086 ): 

3087 with self.connect() as conn: 

3088 return conn._execute_default(default, multiparams, params) 

3089 

3090 @contextlib.contextmanager 

3091 def _optional_conn_ctx_manager(self, connection=None): 

3092 if connection is None: 

3093 with self.connect() as conn: 

3094 yield conn 

3095 else: 

3096 yield connection 

3097 

3098 class _trans_ctx(object): 

3099 def __init__(self, conn, transaction, close_with_result): 

3100 self.conn = conn 

3101 self.transaction = transaction 

3102 self.close_with_result = close_with_result 

3103 

3104 def __enter__(self): 

3105 self.transaction.__enter__() 

3106 return self.conn 

3107 

3108 def __exit__(self, type_, value, traceback): 

3109 try: 

3110 self.transaction.__exit__(type_, value, traceback) 

3111 finally: 

3112 if not self.close_with_result: 

3113 self.conn.close() 

3114 

3115 def begin(self, close_with_result=False): 

3116 """Return a context manager delivering a :class:`_engine.Connection` 

3117 with a :class:`.Transaction` established. 

3118 

3119 E.g.:: 

3120 

3121 with engine.begin() as conn: 

3122 conn.execute( 

3123 text("insert into table (x, y, z) values (1, 2, 3)") 

3124 ) 

3125 conn.execute(text("my_special_procedure(5)")) 

3126 

3127 Upon successful operation, the :class:`.Transaction` 

3128 is committed. If an error is raised, the :class:`.Transaction` 

3129 is rolled back. 

3130 

3131 Legacy use only: the ``close_with_result`` flag is normally ``False``, 

3132 and indicates that the :class:`_engine.Connection` will be closed when 

3133 the operation is complete. When set to ``True``, it indicates the 

3134 :class:`_engine.Connection` is in "single use" mode, where the 

3135 :class:`_engine.CursorResult` returned by the first call to 

3136 :meth:`_engine.Connection.execute` will close the 

3137 :class:`_engine.Connection` when that :class:`_engine.CursorResult` has 

3138 exhausted all result rows. 

3139 

3140 .. seealso:: 

3141 

3142 :meth:`_engine.Engine.connect` - procure a 

3143 :class:`_engine.Connection` from 

3144 an :class:`_engine.Engine`. 

3145 

3146 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

3147 for a particular :class:`_engine.Connection`. 

3148 

3149 """ 

3150 if self._connection_cls._is_future: 

3151 conn = self.connect() 

3152 else: 

3153 conn = self.connect(close_with_result=close_with_result) 

3154 try: 

3155 trans = conn.begin() 

3156 except: 

3157 with util.safe_reraise(): 

3158 conn.close() 

3159 return Engine._trans_ctx(conn, trans, close_with_result) 

3160 

3161 @util.deprecated( 

3162 "1.4", 

3163 "The :meth:`_engine.Engine.transaction` " 

3164 "method is deprecated and will be " 

3165 "removed in a future release. Use the :meth:`_engine.Engine.begin` " 

3166 "context " 

3167 "manager instead.", 

3168 ) 

3169 def transaction(self, callable_, *args, **kwargs): 

3170 r"""Execute the given function within a transaction boundary. 

3171 

3172 The function is passed a :class:`_engine.Connection` newly procured 

3173 from :meth:`_engine.Engine.connect` as the first argument, 

3174 followed by the given \*args and \**kwargs. 

3175 

3176 e.g.:: 

3177 

3178 def do_something(conn, x, y): 

3179 conn.execute(text("some statement"), {'x':x, 'y':y}) 

3180 

3181 engine.transaction(do_something, 5, 10) 

3182 

3183 The operations inside the function are all invoked within the 

3184 context of a single :class:`.Transaction`. 

3185 Upon success, the transaction is committed. If an 

3186 exception is raised, the transaction is rolled back 

3187 before propagating the exception. 

3188 

3189 .. note:: 

3190 

3191 The :meth:`.transaction` method is superseded by 

3192 the usage of the Python ``with:`` statement, which can 

3193 be used with :meth:`_engine.Engine.begin`:: 

3194 

3195 with engine.begin() as conn: 

3196 conn.execute(text("some statement"), {'x':5, 'y':10}) 

3197 

3198 .. seealso:: 

3199 

3200 :meth:`_engine.Engine.begin` - engine-level transactional 

3201 context 

3202 

3203 :meth:`_engine.Connection.transaction` 

3204 - connection-level version of 

3205 :meth:`_engine.Engine.transaction` 

3206 

3207 """ 

3208 kwargs["_sa_skip_warning"] = True 

3209 with self.connect() as conn: 

3210 return conn.transaction(callable_, *args, **kwargs) 

3211 

3212 @util.deprecated( 

3213 "1.4", 

3214 "The :meth:`_engine.Engine.run_callable` " 

3215 "method is deprecated and will be " 

3216 "removed in a future release. Use the :meth:`_engine.Engine.begin` " 

3217 "context manager instead.", 

3218 ) 

3219 def run_callable(self, callable_, *args, **kwargs): 

3220 r"""Given a callable object or function, execute it, passing 

3221 a :class:`_engine.Connection` as the first argument. 

3222 

3223 The given \*args and \**kwargs are passed subsequent 

3224 to the :class:`_engine.Connection` argument. 

3225 

3226 This function, along with :meth:`_engine.Connection.run_callable`, 

3227 allows a function to be run with a :class:`_engine.Connection` 

3228 or :class:`_engine.Engine` object without the need to know 

3229 which one is being dealt with. 

3230 

3231 """ 

3232 kwargs["_sa_skip_warning"] = True 

3233 with self.connect() as conn: 

3234 return conn.run_callable(callable_, *args, **kwargs) 

3235 

3236 def _run_ddl_visitor(self, visitorcallable, element, **kwargs): 

3237 with self.begin() as conn: 

3238 conn._run_ddl_visitor(visitorcallable, element, **kwargs) 

3239 

3240 @util.deprecated_20( 

3241 ":meth:`_engine.Engine.execute`", 

3242 alternative="All statement execution in SQLAlchemy 2.0 is performed " 

3243 "by the :meth:`_engine.Connection.execute` method of " 

3244 ":class:`_engine.Connection`, " 

3245 "or in the ORM by the :meth:`.Session.execute` method of " 

3246 ":class:`.Session`.", 

3247 ) 

3248 def execute(self, statement, *multiparams, **params): 

3249 """Executes the given construct and returns a 

3250 :class:`_engine.CursorResult`. 

3251 

3252 The arguments are the same as those used by 

3253 :meth:`_engine.Connection.execute`. 

3254 

3255 Here, a :class:`_engine.Connection` is acquired using the 

3256 :meth:`_engine.Engine.connect` method, and the statement executed 

3257 with that connection. The returned :class:`_engine.CursorResult` 

3258 is flagged 

3259 such that when the :class:`_engine.CursorResult` is exhausted and its 

3260 underlying cursor is closed, the :class:`_engine.Connection` 

3261 created here 

3262 will also be closed, which allows its associated DBAPI connection 

3263 resource to be returned to the connection pool. 

3264 

3265 """ 

3266 connection = self.connect(close_with_result=True) 

3267 return connection.execute(statement, *multiparams, **params) 

3268 

3269 @util.deprecated_20( 

3270 ":meth:`_engine.Engine.scalar`", 

3271 alternative="All statement execution in SQLAlchemy 2.0 is performed " 

3272 "by the :meth:`_engine.Connection.execute` method of " 

3273 ":class:`_engine.Connection`, " 

3274 "or in the ORM by the :meth:`.Session.execute` method of " 

3275 ":class:`.Session`; the :meth:`_future.Result.scalar` " 

3276 "method can then be " 

3277 "used to return a scalar result.", 

3278 ) 

3279 def scalar(self, statement, *multiparams, **params): 

3280 """Executes and returns the first column of the first row. 

3281 

3282 The underlying result/cursor is closed after execution. 

3283 """ 

3284 return self.execute(statement, *multiparams, **params).scalar() 

3285 

3286 def _execute_clauseelement( 

3287 self, 

3288 elem, 

3289 multiparams=None, 

3290 params=None, 

3291 execution_options=_EMPTY_EXECUTION_OPTS, 

3292 ): 

3293 connection = self.connect(close_with_result=True) 

3294 return connection._execute_clauseelement( 

3295 elem, multiparams, params, execution_options 

3296 ) 

3297 

3298 def _execute_compiled( 

3299 self, 

3300 compiled, 

3301 multiparams, 

3302 params, 

3303 execution_options=_EMPTY_EXECUTION_OPTS, 

3304 ): 

3305 connection = self.connect(close_with_result=True) 

3306 return connection._execute_compiled( 

3307 compiled, multiparams, params, execution_options 

3308 ) 

3309 

3310 def connect(self, close_with_result=False): 

3311 """Return a new :class:`_engine.Connection` object. 

3312 

3313 The :class:`_engine.Connection` object is a facade that uses a DBAPI 

3314 connection internally in order to communicate with the database. This 

3315 connection is procured from the connection-holding :class:`_pool.Pool` 

3316 referenced by this :class:`_engine.Engine`. When the 

3317 :meth:`_engine.Connection.close` method of the 

3318 :class:`_engine.Connection` object 

3319 is called, the underlying DBAPI connection is then returned to the 

3320 connection pool, where it may be used again in a subsequent call to 

3321 :meth:`_engine.Engine.connect`. 

3322 

3323 """ 

3324 

3325 return self._connection_cls(self, close_with_result=close_with_result) 

3326 

3327 @util.deprecated( 

3328 "1.4", 

3329 "The :meth:`_engine.Engine.table_names` " 

3330 "method is deprecated and will be " 

3331 "removed in a future release. Please refer to " 

3332 ":meth:`_reflection.Inspector.get_table_names`.", 

3333 ) 

3334 def table_names(self, schema=None, connection=None): 

3335 """Return a list of all table names available in the database. 

3336 

3337 :param schema: Optional, retrieve names from a non-default schema. 

3338 

3339 :param connection: Optional, use a specified connection. 

3340 """ 

3341 with self._optional_conn_ctx_manager(connection) as conn: 

3342 insp = inspection.inspect(conn) 

3343 return insp.get_table_names(schema) 

3344 

3345 @util.deprecated( 

3346 "1.4", 

3347 "The :meth:`_engine.Engine.has_table` " 

3348 "method is deprecated and will be " 

3349 "removed in a future release. Please refer to " 

3350 ":meth:`_reflection.Inspector.has_table`.", 

3351 ) 

3352 def has_table(self, table_name, schema=None): 

3353 """Return True if the given backend has a table of the given name. 

3354 

3355 .. seealso:: 

3356 

3357 :ref:`metadata_reflection_inspector` - detailed schema inspection 

3358 using the :class:`_reflection.Inspector` interface. 

3359 

3360 :class:`.quoted_name` - used to pass quoting information along 

3361 with a schema identifier. 

3362 

3363 """ 

3364 with self._optional_conn_ctx_manager(None) as conn: 

3365 insp = inspection.inspect(conn) 

3366 return insp.has_table(table_name, schema=schema) 

3367 

3368 def _wrap_pool_connect(self, fn, connection): 

3369 dialect = self.dialect 

3370 try: 

3371 return fn() 

3372 except dialect.dbapi.Error as e: 

3373 if connection is None: 

3374 Connection._handle_dbapi_exception_noconnection( 

3375 e, dialect, self 

3376 ) 

3377 else: 

3378 util.raise_( 

3379 sys.exc_info()[1], with_traceback=sys.exc_info()[2] 

3380 ) 

3381 

3382 def raw_connection(self, _connection=None): 

3383 """Return a "raw" DBAPI connection from the connection pool. 

3384 

3385 The returned object is a proxied version of the DBAPI 

3386 connection object used by the underlying driver in use. 

3387 The object will have all the same behavior as the real DBAPI 

3388 connection, except that its ``close()`` method will result in the 

3389 connection being returned to the pool, rather than being closed 

3390 for real. 

3391 

3392 This method provides direct DBAPI connection access for 

3393 special situations when the API provided by 

3394 :class:`_engine.Connection` 

3395 is not needed. When a :class:`_engine.Connection` object is already 

3396 present, the DBAPI connection is available using 

3397 the :attr:`_engine.Connection.connection` accessor. 

3398 

3399 .. seealso:: 

3400 

3401 :ref:`dbapi_connections` 

3402 

3403 """ 

3404 return self._wrap_pool_connect(self.pool.connect, _connection) 

3405 

3406 

3407class OptionEngineMixin(object): 

3408 _sa_propagate_class_events = False 

3409 

3410 def __init__(self, proxied, execution_options): 

3411 self._proxied = proxied 

3412 self.url = proxied.url 

3413 self.dialect = proxied.dialect 

3414 self.logging_name = proxied.logging_name 

3415 self.echo = proxied.echo 

3416 self._compiled_cache = proxied._compiled_cache 

3417 self.hide_parameters = proxied.hide_parameters 

3418 log.instance_logger(self, echoflag=self.echo) 

3419 

3420 # note: this will propagate events that are assigned to the parent 

3421 # engine after this OptionEngine is created. Since we share 

3422 # the events of the parent we also disallow class-level events 

3423 # to apply to the OptionEngine class directly. 

3424 # 

3425 # the other way this can work would be to transfer existing 

3426 # events only, using: 

3427 # self.dispatch._update(proxied.dispatch) 

3428 # 

3429 # that might be more appropriate however it would be a behavioral 

3430 # change for logic that assigns events to the parent engine and 

3431 # would like it to take effect for the already-created sub-engine. 

3432 self.dispatch = self.dispatch._join(proxied.dispatch) 

3433 

3434 self._execution_options = proxied._execution_options 

3435 self.update_execution_options(**execution_options) 

3436 

3437 def _get_pool(self): 

3438 return self._proxied.pool 

3439 

3440 def _set_pool(self, pool): 

3441 self._proxied.pool = pool 

3442 

3443 pool = property(_get_pool, _set_pool) 

3444 

3445 def _get_has_events(self): 

3446 return self._proxied._has_events or self.__dict__.get( 

3447 "_has_events", False 

3448 ) 

3449 

3450 def _set_has_events(self, value): 

3451 self.__dict__["_has_events"] = value 

3452 

3453 _has_events = property(_get_has_events, _set_has_events) 

3454 

3455 

3456class OptionEngine(OptionEngineMixin, Engine): 

3457 pass 

3458 

3459 

3460Engine._option_cls = OptionEngine