Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/SQLAlchemy-1.3.25.dev0-py3.11-linux-x86_64.egg/sqlalchemy/engine/base.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

783 statements  

1# engine/base.py 

2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: http://www.opensource.org/licenses/mit-license.php 

7from __future__ import with_statement 

8 

9import contextlib 

10import sys 

11 

12from .interfaces import Connectable 

13from .interfaces import ExceptionContext 

14from .util import _distill_params 

15from .. import exc 

16from .. import interfaces 

17from .. import log 

18from .. import util 

19from ..sql import schema 

20from ..sql import util as sql_util 

21 

22 

23"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`. 

24 

25""" 

26 

27 

28class Connection(Connectable): 

29 """Provides high-level functionality for a wrapped DB-API connection. 

30 

31 Provides execution support for string-based SQL statements as well as 

32 :class:`_expression.ClauseElement`, :class:`.Compiled` and 

33 :class:`.DefaultGenerator` 

34 objects. Provides a :meth:`begin` method to return :class:`.Transaction` 

35 objects. 

36 

37 The Connection object is **not** thread-safe. While a Connection can be 

38 shared among threads using properly synchronized access, it is still 

39 possible that the underlying DBAPI connection may not support shared 

40 access between threads. Check the DBAPI documentation for details. 

41 

42 The Connection object represents a single DBAPI connection checked out 

43 from the connection pool. In this state, the connection pool has no affect 

44 upon the connection, including its expiration or timeout state. For the 

45 connection pool to properly manage connections, connections should be 

46 returned to the connection pool (i.e. ``connection.close()``) whenever the 

47 connection is not in use. 

48 

49 .. index:: 

50 single: thread safety; Connection 

51 

52 """ 

53 

54 schema_for_object = schema._schema_getter(None) 

55 """Return the ".schema" attribute for an object. 

56 

57 Used for :class:`_schema.Table`, :class:`.Sequence` and similar objects, 

58 and takes into account 

59 the :paramref:`.Connection.execution_options.schema_translate_map` 

60 parameter. 

61 

62 .. versionadded:: 1.1 

63 

64 .. seealso:: 

65 

66 :ref:`schema_translating` 

67 

68 """ 

69 

70 def __init__( 

71 self, 

72 engine, 

73 connection=None, 

74 close_with_result=False, 

75 _branch_from=None, 

76 _execution_options=None, 

77 _dispatch=None, 

78 _has_events=None, 

79 ): 

80 """Construct a new Connection. 

81 

82 The constructor here is not public and is only called only by an 

83 :class:`_engine.Engine`. See :meth:`_engine.Engine.connect` and 

84 :meth:`_engine.Engine.contextual_connect` methods. 

85 

86 """ 

87 self.engine = engine 

88 self.dialect = engine.dialect 

89 self.__branch_from = _branch_from 

90 self.__branch = _branch_from is not None 

91 

92 if _branch_from: 

93 self.__connection = connection 

94 self._execution_options = _execution_options 

95 self._echo = _branch_from._echo 

96 self.should_close_with_result = False 

97 self.dispatch = _dispatch 

98 self._has_events = _branch_from._has_events 

99 self.schema_for_object = _branch_from.schema_for_object 

100 else: 

101 self.__connection = ( 

102 connection 

103 if connection is not None 

104 else engine.raw_connection() 

105 ) 

106 self.__transaction = None 

107 self.__savepoint_seq = 0 

108 self.should_close_with_result = close_with_result 

109 self.__invalid = False 

110 self.__can_reconnect = True 

111 self._echo = self.engine._should_log_info() 

112 

113 if _has_events is None: 

114 # if _has_events is sent explicitly as False, 

115 # then don't join the dispatch of the engine; we don't 

116 # want to handle any of the engine's events in that case. 

117 self.dispatch = self.dispatch._join(engine.dispatch) 

118 self._has_events = _has_events or ( 

119 _has_events is None and engine._has_events 

120 ) 

121 

122 assert not _execution_options 

123 self._execution_options = engine._execution_options 

124 

125 if self._has_events or self.engine._has_events: 

126 self.dispatch.engine_connect(self, self.__branch) 

127 

128 def _branch(self): 

129 """Return a new Connection which references this Connection's 

130 engine and connection; but does not have close_with_result enabled, 

131 and also whose close() method does nothing. 

132 

133 The Core uses this very sparingly, only in the case of 

134 custom SQL default functions that are to be INSERTed as the 

135 primary key of a row where we need to get the value back, so we have 

136 to invoke it distinctly - this is a very uncommon case. 

137 

138 Userland code accesses _branch() when the connect() or 

139 contextual_connect() methods are called. The branched connection 

140 acts as much as possible like the parent, except that it stays 

141 connected when a close() event occurs. 

142 

143 """ 

144 if self.__branch_from: 

145 return self.__branch_from._branch() 

146 else: 

147 return self.engine._connection_cls( 

148 self.engine, 

149 self.__connection, 

150 _branch_from=self, 

151 _execution_options=self._execution_options, 

152 _has_events=self._has_events, 

153 _dispatch=self.dispatch, 

154 ) 

155 

156 @property 

157 def _root(self): 

158 """Return the 'root' connection. 

159 

160 Returns 'self' if this connection is not a branch, else 

161 returns the root connection from which we ultimately branched. 

162 

163 """ 

164 

165 if self.__branch_from: 

166 return self.__branch_from 

167 else: 

168 return self 

169 

170 def _clone(self): 

171 """Create a shallow copy of this Connection.""" 

172 

173 c = self.__class__.__new__(self.__class__) 

174 c.__dict__ = self.__dict__.copy() 

175 return c 

176 

177 def __enter__(self): 

178 return self 

179 

180 def __exit__(self, type_, value, traceback): 

181 self.close() 

182 

183 def execution_options(self, **opt): 

184 r""" Set non-SQL options for the connection which take effect 

185 during execution. 

186 

187 The method returns a copy of this :class:`_engine.Connection` 

188 which references 

189 the same underlying DBAPI connection, but also defines the given 

190 execution options which will take effect for a call to 

191 :meth:`execute`. As the new :class:`_engine.Connection` 

192 references the same 

193 underlying resource, it's usually a good idea to ensure that the copies 

194 will be discarded immediately, which is implicit if used as in:: 

195 

196 result = connection.execution_options(stream_results=True).\ 

197 execute(stmt) 

198 

199 Note that any key/value can be passed to 

200 :meth:`_engine.Connection.execution_options`, 

201 and it will be stored in the 

202 ``_execution_options`` dictionary of the :class:`_engine.Connection`. 

203 It 

204 is suitable for usage by end-user schemes to communicate with 

205 event listeners, for example. 

206 

207 The keywords that are currently recognized by SQLAlchemy itself 

208 include all those listed under :meth:`.Executable.execution_options`, 

209 as well as others that are specific to :class:`_engine.Connection`. 

210 

211 :param autocommit: Available on: Connection, statement. 

212 When True, a COMMIT will be invoked after execution 

213 when executed in 'autocommit' mode, i.e. when an explicit 

214 transaction is not begun on the connection. Note that this 

215 is **library level, not DBAPI level autocommit**. The DBAPI 

216 connection will remain in a real transaction unless the 

217 "AUTOCOMMIT" isolation level is used. 

218 

219 .. deprecated:: 1.4 The library-level "autocommit" feature is being 

220 removed in favor of database driver "autocommit" which is 

221 now widely available. See the section :ref:`dbapi_autocommit`. 

222 

223 :param compiled_cache: Available on: Connection. 

224 A dictionary where :class:`.Compiled` objects 

225 will be cached when the :class:`_engine.Connection` 

226 compiles a clause 

227 expression into a :class:`.Compiled` object. 

228 It is the user's responsibility to 

229 manage the size of this dictionary, which will have keys 

230 corresponding to the dialect, clause element, the column 

231 names within the VALUES or SET clause of an INSERT or UPDATE, 

232 as well as the "batch" mode for an INSERT or UPDATE statement. 

233 The format of this dictionary is not guaranteed to stay the 

234 same in future releases. 

235 

236 Note that the ORM makes use of its own "compiled" caches for 

237 some operations, including flush operations. The caching 

238 used by the ORM internally supersedes a cache dictionary 

239 specified here. 

240 

241 :param isolation_level: Available on: :class:`_engine.Connection`. 

242 

243 Set the transaction isolation level for the lifespan of this 

244 :class:`_engine.Connection` object. 

245 Valid values include those string 

246 values accepted by the :paramref:`_sa.create_engine.isolation_level` 

247 parameter passed to :func:`_sa.create_engine`. These levels are 

248 semi-database specific; see individual dialect documentation for 

249 valid levels. 

250 

251 The isolation level option applies the isolation level by emitting 

252 statements on the DBAPI connection, and **necessarily affects the 

253 original Connection object overall**, not just the copy that is 

254 returned by the call to :meth:`_engine.Connection.execution_options` 

255 method. The isolation level will remain at the given setting until 

256 the DBAPI connection itself is returned to the connection pool, i.e. 

257 the :meth:`_engine.Connection.close` method on the original 

258 :class:`_engine.Connection` is called, 

259 where an event handler will emit 

260 additional statements on the DBAPI connection in order to revert the 

261 isolation level change. 

262 

263 .. warning:: The ``isolation_level`` execution option should 

264 **not** be used when a transaction is already established, that 

265 is, the :meth:`_engine.Connection.begin` 

266 method or similar has been 

267 called. A database cannot change the isolation level on a 

268 transaction in progress, and different DBAPIs and/or 

269 SQLAlchemy dialects may implicitly roll back or commit 

270 the transaction, or not affect the connection at all. 

271 

272 .. note:: The ``isolation_level`` execution option is implicitly 

273 reset if the :class:`_engine.Connection` is invalidated, e.g. via 

274 the :meth:`_engine.Connection.invalidate` method, or if a 

275 disconnection error occurs. The new connection produced after 

276 the invalidation will not have the isolation level re-applied 

277 to it automatically. 

278 

279 .. seealso:: 

280 

281 :paramref:`_sa.create_engine.isolation_level` 

282 - set per :class:`_engine.Engine` isolation level 

283 

284 :meth:`_engine.Connection.get_isolation_level` 

285 - view current level 

286 

287 :ref:`SQLite Transaction Isolation <sqlite_isolation_level>` 

288 

289 :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>` 

290 

291 :ref:`MySQL Transaction Isolation <mysql_isolation_level>` 

292 

293 :ref:`SQL Server Transaction Isolation <mssql_isolation_level>` 

294 

295 :ref:`session_transaction_isolation` - for the ORM 

296 

297 :param no_parameters: When ``True``, if the final parameter 

298 list or dictionary is totally empty, will invoke the 

299 statement on the cursor as ``cursor.execute(statement)``, 

300 not passing the parameter collection at all. 

301 Some DBAPIs such as psycopg2 and mysql-python consider 

302 percent signs as significant only when parameters are 

303 present; this option allows code to generate SQL 

304 containing percent signs (and possibly other characters) 

305 that is neutral regarding whether it's executed by the DBAPI 

306 or piped into a script that's later invoked by 

307 command line tools. 

308 

309 :param stream_results: Available on: Connection, statement. 

310 Indicate to the dialect that results should be 

311 "streamed" and not pre-buffered, if possible. This is a limitation 

312 of many DBAPIs. The flag is currently understood only by the 

313 psycopg2, mysqldb and pymysql dialects. 

314 

315 :param schema_translate_map: Available on: Connection, Engine. 

316 A dictionary mapping schema names to schema names, that will be 

317 applied to the :paramref:`_schema.Table.schema` element of each 

318 :class:`_schema.Table` 

319 encountered when SQL or DDL expression elements 

320 are compiled into strings; the resulting schema name will be 

321 converted based on presence in the map of the original name. 

322 

323 .. versionadded:: 1.1 

324 

325 .. seealso:: 

326 

327 :ref:`schema_translating` 

328 

329 .. seealso:: 

330 

331 :meth:`_engine.Engine.execution_options` 

332 

333 :meth:`.Executable.execution_options` 

334 

335 :meth:`_engine.Connection.get_execution_options` 

336 

337 

338 """ # noqa 

339 c = self._clone() 

340 c._execution_options = c._execution_options.union(opt) 

341 if self._has_events or self.engine._has_events: 

342 self.dispatch.set_connection_execution_options(c, opt) 

343 self.dialect.set_connection_execution_options(c, opt) 

344 return c 

345 

346 def get_execution_options(self): 

347 """Get the non-SQL options which will take effect during execution. 

348 

349 .. versionadded:: 1.3 

350 

351 .. seealso:: 

352 

353 :meth:`_engine.Connection.execution_options` 

354 """ 

355 return self._execution_options 

356 

357 @property 

358 def closed(self): 

359 """Return True if this connection is closed.""" 

360 

361 return ( 

362 "_Connection__connection" not in self.__dict__ 

363 and not self.__can_reconnect 

364 ) 

365 

366 @property 

367 def invalidated(self): 

368 """Return True if this connection was invalidated.""" 

369 

370 return self._root.__invalid 

371 

372 @property 

373 def connection(self): 

374 """The underlying DB-API connection managed by this Connection. 

375 

376 .. seealso:: 

377 

378 

379 :ref:`dbapi_connections` 

380 

381 """ 

382 

383 try: 

384 return self.__connection 

385 except AttributeError: 

386 # escape "except AttributeError" before revalidating 

387 # to prevent misleading stacktraces in Py3K 

388 pass 

389 try: 

390 return self._revalidate_connection() 

391 except BaseException as e: 

392 self._handle_dbapi_exception(e, None, None, None, None) 

393 

394 def get_isolation_level(self): 

395 """Return the current isolation level assigned to this 

396 :class:`_engine.Connection`. 

397 

398 This will typically be the default isolation level as determined 

399 by the dialect, unless if the 

400 :paramref:`.Connection.execution_options.isolation_level` 

401 feature has been used to alter the isolation level on a 

402 per-:class:`_engine.Connection` basis. 

403 

404 This attribute will typically perform a live SQL operation in order 

405 to procure the current isolation level, so the value returned is the 

406 actual level on the underlying DBAPI connection regardless of how 

407 this state was set. Compare to the 

408 :attr:`_engine.Connection.default_isolation_level` accessor 

409 which returns the dialect-level setting without performing a SQL 

410 query. 

411 

412 .. versionadded:: 0.9.9 

413 

414 .. seealso:: 

415 

416 :attr:`_engine.Connection.default_isolation_level` 

417 - view default level 

418 

419 :paramref:`_sa.create_engine.isolation_level` 

420 - set per :class:`_engine.Engine` isolation level 

421 

422 :paramref:`.Connection.execution_options.isolation_level` 

423 - set per :class:`_engine.Connection` isolation level 

424 

425 """ 

426 try: 

427 return self.dialect.get_isolation_level(self.connection) 

428 except BaseException as e: 

429 self._handle_dbapi_exception(e, None, None, None, None) 

430 

431 @property 

432 def default_isolation_level(self): 

433 """The default isolation level assigned to this 

434 :class:`_engine.Connection`. 

435 

436 This is the isolation level setting that the 

437 :class:`_engine.Connection` 

438 has when first procured via the :meth:`_engine.Engine.connect` method. 

439 This level stays in place until the 

440 :paramref:`.Connection.execution_options.isolation_level` is used 

441 to change the setting on a per-:class:`_engine.Connection` basis. 

442 

443 Unlike :meth:`_engine.Connection.get_isolation_level`, 

444 this attribute is set 

445 ahead of time from the first connection procured by the dialect, 

446 so SQL query is not invoked when this accessor is called. 

447 

448 .. versionadded:: 0.9.9 

449 

450 .. seealso:: 

451 

452 :meth:`_engine.Connection.get_isolation_level` 

453 - view current level 

454 

455 :paramref:`_sa.create_engine.isolation_level` 

456 - set per :class:`_engine.Engine` isolation level 

457 

458 :paramref:`.Connection.execution_options.isolation_level` 

459 - set per :class:`_engine.Connection` isolation level 

460 

461 """ 

462 return self.dialect.default_isolation_level 

463 

464 def _revalidate_connection(self): 

465 if self.__branch_from: 

466 return self.__branch_from._revalidate_connection() 

467 if self.__can_reconnect and self.__invalid: 

468 if self.__transaction is not None: 

469 raise exc.InvalidRequestError( 

470 "Can't reconnect until invalid " 

471 "transaction is rolled back" 

472 ) 

473 self.__connection = self.engine.raw_connection(_connection=self) 

474 self.__invalid = False 

475 return self.__connection 

476 raise exc.ResourceClosedError("This Connection is closed") 

477 

478 @property 

479 def _connection_is_valid(self): 

480 # use getattr() for is_valid to support exceptions raised in 

481 # dialect initializer, where the connection is not wrapped in 

482 # _ConnectionFairy 

483 

484 return getattr(self.__connection, "is_valid", False) 

485 

486 @property 

487 def _still_open_and_connection_is_valid(self): 

488 return ( 

489 not self.closed 

490 and not self.invalidated 

491 and getattr(self.__connection, "is_valid", False) 

492 ) 

493 

494 @property 

495 def info(self): 

496 """Info dictionary associated with the underlying DBAPI connection 

497 referred to by this :class:`_engine.Connection`, allowing user-defined 

498 data to be associated with the connection. 

499 

500 The data here will follow along with the DBAPI connection including 

501 after it is returned to the connection pool and used again 

502 in subsequent instances of :class:`_engine.Connection`. 

503 

504 """ 

505 

506 return self.connection.info 

507 

508 def connect(self): 

509 """Returns a branched version of this :class:`_engine.Connection`. 

510 

511 The :meth:`_engine.Connection.close` method on the returned 

512 :class:`_engine.Connection` can be called and this 

513 :class:`_engine.Connection` will remain open. 

514 

515 This method provides usage symmetry with 

516 :meth:`_engine.Engine.connect`, including for usage 

517 with context managers. 

518 

519 """ 

520 

521 return self._branch() 

522 

523 def _contextual_connect(self, **kwargs): 

524 return self._branch() 

525 

526 def invalidate(self, exception=None): 

527 """Invalidate the underlying DBAPI connection associated with 

528 this :class:`_engine.Connection`. 

529 

530 The underlying DBAPI connection is literally closed (if 

531 possible), and is discarded. Its source connection pool will 

532 typically lazily create a new connection to replace it. 

533 

534 Upon the next use (where "use" typically means using the 

535 :meth:`_engine.Connection.execute` method or similar), 

536 this :class:`_engine.Connection` will attempt to 

537 procure a new DBAPI connection using the services of the 

538 :class:`_pool.Pool` as a source of connectivity (e.g. 

539 a "reconnection"). 

540 

541 If a transaction was in progress (e.g. the 

542 :meth:`_engine.Connection.begin` method has been called) when 

543 :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 

544 level all state associated with this transaction is lost, as 

545 the DBAPI connection is closed. The :class:`_engine.Connection` 

546 will not allow a reconnection to proceed until the 

547 :class:`.Transaction` object is ended, by calling the 

548 :meth:`.Transaction.rollback` method; until that point, any attempt at 

549 continuing to use the :class:`_engine.Connection` will raise an 

550 :class:`~sqlalchemy.exc.InvalidRequestError`. 

551 This is to prevent applications from accidentally 

552 continuing an ongoing transactional operations despite the 

553 fact that the transaction has been lost due to an 

554 invalidation. 

555 

556 The :meth:`_engine.Connection.invalidate` method, 

557 just like auto-invalidation, 

558 will at the connection pool level invoke the 

559 :meth:`_events.PoolEvents.invalidate` event. 

560 

561 .. seealso:: 

562 

563 :ref:`pool_connection_invalidation` 

564 

565 """ 

566 

567 if self.invalidated: 

568 return 

569 

570 if self.closed: 

571 raise exc.ResourceClosedError("This Connection is closed") 

572 

573 if self._root._connection_is_valid: 

574 self._root.__connection.invalidate(exception) 

575 del self._root.__connection 

576 self._root.__invalid = True 

577 

578 def detach(self): 

579 """Detach the underlying DB-API connection from its connection pool. 

580 

581 E.g.:: 

582 

583 with engine.connect() as conn: 

584 conn.detach() 

585 conn.execute("SET search_path TO schema1, schema2") 

586 

587 # work with connection 

588 

589 # connection is fully closed (since we used "with:", can 

590 # also call .close()) 

591 

592 This :class:`_engine.Connection` instance will remain usable. 

593 When closed 

594 (or exited from a context manager context as above), 

595 the DB-API connection will be literally closed and not 

596 returned to its originating pool. 

597 

598 This method can be used to insulate the rest of an application 

599 from a modified state on a connection (such as a transaction 

600 isolation level or similar). 

601 

602 """ 

603 

604 self.__connection.detach() 

605 

606 def begin(self): 

607 """Begin a transaction and return a transaction handle. 

608 

609 The returned object is an instance of :class:`.Transaction`. 

610 This object represents the "scope" of the transaction, 

611 which completes when either the :meth:`.Transaction.rollback` 

612 or :meth:`.Transaction.commit` method is called. 

613 

614 Nested calls to :meth:`.begin` on the same :class:`_engine.Connection` 

615 will return new :class:`.Transaction` objects that represent 

616 an emulated transaction within the scope of the enclosing 

617 transaction, that is:: 

618 

619 trans = conn.begin() # outermost transaction 

620 trans2 = conn.begin() # "nested" 

621 trans2.commit() # does nothing 

622 trans.commit() # actually commits 

623 

624 Calls to :meth:`.Transaction.commit` only have an effect 

625 when invoked via the outermost :class:`.Transaction` object, though the 

626 :meth:`.Transaction.rollback` method of any of the 

627 :class:`.Transaction` objects will roll back the 

628 transaction. 

629 

630 .. seealso:: 

631 

632 :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 

633 

634 :meth:`_engine.Connection.begin_twophase` - 

635 use a two phase /XID transaction 

636 

637 :meth:`_engine.Engine.begin` - context manager available from 

638 :class:`_engine.Engine` 

639 

640 """ 

641 if self.__branch_from: 

642 return self.__branch_from.begin() 

643 

644 if self.__transaction is None: 

645 self.__transaction = RootTransaction(self) 

646 return self.__transaction 

647 else: 

648 return Transaction(self, self.__transaction) 

649 

650 def begin_nested(self): 

651 """Begin a nested transaction and return a transaction handle. 

652 

653 The returned object is an instance of :class:`.NestedTransaction`. 

654 

655 Nested transactions require SAVEPOINT support in the 

656 underlying database. Any transaction in the hierarchy may 

657 ``commit`` and ``rollback``, however the outermost transaction 

658 still controls the overall ``commit`` or ``rollback`` of the 

659 transaction of a whole. 

660 

661 .. seealso:: 

662 

663 :meth:`_engine.Connection.begin` 

664 

665 :meth:`_engine.Connection.begin_twophase` 

666 

667 """ 

668 if self.__branch_from: 

669 return self.__branch_from.begin_nested() 

670 

671 if self.__transaction is None: 

672 self.__transaction = RootTransaction(self) 

673 else: 

674 self.__transaction = NestedTransaction(self, self.__transaction) 

675 return self.__transaction 

676 

677 def begin_twophase(self, xid=None): 

678 """Begin a two-phase or XA transaction and return a transaction 

679 handle. 

680 

681 The returned object is an instance of :class:`.TwoPhaseTransaction`, 

682 which in addition to the methods provided by 

683 :class:`.Transaction`, also provides a 

684 :meth:`~.TwoPhaseTransaction.prepare` method. 

685 

686 :param xid: the two phase transaction id. If not supplied, a 

687 random id will be generated. 

688 

689 .. seealso:: 

690 

691 :meth:`_engine.Connection.begin` 

692 

693 :meth:`_engine.Connection.begin_twophase` 

694 

695 """ 

696 

697 if self.__branch_from: 

698 return self.__branch_from.begin_twophase(xid=xid) 

699 

700 if self.__transaction is not None: 

701 raise exc.InvalidRequestError( 

702 "Cannot start a two phase transaction when a transaction " 

703 "is already in progress." 

704 ) 

705 if xid is None: 

706 xid = self.engine.dialect.create_xid() 

707 self.__transaction = TwoPhaseTransaction(self, xid) 

708 return self.__transaction 

709 

710 def recover_twophase(self): 

711 return self.engine.dialect.do_recover_twophase(self) 

712 

713 def rollback_prepared(self, xid, recover=False): 

714 self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 

715 

716 def commit_prepared(self, xid, recover=False): 

717 self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 

718 

719 def in_transaction(self): 

720 """Return True if a transaction is in progress.""" 

721 return self._root.__transaction is not None 

722 

723 def _begin_impl(self, transaction): 

724 assert not self.__branch_from 

725 

726 if self._echo: 

727 self.engine.logger.info("BEGIN (implicit)") 

728 

729 if self._has_events or self.engine._has_events: 

730 self.dispatch.begin(self) 

731 

732 try: 

733 self.engine.dialect.do_begin(self.connection) 

734 if self.connection._reset_agent is None: 

735 self.connection._reset_agent = transaction 

736 except BaseException as e: 

737 self._handle_dbapi_exception(e, None, None, None, None) 

738 

739 def _rollback_impl(self): 

740 assert not self.__branch_from 

741 

742 if self._has_events or self.engine._has_events: 

743 self.dispatch.rollback(self) 

744 

745 if self._still_open_and_connection_is_valid: 

746 if self._echo: 

747 self.engine.logger.info("ROLLBACK") 

748 try: 

749 self.engine.dialect.do_rollback(self.connection) 

750 except BaseException as e: 

751 self._handle_dbapi_exception(e, None, None, None, None) 

752 finally: 

753 if ( 

754 not self.__invalid 

755 and self.connection._reset_agent is self.__transaction 

756 ): 

757 self.connection._reset_agent = None 

758 self.__transaction = None 

759 else: 

760 self.__transaction = None 

761 

762 def _commit_impl(self, autocommit=False): 

763 assert not self.__branch_from 

764 

765 if self._has_events or self.engine._has_events: 

766 self.dispatch.commit(self) 

767 

768 if self._echo: 

769 self.engine.logger.info("COMMIT") 

770 try: 

771 self.engine.dialect.do_commit(self.connection) 

772 except BaseException as e: 

773 self._handle_dbapi_exception(e, None, None, None, None) 

774 finally: 

775 if ( 

776 not self.__invalid 

777 and self.connection._reset_agent is self.__transaction 

778 ): 

779 self.connection._reset_agent = None 

780 self.__transaction = None 

781 

782 def _savepoint_impl(self, name=None): 

783 assert not self.__branch_from 

784 

785 if self._has_events or self.engine._has_events: 

786 self.dispatch.savepoint(self, name) 

787 

788 if name is None: 

789 self.__savepoint_seq += 1 

790 name = "sa_savepoint_%s" % self.__savepoint_seq 

791 if self._still_open_and_connection_is_valid: 

792 self.engine.dialect.do_savepoint(self, name) 

793 return name 

794 

795 def _discard_transaction(self, trans): 

796 if trans is self.__transaction: 

797 if trans._parent is trans: 

798 self.__transaction = None 

799 else: 

800 self.__transaction = trans._parent 

801 

802 if self._still_open_and_connection_is_valid: 

803 if self.__connection._reset_agent is trans: 

804 self.__connection._reset_agent = None 

805 

806 def _rollback_to_savepoint_impl(self, name, context): 

807 assert not self.__branch_from 

808 

809 if self._has_events or self.engine._has_events: 

810 self.dispatch.rollback_savepoint(self, name, context) 

811 

812 if self._still_open_and_connection_is_valid: 

813 self.engine.dialect.do_rollback_to_savepoint(self, name) 

814 self.__transaction = context 

815 

816 def _release_savepoint_impl(self, name, context): 

817 assert not self.__branch_from 

818 

819 if self._has_events or self.engine._has_events: 

820 self.dispatch.release_savepoint(self, name, context) 

821 

822 if self._still_open_and_connection_is_valid: 

823 self.engine.dialect.do_release_savepoint(self, name) 

824 self.__transaction = context 

825 

826 def _begin_twophase_impl(self, transaction): 

827 assert not self.__branch_from 

828 

829 if self._echo: 

830 self.engine.logger.info("BEGIN TWOPHASE (implicit)") 

831 if self._has_events or self.engine._has_events: 

832 self.dispatch.begin_twophase(self, transaction.xid) 

833 

834 if self._still_open_and_connection_is_valid: 

835 self.engine.dialect.do_begin_twophase(self, transaction.xid) 

836 

837 if self.connection._reset_agent is None: 

838 self.connection._reset_agent = transaction 

839 

840 def _prepare_twophase_impl(self, xid): 

841 assert not self.__branch_from 

842 

843 if self._has_events or self.engine._has_events: 

844 self.dispatch.prepare_twophase(self, xid) 

845 

846 if self._still_open_and_connection_is_valid: 

847 assert isinstance(self.__transaction, TwoPhaseTransaction) 

848 self.engine.dialect.do_prepare_twophase(self, xid) 

849 

850 def _rollback_twophase_impl(self, xid, is_prepared): 

851 assert not self.__branch_from 

852 

853 if self._has_events or self.engine._has_events: 

854 self.dispatch.rollback_twophase(self, xid, is_prepared) 

855 

856 if self._still_open_and_connection_is_valid: 

857 assert isinstance(self.__transaction, TwoPhaseTransaction) 

858 try: 

859 self.engine.dialect.do_rollback_twophase( 

860 self, xid, is_prepared 

861 ) 

862 finally: 

863 if self.connection._reset_agent is self.__transaction: 

864 self.connection._reset_agent = None 

865 self.__transaction = None 

866 else: 

867 self.__transaction = None 

868 

869 def _commit_twophase_impl(self, xid, is_prepared): 

870 assert not self.__branch_from 

871 

872 if self._has_events or self.engine._has_events: 

873 self.dispatch.commit_twophase(self, xid, is_prepared) 

874 

875 if self._still_open_and_connection_is_valid: 

876 assert isinstance(self.__transaction, TwoPhaseTransaction) 

877 try: 

878 self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 

879 finally: 

880 if self.connection._reset_agent is self.__transaction: 

881 self.connection._reset_agent = None 

882 self.__transaction = None 

883 else: 

884 self.__transaction = None 

885 

886 def _autorollback(self): 

887 if not self._root.in_transaction(): 

888 self._root._rollback_impl() 

889 

890 def close(self): 

891 """Close this :class:`_engine.Connection`. 

892 

893 This results in a release of the underlying database 

894 resources, that is, the DBAPI connection referenced 

895 internally. The DBAPI connection is typically restored 

896 back to the connection-holding :class:`_pool.Pool` referenced 

897 by the :class:`_engine.Engine` that produced this 

898 :class:`_engine.Connection`. Any transactional state present on 

899 the DBAPI connection is also unconditionally released via 

900 the DBAPI connection's ``rollback()`` method, regardless 

901 of any :class:`.Transaction` object that may be 

902 outstanding with regards to this :class:`_engine.Connection`. 

903 

904 After :meth:`_engine.Connection.close` is called, the 

905 :class:`_engine.Connection` is permanently in a closed state, 

906 and will allow no further operations. 

907 

908 """ 

909 if self.__branch_from: 

910 try: 

911 del self.__connection 

912 except AttributeError: 

913 pass 

914 finally: 

915 self.__can_reconnect = False 

916 return 

917 try: 

918 conn = self.__connection 

919 except AttributeError: 

920 pass 

921 else: 

922 

923 conn.close() 

924 if conn._reset_agent is self.__transaction: 

925 conn._reset_agent = None 

926 

927 # the close() process can end up invalidating us, 

928 # as the pool will call our transaction as the "reset_agent" 

929 # for rollback(), which can then cause an invalidation 

930 if not self.__invalid: 

931 del self.__connection 

932 self.__can_reconnect = False 

933 self.__transaction = None 

934 

935 def scalar(self, object_, *multiparams, **params): 

936 """Executes and returns the first column of the first row. 

937 

938 The underlying result/cursor is closed after execution. 

939 """ 

940 

941 return self.execute(object_, *multiparams, **params).scalar() 

942 

943 def execute(self, object_, *multiparams, **params): 

944 r"""Executes a SQL statement construct and returns a 

945 :class:`_engine.ResultProxy`. 

946 

947 :param object: The statement to be executed. May be 

948 one of: 

949 

950 * a plain string 

951 * any :class:`_expression.ClauseElement` construct that is also 

952 a subclass of :class:`.Executable`, such as a 

953 :func:`_expression.select` construct 

954 * a :class:`.FunctionElement`, such as that generated 

955 by :data:`.func`, will be automatically wrapped in 

956 a SELECT statement, which is then executed. 

957 * a :class:`.DDLElement` object 

958 * a :class:`.DefaultGenerator` object 

959 * a :class:`.Compiled` object 

960 

961 :param \*multiparams/\**params: represent bound parameter 

962 values to be used in the execution. Typically, 

963 the format is either a collection of one or more 

964 dictionaries passed to \*multiparams:: 

965 

966 conn.execute( 

967 table.insert(), 

968 {"id":1, "value":"v1"}, 

969 {"id":2, "value":"v2"} 

970 ) 

971 

972 ...or individual key/values interpreted by \**params:: 

973 

974 conn.execute( 

975 table.insert(), id=1, value="v1" 

976 ) 

977 

978 In the case that a plain SQL string is passed, and the underlying 

979 DBAPI accepts positional bind parameters, a collection of tuples 

980 or individual values in \*multiparams may be passed:: 

981 

982 conn.execute( 

983 "INSERT INTO table (id, value) VALUES (?, ?)", 

984 (1, "v1"), (2, "v2") 

985 ) 

986 

987 conn.execute( 

988 "INSERT INTO table (id, value) VALUES (?, ?)", 

989 1, "v1" 

990 ) 

991 

992 Note above, the usage of a question mark "?" or other 

993 symbol is contingent upon the "paramstyle" accepted by the DBAPI 

994 in use, which may be any of "qmark", "named", "pyformat", "format", 

995 "numeric". See `pep-249 <http://www.python.org/dev/peps/pep-0249/>`_ 

996 for details on paramstyle. 

997 

998 To execute a textual SQL statement which uses bound parameters in a 

999 DBAPI-agnostic way, use the :func:`_expression.text` construct. 

1000 

1001 """ 

1002 if isinstance(object_, util.string_types[0]): 

1003 return self._execute_text(object_, multiparams, params) 

1004 try: 

1005 meth = object_._execute_on_connection 

1006 except AttributeError as err: 

1007 util.raise_( 

1008 exc.ObjectNotExecutableError(object_), replace_context=err 

1009 ) 

1010 else: 

1011 return meth(self, multiparams, params) 

1012 

1013 def _execute_function(self, func, multiparams, params): 

1014 """Execute a sql.FunctionElement object.""" 

1015 

1016 return self._execute_clauseelement(func.select(), multiparams, params) 

1017 

1018 def _execute_default(self, default, multiparams, params): 

1019 """Execute a schema.ColumnDefault object.""" 

1020 

1021 if self._has_events or self.engine._has_events: 

1022 for fn in self.dispatch.before_execute: 

1023 default, multiparams, params = fn( 

1024 self, default, multiparams, params 

1025 ) 

1026 

1027 try: 

1028 try: 

1029 conn = self.__connection 

1030 except AttributeError: 

1031 # escape "except AttributeError" before revalidating 

1032 # to prevent misleading stacktraces in Py3K 

1033 conn = None 

1034 if conn is None: 

1035 conn = self._revalidate_connection() 

1036 

1037 dialect = self.dialect 

1038 ctx = dialect.execution_ctx_cls._init_default(dialect, self, conn) 

1039 except BaseException as e: 

1040 self._handle_dbapi_exception(e, None, None, None, None) 

1041 

1042 ret = ctx._exec_default(None, default, None) 

1043 if self.should_close_with_result: 

1044 self.close() 

1045 

1046 if self._has_events or self.engine._has_events: 

1047 self.dispatch.after_execute( 

1048 self, default, multiparams, params, ret 

1049 ) 

1050 

1051 return ret 

1052 

1053 def _execute_ddl(self, ddl, multiparams, params): 

1054 """Execute a schema.DDL object.""" 

1055 

1056 if self._has_events or self.engine._has_events: 

1057 for fn in self.dispatch.before_execute: 

1058 ddl, multiparams, params = fn(self, ddl, multiparams, params) 

1059 

1060 dialect = self.dialect 

1061 

1062 compiled = ddl.compile( 

1063 dialect=dialect, 

1064 schema_translate_map=self.schema_for_object 

1065 if not self.schema_for_object.is_default 

1066 else None, 

1067 ) 

1068 ret = self._execute_context( 

1069 dialect, 

1070 dialect.execution_ctx_cls._init_ddl, 

1071 compiled, 

1072 None, 

1073 compiled, 

1074 ) 

1075 if self._has_events or self.engine._has_events: 

1076 self.dispatch.after_execute(self, ddl, multiparams, params, ret) 

1077 return ret 

1078 

1079 def _execute_clauseelement(self, elem, multiparams, params): 

1080 """Execute a sql.ClauseElement object.""" 

1081 

1082 if self._has_events or self.engine._has_events: 

1083 for fn in self.dispatch.before_execute: 

1084 elem, multiparams, params = fn(self, elem, multiparams, params) 

1085 

1086 distilled_params = _distill_params(multiparams, params) 

1087 if distilled_params: 

1088 # ensure we don't retain a link to the view object for keys() 

1089 # which links to the values, which we don't want to cache 

1090 keys = list(distilled_params[0].keys()) 

1091 else: 

1092 keys = [] 

1093 

1094 dialect = self.dialect 

1095 if "compiled_cache" in self._execution_options: 

1096 key = ( 

1097 dialect, 

1098 elem, 

1099 tuple(sorted(keys)), 

1100 self.schema_for_object.hash_key, 

1101 len(distilled_params) > 1, 

1102 ) 

1103 compiled_sql = self._execution_options["compiled_cache"].get(key) 

1104 if compiled_sql is None: 

1105 compiled_sql = elem.compile( 

1106 dialect=dialect, 

1107 column_keys=keys, 

1108 inline=len(distilled_params) > 1, 

1109 schema_translate_map=self.schema_for_object 

1110 if not self.schema_for_object.is_default 

1111 else None, 

1112 ) 

1113 self._execution_options["compiled_cache"][key] = compiled_sql 

1114 else: 

1115 compiled_sql = elem.compile( 

1116 dialect=dialect, 

1117 column_keys=keys, 

1118 inline=len(distilled_params) > 1, 

1119 schema_translate_map=self.schema_for_object 

1120 if not self.schema_for_object.is_default 

1121 else None, 

1122 ) 

1123 

1124 ret = self._execute_context( 

1125 dialect, 

1126 dialect.execution_ctx_cls._init_compiled, 

1127 compiled_sql, 

1128 distilled_params, 

1129 compiled_sql, 

1130 distilled_params, 

1131 ) 

1132 if self._has_events or self.engine._has_events: 

1133 self.dispatch.after_execute(self, elem, multiparams, params, ret) 

1134 return ret 

1135 

1136 def _execute_compiled(self, compiled, multiparams, params): 

1137 """Execute a sql.Compiled object.""" 

1138 

1139 if self._has_events or self.engine._has_events: 

1140 for fn in self.dispatch.before_execute: 

1141 compiled, multiparams, params = fn( 

1142 self, compiled, multiparams, params 

1143 ) 

1144 

1145 dialect = self.dialect 

1146 parameters = _distill_params(multiparams, params) 

1147 ret = self._execute_context( 

1148 dialect, 

1149 dialect.execution_ctx_cls._init_compiled, 

1150 compiled, 

1151 parameters, 

1152 compiled, 

1153 parameters, 

1154 ) 

1155 if self._has_events or self.engine._has_events: 

1156 self.dispatch.after_execute( 

1157 self, compiled, multiparams, params, ret 

1158 ) 

1159 return ret 

1160 

1161 def _execute_text(self, statement, multiparams, params): 

1162 """Execute a string SQL statement.""" 

1163 

1164 if self._has_events or self.engine._has_events: 

1165 for fn in self.dispatch.before_execute: 

1166 statement, multiparams, params = fn( 

1167 self, statement, multiparams, params 

1168 ) 

1169 

1170 dialect = self.dialect 

1171 parameters = _distill_params(multiparams, params) 

1172 ret = self._execute_context( 

1173 dialect, 

1174 dialect.execution_ctx_cls._init_statement, 

1175 statement, 

1176 parameters, 

1177 statement, 

1178 parameters, 

1179 ) 

1180 if self._has_events or self.engine._has_events: 

1181 self.dispatch.after_execute( 

1182 self, statement, multiparams, params, ret 

1183 ) 

1184 return ret 

1185 

1186 def _execute_context( 

1187 self, dialect, constructor, statement, parameters, *args 

1188 ): 

1189 """Create an :class:`.ExecutionContext` and execute, returning 

1190 a :class:`_engine.ResultProxy`. 

1191 

1192 """ 

1193 

1194 try: 

1195 try: 

1196 conn = self.__connection 

1197 except AttributeError: 

1198 # escape "except AttributeError" before revalidating 

1199 # to prevent misleading stacktraces in Py3K 

1200 conn = None 

1201 if conn is None: 

1202 conn = self._revalidate_connection() 

1203 

1204 context = constructor(dialect, self, conn, *args) 

1205 except BaseException as e: 

1206 self._handle_dbapi_exception( 

1207 e, util.text_type(statement), parameters, None, None 

1208 ) 

1209 

1210 if context.compiled: 

1211 context.pre_exec() 

1212 

1213 cursor, statement, parameters = ( 

1214 context.cursor, 

1215 context.statement, 

1216 context.parameters, 

1217 ) 

1218 

1219 if not context.executemany: 

1220 parameters = parameters[0] 

1221 

1222 if self._has_events or self.engine._has_events: 

1223 for fn in self.dispatch.before_cursor_execute: 

1224 statement, parameters = fn( 

1225 self, 

1226 cursor, 

1227 statement, 

1228 parameters, 

1229 context, 

1230 context.executemany, 

1231 ) 

1232 

1233 if self._echo: 

1234 self.engine.logger.info(statement) 

1235 if not self.engine.hide_parameters: 

1236 self.engine.logger.info( 

1237 "%r", 

1238 sql_util._repr_params( 

1239 parameters, batches=10, ismulti=context.executemany 

1240 ), 

1241 ) 

1242 else: 

1243 self.engine.logger.info( 

1244 "[SQL parameters hidden due to hide_parameters=True]" 

1245 ) 

1246 

1247 evt_handled = False 

1248 try: 

1249 if context.executemany: 

1250 if self.dialect._has_events: 

1251 for fn in self.dialect.dispatch.do_executemany: 

1252 if fn(cursor, statement, parameters, context): 

1253 evt_handled = True 

1254 break 

1255 if not evt_handled: 

1256 self.dialect.do_executemany( 

1257 cursor, statement, parameters, context 

1258 ) 

1259 elif not parameters and context.no_parameters: 

1260 if self.dialect._has_events: 

1261 for fn in self.dialect.dispatch.do_execute_no_params: 

1262 if fn(cursor, statement, context): 

1263 evt_handled = True 

1264 break 

1265 if not evt_handled: 

1266 self.dialect.do_execute_no_params( 

1267 cursor, statement, context 

1268 ) 

1269 else: 

1270 if self.dialect._has_events: 

1271 for fn in self.dialect.dispatch.do_execute: 

1272 if fn(cursor, statement, parameters, context): 

1273 evt_handled = True 

1274 break 

1275 if not evt_handled: 

1276 self.dialect.do_execute( 

1277 cursor, statement, parameters, context 

1278 ) 

1279 

1280 if self._has_events or self.engine._has_events: 

1281 self.dispatch.after_cursor_execute( 

1282 self, 

1283 cursor, 

1284 statement, 

1285 parameters, 

1286 context, 

1287 context.executemany, 

1288 ) 

1289 

1290 if context.compiled: 

1291 context.post_exec() 

1292 

1293 if context.is_crud or context.is_text: 

1294 result = context._setup_crud_result_proxy() 

1295 else: 

1296 result = context.get_result_proxy() 

1297 if result._metadata is None: 

1298 result._soft_close() 

1299 

1300 if context.should_autocommit and self._root.__transaction is None: 

1301 self._root._commit_impl(autocommit=True) 

1302 

1303 # for "connectionless" execution, we have to close this 

1304 # Connection after the statement is complete. 

1305 if self.should_close_with_result: 

1306 # ResultProxy already exhausted rows / has no rows. 

1307 # close us now 

1308 if result._soft_closed: 

1309 self.close() 

1310 else: 

1311 # ResultProxy will close this Connection when no more 

1312 # rows to fetch. 

1313 result._autoclose_connection = True 

1314 

1315 except BaseException as e: 

1316 self._handle_dbapi_exception( 

1317 e, statement, parameters, cursor, context 

1318 ) 

1319 

1320 return result 

1321 

1322 def _cursor_execute(self, cursor, statement, parameters, context=None): 

1323 """Execute a statement + params on the given cursor. 

1324 

1325 Adds appropriate logging and exception handling. 

1326 

1327 This method is used by DefaultDialect for special-case 

1328 executions, such as for sequences and column defaults. 

1329 The path of statement execution in the majority of cases 

1330 terminates at _execute_context(). 

1331 

1332 """ 

1333 if self._has_events or self.engine._has_events: 

1334 for fn in self.dispatch.before_cursor_execute: 

1335 statement, parameters = fn( 

1336 self, cursor, statement, parameters, context, False 

1337 ) 

1338 

1339 if self._echo: 

1340 self.engine.logger.info(statement) 

1341 self.engine.logger.info("%r", parameters) 

1342 try: 

1343 for fn in ( 

1344 () 

1345 if not self.dialect._has_events 

1346 else self.dialect.dispatch.do_execute 

1347 ): 

1348 if fn(cursor, statement, parameters, context): 

1349 break 

1350 else: 

1351 self.dialect.do_execute(cursor, statement, parameters, context) 

1352 except BaseException as e: 

1353 self._handle_dbapi_exception( 

1354 e, statement, parameters, cursor, context 

1355 ) 

1356 

1357 if self._has_events or self.engine._has_events: 

1358 self.dispatch.after_cursor_execute( 

1359 self, cursor, statement, parameters, context, False 

1360 ) 

1361 

1362 def _safe_close_cursor(self, cursor): 

1363 """Close the given cursor, catching exceptions 

1364 and turning into log warnings. 

1365 

1366 """ 

1367 try: 

1368 cursor.close() 

1369 except Exception: 

1370 # log the error through the connection pool's logger. 

1371 self.engine.pool.logger.error( 

1372 "Error closing cursor", exc_info=True 

1373 ) 

1374 

1375 _reentrant_error = False 

1376 _is_disconnect = False 

1377 

1378 def _handle_dbapi_exception( 

1379 self, e, statement, parameters, cursor, context 

1380 ): 

1381 exc_info = sys.exc_info() 

1382 

1383 if context and context.exception is None: 

1384 context.exception = e 

1385 

1386 is_exit_exception = not isinstance(e, Exception) 

1387 

1388 if not self._is_disconnect: 

1389 self._is_disconnect = ( 

1390 isinstance(e, self.dialect.dbapi.Error) 

1391 and not self.closed 

1392 and self.dialect.is_disconnect( 

1393 e, 

1394 self.__connection if not self.invalidated else None, 

1395 cursor, 

1396 ) 

1397 ) or (is_exit_exception and not self.closed) 

1398 

1399 if context: 

1400 context.is_disconnect = self._is_disconnect 

1401 

1402 invalidate_pool_on_disconnect = not is_exit_exception 

1403 

1404 if self._reentrant_error: 

1405 util.raise_( 

1406 exc.DBAPIError.instance( 

1407 statement, 

1408 parameters, 

1409 e, 

1410 self.dialect.dbapi.Error, 

1411 hide_parameters=self.engine.hide_parameters, 

1412 dialect=self.dialect, 

1413 ismulti=context.executemany 

1414 if context is not None 

1415 else None, 

1416 ), 

1417 with_traceback=exc_info[2], 

1418 from_=e, 

1419 ) 

1420 self._reentrant_error = True 

1421 try: 

1422 # non-DBAPI error - if we already got a context, 

1423 # or there's no string statement, don't wrap it 

1424 should_wrap = isinstance(e, self.dialect.dbapi.Error) or ( 

1425 statement is not None 

1426 and context is None 

1427 and not is_exit_exception 

1428 ) 

1429 

1430 if should_wrap: 

1431 sqlalchemy_exception = exc.DBAPIError.instance( 

1432 statement, 

1433 parameters, 

1434 e, 

1435 self.dialect.dbapi.Error, 

1436 hide_parameters=self.engine.hide_parameters, 

1437 connection_invalidated=self._is_disconnect, 

1438 dialect=self.dialect, 

1439 ismulti=context.executemany 

1440 if context is not None 

1441 else None, 

1442 ) 

1443 else: 

1444 sqlalchemy_exception = None 

1445 

1446 newraise = None 

1447 

1448 if ( 

1449 self._has_events or self.engine._has_events 

1450 ) and not self._execution_options.get( 

1451 "skip_user_error_events", False 

1452 ): 

1453 # legacy dbapi_error event 

1454 if should_wrap and context: 

1455 self.dispatch.dbapi_error( 

1456 self, cursor, statement, parameters, context, e 

1457 ) 

1458 

1459 # new handle_error event 

1460 ctx = ExceptionContextImpl( 

1461 e, 

1462 sqlalchemy_exception, 

1463 self.engine, 

1464 self, 

1465 cursor, 

1466 statement, 

1467 parameters, 

1468 context, 

1469 self._is_disconnect, 

1470 invalidate_pool_on_disconnect, 

1471 ) 

1472 

1473 for fn in self.dispatch.handle_error: 

1474 try: 

1475 # handler returns an exception; 

1476 # call next handler in a chain 

1477 per_fn = fn(ctx) 

1478 if per_fn is not None: 

1479 ctx.chained_exception = newraise = per_fn 

1480 except Exception as _raised: 

1481 # handler raises an exception - stop processing 

1482 newraise = _raised 

1483 break 

1484 

1485 if self._is_disconnect != ctx.is_disconnect: 

1486 self._is_disconnect = ctx.is_disconnect 

1487 if sqlalchemy_exception: 

1488 sqlalchemy_exception.connection_invalidated = ( 

1489 ctx.is_disconnect 

1490 ) 

1491 

1492 # set up potentially user-defined value for 

1493 # invalidate pool. 

1494 invalidate_pool_on_disconnect = ( 

1495 ctx.invalidate_pool_on_disconnect 

1496 ) 

1497 

1498 if should_wrap and context: 

1499 context.handle_dbapi_exception(e) 

1500 

1501 if not self._is_disconnect: 

1502 if cursor: 

1503 self._safe_close_cursor(cursor) 

1504 with util.safe_reraise(warn_only=True): 

1505 self._autorollback() 

1506 

1507 if newraise: 

1508 util.raise_(newraise, with_traceback=exc_info[2], from_=e) 

1509 elif should_wrap: 

1510 util.raise_( 

1511 sqlalchemy_exception, with_traceback=exc_info[2], from_=e 

1512 ) 

1513 else: 

1514 util.raise_(exc_info[1], with_traceback=exc_info[2]) 

1515 

1516 finally: 

1517 del self._reentrant_error 

1518 if self._is_disconnect: 

1519 del self._is_disconnect 

1520 if not self.invalidated: 

1521 dbapi_conn_wrapper = self.__connection 

1522 if invalidate_pool_on_disconnect: 

1523 self.engine.pool._invalidate(dbapi_conn_wrapper, e) 

1524 self.invalidate(e) 

1525 if self.should_close_with_result: 

1526 self.close() 

1527 

1528 @classmethod 

1529 def _handle_dbapi_exception_noconnection(cls, e, dialect, engine): 

1530 exc_info = sys.exc_info() 

1531 

1532 is_disconnect = dialect.is_disconnect(e, None, None) 

1533 

1534 should_wrap = isinstance(e, dialect.dbapi.Error) 

1535 

1536 if should_wrap: 

1537 sqlalchemy_exception = exc.DBAPIError.instance( 

1538 None, 

1539 None, 

1540 e, 

1541 dialect.dbapi.Error, 

1542 hide_parameters=engine.hide_parameters, 

1543 connection_invalidated=is_disconnect, 

1544 ) 

1545 else: 

1546 sqlalchemy_exception = None 

1547 

1548 newraise = None 

1549 

1550 if engine._has_events: 

1551 ctx = ExceptionContextImpl( 

1552 e, 

1553 sqlalchemy_exception, 

1554 engine, 

1555 None, 

1556 None, 

1557 None, 

1558 None, 

1559 None, 

1560 is_disconnect, 

1561 True, 

1562 ) 

1563 for fn in engine.dispatch.handle_error: 

1564 try: 

1565 # handler returns an exception; 

1566 # call next handler in a chain 

1567 per_fn = fn(ctx) 

1568 if per_fn is not None: 

1569 ctx.chained_exception = newraise = per_fn 

1570 except Exception as _raised: 

1571 # handler raises an exception - stop processing 

1572 newraise = _raised 

1573 break 

1574 

1575 if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 

1576 sqlalchemy_exception.connection_invalidated = ( 

1577 is_disconnect 

1578 ) = ctx.is_disconnect 

1579 

1580 if newraise: 

1581 util.raise_(newraise, with_traceback=exc_info[2], from_=e) 

1582 elif should_wrap: 

1583 util.raise_( 

1584 sqlalchemy_exception, with_traceback=exc_info[2], from_=e 

1585 ) 

1586 else: 

1587 util.raise_(exc_info[1], with_traceback=exc_info[2]) 

1588 

1589 def transaction(self, callable_, *args, **kwargs): 

1590 r"""Execute the given function within a transaction boundary. 

1591 

1592 The function is passed this :class:`_engine.Connection` 

1593 as the first argument, followed by the given \*args and \**kwargs, 

1594 e.g.:: 

1595 

1596 def do_something(conn, x, y): 

1597 conn.execute("some statement", {'x':x, 'y':y}) 

1598 

1599 conn.transaction(do_something, 5, 10) 

1600 

1601 The operations inside the function are all invoked within the 

1602 context of a single :class:`.Transaction`. 

1603 Upon success, the transaction is committed. If an 

1604 exception is raised, the transaction is rolled back 

1605 before propagating the exception. 

1606 

1607 .. note:: 

1608 

1609 The :meth:`.transaction` method is superseded by 

1610 the usage of the Python ``with:`` statement, which can 

1611 be used with :meth:`_engine.Connection.begin`:: 

1612 

1613 with conn.begin(): 

1614 conn.execute("some statement", {'x':5, 'y':10}) 

1615 

1616 As well as with :meth:`_engine.Engine.begin`:: 

1617 

1618 with engine.begin() as conn: 

1619 conn.execute("some statement", {'x':5, 'y':10}) 

1620 

1621 .. seealso:: 

1622 

1623 :meth:`_engine.Engine.begin` - engine-level transactional 

1624 context 

1625 

1626 :meth:`_engine.Engine.transaction` - engine-level version of 

1627 :meth:`_engine.Connection.transaction` 

1628 

1629 """ 

1630 

1631 trans = self.begin() 

1632 try: 

1633 ret = self.run_callable(callable_, *args, **kwargs) 

1634 trans.commit() 

1635 return ret 

1636 except: 

1637 with util.safe_reraise(): 

1638 trans.rollback() 

1639 

1640 def run_callable(self, callable_, *args, **kwargs): 

1641 r"""Given a callable object or function, execute it, passing 

1642 a :class:`_engine.Connection` as the first argument. 

1643 

1644 The given \*args and \**kwargs are passed subsequent 

1645 to the :class:`_engine.Connection` argument. 

1646 

1647 This function, along with :meth:`_engine.Engine.run_callable`, 

1648 allows a function to be run with a :class:`_engine.Connection` 

1649 or :class:`_engine.Engine` object without the need to know 

1650 which one is being dealt with. 

1651 

1652 """ 

1653 return callable_(self, *args, **kwargs) 

1654 

1655 def _run_visitor(self, visitorcallable, element, **kwargs): 

1656 visitorcallable(self.dialect, self, **kwargs).traverse_single(element) 

1657 

1658 

1659class ExceptionContextImpl(ExceptionContext): 

1660 """Implement the :class:`.ExceptionContext` interface.""" 

1661 

1662 def __init__( 

1663 self, 

1664 exception, 

1665 sqlalchemy_exception, 

1666 engine, 

1667 connection, 

1668 cursor, 

1669 statement, 

1670 parameters, 

1671 context, 

1672 is_disconnect, 

1673 invalidate_pool_on_disconnect, 

1674 ): 

1675 self.engine = engine 

1676 self.connection = connection 

1677 self.sqlalchemy_exception = sqlalchemy_exception 

1678 self.original_exception = exception 

1679 self.execution_context = context 

1680 self.statement = statement 

1681 self.parameters = parameters 

1682 self.is_disconnect = is_disconnect 

1683 self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 

1684 

1685 

1686class Transaction(object): 

1687 """Represent a database transaction in progress. 

1688 

1689 The :class:`.Transaction` object is procured by 

1690 calling the :meth:`_engine.Connection.begin` method of 

1691 :class:`_engine.Connection`:: 

1692 

1693 from sqlalchemy import create_engine 

1694 engine = create_engine("postgresql://scott:tiger@localhost/test") 

1695 connection = engine.connect() 

1696 trans = connection.begin() 

1697 connection.execute("insert into x (a, b) values (1, 2)") 

1698 trans.commit() 

1699 

1700 The object provides :meth:`.rollback` and :meth:`.commit` 

1701 methods in order to control transaction boundaries. It 

1702 also implements a context manager interface so that 

1703 the Python ``with`` statement can be used with the 

1704 :meth:`_engine.Connection.begin` method:: 

1705 

1706 with connection.begin(): 

1707 connection.execute("insert into x (a, b) values (1, 2)") 

1708 

1709 The Transaction object is **not** threadsafe. 

1710 

1711 .. seealso:: 

1712 

1713 :meth:`_engine.Connection.begin` 

1714 

1715 :meth:`_engine.Connection.begin_twophase` 

1716 

1717 :meth:`_engine.Connection.begin_nested` 

1718 

1719 .. index:: 

1720 single: thread safety; Transaction 

1721 """ 

1722 

1723 def __init__(self, connection, parent): 

1724 self.connection = connection 

1725 self._actual_parent = parent 

1726 self.is_active = True 

1727 

1728 @property 

1729 def _parent(self): 

1730 return self._actual_parent or self 

1731 

1732 def close(self): 

1733 """Close this :class:`.Transaction`. 

1734 

1735 If this transaction is the base transaction in a begin/commit 

1736 nesting, the transaction will rollback(). Otherwise, the 

1737 method returns. 

1738 

1739 This is used to cancel a Transaction without affecting the scope of 

1740 an enclosing transaction. 

1741 

1742 """ 

1743 

1744 if self._parent.is_active and self._parent is self: 

1745 self.rollback() 

1746 self.connection._discard_transaction(self) 

1747 

1748 def rollback(self): 

1749 """Roll back this :class:`.Transaction`.""" 

1750 if self._parent.is_active: 

1751 self._do_rollback() 

1752 self.is_active = False 

1753 

1754 def _do_rollback(self): 

1755 self._parent.rollback() 

1756 

1757 def commit(self): 

1758 """Commit this :class:`.Transaction`.""" 

1759 

1760 if not self._parent.is_active: 

1761 raise exc.InvalidRequestError("This transaction is inactive") 

1762 self._do_commit() 

1763 self.is_active = False 

1764 

1765 def _do_commit(self): 

1766 pass 

1767 

1768 def __enter__(self): 

1769 return self 

1770 

1771 def __exit__(self, type_, value, traceback): 

1772 if type_ is None and self.is_active: 

1773 try: 

1774 self.commit() 

1775 except: 

1776 with util.safe_reraise(): 

1777 self.rollback() 

1778 else: 

1779 self.rollback() 

1780 

1781 

1782class RootTransaction(Transaction): 

1783 def __init__(self, connection): 

1784 super(RootTransaction, self).__init__(connection, None) 

1785 self.connection._begin_impl(self) 

1786 

1787 def _do_rollback(self): 

1788 if self.is_active: 

1789 self.connection._rollback_impl() 

1790 

1791 def _do_commit(self): 

1792 if self.is_active: 

1793 self.connection._commit_impl() 

1794 

1795 

1796class NestedTransaction(Transaction): 

1797 """Represent a 'nested', or SAVEPOINT transaction. 

1798 

1799 A new :class:`.NestedTransaction` object may be procured 

1800 using the :meth:`_engine.Connection.begin_nested` method. 

1801 

1802 The interface is the same as that of :class:`.Transaction`. 

1803 

1804 """ 

1805 

1806 def __init__(self, connection, parent): 

1807 super(NestedTransaction, self).__init__(connection, parent) 

1808 self._savepoint = self.connection._savepoint_impl() 

1809 

1810 def _do_rollback(self): 

1811 if self.is_active: 

1812 self.connection._rollback_to_savepoint_impl( 

1813 self._savepoint, self._parent 

1814 ) 

1815 

1816 def _do_commit(self): 

1817 if self.is_active: 

1818 self.connection._release_savepoint_impl( 

1819 self._savepoint, self._parent 

1820 ) 

1821 

1822 

1823class TwoPhaseTransaction(Transaction): 

1824 """Represent a two-phase transaction. 

1825 

1826 A new :class:`.TwoPhaseTransaction` object may be procured 

1827 using the :meth:`_engine.Connection.begin_twophase` method. 

1828 

1829 The interface is the same as that of :class:`.Transaction` 

1830 with the addition of the :meth:`prepare` method. 

1831 

1832 """ 

1833 

1834 def __init__(self, connection, xid): 

1835 super(TwoPhaseTransaction, self).__init__(connection, None) 

1836 self._is_prepared = False 

1837 self.xid = xid 

1838 self.connection._begin_twophase_impl(self) 

1839 

1840 def prepare(self): 

1841 """Prepare this :class:`.TwoPhaseTransaction`. 

1842 

1843 After a PREPARE, the transaction can be committed. 

1844 

1845 """ 

1846 if not self._parent.is_active: 

1847 raise exc.InvalidRequestError("This transaction is inactive") 

1848 self.connection._prepare_twophase_impl(self.xid) 

1849 self._is_prepared = True 

1850 

1851 def _do_rollback(self): 

1852 self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 

1853 

1854 def _do_commit(self): 

1855 self.connection._commit_twophase_impl(self.xid, self._is_prepared) 

1856 

1857 

1858class Engine(Connectable, log.Identified): 

1859 """ 

1860 Connects a :class:`~sqlalchemy.pool.Pool` and 

1861 :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 

1862 source of database connectivity and behavior. 

1863 

1864 An :class:`_engine.Engine` object is instantiated publicly using the 

1865 :func:`~sqlalchemy.create_engine` function. 

1866 

1867 .. seealso:: 

1868 

1869 :doc:`/core/engines` 

1870 

1871 :ref:`connections_toplevel` 

1872 

1873 """ 

1874 

1875 _execution_options = util.immutabledict() 

1876 _has_events = False 

1877 _connection_cls = Connection 

1878 

1879 schema_for_object = schema._schema_getter(None) 

1880 """Return the ".schema" attribute for an object. 

1881 

1882 Used for :class:`_schema.Table`, :class:`.Sequence` and similar objects, 

1883 and takes into account 

1884 the :paramref:`.Connection.execution_options.schema_translate_map` 

1885 parameter. 

1886 

1887 .. versionadded:: 1.1 

1888 

1889 .. seealso:: 

1890 

1891 :ref:`schema_translating` 

1892 

1893 """ 

1894 

1895 def __init__( 

1896 self, 

1897 pool, 

1898 dialect, 

1899 url, 

1900 logging_name=None, 

1901 echo=None, 

1902 proxy=None, 

1903 execution_options=None, 

1904 hide_parameters=False, 

1905 ): 

1906 self.pool = pool 

1907 self.url = url 

1908 self.dialect = dialect 

1909 if logging_name: 

1910 self.logging_name = logging_name 

1911 self.echo = echo 

1912 self.hide_parameters = hide_parameters 

1913 log.instance_logger(self, echoflag=echo) 

1914 if proxy: 

1915 interfaces.ConnectionProxy._adapt_listener(self, proxy) 

1916 if execution_options: 

1917 self.update_execution_options(**execution_options) 

1918 

1919 @property 

1920 def engine(self): 

1921 return self 

1922 

1923 def update_execution_options(self, **opt): 

1924 r"""Update the default execution_options dictionary 

1925 of this :class:`_engine.Engine`. 

1926 

1927 The given keys/values in \**opt are added to the 

1928 default execution options that will be used for 

1929 all connections. The initial contents of this dictionary 

1930 can be sent via the ``execution_options`` parameter 

1931 to :func:`_sa.create_engine`. 

1932 

1933 .. seealso:: 

1934 

1935 :meth:`_engine.Connection.execution_options` 

1936 

1937 :meth:`_engine.Engine.execution_options` 

1938 

1939 """ 

1940 self._execution_options = self._execution_options.union(opt) 

1941 self.dispatch.set_engine_execution_options(self, opt) 

1942 self.dialect.set_engine_execution_options(self, opt) 

1943 

1944 def execution_options(self, **opt): 

1945 """Return a new :class:`_engine.Engine` that will provide 

1946 :class:`_engine.Connection` objects with the given execution options. 

1947 

1948 The returned :class:`_engine.Engine` remains related to the original 

1949 :class:`_engine.Engine` in that it shares the same connection pool and 

1950 other state: 

1951 

1952 * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 

1953 is the 

1954 same instance. The :meth:`_engine.Engine.dispose` 

1955 method will replace 

1956 the connection pool instance for the parent engine as well 

1957 as this one. 

1958 * Event listeners are "cascaded" - meaning, the new 

1959 :class:`_engine.Engine` 

1960 inherits the events of the parent, and new events can be associated 

1961 with the new :class:`_engine.Engine` individually. 

1962 * The logging configuration and logging_name is copied from the parent 

1963 :class:`_engine.Engine`. 

1964 

1965 The intent of the :meth:`_engine.Engine.execution_options` method is 

1966 to implement "sharding" schemes where multiple :class:`_engine.Engine` 

1967 objects refer to the same connection pool, but are differentiated 

1968 by options that would be consumed by a custom event:: 

1969 

1970 primary_engine = create_engine("mysql://") 

1971 shard1 = primary_engine.execution_options(shard_id="shard1") 

1972 shard2 = primary_engine.execution_options(shard_id="shard2") 

1973 

1974 Above, the ``shard1`` engine serves as a factory for 

1975 :class:`_engine.Connection` 

1976 objects that will contain the execution option 

1977 ``shard_id=shard1``, and ``shard2`` will produce 

1978 :class:`_engine.Connection` 

1979 objects that contain the execution option ``shard_id=shard2``. 

1980 

1981 An event handler can consume the above execution option to perform 

1982 a schema switch or other operation, given a connection. Below 

1983 we emit a MySQL ``use`` statement to switch databases, at the same 

1984 time keeping track of which database we've established using the 

1985 :attr:`_engine.Connection.info` dictionary, 

1986 which gives us a persistent 

1987 storage space that follows the DBAPI connection:: 

1988 

1989 from sqlalchemy import event 

1990 from sqlalchemy.engine import Engine 

1991 

1992 shards = {"default": "base", shard_1: "db1", "shard_2": "db2"} 

1993 

1994 @event.listens_for(Engine, "before_cursor_execute") 

1995 def _switch_shard(conn, cursor, stmt, 

1996 params, context, executemany): 

1997 shard_id = conn._execution_options.get('shard_id', "default") 

1998 current_shard = conn.info.get("current_shard", None) 

1999 

2000 if current_shard != shard_id: 

2001 cursor.execute("use %s" % shards[shard_id]) 

2002 conn.info["current_shard"] = shard_id 

2003 

2004 .. seealso:: 

2005 

2006 :meth:`_engine.Connection.execution_options` 

2007 - update execution options 

2008 on a :class:`_engine.Connection` object. 

2009 

2010 :meth:`_engine.Engine.update_execution_options` 

2011 - update the execution 

2012 options for a given :class:`_engine.Engine` in place. 

2013 

2014 :meth:`_engine.Engine.get_execution_options` 

2015 

2016 

2017 """ 

2018 return OptionEngine(self, opt) 

2019 

2020 def get_execution_options(self): 

2021 """Get the non-SQL options which will take effect during execution. 

2022 

2023 .. versionadded: 1.3 

2024 

2025 .. seealso:: 

2026 

2027 :meth:`_engine.Engine.execution_options` 

2028 """ 

2029 return self._execution_options 

2030 

2031 @property 

2032 def name(self): 

2033 """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

2034 in use by this :class:`Engine`.""" 

2035 

2036 return self.dialect.name 

2037 

2038 @property 

2039 def driver(self): 

2040 """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

2041 in use by this :class:`Engine`.""" 

2042 

2043 return self.dialect.driver 

2044 

2045 echo = log.echo_property() 

2046 

2047 def __repr__(self): 

2048 return "Engine(%r)" % self.url 

2049 

2050 def dispose(self): 

2051 """Dispose of the connection pool used by this 

2052 :class:`_engine.Engine`. 

2053 

2054 This has the effect of fully closing all **currently checked in** 

2055 database connections. Connections that are still checked out 

2056 will **not** be closed, however they will no longer be associated 

2057 with this :class:`_engine.Engine`, 

2058 so when they are closed individually, 

2059 eventually the :class:`_pool.Pool` which they are associated with will 

2060 be garbage collected and they will be closed out fully, if 

2061 not already closed on checkin. 

2062 

2063 A new connection pool is created immediately after the old one has 

2064 been disposed. This new pool, like all SQLAlchemy connection pools, 

2065 does not make any actual connections to the database until one is 

2066 first requested, so as long as the :class:`_engine.Engine` 

2067 isn't used again, 

2068 no new connections will be made. 

2069 

2070 .. seealso:: 

2071 

2072 :ref:`engine_disposal` 

2073 

2074 """ 

2075 self.pool.dispose() 

2076 self.pool = self.pool.recreate() 

2077 self.dispatch.engine_disposed(self) 

2078 

2079 def _execute_default(self, default): 

2080 with self._contextual_connect() as conn: 

2081 return conn._execute_default(default, (), {}) 

2082 

2083 @contextlib.contextmanager 

2084 def _optional_conn_ctx_manager(self, connection=None): 

2085 if connection is None: 

2086 with self._contextual_connect() as conn: 

2087 yield conn 

2088 else: 

2089 yield connection 

2090 

2091 def _run_visitor( 

2092 self, visitorcallable, element, connection=None, **kwargs 

2093 ): 

2094 with self._optional_conn_ctx_manager(connection) as conn: 

2095 conn._run_visitor(visitorcallable, element, **kwargs) 

2096 

2097 class _trans_ctx(object): 

2098 def __init__(self, conn, transaction, close_with_result): 

2099 self.conn = conn 

2100 self.transaction = transaction 

2101 self.close_with_result = close_with_result 

2102 

2103 def __enter__(self): 

2104 return self.conn 

2105 

2106 def __exit__(self, type_, value, traceback): 

2107 if type_ is not None: 

2108 self.transaction.rollback() 

2109 else: 

2110 self.transaction.commit() 

2111 if not self.close_with_result: 

2112 self.conn.close() 

2113 

2114 def begin(self, close_with_result=False): 

2115 """Return a context manager delivering a :class:`_engine.Connection` 

2116 with a :class:`.Transaction` established. 

2117 

2118 E.g.:: 

2119 

2120 with engine.begin() as conn: 

2121 conn.execute("insert into table (x, y, z) values (1, 2, 3)") 

2122 conn.execute("my_special_procedure(5)") 

2123 

2124 Upon successful operation, the :class:`.Transaction` 

2125 is committed. If an error is raised, the :class:`.Transaction` 

2126 is rolled back. 

2127 

2128 The ``close_with_result`` flag is normally ``False``, and indicates 

2129 that the :class:`_engine.Connection` will be closed when the operation 

2130 is complete. When set to ``True``, it indicates the 

2131 :class:`_engine.Connection` is in "single use" mode, where the 

2132 :class:`_engine.ResultProxy` returned by the first call to 

2133 :meth:`_engine.Connection.execute` will close the 

2134 :class:`_engine.Connection` when 

2135 that :class:`_engine.ResultProxy` has exhausted all result rows. 

2136 

2137 .. seealso:: 

2138 

2139 :meth:`_engine.Engine.connect` - procure a 

2140 :class:`_engine.Connection` from 

2141 an :class:`_engine.Engine`. 

2142 

2143 :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 

2144 for a particular :class:`_engine.Connection`. 

2145 

2146 """ 

2147 conn = self._contextual_connect(close_with_result=close_with_result) 

2148 try: 

2149 trans = conn.begin() 

2150 except: 

2151 with util.safe_reraise(): 

2152 conn.close() 

2153 return Engine._trans_ctx(conn, trans, close_with_result) 

2154 

2155 def transaction(self, callable_, *args, **kwargs): 

2156 r"""Execute the given function within a transaction boundary. 

2157 

2158 The function is passed a :class:`_engine.Connection` newly procured 

2159 from :meth:`_engine.Engine.contextual_connect` as the first argument, 

2160 followed by the given \*args and \**kwargs. 

2161 

2162 e.g.:: 

2163 

2164 def do_something(conn, x, y): 

2165 conn.execute("some statement", {'x':x, 'y':y}) 

2166 

2167 engine.transaction(do_something, 5, 10) 

2168 

2169 The operations inside the function are all invoked within the 

2170 context of a single :class:`.Transaction`. 

2171 Upon success, the transaction is committed. If an 

2172 exception is raised, the transaction is rolled back 

2173 before propagating the exception. 

2174 

2175 .. note:: 

2176 

2177 The :meth:`.transaction` method is superseded by 

2178 the usage of the Python ``with:`` statement, which can 

2179 be used with :meth:`_engine.Engine.begin`:: 

2180 

2181 with engine.begin() as conn: 

2182 conn.execute("some statement", {'x':5, 'y':10}) 

2183 

2184 .. seealso:: 

2185 

2186 :meth:`_engine.Engine.begin` - engine-level transactional 

2187 context 

2188 

2189 :meth:`_engine.Connection.transaction` 

2190 - connection-level version of 

2191 :meth:`_engine.Engine.transaction` 

2192 

2193 """ 

2194 

2195 with self._contextual_connect() as conn: 

2196 return conn.transaction(callable_, *args, **kwargs) 

2197 

2198 def run_callable(self, callable_, *args, **kwargs): 

2199 r"""Given a callable object or function, execute it, passing 

2200 a :class:`_engine.Connection` as the first argument. 

2201 

2202 The given \*args and \**kwargs are passed subsequent 

2203 to the :class:`_engine.Connection` argument. 

2204 

2205 This function, along with :meth:`_engine.Connection.run_callable`, 

2206 allows a function to be run with a :class:`_engine.Connection` 

2207 or :class:`_engine.Engine` object without the need to know 

2208 which one is being dealt with. 

2209 

2210 """ 

2211 with self._contextual_connect() as conn: 

2212 return conn.run_callable(callable_, *args, **kwargs) 

2213 

2214 def execute(self, statement, *multiparams, **params): 

2215 """Executes the given construct and returns a 

2216 :class:`_engine.ResultProxy`. 

2217 

2218 The arguments are the same as those used by 

2219 :meth:`_engine.Connection.execute`. 

2220 

2221 Here, a :class:`_engine.Connection` is acquired using the 

2222 :meth:`_engine.Engine.contextual_connect` method, 

2223 and the statement executed 

2224 with that connection. The returned :class:`_engine.ResultProxy` 

2225 is flagged 

2226 such that when the :class:`_engine.ResultProxy` is exhausted and its 

2227 underlying cursor is closed, the :class:`_engine.Connection` 

2228 created here 

2229 will also be closed, which allows its associated DBAPI connection 

2230 resource to be returned to the connection pool. 

2231 

2232 """ 

2233 

2234 connection = self._contextual_connect(close_with_result=True) 

2235 return connection.execute(statement, *multiparams, **params) 

2236 

2237 def scalar(self, statement, *multiparams, **params): 

2238 return self.execute(statement, *multiparams, **params).scalar() 

2239 

2240 def _execute_clauseelement(self, elem, multiparams=None, params=None): 

2241 connection = self._contextual_connect(close_with_result=True) 

2242 return connection._execute_clauseelement(elem, multiparams, params) 

2243 

2244 def _execute_compiled(self, compiled, multiparams, params): 

2245 connection = self._contextual_connect(close_with_result=True) 

2246 return connection._execute_compiled(compiled, multiparams, params) 

2247 

2248 def connect(self, **kwargs): 

2249 """Return a new :class:`_engine.Connection` object. 

2250 

2251 The :class:`_engine.Connection` object is a facade that uses a DBAPI 

2252 connection internally in order to communicate with the database. This 

2253 connection is procured from the connection-holding :class:`_pool.Pool` 

2254 referenced by this :class:`_engine.Engine`. When the 

2255 :meth:`_engine.Connection.close` method of the 

2256 :class:`_engine.Connection` object 

2257 is called, the underlying DBAPI connection is then returned to the 

2258 connection pool, where it may be used again in a subsequent call to 

2259 :meth:`_engine.Engine.connect`. 

2260 

2261 """ 

2262 

2263 return self._connection_cls(self, **kwargs) 

2264 

2265 @util.deprecated( 

2266 "1.3", 

2267 "The :meth:`_engine.Engine.contextual_connect` method is deprecated. " 

2268 "This " 

2269 "method is an artifact of the threadlocal engine strategy which is " 

2270 "also to be deprecated. For explicit connections from an " 

2271 ":class:`_engine.Engine`, use the :meth:`_engine.Engine.connect` " 

2272 "method.", 

2273 ) 

2274 def contextual_connect(self, close_with_result=False, **kwargs): 

2275 """Return a :class:`_engine.Connection` 

2276 object which may be part of some 

2277 ongoing context. 

2278 

2279 By default, this method does the same thing as 

2280 :meth:`_engine.Engine.connect`. 

2281 Subclasses of :class:`_engine.Engine` may override this method 

2282 to provide contextual behavior. 

2283 

2284 :param close_with_result: When True, the first 

2285 :class:`_engine.ResultProxy` 

2286 created by the :class:`_engine.Connection` will call the 

2287 :meth:`_engine.Connection.close` 

2288 method of that connection as soon as any 

2289 pending result rows are exhausted. This is used to supply the 

2290 "connectionless execution" behavior provided by the 

2291 :meth:`_engine.Engine.execute` method. 

2292 

2293 """ 

2294 

2295 return self._contextual_connect( 

2296 close_with_result=close_with_result, **kwargs 

2297 ) 

2298 

2299 def _contextual_connect(self, close_with_result=False, **kwargs): 

2300 return self._connection_cls( 

2301 self, 

2302 self._wrap_pool_connect(self.pool.connect, None), 

2303 close_with_result=close_with_result, 

2304 **kwargs 

2305 ) 

2306 

2307 def table_names(self, schema=None, connection=None): 

2308 """Return a list of all table names available in the database. 

2309 

2310 :param schema: Optional, retrieve names from a non-default schema. 

2311 

2312 :param connection: Optional, use a specified connection. Default is 

2313 the ``contextual_connect`` for this ``Engine``. 

2314 """ 

2315 

2316 with self._optional_conn_ctx_manager(connection) as conn: 

2317 return self.dialect.get_table_names(conn, schema) 

2318 

2319 def has_table(self, table_name, schema=None): 

2320 """Return True if the given backend has a table of the given name. 

2321 

2322 .. seealso:: 

2323 

2324 :ref:`metadata_reflection_inspector` - detailed schema inspection 

2325 using the :class:`_reflection.Inspector` interface. 

2326 

2327 :class:`.quoted_name` - used to pass quoting information along 

2328 with a schema identifier. 

2329 

2330 """ 

2331 return self.run_callable(self.dialect.has_table, table_name, schema) 

2332 

2333 def _wrap_pool_connect(self, fn, connection): 

2334 dialect = self.dialect 

2335 try: 

2336 return fn() 

2337 except dialect.dbapi.Error as e: 

2338 if connection is None: 

2339 Connection._handle_dbapi_exception_noconnection( 

2340 e, dialect, self 

2341 ) 

2342 else: 

2343 util.raise_( 

2344 sys.exc_info()[1], with_traceback=sys.exc_info()[2] 

2345 ) 

2346 

2347 def raw_connection(self, _connection=None): 

2348 """Return a "raw" DBAPI connection from the connection pool. 

2349 

2350 The returned object is a proxied version of the DBAPI 

2351 connection object used by the underlying driver in use. 

2352 The object will have all the same behavior as the real DBAPI 

2353 connection, except that its ``close()`` method will result in the 

2354 connection being returned to the pool, rather than being closed 

2355 for real. 

2356 

2357 This method provides direct DBAPI connection access for 

2358 special situations when the API provided by 

2359 :class:`_engine.Connection` 

2360 is not needed. When a :class:`_engine.Connection` object is already 

2361 present, the DBAPI connection is available using 

2362 the :attr:`_engine.Connection.connection` accessor. 

2363 

2364 .. seealso:: 

2365 

2366 :ref:`dbapi_connections` 

2367 

2368 """ 

2369 return self._wrap_pool_connect( 

2370 self.pool.unique_connection, _connection 

2371 ) 

2372 

2373 

2374class OptionEngine(Engine): 

2375 _sa_propagate_class_events = False 

2376 

2377 def __init__(self, proxied, execution_options): 

2378 self._proxied = proxied 

2379 self.url = proxied.url 

2380 self.dialect = proxied.dialect 

2381 self.logging_name = proxied.logging_name 

2382 self.echo = proxied.echo 

2383 self.hide_parameters = proxied.hide_parameters 

2384 log.instance_logger(self, echoflag=self.echo) 

2385 

2386 # note: this will propagate events that are assigned to the parent 

2387 # engine after this OptionEngine is created. Since we share 

2388 # the events of the parent we also disallow class-level events 

2389 # to apply to the OptionEngine class directly. 

2390 # 

2391 # the other way this can work would be to transfer existing 

2392 # events only, using: 

2393 # self.dispatch._update(proxied.dispatch) 

2394 # 

2395 # that might be more appropriate however it would be a behavioral 

2396 # change for logic that assigns events to the parent engine and 

2397 # would like it to take effect for the already-created sub-engine. 

2398 self.dispatch = self.dispatch._join(proxied.dispatch) 

2399 

2400 self._execution_options = proxied._execution_options 

2401 self.update_execution_options(**execution_options) 

2402 

2403 def _get_pool(self): 

2404 return self._proxied.pool 

2405 

2406 def _set_pool(self, pool): 

2407 self._proxied.pool = pool 

2408 

2409 pool = property(_get_pool, _set_pool) 

2410 

2411 def _get_has_events(self): 

2412 return self._proxied._has_events or self.__dict__.get( 

2413 "_has_events", False 

2414 ) 

2415 

2416 def _set_has_events(self, value): 

2417 self.__dict__["_has_events"] = value 

2418 

2419 _has_events = property(_get_has_events, _set_has_events)