Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/base.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

565 statements  

1# pool/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Base constructs for connection pools.""" 

10 

11from __future__ import annotations 

12 

13from collections import deque 

14import dataclasses 

15from enum import Enum 

16import threading 

17import time 

18import typing 

19from typing import Any 

20from typing import Callable 

21from typing import cast 

22from typing import Deque 

23from typing import Dict 

24from typing import List 

25from typing import Literal 

26from typing import Optional 

27from typing import Protocol 

28from typing import Tuple 

29from typing import TYPE_CHECKING 

30from typing import Union 

31import weakref 

32 

33from .. import event 

34from .. import exc 

35from .. import log 

36from .. import util 

37from ..util.typing import Self 

38 

39if TYPE_CHECKING: 

40 from ..engine.interfaces import DBAPIConnection 

41 from ..engine.interfaces import DBAPICursor 

42 from ..engine.interfaces import Dialect 

43 from ..event import _DispatchCommon 

44 from ..event import _ListenerFnType 

45 from ..event import dispatcher 

46 from ..sql._typing import _InfoType 

47 

48 

49@dataclasses.dataclass(frozen=True) 

50class PoolResetState: 

51 """describes the state of a DBAPI connection as it is being passed to 

52 the :meth:`.PoolEvents.reset` connection pool event. 

53 

54 .. versionadded:: 2.0.0b3 

55 

56 """ 

57 

58 __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe") 

59 

60 transaction_was_reset: bool 

61 """Indicates if the transaction on the DBAPI connection was already 

62 essentially "reset" back by the :class:`.Connection` object. 

63 

64 This boolean is True if the :class:`.Connection` had transactional 

65 state present upon it, which was then not closed using the 

66 :meth:`.Connection.rollback` or :meth:`.Connection.commit` method; 

67 instead, the transaction was closed inline within the 

68 :meth:`.Connection.close` method so is guaranteed to remain non-present 

69 when this event is reached. 

70 

71 """ 

72 

73 terminate_only: bool 

74 """indicates if the connection is to be immediately terminated and 

75 not checked in to the pool. 

76 

77 This occurs for connections that were invalidated, as well as asyncio 

78 connections that were not cleanly handled by the calling code that 

79 are instead being garbage collected. In the latter case, 

80 operations can't be safely run on asyncio connections within garbage 

81 collection as there is not necessarily an event loop present. 

82 

83 """ 

84 

85 asyncio_safe: bool 

86 """Indicates if the reset operation is occurring within a scope where 

87 an enclosing event loop is expected to be present for asyncio applications. 

88 

89 Will be False in the case that the connection is being garbage collected. 

90 

91 """ 

92 

93 

94class ResetStyle(Enum): 

95 """Describe options for "reset on return" behaviors.""" 

96 

97 reset_rollback = 0 

98 reset_commit = 1 

99 reset_none = 2 

100 

101 

102_ResetStyleArgType = Union[ 

103 ResetStyle, 

104 Literal[True, None, False, "commit", "rollback"], 

105] 

106reset_rollback, reset_commit, reset_none = list(ResetStyle) 

107 

108 

109class _ConnDialect: 

110 """partial implementation of :class:`.Dialect` 

111 which provides DBAPI connection methods. 

112 

113 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`, 

114 the :class:`_engine.Engine` replaces this with its own 

115 :class:`.Dialect`. 

116 

117 """ 

118 

119 is_async = False 

120 has_terminate = False 

121 

122 def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None: 

123 dbapi_connection.rollback() 

124 

125 def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None: 

126 dbapi_connection.commit() 

127 

128 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: 

129 dbapi_connection.close() 

130 

131 def do_close(self, dbapi_connection: DBAPIConnection) -> None: 

132 dbapi_connection.close() 

133 

134 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 

135 raise NotImplementedError( 

136 "The ping feature requires that a dialect is " 

137 "passed to the connection pool." 

138 ) 

139 

140 def get_driver_connection(self, connection: DBAPIConnection) -> Any: 

141 return connection 

142 

143 

144class _AsyncConnDialect(_ConnDialect): 

145 is_async = True 

146 

147 

148class _CreatorFnType(Protocol): 

149 def __call__(self) -> DBAPIConnection: ... 

150 

151 

152class _CreatorWRecFnType(Protocol): 

153 def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ... 

154 

155 

156class Pool(log.Identified, event.EventTarget): 

157 """Abstract base class for connection pools.""" 

158 

159 dispatch: dispatcher[Pool] 

160 echo: log._EchoFlagType 

161 

162 _orig_logging_name: Optional[str] 

163 _dialect: Union[_ConnDialect, Dialect] = _ConnDialect() 

164 _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType] 

165 _invoke_creator: _CreatorWRecFnType 

166 _invalidate_time: float 

167 

168 def __init__( 

169 self, 

170 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

171 recycle: int = -1, 

172 echo: log._EchoFlagType = None, 

173 logging_name: Optional[str] = None, 

174 reset_on_return: _ResetStyleArgType = True, 

175 events: Optional[List[Tuple[_ListenerFnType, str]]] = None, 

176 dialect: Optional[Union[_ConnDialect, Dialect]] = None, 

177 pre_ping: bool = False, 

178 _dispatch: Optional[_DispatchCommon[Pool]] = None, 

179 ): 

180 """ 

181 Construct a Pool. 

182 

183 :param creator: a callable function that returns a DB-API 

184 connection object. The function will be called with 

185 parameters. 

186 

187 :param recycle: If set to a value other than -1, number of 

188 seconds between connection recycling, which means upon 

189 checkout, if this timeout is surpassed the connection will be 

190 closed and replaced with a newly opened connection. Defaults to -1. 

191 

192 :param logging_name: String identifier which will be used within 

193 the "name" field of logging records generated within the 

194 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's 

195 id. 

196 

197 :param echo: if True, the connection pool will log 

198 informational output such as when connections are invalidated 

199 as well as when connections are recycled to the default log handler, 

200 which defaults to ``sys.stdout`` for output.. If set to the string 

201 ``"debug"``, the logging will include pool checkouts and checkins. 

202 

203 The :paramref:`_pool.Pool.echo` parameter can also be set from the 

204 :func:`_sa.create_engine` call by using the 

205 :paramref:`_sa.create_engine.echo_pool` parameter. 

206 

207 .. seealso:: 

208 

209 :ref:`dbengine_logging` - further detail on how to configure 

210 logging. 

211 

212 :param reset_on_return: Determine steps to take on 

213 connections as they are returned to the pool, which were 

214 not otherwise handled by a :class:`_engine.Connection`. 

215 Available from :func:`_sa.create_engine` via the 

216 :paramref:`_sa.create_engine.pool_reset_on_return` parameter. 

217 

218 :paramref:`_pool.Pool.reset_on_return` can have any of these values: 

219 

220 * ``"rollback"`` - call rollback() on the connection, 

221 to release locks and transaction resources. 

222 This is the default value. The vast majority 

223 of use cases should leave this value set. 

224 * ``"commit"`` - call commit() on the connection, 

225 to release locks and transaction resources. 

226 A commit here may be desirable for databases that 

227 cache query plans if a commit is emitted, 

228 such as Microsoft SQL Server. However, this 

229 value is more dangerous than 'rollback' because 

230 any data changes present on the transaction 

231 are committed unconditionally. 

232 * ``None`` - don't do anything on the connection. 

233 This setting may be appropriate if the database / DBAPI 

234 works in pure "autocommit" mode at all times, or if 

235 a custom reset handler is established using the 

236 :meth:`.PoolEvents.reset` event handler. 

237 

238 * ``True`` - same as 'rollback', this is here for 

239 backwards compatibility. 

240 * ``False`` - same as None, this is here for 

241 backwards compatibility. 

242 

243 For further customization of reset on return, the 

244 :meth:`.PoolEvents.reset` event hook may be used which can perform 

245 any connection activity desired on reset. 

246 

247 .. seealso:: 

248 

249 :ref:`pool_reset_on_return` 

250 

251 :meth:`.PoolEvents.reset` 

252 

253 :param events: a list of 2-tuples, each of the form 

254 ``(callable, target)`` which will be passed to :func:`.event.listen` 

255 upon construction. Provided here so that event listeners 

256 can be assigned via :func:`_sa.create_engine` before dialect-level 

257 listeners are applied. 

258 

259 :param dialect: a :class:`.Dialect` that will handle the job 

260 of calling rollback(), close(), or commit() on DBAPI connections. 

261 If omitted, a built-in "stub" dialect is used. Applications that 

262 make use of :func:`_sa.create_engine` should not use this parameter 

263 as it is handled by the engine creation strategy. 

264 

265 :param pre_ping: if True, the pool will emit a "ping" (typically 

266 "SELECT 1", but is dialect-specific) on the connection 

267 upon checkout, to test if the connection is alive or not. If not, 

268 the connection is transparently re-connected and upon success, all 

269 other pooled connections established prior to that timestamp are 

270 invalidated. Requires that a dialect is passed as well to 

271 interpret the disconnection error. 

272 

273 """ 

274 if logging_name: 

275 self.logging_name = self._orig_logging_name = logging_name 

276 else: 

277 self._orig_logging_name = None 

278 

279 log.instance_logger(self, echoflag=echo) 

280 self._creator = creator 

281 self._recycle = recycle 

282 self._invalidate_time = 0 

283 self._pre_ping = pre_ping 

284 self._reset_on_return = util.parse_user_argument_for_enum( 

285 reset_on_return, 

286 { 

287 ResetStyle.reset_rollback: ["rollback", True], 

288 ResetStyle.reset_none: ["none", None, False], 

289 ResetStyle.reset_commit: ["commit"], 

290 }, 

291 "reset_on_return", 

292 ) 

293 

294 self.echo = echo 

295 

296 if _dispatch: 

297 self.dispatch._update(_dispatch, only_propagate=False) 

298 if dialect: 

299 self._dialect = dialect 

300 if events: 

301 for fn, target in events: 

302 event.listen(self, target, fn) 

303 

304 @util.hybridproperty 

305 def _is_asyncio(self) -> bool: 

306 return self._dialect.is_async 

307 

308 @property 

309 def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]: 

310 return self._creator_arg 

311 

312 @_creator.setter 

313 def _creator( 

314 self, creator: Union[_CreatorFnType, _CreatorWRecFnType] 

315 ) -> None: 

316 self._creator_arg = creator 

317 

318 # mypy seems to get super confused assigning functions to 

319 # attributes 

320 self._invoke_creator = self._should_wrap_creator(creator) 

321 

322 @_creator.deleter 

323 def _creator(self) -> None: 

324 # needed for mock testing 

325 del self._creator_arg 

326 del self._invoke_creator 

327 

328 def _should_wrap_creator( 

329 self, creator: Union[_CreatorFnType, _CreatorWRecFnType] 

330 ) -> _CreatorWRecFnType: 

331 """Detect if creator accepts a single argument, or is sent 

332 as a legacy style no-arg function. 

333 

334 """ 

335 

336 try: 

337 argspec = util.get_callable_argspec(self._creator, no_self=True) 

338 except TypeError: 

339 creator_fn = cast(_CreatorFnType, creator) 

340 return lambda rec: creator_fn() 

341 

342 if argspec.defaults is not None: 

343 defaulted = len(argspec.defaults) 

344 else: 

345 defaulted = 0 

346 positionals = len(argspec[0]) - defaulted 

347 

348 # look for the exact arg signature that DefaultStrategy 

349 # sends us 

350 if (argspec[0], argspec[3]) == (["connection_record"], (None,)): 

351 return cast(_CreatorWRecFnType, creator) 

352 # or just a single positional 

353 elif positionals == 1: 

354 return cast(_CreatorWRecFnType, creator) 

355 # all other cases, just wrap and assume legacy "creator" callable 

356 # thing 

357 else: 

358 creator_fn = cast(_CreatorFnType, creator) 

359 return lambda rec: creator_fn() 

360 

361 def _close_connection( 

362 self, connection: DBAPIConnection, *, terminate: bool = False 

363 ) -> None: 

364 self.logger.debug( 

365 "%s connection %r", 

366 "Hard-closing" if terminate else "Closing", 

367 connection, 

368 ) 

369 try: 

370 if terminate: 

371 self._dialect.do_terminate(connection) 

372 else: 

373 self._dialect.do_close(connection) 

374 except BaseException as e: 

375 self.logger.error( 

376 f"Exception {'terminating' if terminate else 'closing'} " 

377 f"connection %r", 

378 connection, 

379 exc_info=True, 

380 ) 

381 if not isinstance(e, Exception): 

382 raise 

383 

384 def _create_connection(self) -> ConnectionPoolEntry: 

385 """Called by subclasses to create a new ConnectionRecord.""" 

386 

387 return _ConnectionRecord(self) 

388 

389 def _invalidate( 

390 self, 

391 connection: PoolProxiedConnection, 

392 exception: Optional[BaseException] = None, 

393 _checkin: bool = True, 

394 ) -> None: 

395 """Mark all connections established within the generation 

396 of the given connection as invalidated. 

397 

398 If this pool's last invalidate time is before when the given 

399 connection was created, update the timestamp til now. Otherwise, 

400 no action is performed. 

401 

402 Connections with a start time prior to this pool's invalidation 

403 time will be recycled upon next checkout. 

404 """ 

405 rec = getattr(connection, "_connection_record", None) 

406 if not rec or self._invalidate_time < rec.starttime: 

407 self._invalidate_time = time.time() 

408 if _checkin and getattr(connection, "is_valid", False): 

409 connection.invalidate(exception) 

410 

411 def recreate(self) -> Pool: 

412 """Return a new :class:`_pool.Pool`, of the same class as this one 

413 and configured with identical creation arguments. 

414 

415 This method is used in conjunction with :meth:`dispose` 

416 to close out an entire :class:`_pool.Pool` and create a new one in 

417 its place. 

418 

419 """ 

420 

421 raise NotImplementedError() 

422 

423 def dispose(self) -> None: 

424 """Dispose of this pool. 

425 

426 This method leaves the possibility of checked-out connections 

427 remaining open, as it only affects connections that are 

428 idle in the pool. 

429 

430 .. seealso:: 

431 

432 :meth:`Pool.recreate` 

433 

434 """ 

435 

436 raise NotImplementedError() 

437 

438 def connect(self) -> PoolProxiedConnection: 

439 """Return a DBAPI connection from the pool. 

440 

441 The connection is instrumented such that when its 

442 ``close()`` method is called, the connection will be returned to 

443 the pool. 

444 

445 """ 

446 return _ConnectionFairy._checkout(self) 

447 

448 def _return_conn(self, record: ConnectionPoolEntry) -> None: 

449 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`. 

450 

451 This method is called when an instrumented DBAPI connection 

452 has its ``close()`` method called. 

453 

454 """ 

455 self._do_return_conn(record) 

456 

457 def _do_get(self) -> ConnectionPoolEntry: 

458 """Implementation for :meth:`get`, supplied by subclasses.""" 

459 

460 raise NotImplementedError() 

461 

462 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

463 """Implementation for :meth:`return_conn`, supplied by subclasses.""" 

464 

465 raise NotImplementedError() 

466 

467 def status(self) -> str: 

468 """Returns a brief description of the state of this pool.""" 

469 raise NotImplementedError() 

470 

471 

472class ManagesConnection: 

473 """Common base for the two connection-management interfaces 

474 :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`. 

475 

476 These two objects are typically exposed in the public facing API 

477 via the connection pool event hooks, documented at :class:`.PoolEvents`. 

478 

479 .. versionadded:: 2.0 

480 

481 """ 

482 

483 __slots__ = () 

484 

485 dbapi_connection: Optional[DBAPIConnection] 

486 """A reference to the actual DBAPI connection being tracked. 

487 

488 This is a :pep:`249`-compliant object that for traditional sync-style 

489 dialects is provided by the third-party 

490 DBAPI implementation in use. For asyncio dialects, the implementation 

491 is typically an adapter object provided by the SQLAlchemy dialect 

492 itself; the underlying asyncio object is available via the 

493 :attr:`.ManagesConnection.driver_connection` attribute. 

494 

495 SQLAlchemy's interface for the DBAPI connection is based on the 

496 :class:`.DBAPIConnection` protocol object 

497 

498 .. seealso:: 

499 

500 :attr:`.ManagesConnection.driver_connection` 

501 

502 :ref:`faq_dbapi_connection` 

503 

504 """ 

505 

506 driver_connection: Optional[Any] 

507 """The "driver level" connection object as used by the Python 

508 DBAPI or database driver. 

509 

510 For traditional :pep:`249` DBAPI implementations, this object will 

511 be the same object as that of 

512 :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database 

513 driver, this will be the ultimate "connection" object used by that 

514 driver, such as the ``asyncpg.Connection`` object which will not have 

515 standard pep-249 methods. 

516 

517 .. versionadded:: 1.4.24 

518 

519 .. seealso:: 

520 

521 :attr:`.ManagesConnection.dbapi_connection` 

522 

523 :ref:`faq_dbapi_connection` 

524 

525 """ 

526 

527 @util.ro_memoized_property 

528 def info(self) -> _InfoType: 

529 """Info dictionary associated with the underlying DBAPI connection 

530 referred to by this :class:`.ManagesConnection` instance, allowing 

531 user-defined data to be associated with the connection. 

532 

533 The data in this dictionary is persistent for the lifespan 

534 of the DBAPI connection itself, including across pool checkins 

535 and checkouts. When the connection is invalidated 

536 and replaced with a new one, this dictionary is cleared. 

537 

538 For a :class:`.PoolProxiedConnection` instance that's not associated 

539 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the 

540 attribute returns a dictionary that is local to that 

541 :class:`.ConnectionPoolEntry`. Therefore the 

542 :attr:`.ManagesConnection.info` attribute will always provide a Python 

543 dictionary. 

544 

545 .. seealso:: 

546 

547 :attr:`.ManagesConnection.record_info` 

548 

549 

550 """ 

551 raise NotImplementedError() 

552 

553 @util.ro_memoized_property 

554 def record_info(self) -> Optional[_InfoType]: 

555 """Persistent info dictionary associated with this 

556 :class:`.ManagesConnection`. 

557 

558 Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan 

559 of this dictionary is that of the :class:`.ConnectionPoolEntry` 

560 which owns it; therefore this dictionary will persist across 

561 reconnects and connection invalidation for a particular entry 

562 in the connection pool. 

563 

564 For a :class:`.PoolProxiedConnection` instance that's not associated 

565 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the 

566 attribute returns None. Contrast to the :attr:`.ManagesConnection.info` 

567 dictionary which is never None. 

568 

569 

570 .. seealso:: 

571 

572 :attr:`.ManagesConnection.info` 

573 

574 """ 

575 raise NotImplementedError() 

576 

577 def invalidate( 

578 self, e: Optional[BaseException] = None, soft: bool = False 

579 ) -> None: 

580 """Mark the managed connection as invalidated. 

581 

582 :param e: an exception object indicating a reason for the invalidation. 

583 

584 :param soft: if True, the connection isn't closed; instead, this 

585 connection will be recycled on next checkout. 

586 

587 .. seealso:: 

588 

589 :ref:`pool_connection_invalidation` 

590 

591 

592 """ 

593 raise NotImplementedError() 

594 

595 

596class ConnectionPoolEntry(ManagesConnection): 

597 """Interface for the object that maintains an individual database 

598 connection on behalf of a :class:`_pool.Pool` instance. 

599 

600 The :class:`.ConnectionPoolEntry` object represents the long term 

601 maintenance of a particular connection for a pool, including expiring or 

602 invalidating that connection to have it replaced with a new one, which will 

603 continue to be maintained by that same :class:`.ConnectionPoolEntry` 

604 instance. Compared to :class:`.PoolProxiedConnection`, which is the 

605 short-term, per-checkout connection manager, this object lasts for the 

606 lifespan of a particular "slot" within a connection pool. 

607 

608 The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing 

609 API code when it is delivered to connection pool event hooks, such as 

610 :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`. 

611 

612 .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public 

613 facing interface for the :class:`._ConnectionRecord` internal class. 

614 

615 """ 

616 

617 __slots__ = () 

618 

619 @property 

620 def in_use(self) -> bool: 

621 """Return True the connection is currently checked out""" 

622 

623 raise NotImplementedError() 

624 

625 def close(self) -> None: 

626 """Close the DBAPI connection managed by this connection pool entry.""" 

627 raise NotImplementedError() 

628 

629 

630class _ConnectionRecord(ConnectionPoolEntry): 

631 """Maintains a position in a connection pool which references a pooled 

632 connection. 

633 

634 This is an internal object used by the :class:`_pool.Pool` implementation 

635 to provide context management to a DBAPI connection maintained by 

636 that :class:`_pool.Pool`. The public facing interface for this class 

637 is described by the :class:`.ConnectionPoolEntry` class. See that 

638 class for public API details. 

639 

640 .. seealso:: 

641 

642 :class:`.ConnectionPoolEntry` 

643 

644 :class:`.PoolProxiedConnection` 

645 

646 """ 

647 

648 __slots__ = ( 

649 "__pool", 

650 "fairy_ref", 

651 "finalize_callback", 

652 "fresh", 

653 "starttime", 

654 "dbapi_connection", 

655 "__weakref__", 

656 "__dict__", 

657 ) 

658 

659 finalize_callback: Deque[Callable[[DBAPIConnection], None]] 

660 fresh: bool 

661 fairy_ref: Optional[weakref.ref[_ConnectionFairy]] 

662 starttime: float 

663 

664 def __init__(self, pool: Pool, connect: bool = True): 

665 self.fresh = False 

666 self.fairy_ref = None 

667 self.starttime = 0 

668 self.dbapi_connection = None 

669 

670 self.__pool = pool 

671 if connect: 

672 self.__connect() 

673 self.finalize_callback = deque() 

674 

675 dbapi_connection: Optional[DBAPIConnection] 

676 

677 @property 

678 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501 

679 if self.dbapi_connection is None: 

680 return None 

681 else: 

682 return self.__pool._dialect.get_driver_connection( 

683 self.dbapi_connection 

684 ) 

685 

686 @property 

687 @util.deprecated( 

688 "2.0", 

689 "The _ConnectionRecord.connection attribute is deprecated; " 

690 "please use 'driver_connection'", 

691 ) 

692 def connection(self) -> Optional[DBAPIConnection]: 

693 return self.dbapi_connection 

694 

695 _soft_invalidate_time: float = 0 

696 

697 @util.ro_memoized_property 

698 def info(self) -> _InfoType: 

699 return {} 

700 

701 @util.ro_memoized_property 

702 def record_info(self) -> Optional[_InfoType]: 

703 return {} 

704 

705 @classmethod 

706 def checkout(cls, pool: Pool) -> _ConnectionFairy: 

707 if TYPE_CHECKING: 

708 rec = cast(_ConnectionRecord, pool._do_get()) 

709 else: 

710 rec = pool._do_get() 

711 

712 try: 

713 dbapi_connection = rec.get_connection() 

714 except BaseException as err: 

715 with util.safe_reraise(): 

716 rec._checkin_failed(err, _fairy_was_created=False) 

717 

718 # not reached, for code linters only 

719 raise 

720 

721 echo = pool._should_log_debug() 

722 fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo) 

723 

724 rec.fairy_ref = ref = weakref.ref( 

725 fairy, 

726 lambda ref: ( 

727 _finalize_fairy( 

728 None, rec, pool, ref, echo, transaction_was_reset=False 

729 ) 

730 if _finalize_fairy is not None 

731 else None 

732 ), 

733 ) 

734 _strong_ref_connection_records[ref] = rec 

735 if echo: 

736 pool.logger.debug( 

737 "Connection %r checked out from pool", dbapi_connection 

738 ) 

739 return fairy 

740 

741 def _checkin_failed( 

742 self, err: BaseException, _fairy_was_created: bool = True 

743 ) -> None: 

744 self.invalidate(e=err) 

745 self.checkin( 

746 _fairy_was_created=_fairy_was_created, 

747 ) 

748 

749 def checkin(self, _fairy_was_created: bool = True) -> None: 

750 if self.fairy_ref is None and _fairy_was_created: 

751 # _fairy_was_created is False for the initial get connection phase; 

752 # meaning there was no _ConnectionFairy and we must unconditionally 

753 # do a checkin. 

754 # 

755 # otherwise, if fairy_was_created==True, if fairy_ref is None here 

756 # that means we were checked in already, so this looks like 

757 # a double checkin. 

758 util.warn("Double checkin attempted on %s" % self) 

759 return 

760 self.fairy_ref = None 

761 connection = self.dbapi_connection 

762 pool = self.__pool 

763 while self.finalize_callback: 

764 finalizer = self.finalize_callback.pop() 

765 if connection is not None: 

766 finalizer(connection) 

767 if pool.dispatch.checkin: 

768 pool.dispatch.checkin(connection, self) 

769 

770 pool._return_conn(self) 

771 

772 @property 

773 def in_use(self) -> bool: 

774 return self.fairy_ref is not None 

775 

776 @property 

777 def last_connect_time(self) -> float: 

778 return self.starttime 

779 

780 def close(self) -> None: 

781 if self.dbapi_connection is not None: 

782 self.__close() 

783 

784 def invalidate( 

785 self, e: Optional[BaseException] = None, soft: bool = False 

786 ) -> None: 

787 # already invalidated 

788 if self.dbapi_connection is None: 

789 return 

790 if soft: 

791 self.__pool.dispatch.soft_invalidate( 

792 self.dbapi_connection, self, e 

793 ) 

794 else: 

795 self.__pool.dispatch.invalidate(self.dbapi_connection, self, e) 

796 if e is not None: 

797 self.__pool.logger.info( 

798 "%sInvalidate connection %r (reason: %s:%s)", 

799 "Soft " if soft else "", 

800 self.dbapi_connection, 

801 e.__class__.__name__, 

802 e, 

803 ) 

804 else: 

805 self.__pool.logger.info( 

806 "%sInvalidate connection %r", 

807 "Soft " if soft else "", 

808 self.dbapi_connection, 

809 ) 

810 

811 if soft: 

812 self._soft_invalidate_time = time.time() 

813 else: 

814 self.__close(terminate=True) 

815 self.dbapi_connection = None 

816 

817 def get_connection(self) -> DBAPIConnection: 

818 recycle = False 

819 

820 # NOTE: the various comparisons here are assuming that measurable time 

821 # passes between these state changes. however, time.time() is not 

822 # guaranteed to have sub-second precision. comparisons of 

823 # "invalidation time" to "starttime" should perhaps use >= so that the 

824 # state change can take place assuming no measurable time has passed, 

825 # however this does not guarantee correct behavior here as if time 

826 # continues to not pass, it will try to reconnect repeatedly until 

827 # these timestamps diverge, so in that sense using > is safer. Per 

828 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be 

829 # within 16 milliseconds accuracy, so unit tests for connection 

830 # invalidation need a sleep of at least this long between initial start 

831 # time and invalidation for the logic below to work reliably. 

832 

833 if self.dbapi_connection is None: 

834 self.info.clear() 

835 self.__connect() 

836 elif ( 

837 self.__pool._recycle > -1 

838 and time.time() - self.starttime > self.__pool._recycle 

839 ): 

840 self.__pool.logger.info( 

841 "Connection %r exceeded timeout; recycling", 

842 self.dbapi_connection, 

843 ) 

844 recycle = True 

845 elif self.__pool._invalidate_time > self.starttime: 

846 self.__pool.logger.info( 

847 "Connection %r invalidated due to pool invalidation; " 

848 + "recycling", 

849 self.dbapi_connection, 

850 ) 

851 recycle = True 

852 elif self._soft_invalidate_time > self.starttime: 

853 self.__pool.logger.info( 

854 "Connection %r invalidated due to local soft invalidation; " 

855 + "recycling", 

856 self.dbapi_connection, 

857 ) 

858 recycle = True 

859 

860 if recycle: 

861 self.__close(terminate=True) 

862 self.info.clear() 

863 

864 self.__connect() 

865 

866 assert self.dbapi_connection is not None 

867 return self.dbapi_connection 

868 

869 def _is_hard_or_soft_invalidated(self) -> bool: 

870 return ( 

871 self.dbapi_connection is None 

872 or self.__pool._invalidate_time > self.starttime 

873 or (self._soft_invalidate_time > self.starttime) 

874 ) 

875 

876 def __close(self, *, terminate: bool = False) -> None: 

877 self.finalize_callback.clear() 

878 if self.__pool.dispatch.close: 

879 self.__pool.dispatch.close(self.dbapi_connection, self) 

880 assert self.dbapi_connection is not None 

881 self.__pool._close_connection( 

882 self.dbapi_connection, terminate=terminate 

883 ) 

884 self.dbapi_connection = None 

885 

886 def __connect(self) -> None: 

887 pool = self.__pool 

888 

889 # ensure any existing connection is removed, so that if 

890 # creator fails, this attribute stays None 

891 self.dbapi_connection = None 

892 try: 

893 self.starttime = time.time() 

894 self.dbapi_connection = connection = pool._invoke_creator(self) 

895 pool.logger.debug("Created new connection %r", connection) 

896 self.fresh = True 

897 except BaseException as e: 

898 with util.safe_reraise(): 

899 pool.logger.debug("Error on connect(): %s", e) 

900 else: 

901 # in SQLAlchemy 1.4 the first_connect event is not used by 

902 # the engine, so this will usually not be set 

903 if pool.dispatch.first_connect: 

904 pool.dispatch.first_connect.for_modify( 

905 pool.dispatch 

906 ).exec_once_unless_exception(self.dbapi_connection, self) 

907 

908 # init of the dialect now takes place within the connect 

909 # event, so ensure a mutex is used on the first run 

910 pool.dispatch.connect.for_modify( 

911 pool.dispatch 

912 )._exec_w_sync_on_first_run(self.dbapi_connection, self) 

913 

914 

915def _finalize_fairy( 

916 dbapi_connection: Optional[DBAPIConnection], 

917 connection_record: Optional[_ConnectionRecord], 

918 pool: Pool, 

919 ref: Optional[ 

920 weakref.ref[_ConnectionFairy] 

921 ], # this is None when called directly, not by the gc 

922 echo: Optional[log._EchoFlagType], 

923 transaction_was_reset: bool = False, 

924 fairy: Optional[_ConnectionFairy] = None, 

925) -> None: 

926 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already 

927 been garbage collected. 

928 

929 When using an async dialect no IO can happen here (without using 

930 a dedicated thread), since this is called outside the greenlet 

931 context and with an already running loop. In this case function 

932 will only log a message and raise a warning. 

933 """ 

934 

935 is_gc_cleanup = ref is not None 

936 

937 if is_gc_cleanup: 

938 assert ref is not None 

939 _strong_ref_connection_records.pop(ref, None) 

940 assert connection_record is not None 

941 if connection_record.fairy_ref is not ref: 

942 return 

943 assert dbapi_connection is None 

944 dbapi_connection = connection_record.dbapi_connection 

945 

946 elif fairy: 

947 _strong_ref_connection_records.pop(weakref.ref(fairy), None) 

948 

949 # null pool is not _is_asyncio but can be used also with async dialects 

950 dont_restore_gced = pool._dialect.is_async 

951 

952 if dont_restore_gced: 

953 detach = connection_record is None or is_gc_cleanup 

954 can_manipulate_connection = not is_gc_cleanup 

955 can_close_or_terminate_connection = ( 

956 not pool._dialect.is_async or pool._dialect.has_terminate 

957 ) 

958 requires_terminate_for_close = ( 

959 pool._dialect.is_async and pool._dialect.has_terminate 

960 ) 

961 

962 else: 

963 detach = connection_record is None 

964 can_manipulate_connection = can_close_or_terminate_connection = True 

965 requires_terminate_for_close = False 

966 

967 if dbapi_connection is not None: 

968 if connection_record and echo: 

969 pool.logger.debug( 

970 "Connection %r being returned to pool", dbapi_connection 

971 ) 

972 

973 try: 

974 if not fairy: 

975 assert connection_record is not None 

976 fairy = _ConnectionFairy( 

977 pool, 

978 dbapi_connection, 

979 connection_record, 

980 echo, 

981 ) 

982 assert fairy.dbapi_connection is dbapi_connection 

983 

984 fairy._reset( 

985 pool, 

986 transaction_was_reset=transaction_was_reset, 

987 terminate_only=detach, 

988 asyncio_safe=can_manipulate_connection, 

989 ) 

990 

991 if detach: 

992 if connection_record: 

993 fairy._pool = pool 

994 fairy.detach() 

995 

996 if can_close_or_terminate_connection: 

997 if pool.dispatch.close_detached: 

998 pool.dispatch.close_detached(dbapi_connection) 

999 

1000 pool._close_connection( 

1001 dbapi_connection, 

1002 terminate=requires_terminate_for_close, 

1003 ) 

1004 

1005 except BaseException as e: 

1006 pool.logger.error( 

1007 "Exception during reset or similar", exc_info=True 

1008 ) 

1009 if connection_record: 

1010 connection_record.invalidate(e=e) 

1011 if not isinstance(e, Exception): 

1012 raise 

1013 finally: 

1014 if detach and is_gc_cleanup and dont_restore_gced: 

1015 message = ( 

1016 "The garbage collector is trying to clean up " 

1017 f"non-checked-in connection {dbapi_connection!r}, " 

1018 f"""which will be { 

1019 'dropped, as it cannot be safely terminated' 

1020 if not can_close_or_terminate_connection 

1021 else 'terminated' 

1022 }. """ 

1023 "Please ensure that SQLAlchemy pooled connections are " 

1024 "returned to " 

1025 "the pool explicitly, either by calling ``close()`` " 

1026 "or by using appropriate context managers to manage " 

1027 "their lifecycle." 

1028 ) 

1029 pool.logger.error(message) 

1030 util.warn(message) 

1031 

1032 if connection_record and connection_record.fairy_ref is not None: 

1033 connection_record.checkin() 

1034 

1035 # give gc some help. See 

1036 # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True] 

1037 # which actually started failing when pytest warnings plugin was 

1038 # turned on, due to util.warn() above 

1039 if fairy is not None: 

1040 fairy.dbapi_connection = None # type: ignore 

1041 fairy._connection_record = None 

1042 del dbapi_connection 

1043 del connection_record 

1044 del fairy 

1045 

1046 

1047# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that 

1048# GC under pypy will call ConnectionFairy finalizers. linked directly to the 

1049# weakref that will empty itself when collected so that it should not create 

1050# any unmanaged memory references. 

1051_strong_ref_connection_records: Dict[ 

1052 weakref.ref[_ConnectionFairy], _ConnectionRecord 

1053] = {} 

1054 

1055 

1056class PoolProxiedConnection(ManagesConnection): 

1057 """A connection-like adapter for a :pep:`249` DBAPI connection, which 

1058 includes additional methods specific to the :class:`.Pool` implementation. 

1059 

1060 :class:`.PoolProxiedConnection` is the public-facing interface for the 

1061 internal :class:`._ConnectionFairy` implementation object; users familiar 

1062 with :class:`._ConnectionFairy` can consider this object to be equivalent. 

1063 

1064 .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public- 

1065 facing interface for the :class:`._ConnectionFairy` internal class. 

1066 

1067 """ 

1068 

1069 __slots__ = () 

1070 

1071 if typing.TYPE_CHECKING: 

1072 

1073 def commit(self) -> None: ... 

1074 

1075 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: ... 

1076 

1077 def rollback(self) -> None: ... 

1078 

1079 def __getattr__(self, key: str) -> Any: ... 

1080 

1081 @property 

1082 def is_valid(self) -> bool: 

1083 """Return True if this :class:`.PoolProxiedConnection` still refers 

1084 to an active DBAPI connection.""" 

1085 

1086 raise NotImplementedError() 

1087 

1088 @property 

1089 def is_detached(self) -> bool: 

1090 """Return True if this :class:`.PoolProxiedConnection` is detached 

1091 from its pool.""" 

1092 

1093 raise NotImplementedError() 

1094 

1095 def detach(self) -> None: 

1096 """Separate this connection from its Pool. 

1097 

1098 This means that the connection will no longer be returned to the 

1099 pool when closed, and will instead be literally closed. The 

1100 associated :class:`.ConnectionPoolEntry` is de-associated from this 

1101 DBAPI connection. 

1102 

1103 Note that any overall connection limiting constraints imposed by a 

1104 Pool implementation may be violated after a detach, as the detached 

1105 connection is removed from the pool's knowledge and control. 

1106 

1107 """ 

1108 

1109 raise NotImplementedError() 

1110 

1111 def close(self) -> None: 

1112 """Release this connection back to the pool. 

1113 

1114 The :meth:`.PoolProxiedConnection.close` method shadows the 

1115 :pep:`249` ``.close()`` method, altering its behavior to instead 

1116 :term:`release` the proxied connection back to the connection pool. 

1117 

1118 Upon release to the pool, whether the connection stays "opened" and 

1119 pooled in the Python process, versus actually closed out and removed 

1120 from the Python process, is based on the pool implementation in use and 

1121 its configuration and current state. 

1122 

1123 """ 

1124 raise NotImplementedError() 

1125 

1126 def __enter__(self) -> Self: 

1127 return self 

1128 

1129 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 

1130 self.close() 

1131 return None 

1132 

1133 

1134class _AdhocProxiedConnection(PoolProxiedConnection): 

1135 """provides the :class:`.PoolProxiedConnection` interface for cases where 

1136 the DBAPI connection is not actually proxied. 

1137 

1138 This is used by the engine internals to pass a consistent 

1139 :class:`.PoolProxiedConnection` object to consuming dialects in response to 

1140 pool events that may not always have the :class:`._ConnectionFairy` 

1141 available. 

1142 

1143 """ 

1144 

1145 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid") 

1146 

1147 dbapi_connection: DBAPIConnection 

1148 _connection_record: ConnectionPoolEntry 

1149 

1150 def __init__( 

1151 self, 

1152 dbapi_connection: DBAPIConnection, 

1153 connection_record: ConnectionPoolEntry, 

1154 ): 

1155 self.dbapi_connection = dbapi_connection 

1156 self._connection_record = connection_record 

1157 self._is_valid = True 

1158 

1159 @property 

1160 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125 

1161 return self._connection_record.driver_connection 

1162 

1163 @property 

1164 def connection(self) -> DBAPIConnection: 

1165 return self.dbapi_connection 

1166 

1167 @property 

1168 def is_valid(self) -> bool: 

1169 """Implement is_valid state attribute. 

1170 

1171 for the adhoc proxied connection it's assumed the connection is valid 

1172 as there is no "invalidate" routine. 

1173 

1174 """ 

1175 return self._is_valid 

1176 

1177 def invalidate( 

1178 self, e: Optional[BaseException] = None, soft: bool = False 

1179 ) -> None: 

1180 self._is_valid = False 

1181 

1182 @util.ro_non_memoized_property 

1183 def record_info(self) -> Optional[_InfoType]: 

1184 return self._connection_record.record_info 

1185 

1186 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: 

1187 return self.dbapi_connection.cursor(*args, **kwargs) 

1188 

1189 def __getattr__(self, key: Any) -> Any: 

1190 return getattr(self.dbapi_connection, key) 

1191 

1192 

1193class _ConnectionFairy(PoolProxiedConnection): 

1194 """Proxies a DBAPI connection and provides return-on-dereference 

1195 support. 

1196 

1197 This is an internal object used by the :class:`_pool.Pool` implementation 

1198 to provide context management to a DBAPI connection delivered by 

1199 that :class:`_pool.Pool`. The public facing interface for this class 

1200 is described by the :class:`.PoolProxiedConnection` class. See that 

1201 class for public API details. 

1202 

1203 The name "fairy" is inspired by the fact that the 

1204 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts 

1205 only for the length of a specific DBAPI connection being checked out from 

1206 the pool, and additionally that as a transparent proxy, it is mostly 

1207 invisible. 

1208 

1209 .. seealso:: 

1210 

1211 :class:`.PoolProxiedConnection` 

1212 

1213 :class:`.ConnectionPoolEntry` 

1214 

1215 

1216 """ 

1217 

1218 __slots__ = ( 

1219 "dbapi_connection", 

1220 "_connection_record", 

1221 "_echo", 

1222 "_pool", 

1223 "_counter", 

1224 "__weakref__", 

1225 "__dict__", 

1226 ) 

1227 

1228 pool: Pool 

1229 dbapi_connection: DBAPIConnection 

1230 _echo: log._EchoFlagType 

1231 

1232 def __init__( 

1233 self, 

1234 pool: Pool, 

1235 dbapi_connection: DBAPIConnection, 

1236 connection_record: _ConnectionRecord, 

1237 echo: log._EchoFlagType, 

1238 ): 

1239 self._pool = pool 

1240 self._counter = 0 

1241 self.dbapi_connection = dbapi_connection 

1242 self._connection_record = connection_record 

1243 self._echo = echo 

1244 

1245 _connection_record: Optional[_ConnectionRecord] 

1246 

1247 @property 

1248 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501 

1249 if self._connection_record is None: 

1250 return None 

1251 return self._connection_record.driver_connection 

1252 

1253 @property 

1254 @util.deprecated( 

1255 "2.0", 

1256 "The _ConnectionFairy.connection attribute is deprecated; " 

1257 "please use 'driver_connection'", 

1258 ) 

1259 def connection(self) -> DBAPIConnection: 

1260 return self.dbapi_connection 

1261 

1262 @classmethod 

1263 def _checkout( 

1264 cls, 

1265 pool: Pool, 

1266 threadconns: Optional[threading.local] = None, 

1267 fairy: Optional[_ConnectionFairy] = None, 

1268 ) -> _ConnectionFairy: 

1269 if not fairy: 

1270 fairy = _ConnectionRecord.checkout(pool) 

1271 

1272 if threadconns is not None: 

1273 threadconns.current = weakref.ref(fairy) 

1274 

1275 assert ( 

1276 fairy._connection_record is not None 

1277 ), "can't 'checkout' a detached connection fairy" 

1278 assert ( 

1279 fairy.dbapi_connection is not None 

1280 ), "can't 'checkout' an invalidated connection fairy" 

1281 

1282 fairy._counter += 1 

1283 if ( 

1284 not pool.dispatch.checkout and not pool._pre_ping 

1285 ) or fairy._counter != 1: 

1286 return fairy 

1287 

1288 # Pool listeners can trigger a reconnection on checkout, as well 

1289 # as the pre-pinger. 

1290 # there are three attempts made here, but note that if the database 

1291 # is not accessible from a connection standpoint, those won't proceed 

1292 # here. 

1293 

1294 attempts = 2 

1295 

1296 while attempts > 0: 

1297 connection_is_fresh = fairy._connection_record.fresh 

1298 fairy._connection_record.fresh = False 

1299 try: 

1300 if pool._pre_ping: 

1301 if not connection_is_fresh: 

1302 if fairy._echo: 

1303 pool.logger.debug( 

1304 "Pool pre-ping on connection %s", 

1305 fairy.dbapi_connection, 

1306 ) 

1307 result = pool._dialect._do_ping_w_event( 

1308 fairy.dbapi_connection 

1309 ) 

1310 if not result: 

1311 if fairy._echo: 

1312 pool.logger.debug( 

1313 "Pool pre-ping on connection %s failed, " 

1314 "will invalidate pool", 

1315 fairy.dbapi_connection, 

1316 ) 

1317 raise exc.InvalidatePoolError() 

1318 elif fairy._echo: 

1319 pool.logger.debug( 

1320 "Connection %s is fresh, skipping pre-ping", 

1321 fairy.dbapi_connection, 

1322 ) 

1323 

1324 pool.dispatch.checkout( 

1325 fairy.dbapi_connection, fairy._connection_record, fairy 

1326 ) 

1327 return fairy 

1328 except exc.DisconnectionError as e: 

1329 if e.invalidate_pool: 

1330 pool.logger.info( 

1331 "Disconnection detected on checkout, " 

1332 "invalidating all pooled connections prior to " 

1333 "current timestamp (reason: %r)", 

1334 e, 

1335 ) 

1336 fairy._connection_record.invalidate(e) 

1337 pool._invalidate(fairy, e, _checkin=False) 

1338 else: 

1339 pool.logger.info( 

1340 "Disconnection detected on checkout, " 

1341 "invalidating individual connection %s (reason: %r)", 

1342 fairy.dbapi_connection, 

1343 e, 

1344 ) 

1345 fairy._connection_record.invalidate(e) 

1346 try: 

1347 fairy.dbapi_connection = ( 

1348 fairy._connection_record.get_connection() 

1349 ) 

1350 except BaseException as err: 

1351 with util.safe_reraise(): 

1352 fairy._connection_record._checkin_failed( 

1353 err, 

1354 _fairy_was_created=True, 

1355 ) 

1356 

1357 # prevent _ConnectionFairy from being carried 

1358 # in the stack trace. Do this after the 

1359 # connection record has been checked in, so that 

1360 # if the del triggers a finalize fairy, it won't 

1361 # try to checkin a second time. 

1362 del fairy 

1363 

1364 # never called, this is for code linters 

1365 raise 

1366 

1367 attempts -= 1 

1368 except BaseException as be_outer: 

1369 with util.safe_reraise(): 

1370 rec = fairy._connection_record 

1371 if rec is not None: 

1372 rec._checkin_failed( 

1373 be_outer, 

1374 _fairy_was_created=True, 

1375 ) 

1376 

1377 # prevent _ConnectionFairy from being carried 

1378 # in the stack trace, see above 

1379 del fairy 

1380 

1381 # never called, this is for code linters 

1382 raise 

1383 

1384 pool.logger.info("Reconnection attempts exhausted on checkout") 

1385 fairy.invalidate() 

1386 raise exc.InvalidRequestError("This connection is closed") 

1387 

1388 def _checkout_existing(self) -> _ConnectionFairy: 

1389 return _ConnectionFairy._checkout(self._pool, fairy=self) 

1390 

1391 def _checkin(self, transaction_was_reset: bool = False) -> None: 

1392 _finalize_fairy( 

1393 self.dbapi_connection, 

1394 self._connection_record, 

1395 self._pool, 

1396 None, 

1397 self._echo, 

1398 transaction_was_reset=transaction_was_reset, 

1399 fairy=self, 

1400 ) 

1401 

1402 def _close(self) -> None: 

1403 self._checkin() 

1404 

1405 def _reset( 

1406 self, 

1407 pool: Pool, 

1408 transaction_was_reset: bool, 

1409 terminate_only: bool, 

1410 asyncio_safe: bool, 

1411 ) -> None: 

1412 if pool.dispatch.reset: 

1413 pool.dispatch.reset( 

1414 self.dbapi_connection, 

1415 self._connection_record, 

1416 PoolResetState( 

1417 transaction_was_reset=transaction_was_reset, 

1418 terminate_only=terminate_only, 

1419 asyncio_safe=asyncio_safe, 

1420 ), 

1421 ) 

1422 

1423 if not asyncio_safe: 

1424 return 

1425 

1426 if pool._reset_on_return is reset_rollback: 

1427 if transaction_was_reset: 

1428 if self._echo: 

1429 pool.logger.debug( 

1430 "Connection %s reset, transaction already reset", 

1431 self.dbapi_connection, 

1432 ) 

1433 else: 

1434 if self._echo: 

1435 pool.logger.debug( 

1436 "Connection %s rollback-on-return", 

1437 self.dbapi_connection, 

1438 ) 

1439 pool._dialect.do_rollback(self) 

1440 elif pool._reset_on_return is reset_commit: 

1441 if self._echo: 

1442 pool.logger.debug( 

1443 "Connection %s commit-on-return", 

1444 self.dbapi_connection, 

1445 ) 

1446 pool._dialect.do_commit(self) 

1447 

1448 @property 

1449 def _logger(self) -> log._IdentifiedLoggerType: 

1450 return self._pool.logger 

1451 

1452 @property 

1453 def is_valid(self) -> bool: 

1454 return self.dbapi_connection is not None 

1455 

1456 @property 

1457 def is_detached(self) -> bool: 

1458 return self._connection_record is None 

1459 

1460 @util.ro_memoized_property 

1461 def info(self) -> _InfoType: 

1462 if self._connection_record is None: 

1463 return {} 

1464 else: 

1465 return self._connection_record.info 

1466 

1467 @util.ro_non_memoized_property 

1468 def record_info(self) -> Optional[_InfoType]: 

1469 if self._connection_record is None: 

1470 return None 

1471 else: 

1472 return self._connection_record.record_info 

1473 

1474 def invalidate( 

1475 self, e: Optional[BaseException] = None, soft: bool = False 

1476 ) -> None: 

1477 if self.dbapi_connection is None: 

1478 util.warn("Can't invalidate an already-closed connection.") 

1479 return 

1480 if self._connection_record: 

1481 self._connection_record.invalidate(e=e, soft=soft) 

1482 if not soft: 

1483 # prevent any rollback / reset actions etc. on 

1484 # the connection 

1485 self.dbapi_connection = None # type: ignore 

1486 

1487 # finalize 

1488 self._checkin() 

1489 

1490 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: 

1491 assert self.dbapi_connection is not None 

1492 return self.dbapi_connection.cursor(*args, **kwargs) 

1493 

1494 def __getattr__(self, key: str) -> Any: 

1495 return getattr(self.dbapi_connection, key) 

1496 

1497 def detach(self) -> None: 

1498 if self._connection_record is not None: 

1499 rec = self._connection_record 

1500 rec.fairy_ref = None 

1501 rec.dbapi_connection = None 

1502 # TODO: should this be _return_conn? 

1503 self._pool._do_return_conn(self._connection_record) 

1504 

1505 # can't get the descriptor assignment to work here 

1506 # in pylance. mypy is OK w/ it 

1507 self.info = self.info.copy() # type: ignore 

1508 

1509 self._connection_record = None 

1510 

1511 if self._pool.dispatch.detach: 

1512 self._pool.dispatch.detach(self.dbapi_connection, rec) 

1513 

1514 def close(self) -> None: 

1515 self._counter -= 1 

1516 if self._counter == 0: 

1517 self._checkin() 

1518 

1519 def _close_special(self, transaction_reset: bool = False) -> None: 

1520 self._counter -= 1 

1521 if self._counter == 0: 

1522 self._checkin(transaction_was_reset=transaction_reset)