1# ext/asyncio/session.py
2# Copyright (C) 2020-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7from __future__ import annotations
8
9import asyncio
10from typing import Any
11from typing import Awaitable
12from typing import Callable
13from typing import cast
14from typing import Dict
15from typing import Generic
16from typing import Iterable
17from typing import Iterator
18from typing import NoReturn
19from typing import Optional
20from typing import overload
21from typing import Sequence
22from typing import Tuple
23from typing import Type
24from typing import TYPE_CHECKING
25from typing import TypeVar
26from typing import Union
27
28from . import engine
29from .base import ReversibleProxy
30from .base import StartableContext
31from .result import _ensure_sync_result
32from .result import AsyncResult
33from .result import AsyncScalarResult
34from ... import util
35from ...orm import close_all_sessions as _sync_close_all_sessions
36from ...orm import object_session
37from ...orm import Session
38from ...orm import SessionTransaction
39from ...orm import state as _instance_state
40from ...util.concurrency import greenlet_spawn
41from ...util.typing import Concatenate
42from ...util.typing import ParamSpec
43
44
45if TYPE_CHECKING:
46 from .engine import AsyncConnection
47 from .engine import AsyncEngine
48 from ...engine import Connection
49 from ...engine import Engine
50 from ...engine import Result
51 from ...engine import Row
52 from ...engine import RowMapping
53 from ...engine import ScalarResult
54 from ...engine.interfaces import _CoreAnyExecuteParams
55 from ...engine.interfaces import CoreExecuteOptionsParameter
56 from ...event import dispatcher
57 from ...orm._typing import _IdentityKeyType
58 from ...orm._typing import _O
59 from ...orm._typing import OrmExecuteOptionsParameter
60 from ...orm.identity import IdentityMap
61 from ...orm.interfaces import ORMOption
62 from ...orm.session import _BindArguments
63 from ...orm.session import _EntityBindKey
64 from ...orm.session import _PKIdentityArgument
65 from ...orm.session import _SessionBind
66 from ...orm.session import _SessionBindKey
67 from ...sql._typing import _InfoType
68 from ...sql.base import Executable
69 from ...sql.elements import ClauseElement
70 from ...sql.selectable import ForUpdateParameter
71 from ...sql.selectable import TypedReturnsRows
72
73_AsyncSessionBind = Union["AsyncEngine", "AsyncConnection"]
74
75_P = ParamSpec("_P")
76_T = TypeVar("_T", bound=Any)
77
78
79_EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True})
80_STREAM_OPTIONS = util.immutabledict({"stream_results": True})
81
82
83class AsyncAttrs:
84 """Mixin class which provides an awaitable accessor for all attributes.
85
86 E.g.::
87
88 from __future__ import annotations
89
90 from typing import List
91
92 from sqlalchemy import ForeignKey
93 from sqlalchemy import func
94 from sqlalchemy.ext.asyncio import AsyncAttrs
95 from sqlalchemy.orm import DeclarativeBase
96 from sqlalchemy.orm import Mapped
97 from sqlalchemy.orm import mapped_column
98 from sqlalchemy.orm import relationship
99
100
101 class Base(AsyncAttrs, DeclarativeBase):
102 pass
103
104
105 class A(Base):
106 __tablename__ = "a"
107
108 id: Mapped[int] = mapped_column(primary_key=True)
109 data: Mapped[str]
110 bs: Mapped[List[B]] = relationship()
111
112
113 class B(Base):
114 __tablename__ = "b"
115 id: Mapped[int] = mapped_column(primary_key=True)
116 a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
117 data: Mapped[str]
118
119 In the above example, the :class:`_asyncio.AsyncAttrs` mixin is applied to
120 the declarative ``Base`` class where it takes effect for all subclasses.
121 This mixin adds a single new attribute
122 :attr:`_asyncio.AsyncAttrs.awaitable_attrs` to all classes, which will
123 yield the value of any attribute as an awaitable. This allows attributes
124 which may be subject to lazy loading or deferred / unexpiry loading to be
125 accessed such that IO can still be emitted::
126
127 a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
128
129 # use the lazy loader on ``a1.bs`` via the ``.awaitable_attrs``
130 # interface, so that it may be awaited
131 for b1 in await a1.awaitable_attrs.bs:
132 print(b1)
133
134 The :attr:`_asyncio.AsyncAttrs.awaitable_attrs` performs a call against the
135 attribute that is approximately equivalent to using the
136 :meth:`_asyncio.AsyncSession.run_sync` method, e.g.::
137
138 for b1 in await async_session.run_sync(lambda sess: a1.bs):
139 print(b1)
140
141 .. versionadded:: 2.0.13
142
143 .. seealso::
144
145 :ref:`asyncio_orm_avoid_lazyloads`
146
147 """
148
149 class _AsyncAttrGetitem:
150 __slots__ = "_instance"
151
152 def __init__(self, _instance: Any):
153 self._instance = _instance
154
155 def __getattr__(self, name: str) -> Awaitable[Any]:
156 return greenlet_spawn(getattr, self._instance, name)
157
158 @property
159 def awaitable_attrs(self) -> AsyncAttrs._AsyncAttrGetitem:
160 """provide a namespace of all attributes on this object wrapped
161 as awaitables.
162
163 e.g.::
164
165
166 a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
167
168 some_attribute = await a1.awaitable_attrs.some_deferred_attribute
169 some_collection = await a1.awaitable_attrs.some_collection
170
171 """ # noqa: E501
172
173 return AsyncAttrs._AsyncAttrGetitem(self)
174
175
176@util.create_proxy_methods(
177 Session,
178 ":class:`_orm.Session`",
179 ":class:`_asyncio.AsyncSession`",
180 classmethods=["object_session", "identity_key"],
181 methods=[
182 "__contains__",
183 "__iter__",
184 "add",
185 "add_all",
186 "expire",
187 "expire_all",
188 "expunge",
189 "expunge_all",
190 "is_modified",
191 "in_transaction",
192 "in_nested_transaction",
193 ],
194 attributes=[
195 "dirty",
196 "deleted",
197 "new",
198 "identity_map",
199 "is_active",
200 "autoflush",
201 "no_autoflush",
202 "info",
203 ],
204)
205class AsyncSession(ReversibleProxy[Session]):
206 """Asyncio version of :class:`_orm.Session`.
207
208 The :class:`_asyncio.AsyncSession` is a proxy for a traditional
209 :class:`_orm.Session` instance.
210
211 The :class:`_asyncio.AsyncSession` is **not safe for use in concurrent
212 tasks.**. See :ref:`session_faq_threadsafe` for background.
213
214 .. versionadded:: 1.4
215
216 To use an :class:`_asyncio.AsyncSession` with custom :class:`_orm.Session`
217 implementations, see the
218 :paramref:`_asyncio.AsyncSession.sync_session_class` parameter.
219
220
221 """
222
223 _is_asyncio = True
224
225 dispatch: dispatcher[Session]
226
227 def __init__(
228 self,
229 bind: Optional[_AsyncSessionBind] = None,
230 *,
231 binds: Optional[Dict[_SessionBindKey, _AsyncSessionBind]] = None,
232 sync_session_class: Optional[Type[Session]] = None,
233 **kw: Any,
234 ):
235 r"""Construct a new :class:`_asyncio.AsyncSession`.
236
237 All parameters other than ``sync_session_class`` are passed to the
238 ``sync_session_class`` callable directly to instantiate a new
239 :class:`_orm.Session`. Refer to :meth:`_orm.Session.__init__` for
240 parameter documentation.
241
242 :param sync_session_class:
243 A :class:`_orm.Session` subclass or other callable which will be used
244 to construct the :class:`_orm.Session` which will be proxied. This
245 parameter may be used to provide custom :class:`_orm.Session`
246 subclasses. Defaults to the
247 :attr:`_asyncio.AsyncSession.sync_session_class` class-level
248 attribute.
249
250 .. versionadded:: 1.4.24
251
252 """
253 sync_bind = sync_binds = None
254
255 if bind:
256 self.bind = bind
257 sync_bind = engine._get_sync_engine_or_connection(bind)
258
259 if binds:
260 self.binds = binds
261 sync_binds = {
262 key: engine._get_sync_engine_or_connection(b)
263 for key, b in binds.items()
264 }
265
266 if sync_session_class:
267 self.sync_session_class = sync_session_class
268
269 self.sync_session = self._proxied = self._assign_proxied(
270 self.sync_session_class(bind=sync_bind, binds=sync_binds, **kw)
271 )
272
273 sync_session_class: Type[Session] = Session
274 """The class or callable that provides the
275 underlying :class:`_orm.Session` instance for a particular
276 :class:`_asyncio.AsyncSession`.
277
278 At the class level, this attribute is the default value for the
279 :paramref:`_asyncio.AsyncSession.sync_session_class` parameter. Custom
280 subclasses of :class:`_asyncio.AsyncSession` can override this.
281
282 At the instance level, this attribute indicates the current class or
283 callable that was used to provide the :class:`_orm.Session` instance for
284 this :class:`_asyncio.AsyncSession` instance.
285
286 .. versionadded:: 1.4.24
287
288 """
289
290 sync_session: Session
291 """Reference to the underlying :class:`_orm.Session` this
292 :class:`_asyncio.AsyncSession` proxies requests towards.
293
294 This instance can be used as an event target.
295
296 .. seealso::
297
298 :ref:`asyncio_events`
299
300 """
301
302 @classmethod
303 def _no_async_engine_events(cls) -> NoReturn:
304 raise NotImplementedError(
305 "asynchronous events are not implemented at this time. Apply "
306 "synchronous listeners to the AsyncSession.sync_session."
307 )
308
309 async def refresh(
310 self,
311 instance: object,
312 attribute_names: Optional[Iterable[str]] = None,
313 with_for_update: ForUpdateParameter = None,
314 ) -> None:
315 """Expire and refresh the attributes on the given instance.
316
317 A query will be issued to the database and all attributes will be
318 refreshed with their current database value.
319
320 This is the async version of the :meth:`_orm.Session.refresh` method.
321 See that method for a complete description of all options.
322
323 .. seealso::
324
325 :meth:`_orm.Session.refresh` - main documentation for refresh
326
327 """
328
329 await greenlet_spawn(
330 self.sync_session.refresh,
331 instance,
332 attribute_names=attribute_names,
333 with_for_update=with_for_update,
334 )
335
336 async def run_sync(
337 self,
338 fn: Callable[Concatenate[Session, _P], _T],
339 *arg: _P.args,
340 **kw: _P.kwargs,
341 ) -> _T:
342 '''Invoke the given synchronous (i.e. not async) callable,
343 passing a synchronous-style :class:`_orm.Session` as the first
344 argument.
345
346 This method allows traditional synchronous SQLAlchemy functions to
347 run within the context of an asyncio application.
348
349 E.g.::
350
351 def some_business_method(session: Session, param: str) -> str:
352 """A synchronous function that does not require awaiting
353
354 :param session: a SQLAlchemy Session, used synchronously
355
356 :return: an optional return value is supported
357
358 """
359 session.add(MyObject(param=param))
360 session.flush()
361 return "success"
362
363
364 async def do_something_async(async_engine: AsyncEngine) -> None:
365 """an async function that uses awaiting"""
366
367 with AsyncSession(async_engine) as async_session:
368 # run some_business_method() with a sync-style
369 # Session, proxied into an awaitable
370 return_code = await async_session.run_sync(
371 some_business_method, param="param1"
372 )
373 print(return_code)
374
375 This method maintains the asyncio event loop all the way through
376 to the database connection by running the given callable in a
377 specially instrumented greenlet.
378
379 .. tip::
380
381 The provided callable is invoked inline within the asyncio event
382 loop, and will block on traditional IO calls. IO within this
383 callable should only call into SQLAlchemy's asyncio database
384 APIs which will be properly adapted to the greenlet context.
385
386 .. seealso::
387
388 :class:`.AsyncAttrs` - a mixin for ORM mapped classes that provides
389 a similar feature more succinctly on a per-attribute basis
390
391 :meth:`.AsyncConnection.run_sync`
392
393 :ref:`session_run_sync`
394 ''' # noqa: E501
395
396 return await greenlet_spawn(
397 fn, self.sync_session, *arg, _require_await=False, **kw
398 )
399
400 @overload
401 async def execute(
402 self,
403 statement: TypedReturnsRows[_T],
404 params: Optional[_CoreAnyExecuteParams] = None,
405 *,
406 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
407 bind_arguments: Optional[_BindArguments] = None,
408 _parent_execute_state: Optional[Any] = None,
409 _add_event: Optional[Any] = None,
410 ) -> Result[_T]: ...
411
412 @overload
413 async def execute(
414 self,
415 statement: Executable,
416 params: Optional[_CoreAnyExecuteParams] = None,
417 *,
418 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
419 bind_arguments: Optional[_BindArguments] = None,
420 _parent_execute_state: Optional[Any] = None,
421 _add_event: Optional[Any] = None,
422 ) -> Result[Any]: ...
423
424 async def execute(
425 self,
426 statement: Executable,
427 params: Optional[_CoreAnyExecuteParams] = None,
428 *,
429 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
430 bind_arguments: Optional[_BindArguments] = None,
431 **kw: Any,
432 ) -> Result[Any]:
433 """Execute a statement and return a buffered
434 :class:`_engine.Result` object.
435
436 .. seealso::
437
438 :meth:`_orm.Session.execute` - main documentation for execute
439
440 """
441
442 if execution_options:
443 execution_options = util.immutabledict(execution_options).union(
444 _EXECUTE_OPTIONS
445 )
446 else:
447 execution_options = _EXECUTE_OPTIONS
448
449 result = await greenlet_spawn(
450 self.sync_session.execute,
451 statement,
452 params=params,
453 execution_options=execution_options,
454 bind_arguments=bind_arguments,
455 **kw,
456 )
457 return await _ensure_sync_result(result, self.execute)
458
459 @overload
460 async def scalar(
461 self,
462 statement: TypedReturnsRows[Tuple[_T]],
463 params: Optional[_CoreAnyExecuteParams] = None,
464 *,
465 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
466 bind_arguments: Optional[_BindArguments] = None,
467 **kw: Any,
468 ) -> Optional[_T]: ...
469
470 @overload
471 async def scalar(
472 self,
473 statement: Executable,
474 params: Optional[_CoreAnyExecuteParams] = None,
475 *,
476 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
477 bind_arguments: Optional[_BindArguments] = None,
478 **kw: Any,
479 ) -> Any: ...
480
481 async def scalar(
482 self,
483 statement: Executable,
484 params: Optional[_CoreAnyExecuteParams] = None,
485 *,
486 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
487 bind_arguments: Optional[_BindArguments] = None,
488 **kw: Any,
489 ) -> Any:
490 """Execute a statement and return a scalar result.
491
492 .. seealso::
493
494 :meth:`_orm.Session.scalar` - main documentation for scalar
495
496 """
497
498 if execution_options:
499 execution_options = util.immutabledict(execution_options).union(
500 _EXECUTE_OPTIONS
501 )
502 else:
503 execution_options = _EXECUTE_OPTIONS
504
505 return await greenlet_spawn(
506 self.sync_session.scalar,
507 statement,
508 params=params,
509 execution_options=execution_options,
510 bind_arguments=bind_arguments,
511 **kw,
512 )
513
514 @overload
515 async def scalars(
516 self,
517 statement: TypedReturnsRows[Tuple[_T]],
518 params: Optional[_CoreAnyExecuteParams] = None,
519 *,
520 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
521 bind_arguments: Optional[_BindArguments] = None,
522 **kw: Any,
523 ) -> ScalarResult[_T]: ...
524
525 @overload
526 async def scalars(
527 self,
528 statement: Executable,
529 params: Optional[_CoreAnyExecuteParams] = None,
530 *,
531 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
532 bind_arguments: Optional[_BindArguments] = None,
533 **kw: Any,
534 ) -> ScalarResult[Any]: ...
535
536 async def scalars(
537 self,
538 statement: Executable,
539 params: Optional[_CoreAnyExecuteParams] = None,
540 *,
541 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
542 bind_arguments: Optional[_BindArguments] = None,
543 **kw: Any,
544 ) -> ScalarResult[Any]:
545 """Execute a statement and return scalar results.
546
547 :return: a :class:`_result.ScalarResult` object
548
549 .. versionadded:: 1.4.24 Added :meth:`_asyncio.AsyncSession.scalars`
550
551 .. versionadded:: 1.4.26 Added
552 :meth:`_asyncio.async_scoped_session.scalars`
553
554 .. seealso::
555
556 :meth:`_orm.Session.scalars` - main documentation for scalars
557
558 :meth:`_asyncio.AsyncSession.stream_scalars` - streaming version
559
560 """
561
562 result = await self.execute(
563 statement,
564 params=params,
565 execution_options=execution_options,
566 bind_arguments=bind_arguments,
567 **kw,
568 )
569 return result.scalars()
570
571 async def get(
572 self,
573 entity: _EntityBindKey[_O],
574 ident: _PKIdentityArgument,
575 *,
576 options: Optional[Sequence[ORMOption]] = None,
577 populate_existing: bool = False,
578 with_for_update: ForUpdateParameter = None,
579 identity_token: Optional[Any] = None,
580 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
581 ) -> Union[_O, None]:
582 """Return an instance based on the given primary key identifier,
583 or ``None`` if not found.
584
585 .. seealso::
586
587 :meth:`_orm.Session.get` - main documentation for get
588
589
590 """
591
592 return await greenlet_spawn(
593 cast("Callable[..., _O]", self.sync_session.get),
594 entity,
595 ident,
596 options=options,
597 populate_existing=populate_existing,
598 with_for_update=with_for_update,
599 identity_token=identity_token,
600 execution_options=execution_options,
601 )
602
603 async def get_one(
604 self,
605 entity: _EntityBindKey[_O],
606 ident: _PKIdentityArgument,
607 *,
608 options: Optional[Sequence[ORMOption]] = None,
609 populate_existing: bool = False,
610 with_for_update: ForUpdateParameter = None,
611 identity_token: Optional[Any] = None,
612 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
613 ) -> _O:
614 """Return an instance based on the given primary key identifier,
615 or raise an exception if not found.
616
617 Raises :class:`_exc.NoResultFound` if the query selects no rows.
618
619 ..versionadded: 2.0.22
620
621 .. seealso::
622
623 :meth:`_orm.Session.get_one` - main documentation for get_one
624
625 """
626
627 return await greenlet_spawn(
628 cast("Callable[..., _O]", self.sync_session.get_one),
629 entity,
630 ident,
631 options=options,
632 populate_existing=populate_existing,
633 with_for_update=with_for_update,
634 identity_token=identity_token,
635 execution_options=execution_options,
636 )
637
638 @overload
639 async def stream(
640 self,
641 statement: TypedReturnsRows[_T],
642 params: Optional[_CoreAnyExecuteParams] = None,
643 *,
644 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
645 bind_arguments: Optional[_BindArguments] = None,
646 **kw: Any,
647 ) -> AsyncResult[_T]: ...
648
649 @overload
650 async def stream(
651 self,
652 statement: Executable,
653 params: Optional[_CoreAnyExecuteParams] = None,
654 *,
655 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
656 bind_arguments: Optional[_BindArguments] = None,
657 **kw: Any,
658 ) -> AsyncResult[Any]: ...
659
660 async def stream(
661 self,
662 statement: Executable,
663 params: Optional[_CoreAnyExecuteParams] = None,
664 *,
665 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
666 bind_arguments: Optional[_BindArguments] = None,
667 **kw: Any,
668 ) -> AsyncResult[Any]:
669 """Execute a statement and return a streaming
670 :class:`_asyncio.AsyncResult` object.
671
672 """
673
674 if execution_options:
675 execution_options = util.immutabledict(execution_options).union(
676 _STREAM_OPTIONS
677 )
678 else:
679 execution_options = _STREAM_OPTIONS
680
681 result = await greenlet_spawn(
682 self.sync_session.execute,
683 statement,
684 params=params,
685 execution_options=execution_options,
686 bind_arguments=bind_arguments,
687 **kw,
688 )
689 return AsyncResult(result)
690
691 @overload
692 async def stream_scalars(
693 self,
694 statement: TypedReturnsRows[Tuple[_T]],
695 params: Optional[_CoreAnyExecuteParams] = None,
696 *,
697 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
698 bind_arguments: Optional[_BindArguments] = None,
699 **kw: Any,
700 ) -> AsyncScalarResult[_T]: ...
701
702 @overload
703 async def stream_scalars(
704 self,
705 statement: Executable,
706 params: Optional[_CoreAnyExecuteParams] = None,
707 *,
708 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
709 bind_arguments: Optional[_BindArguments] = None,
710 **kw: Any,
711 ) -> AsyncScalarResult[Any]: ...
712
713 async def stream_scalars(
714 self,
715 statement: Executable,
716 params: Optional[_CoreAnyExecuteParams] = None,
717 *,
718 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
719 bind_arguments: Optional[_BindArguments] = None,
720 **kw: Any,
721 ) -> AsyncScalarResult[Any]:
722 """Execute a statement and return a stream of scalar results.
723
724 :return: an :class:`_asyncio.AsyncScalarResult` object
725
726 .. versionadded:: 1.4.24
727
728 .. seealso::
729
730 :meth:`_orm.Session.scalars` - main documentation for scalars
731
732 :meth:`_asyncio.AsyncSession.scalars` - non streaming version
733
734 """
735
736 result = await self.stream(
737 statement,
738 params=params,
739 execution_options=execution_options,
740 bind_arguments=bind_arguments,
741 **kw,
742 )
743 return result.scalars()
744
745 async def delete(self, instance: object) -> None:
746 """Mark an instance as deleted.
747
748 The database delete operation occurs upon ``flush()``.
749
750 As this operation may need to cascade along unloaded relationships,
751 it is awaitable to allow for those queries to take place.
752
753 .. seealso::
754
755 :meth:`_orm.Session.delete` - main documentation for delete
756
757 """
758 await greenlet_spawn(self.sync_session.delete, instance)
759
760 async def merge(
761 self,
762 instance: _O,
763 *,
764 load: bool = True,
765 options: Optional[Sequence[ORMOption]] = None,
766 ) -> _O:
767 """Copy the state of a given instance into a corresponding instance
768 within this :class:`_asyncio.AsyncSession`.
769
770 .. seealso::
771
772 :meth:`_orm.Session.merge` - main documentation for merge
773
774 """
775 return await greenlet_spawn(
776 self.sync_session.merge, instance, load=load, options=options
777 )
778
779 async def flush(self, objects: Optional[Sequence[Any]] = None) -> None:
780 """Flush all the object changes to the database.
781
782 .. seealso::
783
784 :meth:`_orm.Session.flush` - main documentation for flush
785
786 """
787 await greenlet_spawn(self.sync_session.flush, objects=objects)
788
789 def get_transaction(self) -> Optional[AsyncSessionTransaction]:
790 """Return the current root transaction in progress, if any.
791
792 :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
793 ``None``.
794
795 .. versionadded:: 1.4.18
796
797 """
798 trans = self.sync_session.get_transaction()
799 if trans is not None:
800 return AsyncSessionTransaction._retrieve_proxy_for_target(
801 trans, async_session=self
802 )
803 else:
804 return None
805
806 def get_nested_transaction(self) -> Optional[AsyncSessionTransaction]:
807 """Return the current nested transaction in progress, if any.
808
809 :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
810 ``None``.
811
812 .. versionadded:: 1.4.18
813
814 """
815
816 trans = self.sync_session.get_nested_transaction()
817 if trans is not None:
818 return AsyncSessionTransaction._retrieve_proxy_for_target(
819 trans, async_session=self
820 )
821 else:
822 return None
823
824 def get_bind(
825 self,
826 mapper: Optional[_EntityBindKey[_O]] = None,
827 clause: Optional[ClauseElement] = None,
828 bind: Optional[_SessionBind] = None,
829 **kw: Any,
830 ) -> Union[Engine, Connection]:
831 """Return a "bind" to which the synchronous proxied :class:`_orm.Session`
832 is bound.
833
834 Unlike the :meth:`_orm.Session.get_bind` method, this method is
835 currently **not** used by this :class:`.AsyncSession` in any way
836 in order to resolve engines for requests.
837
838 .. note::
839
840 This method proxies directly to the :meth:`_orm.Session.get_bind`
841 method, however is currently **not** useful as an override target,
842 in contrast to that of the :meth:`_orm.Session.get_bind` method.
843 The example below illustrates how to implement custom
844 :meth:`_orm.Session.get_bind` schemes that work with
845 :class:`.AsyncSession` and :class:`.AsyncEngine`.
846
847 The pattern introduced at :ref:`session_custom_partitioning`
848 illustrates how to apply a custom bind-lookup scheme to a
849 :class:`_orm.Session` given a set of :class:`_engine.Engine` objects.
850 To apply a corresponding :meth:`_orm.Session.get_bind` implementation
851 for use with a :class:`.AsyncSession` and :class:`.AsyncEngine`
852 objects, continue to subclass :class:`_orm.Session` and apply it to
853 :class:`.AsyncSession` using
854 :paramref:`.AsyncSession.sync_session_class`. The inner method must
855 continue to return :class:`_engine.Engine` instances, which can be
856 acquired from a :class:`_asyncio.AsyncEngine` using the
857 :attr:`_asyncio.AsyncEngine.sync_engine` attribute::
858
859 # using example from "Custom Vertical Partitioning"
860
861
862 import random
863
864 from sqlalchemy.ext.asyncio import AsyncSession
865 from sqlalchemy.ext.asyncio import create_async_engine
866 from sqlalchemy.ext.asyncio import async_sessionmaker
867 from sqlalchemy.orm import Session
868
869 # construct async engines w/ async drivers
870 engines = {
871 "leader": create_async_engine("sqlite+aiosqlite:///leader.db"),
872 "other": create_async_engine("sqlite+aiosqlite:///other.db"),
873 "follower1": create_async_engine("sqlite+aiosqlite:///follower1.db"),
874 "follower2": create_async_engine("sqlite+aiosqlite:///follower2.db"),
875 }
876
877
878 class RoutingSession(Session):
879 def get_bind(self, mapper=None, clause=None, **kw):
880 # within get_bind(), return sync engines
881 if mapper and issubclass(mapper.class_, MyOtherClass):
882 return engines["other"].sync_engine
883 elif self._flushing or isinstance(clause, (Update, Delete)):
884 return engines["leader"].sync_engine
885 else:
886 return engines[
887 random.choice(["follower1", "follower2"])
888 ].sync_engine
889
890
891 # apply to AsyncSession using sync_session_class
892 AsyncSessionMaker = async_sessionmaker(sync_session_class=RoutingSession)
893
894 The :meth:`_orm.Session.get_bind` method is called in a non-asyncio,
895 implicitly non-blocking context in the same manner as ORM event hooks
896 and functions that are invoked via :meth:`.AsyncSession.run_sync`, so
897 routines that wish to run SQL commands inside of
898 :meth:`_orm.Session.get_bind` can continue to do so using
899 blocking-style code, which will be translated to implicitly async calls
900 at the point of invoking IO on the database drivers.
901
902 """ # noqa: E501
903
904 return self.sync_session.get_bind(
905 mapper=mapper, clause=clause, bind=bind, **kw
906 )
907
908 async def connection(
909 self,
910 bind_arguments: Optional[_BindArguments] = None,
911 execution_options: Optional[CoreExecuteOptionsParameter] = None,
912 **kw: Any,
913 ) -> AsyncConnection:
914 r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
915 this :class:`.Session` object's transactional state.
916
917 This method may also be used to establish execution options for the
918 database connection used by the current transaction.
919
920 .. versionadded:: 1.4.24 Added \**kw arguments which are passed
921 through to the underlying :meth:`_orm.Session.connection` method.
922
923 .. seealso::
924
925 :meth:`_orm.Session.connection` - main documentation for
926 "connection"
927
928 """
929
930 sync_connection = await greenlet_spawn(
931 self.sync_session.connection,
932 bind_arguments=bind_arguments,
933 execution_options=execution_options,
934 **kw,
935 )
936 return engine.AsyncConnection._retrieve_proxy_for_target(
937 sync_connection
938 )
939
940 def begin(self) -> AsyncSessionTransaction:
941 """Return an :class:`_asyncio.AsyncSessionTransaction` object.
942
943 The underlying :class:`_orm.Session` will perform the
944 "begin" action when the :class:`_asyncio.AsyncSessionTransaction`
945 object is entered::
946
947 async with async_session.begin():
948 ... # ORM transaction is begun
949
950 Note that database IO will not normally occur when the session-level
951 transaction is begun, as database transactions begin on an
952 on-demand basis. However, the begin block is async to accommodate
953 for a :meth:`_orm.SessionEvents.after_transaction_create`
954 event hook that may perform IO.
955
956 For a general description of ORM begin, see
957 :meth:`_orm.Session.begin`.
958
959 """
960
961 return AsyncSessionTransaction(self)
962
963 def begin_nested(self) -> AsyncSessionTransaction:
964 """Return an :class:`_asyncio.AsyncSessionTransaction` object
965 which will begin a "nested" transaction, e.g. SAVEPOINT.
966
967 Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`.
968
969 For a general description of ORM begin nested, see
970 :meth:`_orm.Session.begin_nested`.
971
972 .. seealso::
973
974 :ref:`aiosqlite_serializable` - special workarounds required
975 with the SQLite asyncio driver in order for SAVEPOINT to work
976 correctly.
977
978 """
979
980 return AsyncSessionTransaction(self, nested=True)
981
982 async def rollback(self) -> None:
983 """Rollback the current transaction in progress.
984
985 .. seealso::
986
987 :meth:`_orm.Session.rollback` - main documentation for
988 "rollback"
989 """
990 await greenlet_spawn(self.sync_session.rollback)
991
992 async def commit(self) -> None:
993 """Commit the current transaction in progress.
994
995 .. seealso::
996
997 :meth:`_orm.Session.commit` - main documentation for
998 "commit"
999 """
1000 await greenlet_spawn(self.sync_session.commit)
1001
1002 async def close(self) -> None:
1003 """Close out the transactional resources and ORM objects used by this
1004 :class:`_asyncio.AsyncSession`.
1005
1006 .. seealso::
1007
1008 :meth:`_orm.Session.close` - main documentation for
1009 "close"
1010
1011 :ref:`session_closing` - detail on the semantics of
1012 :meth:`_asyncio.AsyncSession.close` and
1013 :meth:`_asyncio.AsyncSession.reset`.
1014
1015 """
1016 await greenlet_spawn(self.sync_session.close)
1017
1018 async def reset(self) -> None:
1019 """Close out the transactional resources and ORM objects used by this
1020 :class:`_orm.Session`, resetting the session to its initial state.
1021
1022 .. versionadded:: 2.0.22
1023
1024 .. seealso::
1025
1026 :meth:`_orm.Session.reset` - main documentation for
1027 "reset"
1028
1029 :ref:`session_closing` - detail on the semantics of
1030 :meth:`_asyncio.AsyncSession.close` and
1031 :meth:`_asyncio.AsyncSession.reset`.
1032
1033 """
1034 await greenlet_spawn(self.sync_session.reset)
1035
1036 async def aclose(self) -> None:
1037 """A synonym for :meth:`_asyncio.AsyncSession.close`.
1038
1039 The :meth:`_asyncio.AsyncSession.aclose` name is specifically
1040 to support the Python standard library ``@contextlib.aclosing``
1041 context manager function.
1042
1043 .. versionadded:: 2.0.20
1044
1045 """
1046 await self.close()
1047
1048 async def invalidate(self) -> None:
1049 """Close this Session, using connection invalidation.
1050
1051 For a complete description, see :meth:`_orm.Session.invalidate`.
1052 """
1053 await greenlet_spawn(self.sync_session.invalidate)
1054
1055 @classmethod
1056 @util.deprecated(
1057 "2.0",
1058 "The :meth:`.AsyncSession.close_all` method is deprecated and will be "
1059 "removed in a future release. Please refer to "
1060 ":func:`_asyncio.close_all_sessions`.",
1061 )
1062 async def close_all(cls) -> None:
1063 """Close all :class:`_asyncio.AsyncSession` sessions."""
1064 await close_all_sessions()
1065
1066 async def __aenter__(self: _AS) -> _AS:
1067 return self
1068
1069 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
1070 task = asyncio.create_task(self.close())
1071 await asyncio.shield(task)
1072
1073 def _maker_context_manager(self: _AS) -> _AsyncSessionContextManager[_AS]:
1074 return _AsyncSessionContextManager(self)
1075
1076 # START PROXY METHODS AsyncSession
1077
1078 # code within this block is **programmatically,
1079 # statically generated** by tools/generate_proxy_methods.py
1080
1081 def __contains__(self, instance: object) -> bool:
1082 r"""Return True if the instance is associated with this session.
1083
1084 .. container:: class_bases
1085
1086 Proxied for the :class:`_orm.Session` class on
1087 behalf of the :class:`_asyncio.AsyncSession` class.
1088
1089 The instance may be pending or persistent within the Session for a
1090 result of True.
1091
1092
1093 """ # noqa: E501
1094
1095 return self._proxied.__contains__(instance)
1096
1097 def __iter__(self) -> Iterator[object]:
1098 r"""Iterate over all pending or persistent instances within this
1099 Session.
1100
1101 .. container:: class_bases
1102
1103 Proxied for the :class:`_orm.Session` class on
1104 behalf of the :class:`_asyncio.AsyncSession` class.
1105
1106
1107 """ # noqa: E501
1108
1109 return self._proxied.__iter__()
1110
1111 def add(self, instance: object, _warn: bool = True) -> None:
1112 r"""Place an object into this :class:`_orm.Session`.
1113
1114 .. container:: class_bases
1115
1116 Proxied for the :class:`_orm.Session` class on
1117 behalf of the :class:`_asyncio.AsyncSession` class.
1118
1119 Objects that are in the :term:`transient` state when passed to the
1120 :meth:`_orm.Session.add` method will move to the
1121 :term:`pending` state, until the next flush, at which point they
1122 will move to the :term:`persistent` state.
1123
1124 Objects that are in the :term:`detached` state when passed to the
1125 :meth:`_orm.Session.add` method will move to the :term:`persistent`
1126 state directly.
1127
1128 If the transaction used by the :class:`_orm.Session` is rolled back,
1129 objects which were transient when they were passed to
1130 :meth:`_orm.Session.add` will be moved back to the
1131 :term:`transient` state, and will no longer be present within this
1132 :class:`_orm.Session`.
1133
1134 .. seealso::
1135
1136 :meth:`_orm.Session.add_all`
1137
1138 :ref:`session_adding` - at :ref:`session_basics`
1139
1140
1141 """ # noqa: E501
1142
1143 return self._proxied.add(instance, _warn=_warn)
1144
1145 def add_all(self, instances: Iterable[object]) -> None:
1146 r"""Add the given collection of instances to this :class:`_orm.Session`.
1147
1148 .. container:: class_bases
1149
1150 Proxied for the :class:`_orm.Session` class on
1151 behalf of the :class:`_asyncio.AsyncSession` class.
1152
1153 See the documentation for :meth:`_orm.Session.add` for a general
1154 behavioral description.
1155
1156 .. seealso::
1157
1158 :meth:`_orm.Session.add`
1159
1160 :ref:`session_adding` - at :ref:`session_basics`
1161
1162
1163 """ # noqa: E501
1164
1165 return self._proxied.add_all(instances)
1166
1167 def expire(
1168 self, instance: object, attribute_names: Optional[Iterable[str]] = None
1169 ) -> None:
1170 r"""Expire the attributes on an instance.
1171
1172 .. container:: class_bases
1173
1174 Proxied for the :class:`_orm.Session` class on
1175 behalf of the :class:`_asyncio.AsyncSession` class.
1176
1177 Marks the attributes of an instance as out of date. When an expired
1178 attribute is next accessed, a query will be issued to the
1179 :class:`.Session` object's current transactional context in order to
1180 load all expired attributes for the given instance. Note that
1181 a highly isolated transaction will return the same values as were
1182 previously read in that same transaction, regardless of changes
1183 in database state outside of that transaction.
1184
1185 To expire all objects in the :class:`.Session` simultaneously,
1186 use :meth:`Session.expire_all`.
1187
1188 The :class:`.Session` object's default behavior is to
1189 expire all state whenever the :meth:`Session.rollback`
1190 or :meth:`Session.commit` methods are called, so that new
1191 state can be loaded for the new transaction. For this reason,
1192 calling :meth:`Session.expire` only makes sense for the specific
1193 case that a non-ORM SQL statement was emitted in the current
1194 transaction.
1195
1196 :param instance: The instance to be refreshed.
1197 :param attribute_names: optional list of string attribute names
1198 indicating a subset of attributes to be expired.
1199
1200 .. seealso::
1201
1202 :ref:`session_expire` - introductory material
1203
1204 :meth:`.Session.expire`
1205
1206 :meth:`.Session.refresh`
1207
1208 :meth:`_orm.Query.populate_existing`
1209
1210
1211 """ # noqa: E501
1212
1213 return self._proxied.expire(instance, attribute_names=attribute_names)
1214
1215 def expire_all(self) -> None:
1216 r"""Expires all persistent instances within this Session.
1217
1218 .. container:: class_bases
1219
1220 Proxied for the :class:`_orm.Session` class on
1221 behalf of the :class:`_asyncio.AsyncSession` class.
1222
1223 When any attributes on a persistent instance is next accessed,
1224 a query will be issued using the
1225 :class:`.Session` object's current transactional context in order to
1226 load all expired attributes for the given instance. Note that
1227 a highly isolated transaction will return the same values as were
1228 previously read in that same transaction, regardless of changes
1229 in database state outside of that transaction.
1230
1231 To expire individual objects and individual attributes
1232 on those objects, use :meth:`Session.expire`.
1233
1234 The :class:`.Session` object's default behavior is to
1235 expire all state whenever the :meth:`Session.rollback`
1236 or :meth:`Session.commit` methods are called, so that new
1237 state can be loaded for the new transaction. For this reason,
1238 calling :meth:`Session.expire_all` is not usually needed,
1239 assuming the transaction is isolated.
1240
1241 .. seealso::
1242
1243 :ref:`session_expire` - introductory material
1244
1245 :meth:`.Session.expire`
1246
1247 :meth:`.Session.refresh`
1248
1249 :meth:`_orm.Query.populate_existing`
1250
1251
1252 """ # noqa: E501
1253
1254 return self._proxied.expire_all()
1255
1256 def expunge(self, instance: object) -> None:
1257 r"""Remove the `instance` from this ``Session``.
1258
1259 .. container:: class_bases
1260
1261 Proxied for the :class:`_orm.Session` class on
1262 behalf of the :class:`_asyncio.AsyncSession` class.
1263
1264 This will free all internal references to the instance. Cascading
1265 will be applied according to the *expunge* cascade rule.
1266
1267
1268 """ # noqa: E501
1269
1270 return self._proxied.expunge(instance)
1271
1272 def expunge_all(self) -> None:
1273 r"""Remove all object instances from this ``Session``.
1274
1275 .. container:: class_bases
1276
1277 Proxied for the :class:`_orm.Session` class on
1278 behalf of the :class:`_asyncio.AsyncSession` class.
1279
1280 This is equivalent to calling ``expunge(obj)`` on all objects in this
1281 ``Session``.
1282
1283
1284 """ # noqa: E501
1285
1286 return self._proxied.expunge_all()
1287
1288 def is_modified(
1289 self, instance: object, include_collections: bool = True
1290 ) -> bool:
1291 r"""Return ``True`` if the given instance has locally
1292 modified attributes.
1293
1294 .. container:: class_bases
1295
1296 Proxied for the :class:`_orm.Session` class on
1297 behalf of the :class:`_asyncio.AsyncSession` class.
1298
1299 This method retrieves the history for each instrumented
1300 attribute on the instance and performs a comparison of the current
1301 value to its previously flushed or committed value, if any.
1302
1303 It is in effect a more expensive and accurate
1304 version of checking for the given instance in the
1305 :attr:`.Session.dirty` collection; a full test for
1306 each attribute's net "dirty" status is performed.
1307
1308 E.g.::
1309
1310 return session.is_modified(someobject)
1311
1312 A few caveats to this method apply:
1313
1314 * Instances present in the :attr:`.Session.dirty` collection may
1315 report ``False`` when tested with this method. This is because
1316 the object may have received change events via attribute mutation,
1317 thus placing it in :attr:`.Session.dirty`, but ultimately the state
1318 is the same as that loaded from the database, resulting in no net
1319 change here.
1320 * Scalar attributes may not have recorded the previously set
1321 value when a new value was applied, if the attribute was not loaded,
1322 or was expired, at the time the new value was received - in these
1323 cases, the attribute is assumed to have a change, even if there is
1324 ultimately no net change against its database value. SQLAlchemy in
1325 most cases does not need the "old" value when a set event occurs, so
1326 it skips the expense of a SQL call if the old value isn't present,
1327 based on the assumption that an UPDATE of the scalar value is
1328 usually needed, and in those few cases where it isn't, is less
1329 expensive on average than issuing a defensive SELECT.
1330
1331 The "old" value is fetched unconditionally upon set only if the
1332 attribute container has the ``active_history`` flag set to ``True``.
1333 This flag is set typically for primary key attributes and scalar
1334 object references that are not a simple many-to-one. To set this
1335 flag for any arbitrary mapped column, use the ``active_history``
1336 argument with :func:`.column_property`.
1337
1338 :param instance: mapped instance to be tested for pending changes.
1339 :param include_collections: Indicates if multivalued collections
1340 should be included in the operation. Setting this to ``False`` is a
1341 way to detect only local-column based properties (i.e. scalar columns
1342 or many-to-one foreign keys) that would result in an UPDATE for this
1343 instance upon flush.
1344
1345
1346 """ # noqa: E501
1347
1348 return self._proxied.is_modified(
1349 instance, include_collections=include_collections
1350 )
1351
1352 def in_transaction(self) -> bool:
1353 r"""Return True if this :class:`_orm.Session` has begun a transaction.
1354
1355 .. container:: class_bases
1356
1357 Proxied for the :class:`_orm.Session` class on
1358 behalf of the :class:`_asyncio.AsyncSession` class.
1359
1360 .. versionadded:: 1.4
1361
1362 .. seealso::
1363
1364 :attr:`_orm.Session.is_active`
1365
1366
1367
1368 """ # noqa: E501
1369
1370 return self._proxied.in_transaction()
1371
1372 def in_nested_transaction(self) -> bool:
1373 r"""Return True if this :class:`_orm.Session` has begun a nested
1374 transaction, e.g. SAVEPOINT.
1375
1376 .. container:: class_bases
1377
1378 Proxied for the :class:`_orm.Session` class on
1379 behalf of the :class:`_asyncio.AsyncSession` class.
1380
1381 .. versionadded:: 1.4
1382
1383
1384 """ # noqa: E501
1385
1386 return self._proxied.in_nested_transaction()
1387
1388 @property
1389 def dirty(self) -> Any:
1390 r"""The set of all persistent instances considered dirty.
1391
1392 .. container:: class_bases
1393
1394 Proxied for the :class:`_orm.Session` class
1395 on behalf of the :class:`_asyncio.AsyncSession` class.
1396
1397 E.g.::
1398
1399 some_mapped_object in session.dirty
1400
1401 Instances are considered dirty when they were modified but not
1402 deleted.
1403
1404 Note that this 'dirty' calculation is 'optimistic'; most
1405 attribute-setting or collection modification operations will
1406 mark an instance as 'dirty' and place it in this set, even if
1407 there is no net change to the attribute's value. At flush
1408 time, the value of each attribute is compared to its
1409 previously saved value, and if there's no net change, no SQL
1410 operation will occur (this is a more expensive operation so
1411 it's only done at flush time).
1412
1413 To check if an instance has actionable net changes to its
1414 attributes, use the :meth:`.Session.is_modified` method.
1415
1416
1417 """ # noqa: E501
1418
1419 return self._proxied.dirty
1420
1421 @property
1422 def deleted(self) -> Any:
1423 r"""The set of all instances marked as 'deleted' within this ``Session``
1424
1425 .. container:: class_bases
1426
1427 Proxied for the :class:`_orm.Session` class
1428 on behalf of the :class:`_asyncio.AsyncSession` class.
1429
1430 """ # noqa: E501
1431
1432 return self._proxied.deleted
1433
1434 @property
1435 def new(self) -> Any:
1436 r"""The set of all instances marked as 'new' within this ``Session``.
1437
1438 .. container:: class_bases
1439
1440 Proxied for the :class:`_orm.Session` class
1441 on behalf of the :class:`_asyncio.AsyncSession` class.
1442
1443 """ # noqa: E501
1444
1445 return self._proxied.new
1446
1447 @property
1448 def identity_map(self) -> IdentityMap:
1449 r"""Proxy for the :attr:`_orm.Session.identity_map` attribute
1450 on behalf of the :class:`_asyncio.AsyncSession` class.
1451
1452 """ # noqa: E501
1453
1454 return self._proxied.identity_map
1455
1456 @identity_map.setter
1457 def identity_map(self, attr: IdentityMap) -> None:
1458 self._proxied.identity_map = attr
1459
1460 @property
1461 def is_active(self) -> Any:
1462 r"""True if this :class:`.Session` not in "partial rollback" state.
1463
1464 .. container:: class_bases
1465
1466 Proxied for the :class:`_orm.Session` class
1467 on behalf of the :class:`_asyncio.AsyncSession` class.
1468
1469 .. versionchanged:: 1.4 The :class:`_orm.Session` no longer begins
1470 a new transaction immediately, so this attribute will be False
1471 when the :class:`_orm.Session` is first instantiated.
1472
1473 "partial rollback" state typically indicates that the flush process
1474 of the :class:`_orm.Session` has failed, and that the
1475 :meth:`_orm.Session.rollback` method must be emitted in order to
1476 fully roll back the transaction.
1477
1478 If this :class:`_orm.Session` is not in a transaction at all, the
1479 :class:`_orm.Session` will autobegin when it is first used, so in this
1480 case :attr:`_orm.Session.is_active` will return True.
1481
1482 Otherwise, if this :class:`_orm.Session` is within a transaction,
1483 and that transaction has not been rolled back internally, the
1484 :attr:`_orm.Session.is_active` will also return True.
1485
1486 .. seealso::
1487
1488 :ref:`faq_session_rollback`
1489
1490 :meth:`_orm.Session.in_transaction`
1491
1492
1493 """ # noqa: E501
1494
1495 return self._proxied.is_active
1496
1497 @property
1498 def autoflush(self) -> bool:
1499 r"""Proxy for the :attr:`_orm.Session.autoflush` attribute
1500 on behalf of the :class:`_asyncio.AsyncSession` class.
1501
1502 """ # noqa: E501
1503
1504 return self._proxied.autoflush
1505
1506 @autoflush.setter
1507 def autoflush(self, attr: bool) -> None:
1508 self._proxied.autoflush = attr
1509
1510 @property
1511 def no_autoflush(self) -> Any:
1512 r"""Return a context manager that disables autoflush.
1513
1514 .. container:: class_bases
1515
1516 Proxied for the :class:`_orm.Session` class
1517 on behalf of the :class:`_asyncio.AsyncSession` class.
1518
1519 e.g.::
1520
1521 with session.no_autoflush:
1522
1523 some_object = SomeClass()
1524 session.add(some_object)
1525 # won't autoflush
1526 some_object.related_thing = session.query(SomeRelated).first()
1527
1528 Operations that proceed within the ``with:`` block
1529 will not be subject to flushes occurring upon query
1530 access. This is useful when initializing a series
1531 of objects which involve existing database queries,
1532 where the uncompleted object should not yet be flushed.
1533
1534
1535 """ # noqa: E501
1536
1537 return self._proxied.no_autoflush
1538
1539 @property
1540 def info(self) -> Any:
1541 r"""A user-modifiable dictionary.
1542
1543 .. container:: class_bases
1544
1545 Proxied for the :class:`_orm.Session` class
1546 on behalf of the :class:`_asyncio.AsyncSession` class.
1547
1548 The initial value of this dictionary can be populated using the
1549 ``info`` argument to the :class:`.Session` constructor or
1550 :class:`.sessionmaker` constructor or factory methods. The dictionary
1551 here is always local to this :class:`.Session` and can be modified
1552 independently of all other :class:`.Session` objects.
1553
1554
1555 """ # noqa: E501
1556
1557 return self._proxied.info
1558
1559 @classmethod
1560 def object_session(cls, instance: object) -> Optional[Session]:
1561 r"""Return the :class:`.Session` to which an object belongs.
1562
1563 .. container:: class_bases
1564
1565 Proxied for the :class:`_orm.Session` class on
1566 behalf of the :class:`_asyncio.AsyncSession` class.
1567
1568 This is an alias of :func:`.object_session`.
1569
1570
1571 """ # noqa: E501
1572
1573 return Session.object_session(instance)
1574
1575 @classmethod
1576 def identity_key(
1577 cls,
1578 class_: Optional[Type[Any]] = None,
1579 ident: Union[Any, Tuple[Any, ...]] = None,
1580 *,
1581 instance: Optional[Any] = None,
1582 row: Optional[Union[Row[Any], RowMapping]] = None,
1583 identity_token: Optional[Any] = None,
1584 ) -> _IdentityKeyType[Any]:
1585 r"""Return an identity key.
1586
1587 .. container:: class_bases
1588
1589 Proxied for the :class:`_orm.Session` class on
1590 behalf of the :class:`_asyncio.AsyncSession` class.
1591
1592 This is an alias of :func:`.util.identity_key`.
1593
1594
1595 """ # noqa: E501
1596
1597 return Session.identity_key(
1598 class_=class_,
1599 ident=ident,
1600 instance=instance,
1601 row=row,
1602 identity_token=identity_token,
1603 )
1604
1605 # END PROXY METHODS AsyncSession
1606
1607
1608_AS = TypeVar("_AS", bound="AsyncSession")
1609
1610
1611class async_sessionmaker(Generic[_AS]):
1612 """A configurable :class:`.AsyncSession` factory.
1613
1614 The :class:`.async_sessionmaker` factory works in the same way as the
1615 :class:`.sessionmaker` factory, to generate new :class:`.AsyncSession`
1616 objects when called, creating them given
1617 the configurational arguments established here.
1618
1619 e.g.::
1620
1621 from sqlalchemy.ext.asyncio import create_async_engine
1622 from sqlalchemy.ext.asyncio import AsyncSession
1623 from sqlalchemy.ext.asyncio import async_sessionmaker
1624
1625
1626 async def run_some_sql(
1627 async_session: async_sessionmaker[AsyncSession],
1628 ) -> None:
1629 async with async_session() as session:
1630 session.add(SomeObject(data="object"))
1631 session.add(SomeOtherObject(name="other object"))
1632 await session.commit()
1633
1634
1635 async def main() -> None:
1636 # an AsyncEngine, which the AsyncSession will use for connection
1637 # resources
1638 engine = create_async_engine(
1639 "postgresql+asyncpg://scott:tiger@localhost/"
1640 )
1641
1642 # create a reusable factory for new AsyncSession instances
1643 async_session = async_sessionmaker(engine)
1644
1645 await run_some_sql(async_session)
1646
1647 await engine.dispose()
1648
1649 The :class:`.async_sessionmaker` is useful so that different parts
1650 of a program can create new :class:`.AsyncSession` objects with a
1651 fixed configuration established up front. Note that :class:`.AsyncSession`
1652 objects may also be instantiated directly when not using
1653 :class:`.async_sessionmaker`.
1654
1655 .. versionadded:: 2.0 :class:`.async_sessionmaker` provides a
1656 :class:`.sessionmaker` class that's dedicated to the
1657 :class:`.AsyncSession` object, including pep-484 typing support.
1658
1659 .. seealso::
1660
1661 :ref:`asyncio_orm` - shows example use
1662
1663 :class:`.sessionmaker` - general overview of the
1664 :class:`.sessionmaker` architecture
1665
1666
1667 :ref:`session_getting` - introductory text on creating
1668 sessions using :class:`.sessionmaker`.
1669
1670 """ # noqa E501
1671
1672 class_: Type[_AS]
1673
1674 @overload
1675 def __init__(
1676 self,
1677 bind: Optional[_AsyncSessionBind] = ...,
1678 *,
1679 class_: Type[_AS],
1680 autoflush: bool = ...,
1681 expire_on_commit: bool = ...,
1682 info: Optional[_InfoType] = ...,
1683 **kw: Any,
1684 ): ...
1685
1686 @overload
1687 def __init__(
1688 self: "async_sessionmaker[AsyncSession]",
1689 bind: Optional[_AsyncSessionBind] = ...,
1690 *,
1691 autoflush: bool = ...,
1692 expire_on_commit: bool = ...,
1693 info: Optional[_InfoType] = ...,
1694 **kw: Any,
1695 ): ...
1696
1697 def __init__(
1698 self,
1699 bind: Optional[_AsyncSessionBind] = None,
1700 *,
1701 class_: Type[_AS] = AsyncSession, # type: ignore
1702 autoflush: bool = True,
1703 expire_on_commit: bool = True,
1704 info: Optional[_InfoType] = None,
1705 **kw: Any,
1706 ):
1707 r"""Construct a new :class:`.async_sessionmaker`.
1708
1709 All arguments here except for ``class_`` correspond to arguments
1710 accepted by :class:`.Session` directly. See the
1711 :meth:`.AsyncSession.__init__` docstring for more details on
1712 parameters.
1713
1714
1715 """
1716 kw["bind"] = bind
1717 kw["autoflush"] = autoflush
1718 kw["expire_on_commit"] = expire_on_commit
1719 if info is not None:
1720 kw["info"] = info
1721 self.kw = kw
1722 self.class_ = class_
1723
1724 def begin(self) -> _AsyncSessionContextManager[_AS]:
1725 """Produce a context manager that both provides a new
1726 :class:`_orm.AsyncSession` as well as a transaction that commits.
1727
1728
1729 e.g.::
1730
1731 async def main():
1732 Session = async_sessionmaker(some_engine)
1733
1734 async with Session.begin() as session:
1735 session.add(some_object)
1736
1737 # commits transaction, closes session
1738
1739 """
1740
1741 session = self()
1742 return session._maker_context_manager()
1743
1744 def __call__(self, **local_kw: Any) -> _AS:
1745 """Produce a new :class:`.AsyncSession` object using the configuration
1746 established in this :class:`.async_sessionmaker`.
1747
1748 In Python, the ``__call__`` method is invoked on an object when
1749 it is "called" in the same way as a function::
1750
1751 AsyncSession = async_sessionmaker(async_engine, expire_on_commit=False)
1752 session = AsyncSession() # invokes sessionmaker.__call__()
1753
1754 """ # noqa E501
1755 for k, v in self.kw.items():
1756 if k == "info" and "info" in local_kw:
1757 d = v.copy()
1758 d.update(local_kw["info"])
1759 local_kw["info"] = d
1760 else:
1761 local_kw.setdefault(k, v)
1762 return self.class_(**local_kw)
1763
1764 def configure(self, **new_kw: Any) -> None:
1765 """(Re)configure the arguments for this async_sessionmaker.
1766
1767 e.g.::
1768
1769 AsyncSession = async_sessionmaker(some_engine)
1770
1771 AsyncSession.configure(bind=create_async_engine("sqlite+aiosqlite://"))
1772 """ # noqa E501
1773
1774 self.kw.update(new_kw)
1775
1776 def __repr__(self) -> str:
1777 return "%s(class_=%r, %s)" % (
1778 self.__class__.__name__,
1779 self.class_.__name__,
1780 ", ".join("%s=%r" % (k, v) for k, v in self.kw.items()),
1781 )
1782
1783
1784class _AsyncSessionContextManager(Generic[_AS]):
1785 __slots__ = ("async_session", "trans")
1786
1787 async_session: _AS
1788 trans: AsyncSessionTransaction
1789
1790 def __init__(self, async_session: _AS):
1791 self.async_session = async_session
1792
1793 async def __aenter__(self) -> _AS:
1794 self.trans = self.async_session.begin()
1795 await self.trans.__aenter__()
1796 return self.async_session
1797
1798 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
1799 async def go() -> None:
1800 await self.trans.__aexit__(type_, value, traceback)
1801 await self.async_session.__aexit__(type_, value, traceback)
1802
1803 task = asyncio.create_task(go())
1804 await asyncio.shield(task)
1805
1806
1807class AsyncSessionTransaction(
1808 ReversibleProxy[SessionTransaction],
1809 StartableContext["AsyncSessionTransaction"],
1810):
1811 """A wrapper for the ORM :class:`_orm.SessionTransaction` object.
1812
1813 This object is provided so that a transaction-holding object
1814 for the :meth:`_asyncio.AsyncSession.begin` may be returned.
1815
1816 The object supports both explicit calls to
1817 :meth:`_asyncio.AsyncSessionTransaction.commit` and
1818 :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an
1819 async context manager.
1820
1821
1822 .. versionadded:: 1.4
1823
1824 """
1825
1826 __slots__ = ("session", "sync_transaction", "nested")
1827
1828 session: AsyncSession
1829 sync_transaction: Optional[SessionTransaction]
1830
1831 def __init__(self, session: AsyncSession, nested: bool = False):
1832 self.session = session
1833 self.nested = nested
1834 self.sync_transaction = None
1835
1836 @property
1837 def is_active(self) -> bool:
1838 return (
1839 self._sync_transaction() is not None
1840 and self._sync_transaction().is_active
1841 )
1842
1843 def _sync_transaction(self) -> SessionTransaction:
1844 if not self.sync_transaction:
1845 self._raise_for_not_started()
1846 return self.sync_transaction
1847
1848 async def rollback(self) -> None:
1849 """Roll back this :class:`_asyncio.AsyncTransaction`."""
1850 await greenlet_spawn(self._sync_transaction().rollback)
1851
1852 async def commit(self) -> None:
1853 """Commit this :class:`_asyncio.AsyncTransaction`."""
1854
1855 await greenlet_spawn(self._sync_transaction().commit)
1856
1857 @classmethod
1858 def _regenerate_proxy_for_target( # type: ignore[override]
1859 cls,
1860 target: SessionTransaction,
1861 async_session: AsyncSession,
1862 **additional_kw: Any, # noqa: U100
1863 ) -> AsyncSessionTransaction:
1864 sync_transaction = target
1865 nested = target.nested
1866 obj = cls.__new__(cls)
1867 obj.session = async_session
1868 obj.sync_transaction = obj._assign_proxied(sync_transaction)
1869 obj.nested = nested
1870 return obj
1871
1872 async def start(
1873 self, is_ctxmanager: bool = False
1874 ) -> AsyncSessionTransaction:
1875 self.sync_transaction = self._assign_proxied(
1876 await greenlet_spawn(
1877 self.session.sync_session.begin_nested
1878 if self.nested
1879 else self.session.sync_session.begin
1880 )
1881 )
1882 if is_ctxmanager:
1883 self.sync_transaction.__enter__()
1884 return self
1885
1886 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
1887 await greenlet_spawn(
1888 self._sync_transaction().__exit__, type_, value, traceback
1889 )
1890
1891
1892def async_object_session(instance: object) -> Optional[AsyncSession]:
1893 """Return the :class:`_asyncio.AsyncSession` to which the given instance
1894 belongs.
1895
1896 This function makes use of the sync-API function
1897 :class:`_orm.object_session` to retrieve the :class:`_orm.Session` which
1898 refers to the given instance, and from there links it to the original
1899 :class:`_asyncio.AsyncSession`.
1900
1901 If the :class:`_asyncio.AsyncSession` has been garbage collected, the
1902 return value is ``None``.
1903
1904 This functionality is also available from the
1905 :attr:`_orm.InstanceState.async_session` accessor.
1906
1907 :param instance: an ORM mapped instance
1908 :return: an :class:`_asyncio.AsyncSession` object, or ``None``.
1909
1910 .. versionadded:: 1.4.18
1911
1912 """
1913
1914 session = object_session(instance)
1915 if session is not None:
1916 return async_session(session)
1917 else:
1918 return None
1919
1920
1921def async_session(session: Session) -> Optional[AsyncSession]:
1922 """Return the :class:`_asyncio.AsyncSession` which is proxying the given
1923 :class:`_orm.Session` object, if any.
1924
1925 :param session: a :class:`_orm.Session` instance.
1926 :return: a :class:`_asyncio.AsyncSession` instance, or ``None``.
1927
1928 .. versionadded:: 1.4.18
1929
1930 """
1931 return AsyncSession._retrieve_proxy_for_target(session, regenerate=False)
1932
1933
1934async def close_all_sessions() -> None:
1935 """Close all :class:`_asyncio.AsyncSession` sessions.
1936
1937 .. versionadded:: 2.0.23
1938
1939 .. seealso::
1940
1941 :func:`.session.close_all_sessions`
1942
1943 """
1944 await greenlet_spawn(_sync_close_all_sessions)
1945
1946
1947_instance_state._async_provider = async_session # type: ignore