1# ext/asyncio/engine.py
2# Copyright (C) 2020-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7from __future__ import annotations
8
9import asyncio
10import contextlib
11from typing import Any
12from typing import AsyncIterator
13from typing import Callable
14from typing import Dict
15from typing import Generator
16from typing import NoReturn
17from typing import Optional
18from typing import overload
19from typing import Tuple
20from typing import Type
21from typing import TYPE_CHECKING
22from typing import TypeVar
23from typing import Union
24
25from . import exc as async_exc
26from .base import asyncstartablecontext
27from .base import GeneratorStartableContext
28from .base import ProxyComparable
29from .base import StartableContext
30from .result import _ensure_sync_result
31from .result import AsyncResult
32from .result import AsyncScalarResult
33from ... import exc
34from ... import inspection
35from ... import util
36from ...engine import Connection
37from ...engine import create_engine as _create_engine
38from ...engine import create_pool_from_url as _create_pool_from_url
39from ...engine import Engine
40from ...engine.base import NestedTransaction
41from ...engine.base import Transaction
42from ...exc import ArgumentError
43from ...util.concurrency import greenlet_spawn
44from ...util.typing import Concatenate
45from ...util.typing import ParamSpec
46
47if TYPE_CHECKING:
48 from ...engine.cursor import CursorResult
49 from ...engine.interfaces import _CoreAnyExecuteParams
50 from ...engine.interfaces import _CoreSingleExecuteParams
51 from ...engine.interfaces import _DBAPIAnyExecuteParams
52 from ...engine.interfaces import _ExecuteOptions
53 from ...engine.interfaces import CompiledCacheType
54 from ...engine.interfaces import CoreExecuteOptionsParameter
55 from ...engine.interfaces import Dialect
56 from ...engine.interfaces import IsolationLevel
57 from ...engine.interfaces import SchemaTranslateMapType
58 from ...engine.result import ScalarResult
59 from ...engine.url import URL
60 from ...pool import Pool
61 from ...pool import PoolProxiedConnection
62 from ...sql._typing import _InfoType
63 from ...sql.base import Executable
64 from ...sql.selectable import TypedReturnsRows
65
66_P = ParamSpec("_P")
67_T = TypeVar("_T", bound=Any)
68
69
70def create_async_engine(url: Union[str, URL], **kw: Any) -> AsyncEngine:
71 """Create a new async engine instance.
72
73 Arguments passed to :func:`_asyncio.create_async_engine` are mostly
74 identical to those passed to the :func:`_sa.create_engine` function.
75 The specified dialect must be an asyncio-compatible dialect
76 such as :ref:`dialect-postgresql-asyncpg`.
77
78 .. versionadded:: 1.4
79
80 :param async_creator: an async callable which returns a driver-level
81 asyncio connection. If given, the function should take no arguments,
82 and return a new asyncio connection from the underlying asyncio
83 database driver; the connection will be wrapped in the appropriate
84 structures to be used with the :class:`.AsyncEngine`. Note that the
85 parameters specified in the URL are not applied here, and the creator
86 function should use its own connection parameters.
87
88 This parameter is the asyncio equivalent of the
89 :paramref:`_sa.create_engine.creator` parameter of the
90 :func:`_sa.create_engine` function.
91
92 .. versionadded:: 2.0.16
93
94 """
95
96 if kw.get("server_side_cursors", False):
97 raise async_exc.AsyncMethodRequired(
98 "Can't set server_side_cursors for async engine globally; "
99 "use the connection.stream() method for an async "
100 "streaming result set"
101 )
102 kw["_is_async"] = True
103 async_creator = kw.pop("async_creator", None)
104 if async_creator:
105 if kw.get("creator", None):
106 raise ArgumentError(
107 "Can only specify one of 'async_creator' or 'creator', "
108 "not both."
109 )
110
111 def creator() -> Any:
112 # note that to send adapted arguments like
113 # prepared_statement_cache_size, user would use
114 # "creator" and emulate this form here
115 return sync_engine.dialect.dbapi.connect( # type: ignore
116 async_creator_fn=async_creator
117 )
118
119 kw["creator"] = creator
120 sync_engine = _create_engine(url, **kw)
121 return AsyncEngine(sync_engine)
122
123
124def async_engine_from_config(
125 configuration: Dict[str, Any], prefix: str = "sqlalchemy.", **kwargs: Any
126) -> AsyncEngine:
127 """Create a new AsyncEngine instance using a configuration dictionary.
128
129 This function is analogous to the :func:`_sa.engine_from_config` function
130 in SQLAlchemy Core, except that the requested dialect must be an
131 asyncio-compatible dialect such as :ref:`dialect-postgresql-asyncpg`.
132 The argument signature of the function is identical to that
133 of :func:`_sa.engine_from_config`.
134
135 .. versionadded:: 1.4.29
136
137 """
138 options = {
139 key[len(prefix) :]: value
140 for key, value in configuration.items()
141 if key.startswith(prefix)
142 }
143 options["_coerce_config"] = True
144 options.update(kwargs)
145 url = options.pop("url")
146 return create_async_engine(url, **options)
147
148
149def create_async_pool_from_url(url: Union[str, URL], **kwargs: Any) -> Pool:
150 """Create a new async engine instance.
151
152 Arguments passed to :func:`_asyncio.create_async_pool_from_url` are mostly
153 identical to those passed to the :func:`_sa.create_pool_from_url` function.
154 The specified dialect must be an asyncio-compatible dialect
155 such as :ref:`dialect-postgresql-asyncpg`.
156
157 .. versionadded:: 2.0.10
158
159 """
160 kwargs["_is_async"] = True
161 return _create_pool_from_url(url, **kwargs)
162
163
164class AsyncConnectable:
165 __slots__ = "_slots_dispatch", "__weakref__"
166
167 @classmethod
168 def _no_async_engine_events(cls) -> NoReturn:
169 raise NotImplementedError(
170 "asynchronous events are not implemented at this time. Apply "
171 "synchronous listeners to the AsyncEngine.sync_engine or "
172 "AsyncConnection.sync_connection attributes."
173 )
174
175
176@util.create_proxy_methods(
177 Connection,
178 ":class:`_engine.Connection`",
179 ":class:`_asyncio.AsyncConnection`",
180 classmethods=[],
181 methods=[],
182 attributes=[
183 "closed",
184 "invalidated",
185 "dialect",
186 "default_isolation_level",
187 ],
188)
189# "Class has incompatible disjoint bases" - no idea
190class AsyncConnection( # type:ignore[misc]
191 ProxyComparable[Connection],
192 StartableContext["AsyncConnection"],
193 AsyncConnectable,
194):
195 """An asyncio proxy for a :class:`_engine.Connection`.
196
197 :class:`_asyncio.AsyncConnection` is acquired using the
198 :meth:`_asyncio.AsyncEngine.connect`
199 method of :class:`_asyncio.AsyncEngine`::
200
201 from sqlalchemy.ext.asyncio import create_async_engine
202
203 engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
204
205 async with engine.connect() as conn:
206 result = await conn.execute(select(table))
207
208 .. versionadded:: 1.4
209
210 """ # noqa
211
212 # AsyncConnection is a thin proxy; no state should be added here
213 # that is not retrievable from the "sync" engine / connection, e.g.
214 # current transaction, info, etc. It should be possible to
215 # create a new AsyncConnection that matches this one given only the
216 # "sync" elements.
217 __slots__ = (
218 "engine",
219 "sync_engine",
220 "sync_connection",
221 )
222
223 def __init__(
224 self,
225 async_engine: AsyncEngine,
226 sync_connection: Optional[Connection] = None,
227 ):
228 self.engine = async_engine
229 self.sync_engine = async_engine.sync_engine
230 self.sync_connection = self._assign_proxied(sync_connection)
231
232 sync_connection: Optional[Connection]
233 """Reference to the sync-style :class:`_engine.Connection` this
234 :class:`_asyncio.AsyncConnection` proxies requests towards.
235
236 This instance can be used as an event target.
237
238 .. seealso::
239
240 :ref:`asyncio_events`
241
242 """
243
244 sync_engine: Engine
245 """Reference to the sync-style :class:`_engine.Engine` this
246 :class:`_asyncio.AsyncConnection` is associated with via its underlying
247 :class:`_engine.Connection`.
248
249 This instance can be used as an event target.
250
251 .. seealso::
252
253 :ref:`asyncio_events`
254
255 """
256
257 @classmethod
258 def _regenerate_proxy_for_target(
259 cls, target: Connection, **additional_kw: Any # noqa: U100
260 ) -> AsyncConnection:
261 return AsyncConnection(
262 AsyncEngine._retrieve_proxy_for_target(target.engine), target
263 )
264
265 async def start(
266 self, is_ctxmanager: bool = False # noqa: U100
267 ) -> AsyncConnection:
268 """Start this :class:`_asyncio.AsyncConnection` object's context
269 outside of using a Python ``with:`` block.
270
271 """
272 if self.sync_connection:
273 raise exc.InvalidRequestError("connection is already started")
274 self.sync_connection = self._assign_proxied(
275 await greenlet_spawn(self.sync_engine.connect)
276 )
277 return self
278
279 @property
280 def connection(self) -> NoReturn:
281 """Not implemented for async; call
282 :meth:`_asyncio.AsyncConnection.get_raw_connection`.
283 """
284 raise exc.InvalidRequestError(
285 "AsyncConnection.connection accessor is not implemented as the "
286 "attribute may need to reconnect on an invalidated connection. "
287 "Use the get_raw_connection() method."
288 )
289
290 async def get_raw_connection(self) -> PoolProxiedConnection:
291 """Return the pooled DBAPI-level connection in use by this
292 :class:`_asyncio.AsyncConnection`.
293
294 This is a SQLAlchemy connection-pool proxied connection
295 which then has the attribute
296 :attr:`_pool._ConnectionFairy.driver_connection` that refers to the
297 actual driver connection. Its
298 :attr:`_pool._ConnectionFairy.dbapi_connection` refers instead
299 to an :class:`_engine.AdaptedConnection` instance that
300 adapts the driver connection to the DBAPI protocol.
301
302 """
303
304 return await greenlet_spawn(getattr, self._proxied, "connection")
305
306 @util.ro_non_memoized_property
307 def info(self) -> _InfoType:
308 """Return the :attr:`_engine.Connection.info` dictionary of the
309 underlying :class:`_engine.Connection`.
310
311 This dictionary is freely writable for user-defined state to be
312 associated with the database connection.
313
314 This attribute is only available if the :class:`.AsyncConnection` is
315 currently connected. If the :attr:`.AsyncConnection.closed` attribute
316 is ``True``, then accessing this attribute will raise
317 :class:`.ResourceClosedError`.
318
319 .. versionadded:: 1.4.0b2
320
321 """
322 return self._proxied.info
323
324 @util.ro_non_memoized_property
325 def _proxied(self) -> Connection:
326 if not self.sync_connection:
327 self._raise_for_not_started()
328 return self.sync_connection
329
330 def begin(self) -> AsyncTransaction:
331 """Begin a transaction prior to autobegin occurring."""
332 assert self._proxied
333 return AsyncTransaction(self)
334
335 def begin_nested(self) -> AsyncTransaction:
336 """Begin a nested transaction and return a transaction handle."""
337 assert self._proxied
338 return AsyncTransaction(self, nested=True)
339
340 async def invalidate(
341 self, exception: Optional[BaseException] = None
342 ) -> None:
343 """Invalidate the underlying DBAPI connection associated with
344 this :class:`_engine.Connection`.
345
346 See the method :meth:`_engine.Connection.invalidate` for full
347 detail on this method.
348
349 """
350
351 return await greenlet_spawn(
352 self._proxied.invalidate, exception=exception
353 )
354
355 async def get_isolation_level(self) -> IsolationLevel:
356 return await greenlet_spawn(self._proxied.get_isolation_level)
357
358 def in_transaction(self) -> bool:
359 """Return True if a transaction is in progress."""
360
361 return self._proxied.in_transaction()
362
363 def in_nested_transaction(self) -> bool:
364 """Return True if a transaction is in progress.
365
366 .. versionadded:: 1.4.0b2
367
368 """
369 return self._proxied.in_nested_transaction()
370
371 def get_transaction(self) -> Optional[AsyncTransaction]:
372 """Return an :class:`.AsyncTransaction` representing the current
373 transaction, if any.
374
375 This makes use of the underlying synchronous connection's
376 :meth:`_engine.Connection.get_transaction` method to get the current
377 :class:`_engine.Transaction`, which is then proxied in a new
378 :class:`.AsyncTransaction` object.
379
380 .. versionadded:: 1.4.0b2
381
382 """
383
384 trans = self._proxied.get_transaction()
385 if trans is not None:
386 return AsyncTransaction._retrieve_proxy_for_target(trans)
387 else:
388 return None
389
390 def get_nested_transaction(self) -> Optional[AsyncTransaction]:
391 """Return an :class:`.AsyncTransaction` representing the current
392 nested (savepoint) transaction, if any.
393
394 This makes use of the underlying synchronous connection's
395 :meth:`_engine.Connection.get_nested_transaction` method to get the
396 current :class:`_engine.Transaction`, which is then proxied in a new
397 :class:`.AsyncTransaction` object.
398
399 .. versionadded:: 1.4.0b2
400
401 """
402
403 trans = self._proxied.get_nested_transaction()
404 if trans is not None:
405 return AsyncTransaction._retrieve_proxy_for_target(trans)
406 else:
407 return None
408
409 @overload
410 async def execution_options(
411 self,
412 *,
413 compiled_cache: Optional[CompiledCacheType] = ...,
414 logging_token: str = ...,
415 isolation_level: IsolationLevel = ...,
416 no_parameters: bool = False,
417 stream_results: bool = False,
418 max_row_buffer: int = ...,
419 yield_per: int = ...,
420 insertmanyvalues_page_size: int = ...,
421 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
422 preserve_rowcount: bool = False,
423 **opt: Any,
424 ) -> AsyncConnection: ...
425
426 @overload
427 async def execution_options(self, **opt: Any) -> AsyncConnection: ...
428
429 async def execution_options(self, **opt: Any) -> AsyncConnection:
430 r"""Set non-SQL options for the connection which take effect
431 during execution.
432
433 This returns this :class:`_asyncio.AsyncConnection` object with
434 the new options added.
435
436 See :meth:`_engine.Connection.execution_options` for full details
437 on this method.
438
439 """
440
441 conn = self._proxied
442 c2 = await greenlet_spawn(conn.execution_options, **opt)
443 assert c2 is conn
444 return self
445
446 async def commit(self) -> None:
447 """Commit the transaction that is currently in progress.
448
449 This method commits the current transaction if one has been started.
450 If no transaction was started, the method has no effect, assuming
451 the connection is in a non-invalidated state.
452
453 A transaction is begun on a :class:`_engine.Connection` automatically
454 whenever a statement is first executed, or when the
455 :meth:`_engine.Connection.begin` method is called.
456
457 """
458 await greenlet_spawn(self._proxied.commit)
459
460 async def rollback(self) -> None:
461 """Roll back the transaction that is currently in progress.
462
463 This method rolls back the current transaction if one has been started.
464 If no transaction was started, the method has no effect. If a
465 transaction was started and the connection is in an invalidated state,
466 the transaction is cleared using this method.
467
468 A transaction is begun on a :class:`_engine.Connection` automatically
469 whenever a statement is first executed, or when the
470 :meth:`_engine.Connection.begin` method is called.
471
472
473 """
474 await greenlet_spawn(self._proxied.rollback)
475
476 async def close(self) -> None:
477 """Close this :class:`_asyncio.AsyncConnection`.
478
479 This has the effect of also rolling back the transaction if one
480 is in place.
481
482 """
483 await greenlet_spawn(self._proxied.close)
484
485 async def aclose(self) -> None:
486 """A synonym for :meth:`_asyncio.AsyncConnection.close`.
487
488 The :meth:`_asyncio.AsyncConnection.aclose` name is specifically
489 to support the Python standard library ``@contextlib.aclosing``
490 context manager function.
491
492 .. versionadded:: 2.0.20
493
494 """
495 await self.close()
496
497 async def exec_driver_sql(
498 self,
499 statement: str,
500 parameters: Optional[_DBAPIAnyExecuteParams] = None,
501 execution_options: Optional[CoreExecuteOptionsParameter] = None,
502 ) -> CursorResult[Any]:
503 r"""Executes a driver-level SQL string and return buffered
504 :class:`_engine.Result`.
505
506 """
507
508 result = await greenlet_spawn(
509 self._proxied.exec_driver_sql,
510 statement,
511 parameters,
512 execution_options,
513 _require_await=True,
514 )
515
516 return await _ensure_sync_result(result, self.exec_driver_sql)
517
518 @overload
519 def stream(
520 self,
521 statement: TypedReturnsRows[_T],
522 parameters: Optional[_CoreAnyExecuteParams] = None,
523 *,
524 execution_options: Optional[CoreExecuteOptionsParameter] = None,
525 ) -> GeneratorStartableContext[AsyncResult[_T]]: ...
526
527 @overload
528 def stream(
529 self,
530 statement: Executable,
531 parameters: Optional[_CoreAnyExecuteParams] = None,
532 *,
533 execution_options: Optional[CoreExecuteOptionsParameter] = None,
534 ) -> GeneratorStartableContext[AsyncResult[Any]]: ...
535
536 @asyncstartablecontext
537 async def stream(
538 self,
539 statement: Executable,
540 parameters: Optional[_CoreAnyExecuteParams] = None,
541 *,
542 execution_options: Optional[CoreExecuteOptionsParameter] = None,
543 ) -> AsyncIterator[AsyncResult[Any]]:
544 """Execute a statement and return an awaitable yielding a
545 :class:`_asyncio.AsyncResult` object.
546
547 E.g.::
548
549 result = await conn.stream(stmt)
550 async for row in result:
551 print(f"{row}")
552
553 The :meth:`.AsyncConnection.stream`
554 method supports optional context manager use against the
555 :class:`.AsyncResult` object, as in::
556
557 async with conn.stream(stmt) as result:
558 async for row in result:
559 print(f"{row}")
560
561 In the above pattern, the :meth:`.AsyncResult.close` method is
562 invoked unconditionally, even if the iterator is interrupted by an
563 exception throw. Context manager use remains optional, however,
564 and the function may be called in either an ``async with fn():`` or
565 ``await fn()`` style.
566
567 .. versionadded:: 2.0.0b3 added context manager support
568
569
570 :return: an awaitable object that will yield an
571 :class:`_asyncio.AsyncResult` object.
572
573 .. seealso::
574
575 :meth:`.AsyncConnection.stream_scalars`
576
577 """
578 if not self.dialect.supports_server_side_cursors:
579 raise exc.InvalidRequestError(
580 "Cant use `stream` or `stream_scalars` with the current "
581 "dialect since it does not support server side cursors."
582 )
583
584 result = await greenlet_spawn(
585 self._proxied.execute,
586 statement,
587 parameters,
588 execution_options=util.EMPTY_DICT.merge_with(
589 execution_options, {"stream_results": True}
590 ),
591 _require_await=True,
592 )
593 assert result.context._is_server_side
594 ar = AsyncResult(result)
595 try:
596 yield ar
597 except GeneratorExit:
598 pass
599 else:
600 task = asyncio.create_task(ar.close())
601 await asyncio.shield(task)
602
603 @overload
604 async def execute(
605 self,
606 statement: TypedReturnsRows[_T],
607 parameters: Optional[_CoreAnyExecuteParams] = None,
608 *,
609 execution_options: Optional[CoreExecuteOptionsParameter] = None,
610 ) -> CursorResult[_T]: ...
611
612 @overload
613 async def execute(
614 self,
615 statement: Executable,
616 parameters: Optional[_CoreAnyExecuteParams] = None,
617 *,
618 execution_options: Optional[CoreExecuteOptionsParameter] = None,
619 ) -> CursorResult[Any]: ...
620
621 async def execute(
622 self,
623 statement: Executable,
624 parameters: Optional[_CoreAnyExecuteParams] = None,
625 *,
626 execution_options: Optional[CoreExecuteOptionsParameter] = None,
627 ) -> CursorResult[Any]:
628 r"""Executes a SQL statement construct and return a buffered
629 :class:`_engine.Result`.
630
631 :param object: The statement to be executed. This is always
632 an object that is in both the :class:`_expression.ClauseElement` and
633 :class:`_expression.Executable` hierarchies, including:
634
635 * :class:`_expression.Select`
636 * :class:`_expression.Insert`, :class:`_expression.Update`,
637 :class:`_expression.Delete`
638 * :class:`_expression.TextClause` and
639 :class:`_expression.TextualSelect`
640 * :class:`_schema.DDL` and objects which inherit from
641 :class:`_schema.ExecutableDDLElement`
642
643 :param parameters: parameters which will be bound into the statement.
644 This may be either a dictionary of parameter names to values,
645 or a mutable sequence (e.g. a list) of dictionaries. When a
646 list of dictionaries is passed, the underlying statement execution
647 will make use of the DBAPI ``cursor.executemany()`` method.
648 When a single dictionary is passed, the DBAPI ``cursor.execute()``
649 method will be used.
650
651 :param execution_options: optional dictionary of execution options,
652 which will be associated with the statement execution. This
653 dictionary can provide a subset of the options that are accepted
654 by :meth:`_engine.Connection.execution_options`.
655
656 :return: a :class:`_engine.Result` object.
657
658 """
659 result = await greenlet_spawn(
660 self._proxied.execute,
661 statement,
662 parameters,
663 execution_options=execution_options,
664 _require_await=True,
665 )
666 return await _ensure_sync_result(result, self.execute)
667
668 @overload
669 async def scalar(
670 self,
671 statement: TypedReturnsRows[Tuple[_T]],
672 parameters: Optional[_CoreSingleExecuteParams] = None,
673 *,
674 execution_options: Optional[CoreExecuteOptionsParameter] = None,
675 ) -> Optional[_T]: ...
676
677 @overload
678 async def scalar(
679 self,
680 statement: Executable,
681 parameters: Optional[_CoreSingleExecuteParams] = None,
682 *,
683 execution_options: Optional[CoreExecuteOptionsParameter] = None,
684 ) -> Any: ...
685
686 async def scalar(
687 self,
688 statement: Executable,
689 parameters: Optional[_CoreSingleExecuteParams] = None,
690 *,
691 execution_options: Optional[CoreExecuteOptionsParameter] = None,
692 ) -> Any:
693 r"""Executes a SQL statement construct and returns a scalar object.
694
695 This method is shorthand for invoking the
696 :meth:`_engine.Result.scalar` method after invoking the
697 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
698
699 :return: a scalar Python value representing the first column of the
700 first row returned.
701
702 """
703 result = await self.execute(
704 statement, parameters, execution_options=execution_options
705 )
706 return result.scalar()
707
708 @overload
709 async def scalars(
710 self,
711 statement: TypedReturnsRows[Tuple[_T]],
712 parameters: Optional[_CoreAnyExecuteParams] = None,
713 *,
714 execution_options: Optional[CoreExecuteOptionsParameter] = None,
715 ) -> ScalarResult[_T]: ...
716
717 @overload
718 async def scalars(
719 self,
720 statement: Executable,
721 parameters: Optional[_CoreAnyExecuteParams] = None,
722 *,
723 execution_options: Optional[CoreExecuteOptionsParameter] = None,
724 ) -> ScalarResult[Any]: ...
725
726 async def scalars(
727 self,
728 statement: Executable,
729 parameters: Optional[_CoreAnyExecuteParams] = None,
730 *,
731 execution_options: Optional[CoreExecuteOptionsParameter] = None,
732 ) -> ScalarResult[Any]:
733 r"""Executes a SQL statement construct and returns a scalar objects.
734
735 This method is shorthand for invoking the
736 :meth:`_engine.Result.scalars` method after invoking the
737 :meth:`_engine.Connection.execute` method. Parameters are equivalent.
738
739 :return: a :class:`_engine.ScalarResult` object.
740
741 .. versionadded:: 1.4.24
742
743 """
744 result = await self.execute(
745 statement, parameters, execution_options=execution_options
746 )
747 return result.scalars()
748
749 @overload
750 def stream_scalars(
751 self,
752 statement: TypedReturnsRows[Tuple[_T]],
753 parameters: Optional[_CoreSingleExecuteParams] = None,
754 *,
755 execution_options: Optional[CoreExecuteOptionsParameter] = None,
756 ) -> GeneratorStartableContext[AsyncScalarResult[_T]]: ...
757
758 @overload
759 def stream_scalars(
760 self,
761 statement: Executable,
762 parameters: Optional[_CoreSingleExecuteParams] = None,
763 *,
764 execution_options: Optional[CoreExecuteOptionsParameter] = None,
765 ) -> GeneratorStartableContext[AsyncScalarResult[Any]]: ...
766
767 @asyncstartablecontext
768 async def stream_scalars(
769 self,
770 statement: Executable,
771 parameters: Optional[_CoreSingleExecuteParams] = None,
772 *,
773 execution_options: Optional[CoreExecuteOptionsParameter] = None,
774 ) -> AsyncIterator[AsyncScalarResult[Any]]:
775 r"""Execute a statement and return an awaitable yielding a
776 :class:`_asyncio.AsyncScalarResult` object.
777
778 E.g.::
779
780 result = await conn.stream_scalars(stmt)
781 async for scalar in result:
782 print(f"{scalar}")
783
784 This method is shorthand for invoking the
785 :meth:`_engine.AsyncResult.scalars` method after invoking the
786 :meth:`_engine.Connection.stream` method. Parameters are equivalent.
787
788 The :meth:`.AsyncConnection.stream_scalars`
789 method supports optional context manager use against the
790 :class:`.AsyncScalarResult` object, as in::
791
792 async with conn.stream_scalars(stmt) as result:
793 async for scalar in result:
794 print(f"{scalar}")
795
796 In the above pattern, the :meth:`.AsyncScalarResult.close` method is
797 invoked unconditionally, even if the iterator is interrupted by an
798 exception throw. Context manager use remains optional, however,
799 and the function may be called in either an ``async with fn():`` or
800 ``await fn()`` style.
801
802 .. versionadded:: 2.0.0b3 added context manager support
803
804 :return: an awaitable object that will yield an
805 :class:`_asyncio.AsyncScalarResult` object.
806
807 .. versionadded:: 1.4.24
808
809 .. seealso::
810
811 :meth:`.AsyncConnection.stream`
812
813 """
814
815 async with self.stream(
816 statement, parameters, execution_options=execution_options
817 ) as result:
818 yield result.scalars()
819
820 async def run_sync(
821 self,
822 fn: Callable[Concatenate[Connection, _P], _T],
823 *arg: _P.args,
824 **kw: _P.kwargs,
825 ) -> _T:
826 '''Invoke the given synchronous (i.e. not async) callable,
827 passing a synchronous-style :class:`_engine.Connection` as the first
828 argument.
829
830 This method allows traditional synchronous SQLAlchemy functions to
831 run within the context of an asyncio application.
832
833 E.g.::
834
835 def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str:
836 """A synchronous function that does not require awaiting
837
838 :param conn: a Core SQLAlchemy Connection, used synchronously
839
840 :return: an optional return value is supported
841
842 """
843 conn.execute(some_table.insert().values(int_col=arg1, str_col=arg2))
844 return "success"
845
846
847 async def do_something_async(async_engine: AsyncEngine) -> None:
848 """an async function that uses awaiting"""
849
850 async with async_engine.begin() as async_conn:
851 # run do_something_with_core() with a sync-style
852 # Connection, proxied into an awaitable
853 return_code = await async_conn.run_sync(
854 do_something_with_core, 5, "strval"
855 )
856 print(return_code)
857
858 This method maintains the asyncio event loop all the way through
859 to the database connection by running the given callable in a
860 specially instrumented greenlet.
861
862 The most rudimentary use of :meth:`.AsyncConnection.run_sync` is to
863 invoke methods such as :meth:`_schema.MetaData.create_all`, given
864 an :class:`.AsyncConnection` that needs to be provided to
865 :meth:`_schema.MetaData.create_all` as a :class:`_engine.Connection`
866 object::
867
868 # run metadata.create_all(conn) with a sync-style Connection,
869 # proxied into an awaitable
870 with async_engine.begin() as conn:
871 await conn.run_sync(metadata.create_all)
872
873 .. note::
874
875 The provided callable is invoked inline within the asyncio event
876 loop, and will block on traditional IO calls. IO within this
877 callable should only call into SQLAlchemy's asyncio database
878 APIs which will be properly adapted to the greenlet context.
879
880 .. seealso::
881
882 :meth:`.AsyncSession.run_sync`
883
884 :ref:`session_run_sync`
885
886 ''' # noqa: E501
887
888 return await greenlet_spawn(
889 fn, self._proxied, *arg, _require_await=False, **kw
890 )
891
892 def __await__(self) -> Generator[Any, None, AsyncConnection]:
893 return self.start().__await__()
894
895 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
896 task = asyncio.create_task(self.close())
897 await asyncio.shield(task)
898
899 # START PROXY METHODS AsyncConnection
900
901 # code within this block is **programmatically,
902 # statically generated** by tools/generate_proxy_methods.py
903
904 @property
905 def closed(self) -> Any:
906 r"""Return True if this connection is closed.
907
908 .. container:: class_bases
909
910 Proxied for the :class:`_engine.Connection` class
911 on behalf of the :class:`_asyncio.AsyncConnection` class.
912
913 """ # noqa: E501
914
915 return self._proxied.closed
916
917 @property
918 def invalidated(self) -> Any:
919 r"""Return True if this connection was invalidated.
920
921 .. container:: class_bases
922
923 Proxied for the :class:`_engine.Connection` class
924 on behalf of the :class:`_asyncio.AsyncConnection` class.
925
926 This does not indicate whether or not the connection was
927 invalidated at the pool level, however
928
929
930 """ # noqa: E501
931
932 return self._proxied.invalidated
933
934 @property
935 def dialect(self) -> Dialect:
936 r"""Proxy for the :attr:`_engine.Connection.dialect` attribute
937 on behalf of the :class:`_asyncio.AsyncConnection` class.
938
939 """ # noqa: E501
940
941 return self._proxied.dialect
942
943 @dialect.setter
944 def dialect(self, attr: Dialect) -> None:
945 self._proxied.dialect = attr
946
947 @property
948 def default_isolation_level(self) -> Any:
949 r"""The initial-connection time isolation level associated with the
950 :class:`_engine.Dialect` in use.
951
952 .. container:: class_bases
953
954 Proxied for the :class:`_engine.Connection` class
955 on behalf of the :class:`_asyncio.AsyncConnection` class.
956
957 This value is independent of the
958 :paramref:`.Connection.execution_options.isolation_level` and
959 :paramref:`.Engine.execution_options.isolation_level` execution
960 options, and is determined by the :class:`_engine.Dialect` when the
961 first connection is created, by performing a SQL query against the
962 database for the current isolation level before any additional commands
963 have been emitted.
964
965 Calling this accessor does not invoke any new SQL queries.
966
967 .. seealso::
968
969 :meth:`_engine.Connection.get_isolation_level`
970 - view current actual isolation level
971
972 :paramref:`_sa.create_engine.isolation_level`
973 - set per :class:`_engine.Engine` isolation level
974
975 :paramref:`.Connection.execution_options.isolation_level`
976 - set per :class:`_engine.Connection` isolation level
977
978
979 """ # noqa: E501
980
981 return self._proxied.default_isolation_level
982
983 # END PROXY METHODS AsyncConnection
984
985
986@util.create_proxy_methods(
987 Engine,
988 ":class:`_engine.Engine`",
989 ":class:`_asyncio.AsyncEngine`",
990 classmethods=[],
991 methods=[
992 "clear_compiled_cache",
993 "update_execution_options",
994 "get_execution_options",
995 ],
996 attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"],
997)
998# "Class has incompatible disjoint bases" - no idea
999class AsyncEngine(ProxyComparable[Engine], AsyncConnectable): # type: ignore[misc] # noqa:E501
1000 """An asyncio proxy for a :class:`_engine.Engine`.
1001
1002 :class:`_asyncio.AsyncEngine` is acquired using the
1003 :func:`_asyncio.create_async_engine` function::
1004
1005 from sqlalchemy.ext.asyncio import create_async_engine
1006
1007 engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
1008
1009 .. versionadded:: 1.4
1010
1011 """ # noqa
1012
1013 # AsyncEngine is a thin proxy; no state should be added here
1014 # that is not retrievable from the "sync" engine / connection, e.g.
1015 # current transaction, info, etc. It should be possible to
1016 # create a new AsyncEngine that matches this one given only the
1017 # "sync" elements.
1018 __slots__ = "sync_engine"
1019
1020 _connection_cls: Type[AsyncConnection] = AsyncConnection
1021
1022 sync_engine: Engine
1023 """Reference to the sync-style :class:`_engine.Engine` this
1024 :class:`_asyncio.AsyncEngine` proxies requests towards.
1025
1026 This instance can be used as an event target.
1027
1028 .. seealso::
1029
1030 :ref:`asyncio_events`
1031 """
1032
1033 def __init__(self, sync_engine: Engine):
1034 if not sync_engine.dialect.is_async:
1035 raise exc.InvalidRequestError(
1036 "The asyncio extension requires an async driver to be used. "
1037 f"The loaded {sync_engine.dialect.driver!r} is not async."
1038 )
1039 self.sync_engine = self._assign_proxied(sync_engine)
1040
1041 @util.ro_non_memoized_property
1042 def _proxied(self) -> Engine:
1043 return self.sync_engine
1044
1045 @classmethod
1046 def _regenerate_proxy_for_target(
1047 cls, target: Engine, **additional_kw: Any # noqa: U100
1048 ) -> AsyncEngine:
1049 return AsyncEngine(target)
1050
1051 @contextlib.asynccontextmanager
1052 async def begin(self) -> AsyncIterator[AsyncConnection]:
1053 """Return a context manager which when entered will deliver an
1054 :class:`_asyncio.AsyncConnection` with an
1055 :class:`_asyncio.AsyncTransaction` established.
1056
1057 E.g.::
1058
1059 async with async_engine.begin() as conn:
1060 await conn.execute(
1061 text("insert into table (x, y, z) values (1, 2, 3)")
1062 )
1063 await conn.execute(text("my_special_procedure(5)"))
1064
1065 """
1066 conn = self.connect()
1067
1068 async with conn:
1069 async with conn.begin():
1070 yield conn
1071
1072 def connect(self) -> AsyncConnection:
1073 """Return an :class:`_asyncio.AsyncConnection` object.
1074
1075 The :class:`_asyncio.AsyncConnection` will procure a database
1076 connection from the underlying connection pool when it is entered
1077 as an async context manager::
1078
1079 async with async_engine.connect() as conn:
1080 result = await conn.execute(select(user_table))
1081
1082 The :class:`_asyncio.AsyncConnection` may also be started outside of a
1083 context manager by invoking its :meth:`_asyncio.AsyncConnection.start`
1084 method.
1085
1086 """
1087
1088 return self._connection_cls(self)
1089
1090 async def raw_connection(self) -> PoolProxiedConnection:
1091 """Return a "raw" DBAPI connection from the connection pool.
1092
1093 .. seealso::
1094
1095 :ref:`dbapi_connections`
1096
1097 """
1098 return await greenlet_spawn(self.sync_engine.raw_connection)
1099
1100 @overload
1101 def execution_options(
1102 self,
1103 *,
1104 compiled_cache: Optional[CompiledCacheType] = ...,
1105 logging_token: str = ...,
1106 isolation_level: IsolationLevel = ...,
1107 insertmanyvalues_page_size: int = ...,
1108 schema_translate_map: Optional[SchemaTranslateMapType] = ...,
1109 **opt: Any,
1110 ) -> AsyncEngine: ...
1111
1112 @overload
1113 def execution_options(self, **opt: Any) -> AsyncEngine: ...
1114
1115 def execution_options(self, **opt: Any) -> AsyncEngine:
1116 """Return a new :class:`_asyncio.AsyncEngine` that will provide
1117 :class:`_asyncio.AsyncConnection` objects with the given execution
1118 options.
1119
1120 Proxied from :meth:`_engine.Engine.execution_options`. See that
1121 method for details.
1122
1123 """
1124
1125 return AsyncEngine(self.sync_engine.execution_options(**opt))
1126
1127 async def dispose(self, close: bool = True) -> None:
1128 """Dispose of the connection pool used by this
1129 :class:`_asyncio.AsyncEngine`.
1130
1131 :param close: if left at its default of ``True``, has the
1132 effect of fully closing all **currently checked in**
1133 database connections. Connections that are still checked out
1134 will **not** be closed, however they will no longer be associated
1135 with this :class:`_engine.Engine`,
1136 so when they are closed individually, eventually the
1137 :class:`_pool.Pool` which they are associated with will
1138 be garbage collected and they will be closed out fully, if
1139 not already closed on checkin.
1140
1141 If set to ``False``, the previous connection pool is de-referenced,
1142 and otherwise not touched in any way.
1143
1144 .. seealso::
1145
1146 :meth:`_engine.Engine.dispose`
1147
1148 """
1149
1150 await greenlet_spawn(self.sync_engine.dispose, close=close)
1151
1152 # START PROXY METHODS AsyncEngine
1153
1154 # code within this block is **programmatically,
1155 # statically generated** by tools/generate_proxy_methods.py
1156
1157 def clear_compiled_cache(self) -> None:
1158 r"""Clear the compiled cache associated with the dialect.
1159
1160 .. container:: class_bases
1161
1162 Proxied for the :class:`_engine.Engine` class on
1163 behalf of the :class:`_asyncio.AsyncEngine` class.
1164
1165 This applies **only** to the built-in cache that is established
1166 via the :paramref:`_engine.create_engine.query_cache_size` parameter.
1167 It will not impact any dictionary caches that were passed via the
1168 :paramref:`.Connection.execution_options.compiled_cache` parameter.
1169
1170 .. versionadded:: 1.4
1171
1172
1173 """ # noqa: E501
1174
1175 return self._proxied.clear_compiled_cache()
1176
1177 def update_execution_options(self, **opt: Any) -> None:
1178 r"""Update the default execution_options dictionary
1179 of this :class:`_engine.Engine`.
1180
1181 .. container:: class_bases
1182
1183 Proxied for the :class:`_engine.Engine` class on
1184 behalf of the :class:`_asyncio.AsyncEngine` class.
1185
1186 The given keys/values in \**opt are added to the
1187 default execution options that will be used for
1188 all connections. The initial contents of this dictionary
1189 can be sent via the ``execution_options`` parameter
1190 to :func:`_sa.create_engine`.
1191
1192 .. seealso::
1193
1194 :meth:`_engine.Connection.execution_options`
1195
1196 :meth:`_engine.Engine.execution_options`
1197
1198
1199 """ # noqa: E501
1200
1201 return self._proxied.update_execution_options(**opt)
1202
1203 def get_execution_options(self) -> _ExecuteOptions:
1204 r"""Get the non-SQL options which will take effect during execution.
1205
1206 .. container:: class_bases
1207
1208 Proxied for the :class:`_engine.Engine` class on
1209 behalf of the :class:`_asyncio.AsyncEngine` class.
1210
1211 .. versionadded: 1.3
1212
1213 .. seealso::
1214
1215 :meth:`_engine.Engine.execution_options`
1216
1217 """ # noqa: E501
1218
1219 return self._proxied.get_execution_options()
1220
1221 @property
1222 def url(self) -> URL:
1223 r"""Proxy for the :attr:`_engine.Engine.url` attribute
1224 on behalf of the :class:`_asyncio.AsyncEngine` class.
1225
1226 """ # noqa: E501
1227
1228 return self._proxied.url
1229
1230 @url.setter
1231 def url(self, attr: URL) -> None:
1232 self._proxied.url = attr
1233
1234 @property
1235 def pool(self) -> Pool:
1236 r"""Proxy for the :attr:`_engine.Engine.pool` attribute
1237 on behalf of the :class:`_asyncio.AsyncEngine` class.
1238
1239 """ # noqa: E501
1240
1241 return self._proxied.pool
1242
1243 @pool.setter
1244 def pool(self, attr: Pool) -> None:
1245 self._proxied.pool = attr
1246
1247 @property
1248 def dialect(self) -> Dialect:
1249 r"""Proxy for the :attr:`_engine.Engine.dialect` attribute
1250 on behalf of the :class:`_asyncio.AsyncEngine` class.
1251
1252 """ # noqa: E501
1253
1254 return self._proxied.dialect
1255
1256 @dialect.setter
1257 def dialect(self, attr: Dialect) -> None:
1258 self._proxied.dialect = attr
1259
1260 @property
1261 def engine(self) -> Any:
1262 r"""Returns this :class:`.Engine`.
1263
1264 .. container:: class_bases
1265
1266 Proxied for the :class:`_engine.Engine` class
1267 on behalf of the :class:`_asyncio.AsyncEngine` class.
1268
1269 Used for legacy schemes that accept :class:`.Connection` /
1270 :class:`.Engine` objects within the same variable.
1271
1272
1273 """ # noqa: E501
1274
1275 return self._proxied.engine
1276
1277 @property
1278 def name(self) -> Any:
1279 r"""String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
1280 in use by this :class:`Engine`.
1281
1282 .. container:: class_bases
1283
1284 Proxied for the :class:`_engine.Engine` class
1285 on behalf of the :class:`_asyncio.AsyncEngine` class.
1286
1287
1288 """ # noqa: E501
1289
1290 return self._proxied.name
1291
1292 @property
1293 def driver(self) -> Any:
1294 r"""Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
1295 in use by this :class:`Engine`.
1296
1297 .. container:: class_bases
1298
1299 Proxied for the :class:`_engine.Engine` class
1300 on behalf of the :class:`_asyncio.AsyncEngine` class.
1301
1302
1303 """ # noqa: E501
1304
1305 return self._proxied.driver
1306
1307 @property
1308 def echo(self) -> Any:
1309 r"""When ``True``, enable log output for this element.
1310
1311 .. container:: class_bases
1312
1313 Proxied for the :class:`_engine.Engine` class
1314 on behalf of the :class:`_asyncio.AsyncEngine` class.
1315
1316 This has the effect of setting the Python logging level for the namespace
1317 of this element's class and object reference. A value of boolean ``True``
1318 indicates that the loglevel ``logging.INFO`` will be set for the logger,
1319 whereas the string value ``debug`` will set the loglevel to
1320 ``logging.DEBUG``.
1321
1322 """ # noqa: E501
1323
1324 return self._proxied.echo
1325
1326 @echo.setter
1327 def echo(self, attr: Any) -> None:
1328 self._proxied.echo = attr
1329
1330 # END PROXY METHODS AsyncEngine
1331
1332
1333class AsyncTransaction(
1334 ProxyComparable[Transaction], StartableContext["AsyncTransaction"]
1335):
1336 """An asyncio proxy for a :class:`_engine.Transaction`."""
1337
1338 __slots__ = ("connection", "sync_transaction", "nested")
1339
1340 sync_transaction: Optional[Transaction]
1341 connection: AsyncConnection
1342 nested: bool
1343
1344 def __init__(self, connection: AsyncConnection, nested: bool = False):
1345 self.connection = connection
1346 self.sync_transaction = None
1347 self.nested = nested
1348
1349 @classmethod
1350 def _regenerate_proxy_for_target(
1351 cls, target: Transaction, **additional_kw: Any # noqa: U100
1352 ) -> AsyncTransaction:
1353 sync_connection = target.connection
1354 sync_transaction = target
1355 nested = isinstance(target, NestedTransaction)
1356
1357 async_connection = AsyncConnection._retrieve_proxy_for_target(
1358 sync_connection
1359 )
1360 assert async_connection is not None
1361
1362 obj = cls.__new__(cls)
1363 obj.connection = async_connection
1364 obj.sync_transaction = obj._assign_proxied(sync_transaction)
1365 obj.nested = nested
1366 return obj
1367
1368 @util.ro_non_memoized_property
1369 def _proxied(self) -> Transaction:
1370 if not self.sync_transaction:
1371 self._raise_for_not_started()
1372 return self.sync_transaction
1373
1374 @property
1375 def is_valid(self) -> bool:
1376 return self._proxied.is_valid
1377
1378 @property
1379 def is_active(self) -> bool:
1380 return self._proxied.is_active
1381
1382 async def close(self) -> None:
1383 """Close this :class:`.AsyncTransaction`.
1384
1385 If this transaction is the base transaction in a begin/commit
1386 nesting, the transaction will rollback(). Otherwise, the
1387 method returns.
1388
1389 This is used to cancel a Transaction without affecting the scope of
1390 an enclosing transaction.
1391
1392 """
1393 await greenlet_spawn(self._proxied.close)
1394
1395 async def rollback(self) -> None:
1396 """Roll back this :class:`.AsyncTransaction`."""
1397 await greenlet_spawn(self._proxied.rollback)
1398
1399 async def commit(self) -> None:
1400 """Commit this :class:`.AsyncTransaction`."""
1401
1402 await greenlet_spawn(self._proxied.commit)
1403
1404 async def start(self, is_ctxmanager: bool = False) -> AsyncTransaction:
1405 """Start this :class:`_asyncio.AsyncTransaction` object's context
1406 outside of using a Python ``with:`` block.
1407
1408 """
1409
1410 self.sync_transaction = self._assign_proxied(
1411 await greenlet_spawn(
1412 self.connection._proxied.begin_nested
1413 if self.nested
1414 else self.connection._proxied.begin
1415 )
1416 )
1417 if is_ctxmanager:
1418 self.sync_transaction.__enter__()
1419 return self
1420
1421 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
1422 await greenlet_spawn(self._proxied.__exit__, type_, value, traceback)
1423
1424
1425@overload
1426def _get_sync_engine_or_connection(async_engine: AsyncEngine) -> Engine: ...
1427
1428
1429@overload
1430def _get_sync_engine_or_connection(
1431 async_engine: AsyncConnection,
1432) -> Connection: ...
1433
1434
1435def _get_sync_engine_or_connection(
1436 async_engine: Union[AsyncEngine, AsyncConnection],
1437) -> Union[Engine, Connection]:
1438 if isinstance(async_engine, AsyncConnection):
1439 return async_engine._proxied
1440
1441 try:
1442 return async_engine.sync_engine
1443 except AttributeError as e:
1444 raise exc.ArgumentError(
1445 "AsyncEngine expected, got %r" % async_engine
1446 ) from e
1447
1448
1449@inspection._inspects(AsyncConnection)
1450def _no_insp_for_async_conn_yet(
1451 subject: AsyncConnection, # noqa: U100
1452) -> NoReturn:
1453 raise exc.NoInspectionAvailable(
1454 "Inspection on an AsyncConnection is currently not supported. "
1455 "Please use ``run_sync`` to pass a callable where it's possible "
1456 "to call ``inspect`` on the passed connection.",
1457 code="xd3s",
1458 )
1459
1460
1461@inspection._inspects(AsyncEngine)
1462def _no_insp_for_async_engine_xyet(
1463 subject: AsyncEngine, # noqa: U100
1464) -> NoReturn:
1465 raise exc.NoInspectionAvailable(
1466 "Inspection on an AsyncEngine is currently not supported. "
1467 "Please obtain a connection then use ``conn.run_sync`` to pass a "
1468 "callable where it's possible to call ``inspect`` on the "
1469 "passed connection.",
1470 code="xd3s",
1471 )