Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

364 statements  

1# ext/asyncio/engine.py 

2# Copyright (C) 2020-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7from __future__ import annotations 

8 

9import asyncio 

10import contextlib 

11from typing import Any 

12from typing import AsyncIterator 

13from typing import Callable 

14from typing import Dict 

15from typing import Generator 

16from typing import NoReturn 

17from typing import Optional 

18from typing import overload 

19from typing import Tuple 

20from typing import Type 

21from typing import TYPE_CHECKING 

22from typing import TypeVar 

23from typing import Union 

24 

25from . import exc as async_exc 

26from .base import asyncstartablecontext 

27from .base import GeneratorStartableContext 

28from .base import ProxyComparable 

29from .base import StartableContext 

30from .result import _ensure_sync_result 

31from .result import AsyncResult 

32from .result import AsyncScalarResult 

33from ... import exc 

34from ... import inspection 

35from ... import util 

36from ...engine import Connection 

37from ...engine import create_engine as _create_engine 

38from ...engine import create_pool_from_url as _create_pool_from_url 

39from ...engine import Engine 

40from ...engine.base import NestedTransaction 

41from ...engine.base import Transaction 

42from ...exc import ArgumentError 

43from ...util.concurrency import greenlet_spawn 

44from ...util.typing import Concatenate 

45from ...util.typing import ParamSpec 

46 

47if TYPE_CHECKING: 

48 from ...engine.cursor import CursorResult 

49 from ...engine.interfaces import _CoreAnyExecuteParams 

50 from ...engine.interfaces import _CoreSingleExecuteParams 

51 from ...engine.interfaces import _DBAPIAnyExecuteParams 

52 from ...engine.interfaces import _ExecuteOptions 

53 from ...engine.interfaces import CompiledCacheType 

54 from ...engine.interfaces import CoreExecuteOptionsParameter 

55 from ...engine.interfaces import Dialect 

56 from ...engine.interfaces import IsolationLevel 

57 from ...engine.interfaces import SchemaTranslateMapType 

58 from ...engine.result import ScalarResult 

59 from ...engine.url import URL 

60 from ...pool import Pool 

61 from ...pool import PoolProxiedConnection 

62 from ...sql._typing import _InfoType 

63 from ...sql.base import Executable 

64 from ...sql.selectable import TypedReturnsRows 

65 

66_P = ParamSpec("_P") 

67_T = TypeVar("_T", bound=Any) 

68 

69 

70def create_async_engine(url: Union[str, URL], **kw: Any) -> AsyncEngine: 

71 """Create a new async engine instance. 

72 

73 Arguments passed to :func:`_asyncio.create_async_engine` are mostly 

74 identical to those passed to the :func:`_sa.create_engine` function. 

75 The specified dialect must be an asyncio-compatible dialect 

76 such as :ref:`dialect-postgresql-asyncpg`. 

77 

78 .. versionadded:: 1.4 

79 

80 :param async_creator: an async callable which returns a driver-level 

81 asyncio connection. If given, the function should take no arguments, 

82 and return a new asyncio connection from the underlying asyncio 

83 database driver; the connection will be wrapped in the appropriate 

84 structures to be used with the :class:`.AsyncEngine`. Note that the 

85 parameters specified in the URL are not applied here, and the creator 

86 function should use its own connection parameters. 

87 

88 This parameter is the asyncio equivalent of the 

89 :paramref:`_sa.create_engine.creator` parameter of the 

90 :func:`_sa.create_engine` function. 

91 

92 .. versionadded:: 2.0.16 

93 

94 """ 

95 

96 if kw.get("server_side_cursors", False): 

97 raise async_exc.AsyncMethodRequired( 

98 "Can't set server_side_cursors for async engine globally; " 

99 "use the connection.stream() method for an async " 

100 "streaming result set" 

101 ) 

102 kw["_is_async"] = True 

103 async_creator = kw.pop("async_creator", None) 

104 if async_creator: 

105 if kw.get("creator", None): 

106 raise ArgumentError( 

107 "Can only specify one of 'async_creator' or 'creator', " 

108 "not both." 

109 ) 

110 

111 def creator() -> Any: 

112 # note that to send adapted arguments like 

113 # prepared_statement_cache_size, user would use 

114 # "creator" and emulate this form here 

115 return sync_engine.dialect.dbapi.connect( # type: ignore 

116 async_creator_fn=async_creator 

117 ) 

118 

119 kw["creator"] = creator 

120 sync_engine = _create_engine(url, **kw) 

121 return AsyncEngine(sync_engine) 

122 

123 

124def async_engine_from_config( 

125 configuration: Dict[str, Any], prefix: str = "sqlalchemy.", **kwargs: Any 

126) -> AsyncEngine: 

127 """Create a new AsyncEngine instance using a configuration dictionary. 

128 

129 This function is analogous to the :func:`_sa.engine_from_config` function 

130 in SQLAlchemy Core, except that the requested dialect must be an 

131 asyncio-compatible dialect such as :ref:`dialect-postgresql-asyncpg`. 

132 The argument signature of the function is identical to that 

133 of :func:`_sa.engine_from_config`. 

134 

135 .. versionadded:: 1.4.29 

136 

137 """ 

138 options = { 

139 key[len(prefix) :]: value 

140 for key, value in configuration.items() 

141 if key.startswith(prefix) 

142 } 

143 options["_coerce_config"] = True 

144 options.update(kwargs) 

145 url = options.pop("url") 

146 return create_async_engine(url, **options) 

147 

148 

149def create_async_pool_from_url(url: Union[str, URL], **kwargs: Any) -> Pool: 

150 """Create a new async engine instance. 

151 

152 Arguments passed to :func:`_asyncio.create_async_pool_from_url` are mostly 

153 identical to those passed to the :func:`_sa.create_pool_from_url` function. 

154 The specified dialect must be an asyncio-compatible dialect 

155 such as :ref:`dialect-postgresql-asyncpg`. 

156 

157 .. versionadded:: 2.0.10 

158 

159 """ 

160 kwargs["_is_async"] = True 

161 return _create_pool_from_url(url, **kwargs) 

162 

163 

164class AsyncConnectable: 

165 __slots__ = "_slots_dispatch", "__weakref__" 

166 

167 @classmethod 

168 def _no_async_engine_events(cls) -> NoReturn: 

169 raise NotImplementedError( 

170 "asynchronous events are not implemented at this time. Apply " 

171 "synchronous listeners to the AsyncEngine.sync_engine or " 

172 "AsyncConnection.sync_connection attributes." 

173 ) 

174 

175 

176@util.create_proxy_methods( 

177 Connection, 

178 ":class:`_engine.Connection`", 

179 ":class:`_asyncio.AsyncConnection`", 

180 classmethods=[], 

181 methods=[], 

182 attributes=[ 

183 "closed", 

184 "invalidated", 

185 "dialect", 

186 "default_isolation_level", 

187 ], 

188) 

189# "Class has incompatible disjoint bases" - no idea 

190class AsyncConnection( # type:ignore[misc] 

191 ProxyComparable[Connection], 

192 StartableContext["AsyncConnection"], 

193 AsyncConnectable, 

194): 

195 """An asyncio proxy for a :class:`_engine.Connection`. 

196 

197 :class:`_asyncio.AsyncConnection` is acquired using the 

198 :meth:`_asyncio.AsyncEngine.connect` 

199 method of :class:`_asyncio.AsyncEngine`:: 

200 

201 from sqlalchemy.ext.asyncio import create_async_engine 

202 

203 engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") 

204 

205 async with engine.connect() as conn: 

206 result = await conn.execute(select(table)) 

207 

208 .. versionadded:: 1.4 

209 

210 """ # noqa 

211 

212 # AsyncConnection is a thin proxy; no state should be added here 

213 # that is not retrievable from the "sync" engine / connection, e.g. 

214 # current transaction, info, etc. It should be possible to 

215 # create a new AsyncConnection that matches this one given only the 

216 # "sync" elements. 

217 __slots__ = ( 

218 "engine", 

219 "sync_engine", 

220 "sync_connection", 

221 ) 

222 

223 def __init__( 

224 self, 

225 async_engine: AsyncEngine, 

226 sync_connection: Optional[Connection] = None, 

227 ): 

228 self.engine = async_engine 

229 self.sync_engine = async_engine.sync_engine 

230 self.sync_connection = self._assign_proxied(sync_connection) 

231 

232 sync_connection: Optional[Connection] 

233 """Reference to the sync-style :class:`_engine.Connection` this 

234 :class:`_asyncio.AsyncConnection` proxies requests towards. 

235 

236 This instance can be used as an event target. 

237 

238 .. seealso:: 

239 

240 :ref:`asyncio_events` 

241 

242 """ 

243 

244 sync_engine: Engine 

245 """Reference to the sync-style :class:`_engine.Engine` this 

246 :class:`_asyncio.AsyncConnection` is associated with via its underlying 

247 :class:`_engine.Connection`. 

248 

249 This instance can be used as an event target. 

250 

251 .. seealso:: 

252 

253 :ref:`asyncio_events` 

254 

255 """ 

256 

257 @classmethod 

258 def _regenerate_proxy_for_target( 

259 cls, target: Connection, **additional_kw: Any # noqa: U100 

260 ) -> AsyncConnection: 

261 return AsyncConnection( 

262 AsyncEngine._retrieve_proxy_for_target(target.engine), target 

263 ) 

264 

265 async def start( 

266 self, is_ctxmanager: bool = False # noqa: U100 

267 ) -> AsyncConnection: 

268 """Start this :class:`_asyncio.AsyncConnection` object's context 

269 outside of using a Python ``with:`` block. 

270 

271 """ 

272 if self.sync_connection: 

273 raise exc.InvalidRequestError("connection is already started") 

274 self.sync_connection = self._assign_proxied( 

275 await greenlet_spawn(self.sync_engine.connect) 

276 ) 

277 return self 

278 

279 @property 

280 def connection(self) -> NoReturn: 

281 """Not implemented for async; call 

282 :meth:`_asyncio.AsyncConnection.get_raw_connection`. 

283 """ 

284 raise exc.InvalidRequestError( 

285 "AsyncConnection.connection accessor is not implemented as the " 

286 "attribute may need to reconnect on an invalidated connection. " 

287 "Use the get_raw_connection() method." 

288 ) 

289 

290 async def get_raw_connection(self) -> PoolProxiedConnection: 

291 """Return the pooled DBAPI-level connection in use by this 

292 :class:`_asyncio.AsyncConnection`. 

293 

294 This is a SQLAlchemy connection-pool proxied connection 

295 which then has the attribute 

296 :attr:`_pool._ConnectionFairy.driver_connection` that refers to the 

297 actual driver connection. Its 

298 :attr:`_pool._ConnectionFairy.dbapi_connection` refers instead 

299 to an :class:`_engine.AdaptedConnection` instance that 

300 adapts the driver connection to the DBAPI protocol. 

301 

302 """ 

303 

304 return await greenlet_spawn(getattr, self._proxied, "connection") 

305 

306 @util.ro_non_memoized_property 

307 def info(self) -> _InfoType: 

308 """Return the :attr:`_engine.Connection.info` dictionary of the 

309 underlying :class:`_engine.Connection`. 

310 

311 This dictionary is freely writable for user-defined state to be 

312 associated with the database connection. 

313 

314 This attribute is only available if the :class:`.AsyncConnection` is 

315 currently connected. If the :attr:`.AsyncConnection.closed` attribute 

316 is ``True``, then accessing this attribute will raise 

317 :class:`.ResourceClosedError`. 

318 

319 .. versionadded:: 1.4.0b2 

320 

321 """ 

322 return self._proxied.info 

323 

324 @util.ro_non_memoized_property 

325 def _proxied(self) -> Connection: 

326 if not self.sync_connection: 

327 self._raise_for_not_started() 

328 return self.sync_connection 

329 

330 def begin(self) -> AsyncTransaction: 

331 """Begin a transaction prior to autobegin occurring.""" 

332 assert self._proxied 

333 return AsyncTransaction(self) 

334 

335 def begin_nested(self) -> AsyncTransaction: 

336 """Begin a nested transaction and return a transaction handle.""" 

337 assert self._proxied 

338 return AsyncTransaction(self, nested=True) 

339 

340 async def invalidate( 

341 self, exception: Optional[BaseException] = None 

342 ) -> None: 

343 """Invalidate the underlying DBAPI connection associated with 

344 this :class:`_engine.Connection`. 

345 

346 See the method :meth:`_engine.Connection.invalidate` for full 

347 detail on this method. 

348 

349 """ 

350 

351 return await greenlet_spawn( 

352 self._proxied.invalidate, exception=exception 

353 ) 

354 

355 async def get_isolation_level(self) -> IsolationLevel: 

356 return await greenlet_spawn(self._proxied.get_isolation_level) 

357 

358 def in_transaction(self) -> bool: 

359 """Return True if a transaction is in progress.""" 

360 

361 return self._proxied.in_transaction() 

362 

363 def in_nested_transaction(self) -> bool: 

364 """Return True if a transaction is in progress. 

365 

366 .. versionadded:: 1.4.0b2 

367 

368 """ 

369 return self._proxied.in_nested_transaction() 

370 

371 def get_transaction(self) -> Optional[AsyncTransaction]: 

372 """Return an :class:`.AsyncTransaction` representing the current 

373 transaction, if any. 

374 

375 This makes use of the underlying synchronous connection's 

376 :meth:`_engine.Connection.get_transaction` method to get the current 

377 :class:`_engine.Transaction`, which is then proxied in a new 

378 :class:`.AsyncTransaction` object. 

379 

380 .. versionadded:: 1.4.0b2 

381 

382 """ 

383 

384 trans = self._proxied.get_transaction() 

385 if trans is not None: 

386 return AsyncTransaction._retrieve_proxy_for_target(trans) 

387 else: 

388 return None 

389 

390 def get_nested_transaction(self) -> Optional[AsyncTransaction]: 

391 """Return an :class:`.AsyncTransaction` representing the current 

392 nested (savepoint) transaction, if any. 

393 

394 This makes use of the underlying synchronous connection's 

395 :meth:`_engine.Connection.get_nested_transaction` method to get the 

396 current :class:`_engine.Transaction`, which is then proxied in a new 

397 :class:`.AsyncTransaction` object. 

398 

399 .. versionadded:: 1.4.0b2 

400 

401 """ 

402 

403 trans = self._proxied.get_nested_transaction() 

404 if trans is not None: 

405 return AsyncTransaction._retrieve_proxy_for_target(trans) 

406 else: 

407 return None 

408 

409 @overload 

410 async def execution_options( 

411 self, 

412 *, 

413 compiled_cache: Optional[CompiledCacheType] = ..., 

414 logging_token: str = ..., 

415 isolation_level: IsolationLevel = ..., 

416 no_parameters: bool = False, 

417 stream_results: bool = False, 

418 max_row_buffer: int = ..., 

419 yield_per: int = ..., 

420 insertmanyvalues_page_size: int = ..., 

421 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

422 preserve_rowcount: bool = False, 

423 **opt: Any, 

424 ) -> AsyncConnection: ... 

425 

426 @overload 

427 async def execution_options(self, **opt: Any) -> AsyncConnection: ... 

428 

429 async def execution_options(self, **opt: Any) -> AsyncConnection: 

430 r"""Set non-SQL options for the connection which take effect 

431 during execution. 

432 

433 This returns this :class:`_asyncio.AsyncConnection` object with 

434 the new options added. 

435 

436 See :meth:`_engine.Connection.execution_options` for full details 

437 on this method. 

438 

439 """ 

440 

441 conn = self._proxied 

442 c2 = await greenlet_spawn(conn.execution_options, **opt) 

443 assert c2 is conn 

444 return self 

445 

446 async def commit(self) -> None: 

447 """Commit the transaction that is currently in progress. 

448 

449 This method commits the current transaction if one has been started. 

450 If no transaction was started, the method has no effect, assuming 

451 the connection is in a non-invalidated state. 

452 

453 A transaction is begun on a :class:`_engine.Connection` automatically 

454 whenever a statement is first executed, or when the 

455 :meth:`_engine.Connection.begin` method is called. 

456 

457 """ 

458 await greenlet_spawn(self._proxied.commit) 

459 

460 async def rollback(self) -> None: 

461 """Roll back the transaction that is currently in progress. 

462 

463 This method rolls back the current transaction if one has been started. 

464 If no transaction was started, the method has no effect. If a 

465 transaction was started and the connection is in an invalidated state, 

466 the transaction is cleared using this method. 

467 

468 A transaction is begun on a :class:`_engine.Connection` automatically 

469 whenever a statement is first executed, or when the 

470 :meth:`_engine.Connection.begin` method is called. 

471 

472 

473 """ 

474 await greenlet_spawn(self._proxied.rollback) 

475 

476 async def close(self) -> None: 

477 """Close this :class:`_asyncio.AsyncConnection`. 

478 

479 This has the effect of also rolling back the transaction if one 

480 is in place. 

481 

482 """ 

483 await greenlet_spawn(self._proxied.close) 

484 

485 async def aclose(self) -> None: 

486 """A synonym for :meth:`_asyncio.AsyncConnection.close`. 

487 

488 The :meth:`_asyncio.AsyncConnection.aclose` name is specifically 

489 to support the Python standard library ``@contextlib.aclosing`` 

490 context manager function. 

491 

492 .. versionadded:: 2.0.20 

493 

494 """ 

495 await self.close() 

496 

497 async def exec_driver_sql( 

498 self, 

499 statement: str, 

500 parameters: Optional[_DBAPIAnyExecuteParams] = None, 

501 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

502 ) -> CursorResult[Any]: 

503 r"""Executes a driver-level SQL string and return buffered 

504 :class:`_engine.Result`. 

505 

506 """ 

507 

508 result = await greenlet_spawn( 

509 self._proxied.exec_driver_sql, 

510 statement, 

511 parameters, 

512 execution_options, 

513 _require_await=True, 

514 ) 

515 

516 return await _ensure_sync_result(result, self.exec_driver_sql) 

517 

518 @overload 

519 def stream( 

520 self, 

521 statement: TypedReturnsRows[_T], 

522 parameters: Optional[_CoreAnyExecuteParams] = None, 

523 *, 

524 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

525 ) -> GeneratorStartableContext[AsyncResult[_T]]: ... 

526 

527 @overload 

528 def stream( 

529 self, 

530 statement: Executable, 

531 parameters: Optional[_CoreAnyExecuteParams] = None, 

532 *, 

533 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

534 ) -> GeneratorStartableContext[AsyncResult[Any]]: ... 

535 

536 @asyncstartablecontext 

537 async def stream( 

538 self, 

539 statement: Executable, 

540 parameters: Optional[_CoreAnyExecuteParams] = None, 

541 *, 

542 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

543 ) -> AsyncIterator[AsyncResult[Any]]: 

544 """Execute a statement and return an awaitable yielding a 

545 :class:`_asyncio.AsyncResult` object. 

546 

547 E.g.:: 

548 

549 result = await conn.stream(stmt) 

550 async for row in result: 

551 print(f"{row}") 

552 

553 The :meth:`.AsyncConnection.stream` 

554 method supports optional context manager use against the 

555 :class:`.AsyncResult` object, as in:: 

556 

557 async with conn.stream(stmt) as result: 

558 async for row in result: 

559 print(f"{row}") 

560 

561 In the above pattern, the :meth:`.AsyncResult.close` method is 

562 invoked unconditionally, even if the iterator is interrupted by an 

563 exception throw. Context manager use remains optional, however, 

564 and the function may be called in either an ``async with fn():`` or 

565 ``await fn()`` style. 

566 

567 .. versionadded:: 2.0.0b3 added context manager support 

568 

569 

570 :return: an awaitable object that will yield an 

571 :class:`_asyncio.AsyncResult` object. 

572 

573 .. seealso:: 

574 

575 :meth:`.AsyncConnection.stream_scalars` 

576 

577 """ 

578 if not self.dialect.supports_server_side_cursors: 

579 raise exc.InvalidRequestError( 

580 "Cant use `stream` or `stream_scalars` with the current " 

581 "dialect since it does not support server side cursors." 

582 ) 

583 

584 result = await greenlet_spawn( 

585 self._proxied.execute, 

586 statement, 

587 parameters, 

588 execution_options=util.EMPTY_DICT.merge_with( 

589 execution_options, {"stream_results": True} 

590 ), 

591 _require_await=True, 

592 ) 

593 assert result.context._is_server_side 

594 ar = AsyncResult(result) 

595 try: 

596 yield ar 

597 except GeneratorExit: 

598 pass 

599 else: 

600 task = asyncio.create_task(ar.close()) 

601 await asyncio.shield(task) 

602 

603 @overload 

604 async def execute( 

605 self, 

606 statement: TypedReturnsRows[_T], 

607 parameters: Optional[_CoreAnyExecuteParams] = None, 

608 *, 

609 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

610 ) -> CursorResult[_T]: ... 

611 

612 @overload 

613 async def execute( 

614 self, 

615 statement: Executable, 

616 parameters: Optional[_CoreAnyExecuteParams] = None, 

617 *, 

618 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

619 ) -> CursorResult[Any]: ... 

620 

621 async def execute( 

622 self, 

623 statement: Executable, 

624 parameters: Optional[_CoreAnyExecuteParams] = None, 

625 *, 

626 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

627 ) -> CursorResult[Any]: 

628 r"""Executes a SQL statement construct and return a buffered 

629 :class:`_engine.Result`. 

630 

631 :param object: The statement to be executed. This is always 

632 an object that is in both the :class:`_expression.ClauseElement` and 

633 :class:`_expression.Executable` hierarchies, including: 

634 

635 * :class:`_expression.Select` 

636 * :class:`_expression.Insert`, :class:`_expression.Update`, 

637 :class:`_expression.Delete` 

638 * :class:`_expression.TextClause` and 

639 :class:`_expression.TextualSelect` 

640 * :class:`_schema.DDL` and objects which inherit from 

641 :class:`_schema.ExecutableDDLElement` 

642 

643 :param parameters: parameters which will be bound into the statement. 

644 This may be either a dictionary of parameter names to values, 

645 or a mutable sequence (e.g. a list) of dictionaries. When a 

646 list of dictionaries is passed, the underlying statement execution 

647 will make use of the DBAPI ``cursor.executemany()`` method. 

648 When a single dictionary is passed, the DBAPI ``cursor.execute()`` 

649 method will be used. 

650 

651 :param execution_options: optional dictionary of execution options, 

652 which will be associated with the statement execution. This 

653 dictionary can provide a subset of the options that are accepted 

654 by :meth:`_engine.Connection.execution_options`. 

655 

656 :return: a :class:`_engine.Result` object. 

657 

658 """ 

659 result = await greenlet_spawn( 

660 self._proxied.execute, 

661 statement, 

662 parameters, 

663 execution_options=execution_options, 

664 _require_await=True, 

665 ) 

666 return await _ensure_sync_result(result, self.execute) 

667 

668 @overload 

669 async def scalar( 

670 self, 

671 statement: TypedReturnsRows[Tuple[_T]], 

672 parameters: Optional[_CoreSingleExecuteParams] = None, 

673 *, 

674 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

675 ) -> Optional[_T]: ... 

676 

677 @overload 

678 async def scalar( 

679 self, 

680 statement: Executable, 

681 parameters: Optional[_CoreSingleExecuteParams] = None, 

682 *, 

683 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

684 ) -> Any: ... 

685 

686 async def scalar( 

687 self, 

688 statement: Executable, 

689 parameters: Optional[_CoreSingleExecuteParams] = None, 

690 *, 

691 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

692 ) -> Any: 

693 r"""Executes a SQL statement construct and returns a scalar object. 

694 

695 This method is shorthand for invoking the 

696 :meth:`_engine.Result.scalar` method after invoking the 

697 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

698 

699 :return: a scalar Python value representing the first column of the 

700 first row returned. 

701 

702 """ 

703 result = await self.execute( 

704 statement, parameters, execution_options=execution_options 

705 ) 

706 return result.scalar() 

707 

708 @overload 

709 async def scalars( 

710 self, 

711 statement: TypedReturnsRows[Tuple[_T]], 

712 parameters: Optional[_CoreAnyExecuteParams] = None, 

713 *, 

714 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

715 ) -> ScalarResult[_T]: ... 

716 

717 @overload 

718 async def scalars( 

719 self, 

720 statement: Executable, 

721 parameters: Optional[_CoreAnyExecuteParams] = None, 

722 *, 

723 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

724 ) -> ScalarResult[Any]: ... 

725 

726 async def scalars( 

727 self, 

728 statement: Executable, 

729 parameters: Optional[_CoreAnyExecuteParams] = None, 

730 *, 

731 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

732 ) -> ScalarResult[Any]: 

733 r"""Executes a SQL statement construct and returns a scalar objects. 

734 

735 This method is shorthand for invoking the 

736 :meth:`_engine.Result.scalars` method after invoking the 

737 :meth:`_engine.Connection.execute` method. Parameters are equivalent. 

738 

739 :return: a :class:`_engine.ScalarResult` object. 

740 

741 .. versionadded:: 1.4.24 

742 

743 """ 

744 result = await self.execute( 

745 statement, parameters, execution_options=execution_options 

746 ) 

747 return result.scalars() 

748 

749 @overload 

750 def stream_scalars( 

751 self, 

752 statement: TypedReturnsRows[Tuple[_T]], 

753 parameters: Optional[_CoreSingleExecuteParams] = None, 

754 *, 

755 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

756 ) -> GeneratorStartableContext[AsyncScalarResult[_T]]: ... 

757 

758 @overload 

759 def stream_scalars( 

760 self, 

761 statement: Executable, 

762 parameters: Optional[_CoreSingleExecuteParams] = None, 

763 *, 

764 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

765 ) -> GeneratorStartableContext[AsyncScalarResult[Any]]: ... 

766 

767 @asyncstartablecontext 

768 async def stream_scalars( 

769 self, 

770 statement: Executable, 

771 parameters: Optional[_CoreSingleExecuteParams] = None, 

772 *, 

773 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

774 ) -> AsyncIterator[AsyncScalarResult[Any]]: 

775 r"""Execute a statement and return an awaitable yielding a 

776 :class:`_asyncio.AsyncScalarResult` object. 

777 

778 E.g.:: 

779 

780 result = await conn.stream_scalars(stmt) 

781 async for scalar in result: 

782 print(f"{scalar}") 

783 

784 This method is shorthand for invoking the 

785 :meth:`_engine.AsyncResult.scalars` method after invoking the 

786 :meth:`_engine.Connection.stream` method. Parameters are equivalent. 

787 

788 The :meth:`.AsyncConnection.stream_scalars` 

789 method supports optional context manager use against the 

790 :class:`.AsyncScalarResult` object, as in:: 

791 

792 async with conn.stream_scalars(stmt) as result: 

793 async for scalar in result: 

794 print(f"{scalar}") 

795 

796 In the above pattern, the :meth:`.AsyncScalarResult.close` method is 

797 invoked unconditionally, even if the iterator is interrupted by an 

798 exception throw. Context manager use remains optional, however, 

799 and the function may be called in either an ``async with fn():`` or 

800 ``await fn()`` style. 

801 

802 .. versionadded:: 2.0.0b3 added context manager support 

803 

804 :return: an awaitable object that will yield an 

805 :class:`_asyncio.AsyncScalarResult` object. 

806 

807 .. versionadded:: 1.4.24 

808 

809 .. seealso:: 

810 

811 :meth:`.AsyncConnection.stream` 

812 

813 """ 

814 

815 async with self.stream( 

816 statement, parameters, execution_options=execution_options 

817 ) as result: 

818 yield result.scalars() 

819 

820 async def run_sync( 

821 self, 

822 fn: Callable[Concatenate[Connection, _P], _T], 

823 *arg: _P.args, 

824 **kw: _P.kwargs, 

825 ) -> _T: 

826 '''Invoke the given synchronous (i.e. not async) callable, 

827 passing a synchronous-style :class:`_engine.Connection` as the first 

828 argument. 

829 

830 This method allows traditional synchronous SQLAlchemy functions to 

831 run within the context of an asyncio application. 

832 

833 E.g.:: 

834 

835 def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str: 

836 """A synchronous function that does not require awaiting 

837 

838 :param conn: a Core SQLAlchemy Connection, used synchronously 

839 

840 :return: an optional return value is supported 

841 

842 """ 

843 conn.execute(some_table.insert().values(int_col=arg1, str_col=arg2)) 

844 return "success" 

845 

846 

847 async def do_something_async(async_engine: AsyncEngine) -> None: 

848 """an async function that uses awaiting""" 

849 

850 async with async_engine.begin() as async_conn: 

851 # run do_something_with_core() with a sync-style 

852 # Connection, proxied into an awaitable 

853 return_code = await async_conn.run_sync( 

854 do_something_with_core, 5, "strval" 

855 ) 

856 print(return_code) 

857 

858 This method maintains the asyncio event loop all the way through 

859 to the database connection by running the given callable in a 

860 specially instrumented greenlet. 

861 

862 The most rudimentary use of :meth:`.AsyncConnection.run_sync` is to 

863 invoke methods such as :meth:`_schema.MetaData.create_all`, given 

864 an :class:`.AsyncConnection` that needs to be provided to 

865 :meth:`_schema.MetaData.create_all` as a :class:`_engine.Connection` 

866 object:: 

867 

868 # run metadata.create_all(conn) with a sync-style Connection, 

869 # proxied into an awaitable 

870 with async_engine.begin() as conn: 

871 await conn.run_sync(metadata.create_all) 

872 

873 .. note:: 

874 

875 The provided callable is invoked inline within the asyncio event 

876 loop, and will block on traditional IO calls. IO within this 

877 callable should only call into SQLAlchemy's asyncio database 

878 APIs which will be properly adapted to the greenlet context. 

879 

880 .. seealso:: 

881 

882 :meth:`.AsyncSession.run_sync` 

883 

884 :ref:`session_run_sync` 

885 

886 ''' # noqa: E501 

887 

888 return await greenlet_spawn( 

889 fn, self._proxied, *arg, _require_await=False, **kw 

890 ) 

891 

892 def __await__(self) -> Generator[Any, None, AsyncConnection]: 

893 return self.start().__await__() 

894 

895 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: 

896 task = asyncio.create_task(self.close()) 

897 await asyncio.shield(task) 

898 

899 # START PROXY METHODS AsyncConnection 

900 

901 # code within this block is **programmatically, 

902 # statically generated** by tools/generate_proxy_methods.py 

903 

904 @property 

905 def closed(self) -> Any: 

906 r"""Return True if this connection is closed. 

907 

908 .. container:: class_bases 

909 

910 Proxied for the :class:`_engine.Connection` class 

911 on behalf of the :class:`_asyncio.AsyncConnection` class. 

912 

913 """ # noqa: E501 

914 

915 return self._proxied.closed 

916 

917 @property 

918 def invalidated(self) -> Any: 

919 r"""Return True if this connection was invalidated. 

920 

921 .. container:: class_bases 

922 

923 Proxied for the :class:`_engine.Connection` class 

924 on behalf of the :class:`_asyncio.AsyncConnection` class. 

925 

926 This does not indicate whether or not the connection was 

927 invalidated at the pool level, however 

928 

929 

930 """ # noqa: E501 

931 

932 return self._proxied.invalidated 

933 

934 @property 

935 def dialect(self) -> Dialect: 

936 r"""Proxy for the :attr:`_engine.Connection.dialect` attribute 

937 on behalf of the :class:`_asyncio.AsyncConnection` class. 

938 

939 """ # noqa: E501 

940 

941 return self._proxied.dialect 

942 

943 @dialect.setter 

944 def dialect(self, attr: Dialect) -> None: 

945 self._proxied.dialect = attr 

946 

947 @property 

948 def default_isolation_level(self) -> Any: 

949 r"""The initial-connection time isolation level associated with the 

950 :class:`_engine.Dialect` in use. 

951 

952 .. container:: class_bases 

953 

954 Proxied for the :class:`_engine.Connection` class 

955 on behalf of the :class:`_asyncio.AsyncConnection` class. 

956 

957 This value is independent of the 

958 :paramref:`.Connection.execution_options.isolation_level` and 

959 :paramref:`.Engine.execution_options.isolation_level` execution 

960 options, and is determined by the :class:`_engine.Dialect` when the 

961 first connection is created, by performing a SQL query against the 

962 database for the current isolation level before any additional commands 

963 have been emitted. 

964 

965 Calling this accessor does not invoke any new SQL queries. 

966 

967 .. seealso:: 

968 

969 :meth:`_engine.Connection.get_isolation_level` 

970 - view current actual isolation level 

971 

972 :paramref:`_sa.create_engine.isolation_level` 

973 - set per :class:`_engine.Engine` isolation level 

974 

975 :paramref:`.Connection.execution_options.isolation_level` 

976 - set per :class:`_engine.Connection` isolation level 

977 

978 

979 """ # noqa: E501 

980 

981 return self._proxied.default_isolation_level 

982 

983 # END PROXY METHODS AsyncConnection 

984 

985 

986@util.create_proxy_methods( 

987 Engine, 

988 ":class:`_engine.Engine`", 

989 ":class:`_asyncio.AsyncEngine`", 

990 classmethods=[], 

991 methods=[ 

992 "clear_compiled_cache", 

993 "update_execution_options", 

994 "get_execution_options", 

995 ], 

996 attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"], 

997) 

998# "Class has incompatible disjoint bases" - no idea 

999class AsyncEngine(ProxyComparable[Engine], AsyncConnectable): # type: ignore[misc] # noqa:E501 

1000 """An asyncio proxy for a :class:`_engine.Engine`. 

1001 

1002 :class:`_asyncio.AsyncEngine` is acquired using the 

1003 :func:`_asyncio.create_async_engine` function:: 

1004 

1005 from sqlalchemy.ext.asyncio import create_async_engine 

1006 

1007 engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") 

1008 

1009 .. versionadded:: 1.4 

1010 

1011 """ # noqa 

1012 

1013 # AsyncEngine is a thin proxy; no state should be added here 

1014 # that is not retrievable from the "sync" engine / connection, e.g. 

1015 # current transaction, info, etc. It should be possible to 

1016 # create a new AsyncEngine that matches this one given only the 

1017 # "sync" elements. 

1018 __slots__ = "sync_engine" 

1019 

1020 _connection_cls: Type[AsyncConnection] = AsyncConnection 

1021 

1022 sync_engine: Engine 

1023 """Reference to the sync-style :class:`_engine.Engine` this 

1024 :class:`_asyncio.AsyncEngine` proxies requests towards. 

1025 

1026 This instance can be used as an event target. 

1027 

1028 .. seealso:: 

1029 

1030 :ref:`asyncio_events` 

1031 """ 

1032 

1033 def __init__(self, sync_engine: Engine): 

1034 if not sync_engine.dialect.is_async: 

1035 raise exc.InvalidRequestError( 

1036 "The asyncio extension requires an async driver to be used. " 

1037 f"The loaded {sync_engine.dialect.driver!r} is not async." 

1038 ) 

1039 self.sync_engine = self._assign_proxied(sync_engine) 

1040 

1041 @util.ro_non_memoized_property 

1042 def _proxied(self) -> Engine: 

1043 return self.sync_engine 

1044 

1045 @classmethod 

1046 def _regenerate_proxy_for_target( 

1047 cls, target: Engine, **additional_kw: Any # noqa: U100 

1048 ) -> AsyncEngine: 

1049 return AsyncEngine(target) 

1050 

1051 @contextlib.asynccontextmanager 

1052 async def begin(self) -> AsyncIterator[AsyncConnection]: 

1053 """Return a context manager which when entered will deliver an 

1054 :class:`_asyncio.AsyncConnection` with an 

1055 :class:`_asyncio.AsyncTransaction` established. 

1056 

1057 E.g.:: 

1058 

1059 async with async_engine.begin() as conn: 

1060 await conn.execute( 

1061 text("insert into table (x, y, z) values (1, 2, 3)") 

1062 ) 

1063 await conn.execute(text("my_special_procedure(5)")) 

1064 

1065 """ 

1066 conn = self.connect() 

1067 

1068 async with conn: 

1069 async with conn.begin(): 

1070 yield conn 

1071 

1072 def connect(self) -> AsyncConnection: 

1073 """Return an :class:`_asyncio.AsyncConnection` object. 

1074 

1075 The :class:`_asyncio.AsyncConnection` will procure a database 

1076 connection from the underlying connection pool when it is entered 

1077 as an async context manager:: 

1078 

1079 async with async_engine.connect() as conn: 

1080 result = await conn.execute(select(user_table)) 

1081 

1082 The :class:`_asyncio.AsyncConnection` may also be started outside of a 

1083 context manager by invoking its :meth:`_asyncio.AsyncConnection.start` 

1084 method. 

1085 

1086 """ 

1087 

1088 return self._connection_cls(self) 

1089 

1090 async def raw_connection(self) -> PoolProxiedConnection: 

1091 """Return a "raw" DBAPI connection from the connection pool. 

1092 

1093 .. seealso:: 

1094 

1095 :ref:`dbapi_connections` 

1096 

1097 """ 

1098 return await greenlet_spawn(self.sync_engine.raw_connection) 

1099 

1100 @overload 

1101 def execution_options( 

1102 self, 

1103 *, 

1104 compiled_cache: Optional[CompiledCacheType] = ..., 

1105 logging_token: str = ..., 

1106 isolation_level: IsolationLevel = ..., 

1107 insertmanyvalues_page_size: int = ..., 

1108 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

1109 **opt: Any, 

1110 ) -> AsyncEngine: ... 

1111 

1112 @overload 

1113 def execution_options(self, **opt: Any) -> AsyncEngine: ... 

1114 

1115 def execution_options(self, **opt: Any) -> AsyncEngine: 

1116 """Return a new :class:`_asyncio.AsyncEngine` that will provide 

1117 :class:`_asyncio.AsyncConnection` objects with the given execution 

1118 options. 

1119 

1120 Proxied from :meth:`_engine.Engine.execution_options`. See that 

1121 method for details. 

1122 

1123 """ 

1124 

1125 return AsyncEngine(self.sync_engine.execution_options(**opt)) 

1126 

1127 async def dispose(self, close: bool = True) -> None: 

1128 """Dispose of the connection pool used by this 

1129 :class:`_asyncio.AsyncEngine`. 

1130 

1131 :param close: if left at its default of ``True``, has the 

1132 effect of fully closing all **currently checked in** 

1133 database connections. Connections that are still checked out 

1134 will **not** be closed, however they will no longer be associated 

1135 with this :class:`_engine.Engine`, 

1136 so when they are closed individually, eventually the 

1137 :class:`_pool.Pool` which they are associated with will 

1138 be garbage collected and they will be closed out fully, if 

1139 not already closed on checkin. 

1140 

1141 If set to ``False``, the previous connection pool is de-referenced, 

1142 and otherwise not touched in any way. 

1143 

1144 .. seealso:: 

1145 

1146 :meth:`_engine.Engine.dispose` 

1147 

1148 """ 

1149 

1150 await greenlet_spawn(self.sync_engine.dispose, close=close) 

1151 

1152 # START PROXY METHODS AsyncEngine 

1153 

1154 # code within this block is **programmatically, 

1155 # statically generated** by tools/generate_proxy_methods.py 

1156 

1157 def clear_compiled_cache(self) -> None: 

1158 r"""Clear the compiled cache associated with the dialect. 

1159 

1160 .. container:: class_bases 

1161 

1162 Proxied for the :class:`_engine.Engine` class on 

1163 behalf of the :class:`_asyncio.AsyncEngine` class. 

1164 

1165 This applies **only** to the built-in cache that is established 

1166 via the :paramref:`_engine.create_engine.query_cache_size` parameter. 

1167 It will not impact any dictionary caches that were passed via the 

1168 :paramref:`.Connection.execution_options.compiled_cache` parameter. 

1169 

1170 .. versionadded:: 1.4 

1171 

1172 

1173 """ # noqa: E501 

1174 

1175 return self._proxied.clear_compiled_cache() 

1176 

1177 def update_execution_options(self, **opt: Any) -> None: 

1178 r"""Update the default execution_options dictionary 

1179 of this :class:`_engine.Engine`. 

1180 

1181 .. container:: class_bases 

1182 

1183 Proxied for the :class:`_engine.Engine` class on 

1184 behalf of the :class:`_asyncio.AsyncEngine` class. 

1185 

1186 The given keys/values in \**opt are added to the 

1187 default execution options that will be used for 

1188 all connections. The initial contents of this dictionary 

1189 can be sent via the ``execution_options`` parameter 

1190 to :func:`_sa.create_engine`. 

1191 

1192 .. seealso:: 

1193 

1194 :meth:`_engine.Connection.execution_options` 

1195 

1196 :meth:`_engine.Engine.execution_options` 

1197 

1198 

1199 """ # noqa: E501 

1200 

1201 return self._proxied.update_execution_options(**opt) 

1202 

1203 def get_execution_options(self) -> _ExecuteOptions: 

1204 r"""Get the non-SQL options which will take effect during execution. 

1205 

1206 .. container:: class_bases 

1207 

1208 Proxied for the :class:`_engine.Engine` class on 

1209 behalf of the :class:`_asyncio.AsyncEngine` class. 

1210 

1211 .. versionadded: 1.3 

1212 

1213 .. seealso:: 

1214 

1215 :meth:`_engine.Engine.execution_options` 

1216 

1217 """ # noqa: E501 

1218 

1219 return self._proxied.get_execution_options() 

1220 

1221 @property 

1222 def url(self) -> URL: 

1223 r"""Proxy for the :attr:`_engine.Engine.url` attribute 

1224 on behalf of the :class:`_asyncio.AsyncEngine` class. 

1225 

1226 """ # noqa: E501 

1227 

1228 return self._proxied.url 

1229 

1230 @url.setter 

1231 def url(self, attr: URL) -> None: 

1232 self._proxied.url = attr 

1233 

1234 @property 

1235 def pool(self) -> Pool: 

1236 r"""Proxy for the :attr:`_engine.Engine.pool` attribute 

1237 on behalf of the :class:`_asyncio.AsyncEngine` class. 

1238 

1239 """ # noqa: E501 

1240 

1241 return self._proxied.pool 

1242 

1243 @pool.setter 

1244 def pool(self, attr: Pool) -> None: 

1245 self._proxied.pool = attr 

1246 

1247 @property 

1248 def dialect(self) -> Dialect: 

1249 r"""Proxy for the :attr:`_engine.Engine.dialect` attribute 

1250 on behalf of the :class:`_asyncio.AsyncEngine` class. 

1251 

1252 """ # noqa: E501 

1253 

1254 return self._proxied.dialect 

1255 

1256 @dialect.setter 

1257 def dialect(self, attr: Dialect) -> None: 

1258 self._proxied.dialect = attr 

1259 

1260 @property 

1261 def engine(self) -> Any: 

1262 r"""Returns this :class:`.Engine`. 

1263 

1264 .. container:: class_bases 

1265 

1266 Proxied for the :class:`_engine.Engine` class 

1267 on behalf of the :class:`_asyncio.AsyncEngine` class. 

1268 

1269 Used for legacy schemes that accept :class:`.Connection` / 

1270 :class:`.Engine` objects within the same variable. 

1271 

1272 

1273 """ # noqa: E501 

1274 

1275 return self._proxied.engine 

1276 

1277 @property 

1278 def name(self) -> Any: 

1279 r"""String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

1280 in use by this :class:`Engine`. 

1281 

1282 .. container:: class_bases 

1283 

1284 Proxied for the :class:`_engine.Engine` class 

1285 on behalf of the :class:`_asyncio.AsyncEngine` class. 

1286 

1287 

1288 """ # noqa: E501 

1289 

1290 return self._proxied.name 

1291 

1292 @property 

1293 def driver(self) -> Any: 

1294 r"""Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 

1295 in use by this :class:`Engine`. 

1296 

1297 .. container:: class_bases 

1298 

1299 Proxied for the :class:`_engine.Engine` class 

1300 on behalf of the :class:`_asyncio.AsyncEngine` class. 

1301 

1302 

1303 """ # noqa: E501 

1304 

1305 return self._proxied.driver 

1306 

1307 @property 

1308 def echo(self) -> Any: 

1309 r"""When ``True``, enable log output for this element. 

1310 

1311 .. container:: class_bases 

1312 

1313 Proxied for the :class:`_engine.Engine` class 

1314 on behalf of the :class:`_asyncio.AsyncEngine` class. 

1315 

1316 This has the effect of setting the Python logging level for the namespace 

1317 of this element's class and object reference. A value of boolean ``True`` 

1318 indicates that the loglevel ``logging.INFO`` will be set for the logger, 

1319 whereas the string value ``debug`` will set the loglevel to 

1320 ``logging.DEBUG``. 

1321 

1322 """ # noqa: E501 

1323 

1324 return self._proxied.echo 

1325 

1326 @echo.setter 

1327 def echo(self, attr: Any) -> None: 

1328 self._proxied.echo = attr 

1329 

1330 # END PROXY METHODS AsyncEngine 

1331 

1332 

1333class AsyncTransaction( 

1334 ProxyComparable[Transaction], StartableContext["AsyncTransaction"] 

1335): 

1336 """An asyncio proxy for a :class:`_engine.Transaction`.""" 

1337 

1338 __slots__ = ("connection", "sync_transaction", "nested") 

1339 

1340 sync_transaction: Optional[Transaction] 

1341 connection: AsyncConnection 

1342 nested: bool 

1343 

1344 def __init__(self, connection: AsyncConnection, nested: bool = False): 

1345 self.connection = connection 

1346 self.sync_transaction = None 

1347 self.nested = nested 

1348 

1349 @classmethod 

1350 def _regenerate_proxy_for_target( 

1351 cls, target: Transaction, **additional_kw: Any # noqa: U100 

1352 ) -> AsyncTransaction: 

1353 sync_connection = target.connection 

1354 sync_transaction = target 

1355 nested = isinstance(target, NestedTransaction) 

1356 

1357 async_connection = AsyncConnection._retrieve_proxy_for_target( 

1358 sync_connection 

1359 ) 

1360 assert async_connection is not None 

1361 

1362 obj = cls.__new__(cls) 

1363 obj.connection = async_connection 

1364 obj.sync_transaction = obj._assign_proxied(sync_transaction) 

1365 obj.nested = nested 

1366 return obj 

1367 

1368 @util.ro_non_memoized_property 

1369 def _proxied(self) -> Transaction: 

1370 if not self.sync_transaction: 

1371 self._raise_for_not_started() 

1372 return self.sync_transaction 

1373 

1374 @property 

1375 def is_valid(self) -> bool: 

1376 return self._proxied.is_valid 

1377 

1378 @property 

1379 def is_active(self) -> bool: 

1380 return self._proxied.is_active 

1381 

1382 async def close(self) -> None: 

1383 """Close this :class:`.AsyncTransaction`. 

1384 

1385 If this transaction is the base transaction in a begin/commit 

1386 nesting, the transaction will rollback(). Otherwise, the 

1387 method returns. 

1388 

1389 This is used to cancel a Transaction without affecting the scope of 

1390 an enclosing transaction. 

1391 

1392 """ 

1393 await greenlet_spawn(self._proxied.close) 

1394 

1395 async def rollback(self) -> None: 

1396 """Roll back this :class:`.AsyncTransaction`.""" 

1397 await greenlet_spawn(self._proxied.rollback) 

1398 

1399 async def commit(self) -> None: 

1400 """Commit this :class:`.AsyncTransaction`.""" 

1401 

1402 await greenlet_spawn(self._proxied.commit) 

1403 

1404 async def start(self, is_ctxmanager: bool = False) -> AsyncTransaction: 

1405 """Start this :class:`_asyncio.AsyncTransaction` object's context 

1406 outside of using a Python ``with:`` block. 

1407 

1408 """ 

1409 

1410 self.sync_transaction = self._assign_proxied( 

1411 await greenlet_spawn( 

1412 self.connection._proxied.begin_nested 

1413 if self.nested 

1414 else self.connection._proxied.begin 

1415 ) 

1416 ) 

1417 if is_ctxmanager: 

1418 self.sync_transaction.__enter__() 

1419 return self 

1420 

1421 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: 

1422 await greenlet_spawn(self._proxied.__exit__, type_, value, traceback) 

1423 

1424 

1425@overload 

1426def _get_sync_engine_or_connection(async_engine: AsyncEngine) -> Engine: ... 

1427 

1428 

1429@overload 

1430def _get_sync_engine_or_connection( 

1431 async_engine: AsyncConnection, 

1432) -> Connection: ... 

1433 

1434 

1435def _get_sync_engine_or_connection( 

1436 async_engine: Union[AsyncEngine, AsyncConnection], 

1437) -> Union[Engine, Connection]: 

1438 if isinstance(async_engine, AsyncConnection): 

1439 return async_engine._proxied 

1440 

1441 try: 

1442 return async_engine.sync_engine 

1443 except AttributeError as e: 

1444 raise exc.ArgumentError( 

1445 "AsyncEngine expected, got %r" % async_engine 

1446 ) from e 

1447 

1448 

1449@inspection._inspects(AsyncConnection) 

1450def _no_insp_for_async_conn_yet( 

1451 subject: AsyncConnection, # noqa: U100 

1452) -> NoReturn: 

1453 raise exc.NoInspectionAvailable( 

1454 "Inspection on an AsyncConnection is currently not supported. " 

1455 "Please use ``run_sync`` to pass a callable where it's possible " 

1456 "to call ``inspect`` on the passed connection.", 

1457 code="xd3s", 

1458 ) 

1459 

1460 

1461@inspection._inspects(AsyncEngine) 

1462def _no_insp_for_async_engine_xyet( 

1463 subject: AsyncEngine, # noqa: U100 

1464) -> NoReturn: 

1465 raise exc.NoInspectionAvailable( 

1466 "Inspection on an AsyncEngine is currently not supported. " 

1467 "Please obtain a connection then use ``conn.run_sync`` to pass a " 

1468 "callable where it's possible to call ``inspect`` on the " 

1469 "passed connection.", 

1470 code="xd3s", 

1471 )