Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

347 statements  

1# ext/asyncio/session.py 

2# Copyright (C) 2020-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7from __future__ import annotations 

8 

9import asyncio 

10from typing import Any 

11from typing import Awaitable 

12from typing import Callable 

13from typing import cast 

14from typing import Dict 

15from typing import Generic 

16from typing import Iterable 

17from typing import Iterator 

18from typing import NoReturn 

19from typing import Optional 

20from typing import overload 

21from typing import Sequence 

22from typing import Tuple 

23from typing import Type 

24from typing import TYPE_CHECKING 

25from typing import TypeVar 

26from typing import Union 

27 

28from . import engine 

29from .base import ReversibleProxy 

30from .base import StartableContext 

31from .result import _ensure_sync_result 

32from .result import AsyncResult 

33from .result import AsyncScalarResult 

34from ... import util 

35from ...orm import close_all_sessions as _sync_close_all_sessions 

36from ...orm import object_session 

37from ...orm import Session 

38from ...orm import SessionTransaction 

39from ...orm import state as _instance_state 

40from ...util.concurrency import greenlet_spawn 

41from ...util.typing import Concatenate 

42from ...util.typing import ParamSpec 

43 

44 

45if TYPE_CHECKING: 

46 from .engine import AsyncConnection 

47 from .engine import AsyncEngine 

48 from ...engine import Connection 

49 from ...engine import Engine 

50 from ...engine import Result 

51 from ...engine import Row 

52 from ...engine import RowMapping 

53 from ...engine import ScalarResult 

54 from ...engine.interfaces import _CoreAnyExecuteParams 

55 from ...engine.interfaces import CoreExecuteOptionsParameter 

56 from ...event import dispatcher 

57 from ...orm._typing import _IdentityKeyType 

58 from ...orm._typing import _O 

59 from ...orm._typing import OrmExecuteOptionsParameter 

60 from ...orm.identity import IdentityMap 

61 from ...orm.interfaces import ORMOption 

62 from ...orm.session import _BindArguments 

63 from ...orm.session import _EntityBindKey 

64 from ...orm.session import _PKIdentityArgument 

65 from ...orm.session import _SessionBind 

66 from ...orm.session import _SessionBindKey 

67 from ...sql._typing import _InfoType 

68 from ...sql.base import Executable 

69 from ...sql.elements import ClauseElement 

70 from ...sql.selectable import ForUpdateParameter 

71 from ...sql.selectable import TypedReturnsRows 

72 

73_AsyncSessionBind = Union["AsyncEngine", "AsyncConnection"] 

74 

75_P = ParamSpec("_P") 

76_T = TypeVar("_T", bound=Any) 

77 

78 

79_EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True}) 

80_STREAM_OPTIONS = util.immutabledict({"stream_results": True}) 

81 

82 

83class AsyncAttrs: 

84 """Mixin class which provides an awaitable accessor for all attributes. 

85 

86 E.g.:: 

87 

88 from __future__ import annotations 

89 

90 from typing import List 

91 

92 from sqlalchemy import ForeignKey 

93 from sqlalchemy import func 

94 from sqlalchemy.ext.asyncio import AsyncAttrs 

95 from sqlalchemy.orm import DeclarativeBase 

96 from sqlalchemy.orm import Mapped 

97 from sqlalchemy.orm import mapped_column 

98 from sqlalchemy.orm import relationship 

99 

100 

101 class Base(AsyncAttrs, DeclarativeBase): 

102 pass 

103 

104 

105 class A(Base): 

106 __tablename__ = "a" 

107 

108 id: Mapped[int] = mapped_column(primary_key=True) 

109 data: Mapped[str] 

110 bs: Mapped[List[B]] = relationship() 

111 

112 

113 class B(Base): 

114 __tablename__ = "b" 

115 id: Mapped[int] = mapped_column(primary_key=True) 

116 a_id: Mapped[int] = mapped_column(ForeignKey("a.id")) 

117 data: Mapped[str] 

118 

119 In the above example, the :class:`_asyncio.AsyncAttrs` mixin is applied to 

120 the declarative ``Base`` class where it takes effect for all subclasses. 

121 This mixin adds a single new attribute 

122 :attr:`_asyncio.AsyncAttrs.awaitable_attrs` to all classes, which will 

123 yield the value of any attribute as an awaitable. This allows attributes 

124 which may be subject to lazy loading or deferred / unexpiry loading to be 

125 accessed such that IO can still be emitted:: 

126 

127 a1 = (await async_session.scalars(select(A).where(A.id == 5))).one() 

128 

129 # use the lazy loader on ``a1.bs`` via the ``.awaitable_attrs`` 

130 # interface, so that it may be awaited 

131 for b1 in await a1.awaitable_attrs.bs: 

132 print(b1) 

133 

134 The :attr:`_asyncio.AsyncAttrs.awaitable_attrs` performs a call against the 

135 attribute that is approximately equivalent to using the 

136 :meth:`_asyncio.AsyncSession.run_sync` method, e.g.:: 

137 

138 for b1 in await async_session.run_sync(lambda sess: a1.bs): 

139 print(b1) 

140 

141 .. versionadded:: 2.0.13 

142 

143 .. seealso:: 

144 

145 :ref:`asyncio_orm_avoid_lazyloads` 

146 

147 """ 

148 

149 class _AsyncAttrGetitem: 

150 __slots__ = "_instance" 

151 

152 def __init__(self, _instance: Any): 

153 self._instance = _instance 

154 

155 def __getattr__(self, name: str) -> Awaitable[Any]: 

156 return greenlet_spawn(getattr, self._instance, name) 

157 

158 @property 

159 def awaitable_attrs(self) -> AsyncAttrs._AsyncAttrGetitem: 

160 """provide a namespace of all attributes on this object wrapped 

161 as awaitables. 

162 

163 e.g.:: 

164 

165 

166 a1 = (await async_session.scalars(select(A).where(A.id == 5))).one() 

167 

168 some_attribute = await a1.awaitable_attrs.some_deferred_attribute 

169 some_collection = await a1.awaitable_attrs.some_collection 

170 

171 """ # noqa: E501 

172 

173 return AsyncAttrs._AsyncAttrGetitem(self) 

174 

175 

176@util.create_proxy_methods( 

177 Session, 

178 ":class:`_orm.Session`", 

179 ":class:`_asyncio.AsyncSession`", 

180 classmethods=["object_session", "identity_key"], 

181 methods=[ 

182 "__contains__", 

183 "__iter__", 

184 "add", 

185 "add_all", 

186 "expire", 

187 "expire_all", 

188 "expunge", 

189 "expunge_all", 

190 "is_modified", 

191 "in_transaction", 

192 "in_nested_transaction", 

193 ], 

194 attributes=[ 

195 "dirty", 

196 "deleted", 

197 "new", 

198 "identity_map", 

199 "is_active", 

200 "autoflush", 

201 "no_autoflush", 

202 "info", 

203 ], 

204) 

205class AsyncSession(ReversibleProxy[Session]): 

206 """Asyncio version of :class:`_orm.Session`. 

207 

208 The :class:`_asyncio.AsyncSession` is a proxy for a traditional 

209 :class:`_orm.Session` instance. 

210 

211 The :class:`_asyncio.AsyncSession` is **not safe for use in concurrent 

212 tasks.**. See :ref:`session_faq_threadsafe` for background. 

213 

214 .. versionadded:: 1.4 

215 

216 To use an :class:`_asyncio.AsyncSession` with custom :class:`_orm.Session` 

217 implementations, see the 

218 :paramref:`_asyncio.AsyncSession.sync_session_class` parameter. 

219 

220 

221 """ 

222 

223 _is_asyncio = True 

224 

225 dispatch: dispatcher[Session] 

226 

227 def __init__( 

228 self, 

229 bind: Optional[_AsyncSessionBind] = None, 

230 *, 

231 binds: Optional[Dict[_SessionBindKey, _AsyncSessionBind]] = None, 

232 sync_session_class: Optional[Type[Session]] = None, 

233 **kw: Any, 

234 ): 

235 r"""Construct a new :class:`_asyncio.AsyncSession`. 

236 

237 All parameters other than ``sync_session_class`` are passed to the 

238 ``sync_session_class`` callable directly to instantiate a new 

239 :class:`_orm.Session`. Refer to :meth:`_orm.Session.__init__` for 

240 parameter documentation. 

241 

242 :param sync_session_class: 

243 A :class:`_orm.Session` subclass or other callable which will be used 

244 to construct the :class:`_orm.Session` which will be proxied. This 

245 parameter may be used to provide custom :class:`_orm.Session` 

246 subclasses. Defaults to the 

247 :attr:`_asyncio.AsyncSession.sync_session_class` class-level 

248 attribute. 

249 

250 .. versionadded:: 1.4.24 

251 

252 """ 

253 sync_bind = sync_binds = None 

254 

255 if bind: 

256 self.bind = bind 

257 sync_bind = engine._get_sync_engine_or_connection(bind) 

258 

259 if binds: 

260 self.binds = binds 

261 sync_binds = { 

262 key: engine._get_sync_engine_or_connection(b) 

263 for key, b in binds.items() 

264 } 

265 

266 if sync_session_class: 

267 self.sync_session_class = sync_session_class 

268 

269 self.sync_session = self._proxied = self._assign_proxied( 

270 self.sync_session_class(bind=sync_bind, binds=sync_binds, **kw) 

271 ) 

272 

273 sync_session_class: Type[Session] = Session 

274 """The class or callable that provides the 

275 underlying :class:`_orm.Session` instance for a particular 

276 :class:`_asyncio.AsyncSession`. 

277 

278 At the class level, this attribute is the default value for the 

279 :paramref:`_asyncio.AsyncSession.sync_session_class` parameter. Custom 

280 subclasses of :class:`_asyncio.AsyncSession` can override this. 

281 

282 At the instance level, this attribute indicates the current class or 

283 callable that was used to provide the :class:`_orm.Session` instance for 

284 this :class:`_asyncio.AsyncSession` instance. 

285 

286 .. versionadded:: 1.4.24 

287 

288 """ 

289 

290 sync_session: Session 

291 """Reference to the underlying :class:`_orm.Session` this 

292 :class:`_asyncio.AsyncSession` proxies requests towards. 

293 

294 This instance can be used as an event target. 

295 

296 .. seealso:: 

297 

298 :ref:`asyncio_events` 

299 

300 """ 

301 

302 @classmethod 

303 def _no_async_engine_events(cls) -> NoReturn: 

304 raise NotImplementedError( 

305 "asynchronous events are not implemented at this time. Apply " 

306 "synchronous listeners to the AsyncSession.sync_session." 

307 ) 

308 

309 async def refresh( 

310 self, 

311 instance: object, 

312 attribute_names: Optional[Iterable[str]] = None, 

313 with_for_update: ForUpdateParameter = None, 

314 ) -> None: 

315 """Expire and refresh the attributes on the given instance. 

316 

317 A query will be issued to the database and all attributes will be 

318 refreshed with their current database value. 

319 

320 This is the async version of the :meth:`_orm.Session.refresh` method. 

321 See that method for a complete description of all options. 

322 

323 .. seealso:: 

324 

325 :meth:`_orm.Session.refresh` - main documentation for refresh 

326 

327 """ 

328 

329 await greenlet_spawn( 

330 self.sync_session.refresh, 

331 instance, 

332 attribute_names=attribute_names, 

333 with_for_update=with_for_update, 

334 ) 

335 

336 async def run_sync( 

337 self, 

338 fn: Callable[Concatenate[Session, _P], _T], 

339 *arg: _P.args, 

340 **kw: _P.kwargs, 

341 ) -> _T: 

342 '''Invoke the given synchronous (i.e. not async) callable, 

343 passing a synchronous-style :class:`_orm.Session` as the first 

344 argument. 

345 

346 This method allows traditional synchronous SQLAlchemy functions to 

347 run within the context of an asyncio application. 

348 

349 E.g.:: 

350 

351 def some_business_method(session: Session, param: str) -> str: 

352 """A synchronous function that does not require awaiting 

353 

354 :param session: a SQLAlchemy Session, used synchronously 

355 

356 :return: an optional return value is supported 

357 

358 """ 

359 session.add(MyObject(param=param)) 

360 session.flush() 

361 return "success" 

362 

363 

364 async def do_something_async(async_engine: AsyncEngine) -> None: 

365 """an async function that uses awaiting""" 

366 

367 with AsyncSession(async_engine) as async_session: 

368 # run some_business_method() with a sync-style 

369 # Session, proxied into an awaitable 

370 return_code = await async_session.run_sync( 

371 some_business_method, param="param1" 

372 ) 

373 print(return_code) 

374 

375 This method maintains the asyncio event loop all the way through 

376 to the database connection by running the given callable in a 

377 specially instrumented greenlet. 

378 

379 .. tip:: 

380 

381 The provided callable is invoked inline within the asyncio event 

382 loop, and will block on traditional IO calls. IO within this 

383 callable should only call into SQLAlchemy's asyncio database 

384 APIs which will be properly adapted to the greenlet context. 

385 

386 .. seealso:: 

387 

388 :class:`.AsyncAttrs` - a mixin for ORM mapped classes that provides 

389 a similar feature more succinctly on a per-attribute basis 

390 

391 :meth:`.AsyncConnection.run_sync` 

392 

393 :ref:`session_run_sync` 

394 ''' # noqa: E501 

395 

396 return await greenlet_spawn( 

397 fn, self.sync_session, *arg, _require_await=False, **kw 

398 ) 

399 

400 @overload 

401 async def execute( 

402 self, 

403 statement: TypedReturnsRows[_T], 

404 params: Optional[_CoreAnyExecuteParams] = None, 

405 *, 

406 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

407 bind_arguments: Optional[_BindArguments] = None, 

408 _parent_execute_state: Optional[Any] = None, 

409 _add_event: Optional[Any] = None, 

410 ) -> Result[_T]: ... 

411 

412 @overload 

413 async def execute( 

414 self, 

415 statement: Executable, 

416 params: Optional[_CoreAnyExecuteParams] = None, 

417 *, 

418 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

419 bind_arguments: Optional[_BindArguments] = None, 

420 _parent_execute_state: Optional[Any] = None, 

421 _add_event: Optional[Any] = None, 

422 ) -> Result[Any]: ... 

423 

424 async def execute( 

425 self, 

426 statement: Executable, 

427 params: Optional[_CoreAnyExecuteParams] = None, 

428 *, 

429 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

430 bind_arguments: Optional[_BindArguments] = None, 

431 **kw: Any, 

432 ) -> Result[Any]: 

433 """Execute a statement and return a buffered 

434 :class:`_engine.Result` object. 

435 

436 .. seealso:: 

437 

438 :meth:`_orm.Session.execute` - main documentation for execute 

439 

440 """ 

441 

442 if execution_options: 

443 execution_options = util.immutabledict(execution_options).union( 

444 _EXECUTE_OPTIONS 

445 ) 

446 else: 

447 execution_options = _EXECUTE_OPTIONS 

448 

449 result = await greenlet_spawn( 

450 self.sync_session.execute, 

451 statement, 

452 params=params, 

453 execution_options=execution_options, 

454 bind_arguments=bind_arguments, 

455 **kw, 

456 ) 

457 return await _ensure_sync_result(result, self.execute) 

458 

459 @overload 

460 async def scalar( 

461 self, 

462 statement: TypedReturnsRows[Tuple[_T]], 

463 params: Optional[_CoreAnyExecuteParams] = None, 

464 *, 

465 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

466 bind_arguments: Optional[_BindArguments] = None, 

467 **kw: Any, 

468 ) -> Optional[_T]: ... 

469 

470 @overload 

471 async def scalar( 

472 self, 

473 statement: Executable, 

474 params: Optional[_CoreAnyExecuteParams] = None, 

475 *, 

476 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

477 bind_arguments: Optional[_BindArguments] = None, 

478 **kw: Any, 

479 ) -> Any: ... 

480 

481 async def scalar( 

482 self, 

483 statement: Executable, 

484 params: Optional[_CoreAnyExecuteParams] = None, 

485 *, 

486 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

487 bind_arguments: Optional[_BindArguments] = None, 

488 **kw: Any, 

489 ) -> Any: 

490 """Execute a statement and return a scalar result. 

491 

492 .. seealso:: 

493 

494 :meth:`_orm.Session.scalar` - main documentation for scalar 

495 

496 """ 

497 

498 if execution_options: 

499 execution_options = util.immutabledict(execution_options).union( 

500 _EXECUTE_OPTIONS 

501 ) 

502 else: 

503 execution_options = _EXECUTE_OPTIONS 

504 

505 return await greenlet_spawn( 

506 self.sync_session.scalar, 

507 statement, 

508 params=params, 

509 execution_options=execution_options, 

510 bind_arguments=bind_arguments, 

511 **kw, 

512 ) 

513 

514 @overload 

515 async def scalars( 

516 self, 

517 statement: TypedReturnsRows[Tuple[_T]], 

518 params: Optional[_CoreAnyExecuteParams] = None, 

519 *, 

520 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

521 bind_arguments: Optional[_BindArguments] = None, 

522 **kw: Any, 

523 ) -> ScalarResult[_T]: ... 

524 

525 @overload 

526 async def scalars( 

527 self, 

528 statement: Executable, 

529 params: Optional[_CoreAnyExecuteParams] = None, 

530 *, 

531 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

532 bind_arguments: Optional[_BindArguments] = None, 

533 **kw: Any, 

534 ) -> ScalarResult[Any]: ... 

535 

536 async def scalars( 

537 self, 

538 statement: Executable, 

539 params: Optional[_CoreAnyExecuteParams] = None, 

540 *, 

541 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

542 bind_arguments: Optional[_BindArguments] = None, 

543 **kw: Any, 

544 ) -> ScalarResult[Any]: 

545 """Execute a statement and return scalar results. 

546 

547 :return: a :class:`_result.ScalarResult` object 

548 

549 .. versionadded:: 1.4.24 Added :meth:`_asyncio.AsyncSession.scalars` 

550 

551 .. versionadded:: 1.4.26 Added 

552 :meth:`_asyncio.async_scoped_session.scalars` 

553 

554 .. seealso:: 

555 

556 :meth:`_orm.Session.scalars` - main documentation for scalars 

557 

558 :meth:`_asyncio.AsyncSession.stream_scalars` - streaming version 

559 

560 """ 

561 

562 result = await self.execute( 

563 statement, 

564 params=params, 

565 execution_options=execution_options, 

566 bind_arguments=bind_arguments, 

567 **kw, 

568 ) 

569 return result.scalars() 

570 

571 async def get( 

572 self, 

573 entity: _EntityBindKey[_O], 

574 ident: _PKIdentityArgument, 

575 *, 

576 options: Optional[Sequence[ORMOption]] = None, 

577 populate_existing: bool = False, 

578 with_for_update: ForUpdateParameter = None, 

579 identity_token: Optional[Any] = None, 

580 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

581 ) -> Union[_O, None]: 

582 """Return an instance based on the given primary key identifier, 

583 or ``None`` if not found. 

584 

585 .. seealso:: 

586 

587 :meth:`_orm.Session.get` - main documentation for get 

588 

589 

590 """ 

591 

592 return await greenlet_spawn( 

593 cast("Callable[..., _O]", self.sync_session.get), 

594 entity, 

595 ident, 

596 options=options, 

597 populate_existing=populate_existing, 

598 with_for_update=with_for_update, 

599 identity_token=identity_token, 

600 execution_options=execution_options, 

601 ) 

602 

603 async def get_one( 

604 self, 

605 entity: _EntityBindKey[_O], 

606 ident: _PKIdentityArgument, 

607 *, 

608 options: Optional[Sequence[ORMOption]] = None, 

609 populate_existing: bool = False, 

610 with_for_update: ForUpdateParameter = None, 

611 identity_token: Optional[Any] = None, 

612 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

613 ) -> _O: 

614 """Return an instance based on the given primary key identifier, 

615 or raise an exception if not found. 

616 

617 Raises :class:`_exc.NoResultFound` if the query selects no rows. 

618 

619 ..versionadded: 2.0.22 

620 

621 .. seealso:: 

622 

623 :meth:`_orm.Session.get_one` - main documentation for get_one 

624 

625 """ 

626 

627 return await greenlet_spawn( 

628 cast("Callable[..., _O]", self.sync_session.get_one), 

629 entity, 

630 ident, 

631 options=options, 

632 populate_existing=populate_existing, 

633 with_for_update=with_for_update, 

634 identity_token=identity_token, 

635 execution_options=execution_options, 

636 ) 

637 

638 @overload 

639 async def stream( 

640 self, 

641 statement: TypedReturnsRows[_T], 

642 params: Optional[_CoreAnyExecuteParams] = None, 

643 *, 

644 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

645 bind_arguments: Optional[_BindArguments] = None, 

646 **kw: Any, 

647 ) -> AsyncResult[_T]: ... 

648 

649 @overload 

650 async def stream( 

651 self, 

652 statement: Executable, 

653 params: Optional[_CoreAnyExecuteParams] = None, 

654 *, 

655 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

656 bind_arguments: Optional[_BindArguments] = None, 

657 **kw: Any, 

658 ) -> AsyncResult[Any]: ... 

659 

660 async def stream( 

661 self, 

662 statement: Executable, 

663 params: Optional[_CoreAnyExecuteParams] = None, 

664 *, 

665 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

666 bind_arguments: Optional[_BindArguments] = None, 

667 **kw: Any, 

668 ) -> AsyncResult[Any]: 

669 """Execute a statement and return a streaming 

670 :class:`_asyncio.AsyncResult` object. 

671 

672 """ 

673 

674 if execution_options: 

675 execution_options = util.immutabledict(execution_options).union( 

676 _STREAM_OPTIONS 

677 ) 

678 else: 

679 execution_options = _STREAM_OPTIONS 

680 

681 result = await greenlet_spawn( 

682 self.sync_session.execute, 

683 statement, 

684 params=params, 

685 execution_options=execution_options, 

686 bind_arguments=bind_arguments, 

687 **kw, 

688 ) 

689 return AsyncResult(result) 

690 

691 @overload 

692 async def stream_scalars( 

693 self, 

694 statement: TypedReturnsRows[Tuple[_T]], 

695 params: Optional[_CoreAnyExecuteParams] = None, 

696 *, 

697 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

698 bind_arguments: Optional[_BindArguments] = None, 

699 **kw: Any, 

700 ) -> AsyncScalarResult[_T]: ... 

701 

702 @overload 

703 async def stream_scalars( 

704 self, 

705 statement: Executable, 

706 params: Optional[_CoreAnyExecuteParams] = None, 

707 *, 

708 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

709 bind_arguments: Optional[_BindArguments] = None, 

710 **kw: Any, 

711 ) -> AsyncScalarResult[Any]: ... 

712 

713 async def stream_scalars( 

714 self, 

715 statement: Executable, 

716 params: Optional[_CoreAnyExecuteParams] = None, 

717 *, 

718 execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, 

719 bind_arguments: Optional[_BindArguments] = None, 

720 **kw: Any, 

721 ) -> AsyncScalarResult[Any]: 

722 """Execute a statement and return a stream of scalar results. 

723 

724 :return: an :class:`_asyncio.AsyncScalarResult` object 

725 

726 .. versionadded:: 1.4.24 

727 

728 .. seealso:: 

729 

730 :meth:`_orm.Session.scalars` - main documentation for scalars 

731 

732 :meth:`_asyncio.AsyncSession.scalars` - non streaming version 

733 

734 """ 

735 

736 result = await self.stream( 

737 statement, 

738 params=params, 

739 execution_options=execution_options, 

740 bind_arguments=bind_arguments, 

741 **kw, 

742 ) 

743 return result.scalars() 

744 

745 async def delete(self, instance: object) -> None: 

746 """Mark an instance as deleted. 

747 

748 The database delete operation occurs upon ``flush()``. 

749 

750 As this operation may need to cascade along unloaded relationships, 

751 it is awaitable to allow for those queries to take place. 

752 

753 .. seealso:: 

754 

755 :meth:`_orm.Session.delete` - main documentation for delete 

756 

757 """ 

758 await greenlet_spawn(self.sync_session.delete, instance) 

759 

760 async def merge( 

761 self, 

762 instance: _O, 

763 *, 

764 load: bool = True, 

765 options: Optional[Sequence[ORMOption]] = None, 

766 ) -> _O: 

767 """Copy the state of a given instance into a corresponding instance 

768 within this :class:`_asyncio.AsyncSession`. 

769 

770 .. seealso:: 

771 

772 :meth:`_orm.Session.merge` - main documentation for merge 

773 

774 """ 

775 return await greenlet_spawn( 

776 self.sync_session.merge, instance, load=load, options=options 

777 ) 

778 

779 async def flush(self, objects: Optional[Sequence[Any]] = None) -> None: 

780 """Flush all the object changes to the database. 

781 

782 .. seealso:: 

783 

784 :meth:`_orm.Session.flush` - main documentation for flush 

785 

786 """ 

787 await greenlet_spawn(self.sync_session.flush, objects=objects) 

788 

789 def get_transaction(self) -> Optional[AsyncSessionTransaction]: 

790 """Return the current root transaction in progress, if any. 

791 

792 :return: an :class:`_asyncio.AsyncSessionTransaction` object, or 

793 ``None``. 

794 

795 .. versionadded:: 1.4.18 

796 

797 """ 

798 trans = self.sync_session.get_transaction() 

799 if trans is not None: 

800 return AsyncSessionTransaction._retrieve_proxy_for_target( 

801 trans, async_session=self 

802 ) 

803 else: 

804 return None 

805 

806 def get_nested_transaction(self) -> Optional[AsyncSessionTransaction]: 

807 """Return the current nested transaction in progress, if any. 

808 

809 :return: an :class:`_asyncio.AsyncSessionTransaction` object, or 

810 ``None``. 

811 

812 .. versionadded:: 1.4.18 

813 

814 """ 

815 

816 trans = self.sync_session.get_nested_transaction() 

817 if trans is not None: 

818 return AsyncSessionTransaction._retrieve_proxy_for_target( 

819 trans, async_session=self 

820 ) 

821 else: 

822 return None 

823 

824 def get_bind( 

825 self, 

826 mapper: Optional[_EntityBindKey[_O]] = None, 

827 clause: Optional[ClauseElement] = None, 

828 bind: Optional[_SessionBind] = None, 

829 **kw: Any, 

830 ) -> Union[Engine, Connection]: 

831 """Return a "bind" to which the synchronous proxied :class:`_orm.Session` 

832 is bound. 

833 

834 Unlike the :meth:`_orm.Session.get_bind` method, this method is 

835 currently **not** used by this :class:`.AsyncSession` in any way 

836 in order to resolve engines for requests. 

837 

838 .. note:: 

839 

840 This method proxies directly to the :meth:`_orm.Session.get_bind` 

841 method, however is currently **not** useful as an override target, 

842 in contrast to that of the :meth:`_orm.Session.get_bind` method. 

843 The example below illustrates how to implement custom 

844 :meth:`_orm.Session.get_bind` schemes that work with 

845 :class:`.AsyncSession` and :class:`.AsyncEngine`. 

846 

847 The pattern introduced at :ref:`session_custom_partitioning` 

848 illustrates how to apply a custom bind-lookup scheme to a 

849 :class:`_orm.Session` given a set of :class:`_engine.Engine` objects. 

850 To apply a corresponding :meth:`_orm.Session.get_bind` implementation 

851 for use with a :class:`.AsyncSession` and :class:`.AsyncEngine` 

852 objects, continue to subclass :class:`_orm.Session` and apply it to 

853 :class:`.AsyncSession` using 

854 :paramref:`.AsyncSession.sync_session_class`. The inner method must 

855 continue to return :class:`_engine.Engine` instances, which can be 

856 acquired from a :class:`_asyncio.AsyncEngine` using the 

857 :attr:`_asyncio.AsyncEngine.sync_engine` attribute:: 

858 

859 # using example from "Custom Vertical Partitioning" 

860 

861 

862 import random 

863 

864 from sqlalchemy.ext.asyncio import AsyncSession 

865 from sqlalchemy.ext.asyncio import create_async_engine 

866 from sqlalchemy.ext.asyncio import async_sessionmaker 

867 from sqlalchemy.orm import Session 

868 

869 # construct async engines w/ async drivers 

870 engines = { 

871 "leader": create_async_engine("sqlite+aiosqlite:///leader.db"), 

872 "other": create_async_engine("sqlite+aiosqlite:///other.db"), 

873 "follower1": create_async_engine("sqlite+aiosqlite:///follower1.db"), 

874 "follower2": create_async_engine("sqlite+aiosqlite:///follower2.db"), 

875 } 

876 

877 

878 class RoutingSession(Session): 

879 def get_bind(self, mapper=None, clause=None, **kw): 

880 # within get_bind(), return sync engines 

881 if mapper and issubclass(mapper.class_, MyOtherClass): 

882 return engines["other"].sync_engine 

883 elif self._flushing or isinstance(clause, (Update, Delete)): 

884 return engines["leader"].sync_engine 

885 else: 

886 return engines[ 

887 random.choice(["follower1", "follower2"]) 

888 ].sync_engine 

889 

890 

891 # apply to AsyncSession using sync_session_class 

892 AsyncSessionMaker = async_sessionmaker(sync_session_class=RoutingSession) 

893 

894 The :meth:`_orm.Session.get_bind` method is called in a non-asyncio, 

895 implicitly non-blocking context in the same manner as ORM event hooks 

896 and functions that are invoked via :meth:`.AsyncSession.run_sync`, so 

897 routines that wish to run SQL commands inside of 

898 :meth:`_orm.Session.get_bind` can continue to do so using 

899 blocking-style code, which will be translated to implicitly async calls 

900 at the point of invoking IO on the database drivers. 

901 

902 """ # noqa: E501 

903 

904 return self.sync_session.get_bind( 

905 mapper=mapper, clause=clause, bind=bind, **kw 

906 ) 

907 

908 async def connection( 

909 self, 

910 bind_arguments: Optional[_BindArguments] = None, 

911 execution_options: Optional[CoreExecuteOptionsParameter] = None, 

912 **kw: Any, 

913 ) -> AsyncConnection: 

914 r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to 

915 this :class:`.Session` object's transactional state. 

916 

917 This method may also be used to establish execution options for the 

918 database connection used by the current transaction. 

919 

920 .. versionadded:: 1.4.24 Added \**kw arguments which are passed 

921 through to the underlying :meth:`_orm.Session.connection` method. 

922 

923 .. seealso:: 

924 

925 :meth:`_orm.Session.connection` - main documentation for 

926 "connection" 

927 

928 """ 

929 

930 sync_connection = await greenlet_spawn( 

931 self.sync_session.connection, 

932 bind_arguments=bind_arguments, 

933 execution_options=execution_options, 

934 **kw, 

935 ) 

936 return engine.AsyncConnection._retrieve_proxy_for_target( 

937 sync_connection 

938 ) 

939 

940 def begin(self) -> AsyncSessionTransaction: 

941 """Return an :class:`_asyncio.AsyncSessionTransaction` object. 

942 

943 The underlying :class:`_orm.Session` will perform the 

944 "begin" action when the :class:`_asyncio.AsyncSessionTransaction` 

945 object is entered:: 

946 

947 async with async_session.begin(): 

948 ... # ORM transaction is begun 

949 

950 Note that database IO will not normally occur when the session-level 

951 transaction is begun, as database transactions begin on an 

952 on-demand basis. However, the begin block is async to accommodate 

953 for a :meth:`_orm.SessionEvents.after_transaction_create` 

954 event hook that may perform IO. 

955 

956 For a general description of ORM begin, see 

957 :meth:`_orm.Session.begin`. 

958 

959 """ 

960 

961 return AsyncSessionTransaction(self) 

962 

963 def begin_nested(self) -> AsyncSessionTransaction: 

964 """Return an :class:`_asyncio.AsyncSessionTransaction` object 

965 which will begin a "nested" transaction, e.g. SAVEPOINT. 

966 

967 Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`. 

968 

969 For a general description of ORM begin nested, see 

970 :meth:`_orm.Session.begin_nested`. 

971 

972 .. seealso:: 

973 

974 :ref:`aiosqlite_serializable` - special workarounds required 

975 with the SQLite asyncio driver in order for SAVEPOINT to work 

976 correctly. 

977 

978 """ 

979 

980 return AsyncSessionTransaction(self, nested=True) 

981 

982 async def rollback(self) -> None: 

983 """Rollback the current transaction in progress. 

984 

985 .. seealso:: 

986 

987 :meth:`_orm.Session.rollback` - main documentation for 

988 "rollback" 

989 """ 

990 await greenlet_spawn(self.sync_session.rollback) 

991 

992 async def commit(self) -> None: 

993 """Commit the current transaction in progress. 

994 

995 .. seealso:: 

996 

997 :meth:`_orm.Session.commit` - main documentation for 

998 "commit" 

999 """ 

1000 await greenlet_spawn(self.sync_session.commit) 

1001 

1002 async def close(self) -> None: 

1003 """Close out the transactional resources and ORM objects used by this 

1004 :class:`_asyncio.AsyncSession`. 

1005 

1006 .. seealso:: 

1007 

1008 :meth:`_orm.Session.close` - main documentation for 

1009 "close" 

1010 

1011 :ref:`session_closing` - detail on the semantics of 

1012 :meth:`_asyncio.AsyncSession.close` and 

1013 :meth:`_asyncio.AsyncSession.reset`. 

1014 

1015 """ 

1016 await greenlet_spawn(self.sync_session.close) 

1017 

1018 async def reset(self) -> None: 

1019 """Close out the transactional resources and ORM objects used by this 

1020 :class:`_orm.Session`, resetting the session to its initial state. 

1021 

1022 .. versionadded:: 2.0.22 

1023 

1024 .. seealso:: 

1025 

1026 :meth:`_orm.Session.reset` - main documentation for 

1027 "reset" 

1028 

1029 :ref:`session_closing` - detail on the semantics of 

1030 :meth:`_asyncio.AsyncSession.close` and 

1031 :meth:`_asyncio.AsyncSession.reset`. 

1032 

1033 """ 

1034 await greenlet_spawn(self.sync_session.reset) 

1035 

1036 async def aclose(self) -> None: 

1037 """A synonym for :meth:`_asyncio.AsyncSession.close`. 

1038 

1039 The :meth:`_asyncio.AsyncSession.aclose` name is specifically 

1040 to support the Python standard library ``@contextlib.aclosing`` 

1041 context manager function. 

1042 

1043 .. versionadded:: 2.0.20 

1044 

1045 """ 

1046 await self.close() 

1047 

1048 async def invalidate(self) -> None: 

1049 """Close this Session, using connection invalidation. 

1050 

1051 For a complete description, see :meth:`_orm.Session.invalidate`. 

1052 """ 

1053 await greenlet_spawn(self.sync_session.invalidate) 

1054 

1055 @classmethod 

1056 @util.deprecated( 

1057 "2.0", 

1058 "The :meth:`.AsyncSession.close_all` method is deprecated and will be " 

1059 "removed in a future release. Please refer to " 

1060 ":func:`_asyncio.close_all_sessions`.", 

1061 ) 

1062 async def close_all(cls) -> None: 

1063 """Close all :class:`_asyncio.AsyncSession` sessions.""" 

1064 await close_all_sessions() 

1065 

1066 async def __aenter__(self: _AS) -> _AS: 

1067 return self 

1068 

1069 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: 

1070 task = asyncio.create_task(self.close()) 

1071 await asyncio.shield(task) 

1072 

1073 def _maker_context_manager(self: _AS) -> _AsyncSessionContextManager[_AS]: 

1074 return _AsyncSessionContextManager(self) 

1075 

1076 # START PROXY METHODS AsyncSession 

1077 

1078 # code within this block is **programmatically, 

1079 # statically generated** by tools/generate_proxy_methods.py 

1080 

1081 def __contains__(self, instance: object) -> bool: 

1082 r"""Return True if the instance is associated with this session. 

1083 

1084 .. container:: class_bases 

1085 

1086 Proxied for the :class:`_orm.Session` class on 

1087 behalf of the :class:`_asyncio.AsyncSession` class. 

1088 

1089 The instance may be pending or persistent within the Session for a 

1090 result of True. 

1091 

1092 

1093 """ # noqa: E501 

1094 

1095 return self._proxied.__contains__(instance) 

1096 

1097 def __iter__(self) -> Iterator[object]: 

1098 r"""Iterate over all pending or persistent instances within this 

1099 Session. 

1100 

1101 .. container:: class_bases 

1102 

1103 Proxied for the :class:`_orm.Session` class on 

1104 behalf of the :class:`_asyncio.AsyncSession` class. 

1105 

1106 

1107 """ # noqa: E501 

1108 

1109 return self._proxied.__iter__() 

1110 

1111 def add(self, instance: object, _warn: bool = True) -> None: 

1112 r"""Place an object into this :class:`_orm.Session`. 

1113 

1114 .. container:: class_bases 

1115 

1116 Proxied for the :class:`_orm.Session` class on 

1117 behalf of the :class:`_asyncio.AsyncSession` class. 

1118 

1119 Objects that are in the :term:`transient` state when passed to the 

1120 :meth:`_orm.Session.add` method will move to the 

1121 :term:`pending` state, until the next flush, at which point they 

1122 will move to the :term:`persistent` state. 

1123 

1124 Objects that are in the :term:`detached` state when passed to the 

1125 :meth:`_orm.Session.add` method will move to the :term:`persistent` 

1126 state directly. 

1127 

1128 If the transaction used by the :class:`_orm.Session` is rolled back, 

1129 objects which were transient when they were passed to 

1130 :meth:`_orm.Session.add` will be moved back to the 

1131 :term:`transient` state, and will no longer be present within this 

1132 :class:`_orm.Session`. 

1133 

1134 .. seealso:: 

1135 

1136 :meth:`_orm.Session.add_all` 

1137 

1138 :ref:`session_adding` - at :ref:`session_basics` 

1139 

1140 

1141 """ # noqa: E501 

1142 

1143 return self._proxied.add(instance, _warn=_warn) 

1144 

1145 def add_all(self, instances: Iterable[object]) -> None: 

1146 r"""Add the given collection of instances to this :class:`_orm.Session`. 

1147 

1148 .. container:: class_bases 

1149 

1150 Proxied for the :class:`_orm.Session` class on 

1151 behalf of the :class:`_asyncio.AsyncSession` class. 

1152 

1153 See the documentation for :meth:`_orm.Session.add` for a general 

1154 behavioral description. 

1155 

1156 .. seealso:: 

1157 

1158 :meth:`_orm.Session.add` 

1159 

1160 :ref:`session_adding` - at :ref:`session_basics` 

1161 

1162 

1163 """ # noqa: E501 

1164 

1165 return self._proxied.add_all(instances) 

1166 

1167 def expire( 

1168 self, instance: object, attribute_names: Optional[Iterable[str]] = None 

1169 ) -> None: 

1170 r"""Expire the attributes on an instance. 

1171 

1172 .. container:: class_bases 

1173 

1174 Proxied for the :class:`_orm.Session` class on 

1175 behalf of the :class:`_asyncio.AsyncSession` class. 

1176 

1177 Marks the attributes of an instance as out of date. When an expired 

1178 attribute is next accessed, a query will be issued to the 

1179 :class:`.Session` object's current transactional context in order to 

1180 load all expired attributes for the given instance. Note that 

1181 a highly isolated transaction will return the same values as were 

1182 previously read in that same transaction, regardless of changes 

1183 in database state outside of that transaction. 

1184 

1185 To expire all objects in the :class:`.Session` simultaneously, 

1186 use :meth:`Session.expire_all`. 

1187 

1188 The :class:`.Session` object's default behavior is to 

1189 expire all state whenever the :meth:`Session.rollback` 

1190 or :meth:`Session.commit` methods are called, so that new 

1191 state can be loaded for the new transaction. For this reason, 

1192 calling :meth:`Session.expire` only makes sense for the specific 

1193 case that a non-ORM SQL statement was emitted in the current 

1194 transaction. 

1195 

1196 :param instance: The instance to be refreshed. 

1197 :param attribute_names: optional list of string attribute names 

1198 indicating a subset of attributes to be expired. 

1199 

1200 .. seealso:: 

1201 

1202 :ref:`session_expire` - introductory material 

1203 

1204 :meth:`.Session.expire` 

1205 

1206 :meth:`.Session.refresh` 

1207 

1208 :meth:`_orm.Query.populate_existing` 

1209 

1210 

1211 """ # noqa: E501 

1212 

1213 return self._proxied.expire(instance, attribute_names=attribute_names) 

1214 

1215 def expire_all(self) -> None: 

1216 r"""Expires all persistent instances within this Session. 

1217 

1218 .. container:: class_bases 

1219 

1220 Proxied for the :class:`_orm.Session` class on 

1221 behalf of the :class:`_asyncio.AsyncSession` class. 

1222 

1223 When any attributes on a persistent instance is next accessed, 

1224 a query will be issued using the 

1225 :class:`.Session` object's current transactional context in order to 

1226 load all expired attributes for the given instance. Note that 

1227 a highly isolated transaction will return the same values as were 

1228 previously read in that same transaction, regardless of changes 

1229 in database state outside of that transaction. 

1230 

1231 To expire individual objects and individual attributes 

1232 on those objects, use :meth:`Session.expire`. 

1233 

1234 The :class:`.Session` object's default behavior is to 

1235 expire all state whenever the :meth:`Session.rollback` 

1236 or :meth:`Session.commit` methods are called, so that new 

1237 state can be loaded for the new transaction. For this reason, 

1238 calling :meth:`Session.expire_all` is not usually needed, 

1239 assuming the transaction is isolated. 

1240 

1241 .. seealso:: 

1242 

1243 :ref:`session_expire` - introductory material 

1244 

1245 :meth:`.Session.expire` 

1246 

1247 :meth:`.Session.refresh` 

1248 

1249 :meth:`_orm.Query.populate_existing` 

1250 

1251 

1252 """ # noqa: E501 

1253 

1254 return self._proxied.expire_all() 

1255 

1256 def expunge(self, instance: object) -> None: 

1257 r"""Remove the `instance` from this ``Session``. 

1258 

1259 .. container:: class_bases 

1260 

1261 Proxied for the :class:`_orm.Session` class on 

1262 behalf of the :class:`_asyncio.AsyncSession` class. 

1263 

1264 This will free all internal references to the instance. Cascading 

1265 will be applied according to the *expunge* cascade rule. 

1266 

1267 

1268 """ # noqa: E501 

1269 

1270 return self._proxied.expunge(instance) 

1271 

1272 def expunge_all(self) -> None: 

1273 r"""Remove all object instances from this ``Session``. 

1274 

1275 .. container:: class_bases 

1276 

1277 Proxied for the :class:`_orm.Session` class on 

1278 behalf of the :class:`_asyncio.AsyncSession` class. 

1279 

1280 This is equivalent to calling ``expunge(obj)`` on all objects in this 

1281 ``Session``. 

1282 

1283 

1284 """ # noqa: E501 

1285 

1286 return self._proxied.expunge_all() 

1287 

1288 def is_modified( 

1289 self, instance: object, include_collections: bool = True 

1290 ) -> bool: 

1291 r"""Return ``True`` if the given instance has locally 

1292 modified attributes. 

1293 

1294 .. container:: class_bases 

1295 

1296 Proxied for the :class:`_orm.Session` class on 

1297 behalf of the :class:`_asyncio.AsyncSession` class. 

1298 

1299 This method retrieves the history for each instrumented 

1300 attribute on the instance and performs a comparison of the current 

1301 value to its previously flushed or committed value, if any. 

1302 

1303 It is in effect a more expensive and accurate 

1304 version of checking for the given instance in the 

1305 :attr:`.Session.dirty` collection; a full test for 

1306 each attribute's net "dirty" status is performed. 

1307 

1308 E.g.:: 

1309 

1310 return session.is_modified(someobject) 

1311 

1312 A few caveats to this method apply: 

1313 

1314 * Instances present in the :attr:`.Session.dirty` collection may 

1315 report ``False`` when tested with this method. This is because 

1316 the object may have received change events via attribute mutation, 

1317 thus placing it in :attr:`.Session.dirty`, but ultimately the state 

1318 is the same as that loaded from the database, resulting in no net 

1319 change here. 

1320 * Scalar attributes may not have recorded the previously set 

1321 value when a new value was applied, if the attribute was not loaded, 

1322 or was expired, at the time the new value was received - in these 

1323 cases, the attribute is assumed to have a change, even if there is 

1324 ultimately no net change against its database value. SQLAlchemy in 

1325 most cases does not need the "old" value when a set event occurs, so 

1326 it skips the expense of a SQL call if the old value isn't present, 

1327 based on the assumption that an UPDATE of the scalar value is 

1328 usually needed, and in those few cases where it isn't, is less 

1329 expensive on average than issuing a defensive SELECT. 

1330 

1331 The "old" value is fetched unconditionally upon set only if the 

1332 attribute container has the ``active_history`` flag set to ``True``. 

1333 This flag is set typically for primary key attributes and scalar 

1334 object references that are not a simple many-to-one. To set this 

1335 flag for any arbitrary mapped column, use the ``active_history`` 

1336 argument with :func:`.column_property`. 

1337 

1338 :param instance: mapped instance to be tested for pending changes. 

1339 :param include_collections: Indicates if multivalued collections 

1340 should be included in the operation. Setting this to ``False`` is a 

1341 way to detect only local-column based properties (i.e. scalar columns 

1342 or many-to-one foreign keys) that would result in an UPDATE for this 

1343 instance upon flush. 

1344 

1345 

1346 """ # noqa: E501 

1347 

1348 return self._proxied.is_modified( 

1349 instance, include_collections=include_collections 

1350 ) 

1351 

1352 def in_transaction(self) -> bool: 

1353 r"""Return True if this :class:`_orm.Session` has begun a transaction. 

1354 

1355 .. container:: class_bases 

1356 

1357 Proxied for the :class:`_orm.Session` class on 

1358 behalf of the :class:`_asyncio.AsyncSession` class. 

1359 

1360 .. versionadded:: 1.4 

1361 

1362 .. seealso:: 

1363 

1364 :attr:`_orm.Session.is_active` 

1365 

1366 

1367 

1368 """ # noqa: E501 

1369 

1370 return self._proxied.in_transaction() 

1371 

1372 def in_nested_transaction(self) -> bool: 

1373 r"""Return True if this :class:`_orm.Session` has begun a nested 

1374 transaction, e.g. SAVEPOINT. 

1375 

1376 .. container:: class_bases 

1377 

1378 Proxied for the :class:`_orm.Session` class on 

1379 behalf of the :class:`_asyncio.AsyncSession` class. 

1380 

1381 .. versionadded:: 1.4 

1382 

1383 

1384 """ # noqa: E501 

1385 

1386 return self._proxied.in_nested_transaction() 

1387 

1388 @property 

1389 def dirty(self) -> Any: 

1390 r"""The set of all persistent instances considered dirty. 

1391 

1392 .. container:: class_bases 

1393 

1394 Proxied for the :class:`_orm.Session` class 

1395 on behalf of the :class:`_asyncio.AsyncSession` class. 

1396 

1397 E.g.:: 

1398 

1399 some_mapped_object in session.dirty 

1400 

1401 Instances are considered dirty when they were modified but not 

1402 deleted. 

1403 

1404 Note that this 'dirty' calculation is 'optimistic'; most 

1405 attribute-setting or collection modification operations will 

1406 mark an instance as 'dirty' and place it in this set, even if 

1407 there is no net change to the attribute's value. At flush 

1408 time, the value of each attribute is compared to its 

1409 previously saved value, and if there's no net change, no SQL 

1410 operation will occur (this is a more expensive operation so 

1411 it's only done at flush time). 

1412 

1413 To check if an instance has actionable net changes to its 

1414 attributes, use the :meth:`.Session.is_modified` method. 

1415 

1416 

1417 """ # noqa: E501 

1418 

1419 return self._proxied.dirty 

1420 

1421 @property 

1422 def deleted(self) -> Any: 

1423 r"""The set of all instances marked as 'deleted' within this ``Session`` 

1424 

1425 .. container:: class_bases 

1426 

1427 Proxied for the :class:`_orm.Session` class 

1428 on behalf of the :class:`_asyncio.AsyncSession` class. 

1429 

1430 """ # noqa: E501 

1431 

1432 return self._proxied.deleted 

1433 

1434 @property 

1435 def new(self) -> Any: 

1436 r"""The set of all instances marked as 'new' within this ``Session``. 

1437 

1438 .. container:: class_bases 

1439 

1440 Proxied for the :class:`_orm.Session` class 

1441 on behalf of the :class:`_asyncio.AsyncSession` class. 

1442 

1443 """ # noqa: E501 

1444 

1445 return self._proxied.new 

1446 

1447 @property 

1448 def identity_map(self) -> IdentityMap: 

1449 r"""Proxy for the :attr:`_orm.Session.identity_map` attribute 

1450 on behalf of the :class:`_asyncio.AsyncSession` class. 

1451 

1452 """ # noqa: E501 

1453 

1454 return self._proxied.identity_map 

1455 

1456 @identity_map.setter 

1457 def identity_map(self, attr: IdentityMap) -> None: 

1458 self._proxied.identity_map = attr 

1459 

1460 @property 

1461 def is_active(self) -> Any: 

1462 r"""True if this :class:`.Session` not in "partial rollback" state. 

1463 

1464 .. container:: class_bases 

1465 

1466 Proxied for the :class:`_orm.Session` class 

1467 on behalf of the :class:`_asyncio.AsyncSession` class. 

1468 

1469 .. versionchanged:: 1.4 The :class:`_orm.Session` no longer begins 

1470 a new transaction immediately, so this attribute will be False 

1471 when the :class:`_orm.Session` is first instantiated. 

1472 

1473 "partial rollback" state typically indicates that the flush process 

1474 of the :class:`_orm.Session` has failed, and that the 

1475 :meth:`_orm.Session.rollback` method must be emitted in order to 

1476 fully roll back the transaction. 

1477 

1478 If this :class:`_orm.Session` is not in a transaction at all, the 

1479 :class:`_orm.Session` will autobegin when it is first used, so in this 

1480 case :attr:`_orm.Session.is_active` will return True. 

1481 

1482 Otherwise, if this :class:`_orm.Session` is within a transaction, 

1483 and that transaction has not been rolled back internally, the 

1484 :attr:`_orm.Session.is_active` will also return True. 

1485 

1486 .. seealso:: 

1487 

1488 :ref:`faq_session_rollback` 

1489 

1490 :meth:`_orm.Session.in_transaction` 

1491 

1492 

1493 """ # noqa: E501 

1494 

1495 return self._proxied.is_active 

1496 

1497 @property 

1498 def autoflush(self) -> bool: 

1499 r"""Proxy for the :attr:`_orm.Session.autoflush` attribute 

1500 on behalf of the :class:`_asyncio.AsyncSession` class. 

1501 

1502 """ # noqa: E501 

1503 

1504 return self._proxied.autoflush 

1505 

1506 @autoflush.setter 

1507 def autoflush(self, attr: bool) -> None: 

1508 self._proxied.autoflush = attr 

1509 

1510 @property 

1511 def no_autoflush(self) -> Any: 

1512 r"""Return a context manager that disables autoflush. 

1513 

1514 .. container:: class_bases 

1515 

1516 Proxied for the :class:`_orm.Session` class 

1517 on behalf of the :class:`_asyncio.AsyncSession` class. 

1518 

1519 e.g.:: 

1520 

1521 with session.no_autoflush: 

1522 

1523 some_object = SomeClass() 

1524 session.add(some_object) 

1525 # won't autoflush 

1526 some_object.related_thing = session.query(SomeRelated).first() 

1527 

1528 Operations that proceed within the ``with:`` block 

1529 will not be subject to flushes occurring upon query 

1530 access. This is useful when initializing a series 

1531 of objects which involve existing database queries, 

1532 where the uncompleted object should not yet be flushed. 

1533 

1534 

1535 """ # noqa: E501 

1536 

1537 return self._proxied.no_autoflush 

1538 

1539 @property 

1540 def info(self) -> Any: 

1541 r"""A user-modifiable dictionary. 

1542 

1543 .. container:: class_bases 

1544 

1545 Proxied for the :class:`_orm.Session` class 

1546 on behalf of the :class:`_asyncio.AsyncSession` class. 

1547 

1548 The initial value of this dictionary can be populated using the 

1549 ``info`` argument to the :class:`.Session` constructor or 

1550 :class:`.sessionmaker` constructor or factory methods. The dictionary 

1551 here is always local to this :class:`.Session` and can be modified 

1552 independently of all other :class:`.Session` objects. 

1553 

1554 

1555 """ # noqa: E501 

1556 

1557 return self._proxied.info 

1558 

1559 @classmethod 

1560 def object_session(cls, instance: object) -> Optional[Session]: 

1561 r"""Return the :class:`.Session` to which an object belongs. 

1562 

1563 .. container:: class_bases 

1564 

1565 Proxied for the :class:`_orm.Session` class on 

1566 behalf of the :class:`_asyncio.AsyncSession` class. 

1567 

1568 This is an alias of :func:`.object_session`. 

1569 

1570 

1571 """ # noqa: E501 

1572 

1573 return Session.object_session(instance) 

1574 

1575 @classmethod 

1576 def identity_key( 

1577 cls, 

1578 class_: Optional[Type[Any]] = None, 

1579 ident: Union[Any, Tuple[Any, ...]] = None, 

1580 *, 

1581 instance: Optional[Any] = None, 

1582 row: Optional[Union[Row[Any], RowMapping]] = None, 

1583 identity_token: Optional[Any] = None, 

1584 ) -> _IdentityKeyType[Any]: 

1585 r"""Return an identity key. 

1586 

1587 .. container:: class_bases 

1588 

1589 Proxied for the :class:`_orm.Session` class on 

1590 behalf of the :class:`_asyncio.AsyncSession` class. 

1591 

1592 This is an alias of :func:`.util.identity_key`. 

1593 

1594 

1595 """ # noqa: E501 

1596 

1597 return Session.identity_key( 

1598 class_=class_, 

1599 ident=ident, 

1600 instance=instance, 

1601 row=row, 

1602 identity_token=identity_token, 

1603 ) 

1604 

1605 # END PROXY METHODS AsyncSession 

1606 

1607 

1608_AS = TypeVar("_AS", bound="AsyncSession") 

1609 

1610 

1611class async_sessionmaker(Generic[_AS]): 

1612 """A configurable :class:`.AsyncSession` factory. 

1613 

1614 The :class:`.async_sessionmaker` factory works in the same way as the 

1615 :class:`.sessionmaker` factory, to generate new :class:`.AsyncSession` 

1616 objects when called, creating them given 

1617 the configurational arguments established here. 

1618 

1619 e.g.:: 

1620 

1621 from sqlalchemy.ext.asyncio import create_async_engine 

1622 from sqlalchemy.ext.asyncio import AsyncSession 

1623 from sqlalchemy.ext.asyncio import async_sessionmaker 

1624 

1625 

1626 async def run_some_sql( 

1627 async_session: async_sessionmaker[AsyncSession], 

1628 ) -> None: 

1629 async with async_session() as session: 

1630 session.add(SomeObject(data="object")) 

1631 session.add(SomeOtherObject(name="other object")) 

1632 await session.commit() 

1633 

1634 

1635 async def main() -> None: 

1636 # an AsyncEngine, which the AsyncSession will use for connection 

1637 # resources 

1638 engine = create_async_engine( 

1639 "postgresql+asyncpg://scott:tiger@localhost/" 

1640 ) 

1641 

1642 # create a reusable factory for new AsyncSession instances 

1643 async_session = async_sessionmaker(engine) 

1644 

1645 await run_some_sql(async_session) 

1646 

1647 await engine.dispose() 

1648 

1649 The :class:`.async_sessionmaker` is useful so that different parts 

1650 of a program can create new :class:`.AsyncSession` objects with a 

1651 fixed configuration established up front. Note that :class:`.AsyncSession` 

1652 objects may also be instantiated directly when not using 

1653 :class:`.async_sessionmaker`. 

1654 

1655 .. versionadded:: 2.0 :class:`.async_sessionmaker` provides a 

1656 :class:`.sessionmaker` class that's dedicated to the 

1657 :class:`.AsyncSession` object, including pep-484 typing support. 

1658 

1659 .. seealso:: 

1660 

1661 :ref:`asyncio_orm` - shows example use 

1662 

1663 :class:`.sessionmaker` - general overview of the 

1664 :class:`.sessionmaker` architecture 

1665 

1666 

1667 :ref:`session_getting` - introductory text on creating 

1668 sessions using :class:`.sessionmaker`. 

1669 

1670 """ # noqa E501 

1671 

1672 class_: Type[_AS] 

1673 

1674 @overload 

1675 def __init__( 

1676 self, 

1677 bind: Optional[_AsyncSessionBind] = ..., 

1678 *, 

1679 class_: Type[_AS], 

1680 autoflush: bool = ..., 

1681 expire_on_commit: bool = ..., 

1682 info: Optional[_InfoType] = ..., 

1683 **kw: Any, 

1684 ): ... 

1685 

1686 @overload 

1687 def __init__( 

1688 self: "async_sessionmaker[AsyncSession]", 

1689 bind: Optional[_AsyncSessionBind] = ..., 

1690 *, 

1691 autoflush: bool = ..., 

1692 expire_on_commit: bool = ..., 

1693 info: Optional[_InfoType] = ..., 

1694 **kw: Any, 

1695 ): ... 

1696 

1697 def __init__( 

1698 self, 

1699 bind: Optional[_AsyncSessionBind] = None, 

1700 *, 

1701 class_: Type[_AS] = AsyncSession, # type: ignore 

1702 autoflush: bool = True, 

1703 expire_on_commit: bool = True, 

1704 info: Optional[_InfoType] = None, 

1705 **kw: Any, 

1706 ): 

1707 r"""Construct a new :class:`.async_sessionmaker`. 

1708 

1709 All arguments here except for ``class_`` correspond to arguments 

1710 accepted by :class:`.Session` directly. See the 

1711 :meth:`.AsyncSession.__init__` docstring for more details on 

1712 parameters. 

1713 

1714 

1715 """ 

1716 kw["bind"] = bind 

1717 kw["autoflush"] = autoflush 

1718 kw["expire_on_commit"] = expire_on_commit 

1719 if info is not None: 

1720 kw["info"] = info 

1721 self.kw = kw 

1722 self.class_ = class_ 

1723 

1724 def begin(self) -> _AsyncSessionContextManager[_AS]: 

1725 """Produce a context manager that both provides a new 

1726 :class:`_orm.AsyncSession` as well as a transaction that commits. 

1727 

1728 

1729 e.g.:: 

1730 

1731 async def main(): 

1732 Session = async_sessionmaker(some_engine) 

1733 

1734 async with Session.begin() as session: 

1735 session.add(some_object) 

1736 

1737 # commits transaction, closes session 

1738 

1739 """ 

1740 

1741 session = self() 

1742 return session._maker_context_manager() 

1743 

1744 def __call__(self, **local_kw: Any) -> _AS: 

1745 """Produce a new :class:`.AsyncSession` object using the configuration 

1746 established in this :class:`.async_sessionmaker`. 

1747 

1748 In Python, the ``__call__`` method is invoked on an object when 

1749 it is "called" in the same way as a function:: 

1750 

1751 AsyncSession = async_sessionmaker(async_engine, expire_on_commit=False) 

1752 session = AsyncSession() # invokes sessionmaker.__call__() 

1753 

1754 """ # noqa E501 

1755 for k, v in self.kw.items(): 

1756 if k == "info" and "info" in local_kw: 

1757 d = v.copy() 

1758 d.update(local_kw["info"]) 

1759 local_kw["info"] = d 

1760 else: 

1761 local_kw.setdefault(k, v) 

1762 return self.class_(**local_kw) 

1763 

1764 def configure(self, **new_kw: Any) -> None: 

1765 """(Re)configure the arguments for this async_sessionmaker. 

1766 

1767 e.g.:: 

1768 

1769 AsyncSession = async_sessionmaker(some_engine) 

1770 

1771 AsyncSession.configure(bind=create_async_engine("sqlite+aiosqlite://")) 

1772 """ # noqa E501 

1773 

1774 self.kw.update(new_kw) 

1775 

1776 def __repr__(self) -> str: 

1777 return "%s(class_=%r, %s)" % ( 

1778 self.__class__.__name__, 

1779 self.class_.__name__, 

1780 ", ".join("%s=%r" % (k, v) for k, v in self.kw.items()), 

1781 ) 

1782 

1783 

1784class _AsyncSessionContextManager(Generic[_AS]): 

1785 __slots__ = ("async_session", "trans") 

1786 

1787 async_session: _AS 

1788 trans: AsyncSessionTransaction 

1789 

1790 def __init__(self, async_session: _AS): 

1791 self.async_session = async_session 

1792 

1793 async def __aenter__(self) -> _AS: 

1794 self.trans = self.async_session.begin() 

1795 await self.trans.__aenter__() 

1796 return self.async_session 

1797 

1798 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: 

1799 async def go() -> None: 

1800 await self.trans.__aexit__(type_, value, traceback) 

1801 await self.async_session.__aexit__(type_, value, traceback) 

1802 

1803 task = asyncio.create_task(go()) 

1804 await asyncio.shield(task) 

1805 

1806 

1807class AsyncSessionTransaction( 

1808 ReversibleProxy[SessionTransaction], 

1809 StartableContext["AsyncSessionTransaction"], 

1810): 

1811 """A wrapper for the ORM :class:`_orm.SessionTransaction` object. 

1812 

1813 This object is provided so that a transaction-holding object 

1814 for the :meth:`_asyncio.AsyncSession.begin` may be returned. 

1815 

1816 The object supports both explicit calls to 

1817 :meth:`_asyncio.AsyncSessionTransaction.commit` and 

1818 :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an 

1819 async context manager. 

1820 

1821 

1822 .. versionadded:: 1.4 

1823 

1824 """ 

1825 

1826 __slots__ = ("session", "sync_transaction", "nested") 

1827 

1828 session: AsyncSession 

1829 sync_transaction: Optional[SessionTransaction] 

1830 

1831 def __init__(self, session: AsyncSession, nested: bool = False): 

1832 self.session = session 

1833 self.nested = nested 

1834 self.sync_transaction = None 

1835 

1836 @property 

1837 def is_active(self) -> bool: 

1838 return ( 

1839 self._sync_transaction() is not None 

1840 and self._sync_transaction().is_active 

1841 ) 

1842 

1843 def _sync_transaction(self) -> SessionTransaction: 

1844 if not self.sync_transaction: 

1845 self._raise_for_not_started() 

1846 return self.sync_transaction 

1847 

1848 async def rollback(self) -> None: 

1849 """Roll back this :class:`_asyncio.AsyncTransaction`.""" 

1850 await greenlet_spawn(self._sync_transaction().rollback) 

1851 

1852 async def commit(self) -> None: 

1853 """Commit this :class:`_asyncio.AsyncTransaction`.""" 

1854 

1855 await greenlet_spawn(self._sync_transaction().commit) 

1856 

1857 @classmethod 

1858 def _regenerate_proxy_for_target( # type: ignore[override] 

1859 cls, 

1860 target: SessionTransaction, 

1861 async_session: AsyncSession, 

1862 **additional_kw: Any, # noqa: U100 

1863 ) -> AsyncSessionTransaction: 

1864 sync_transaction = target 

1865 nested = target.nested 

1866 obj = cls.__new__(cls) 

1867 obj.session = async_session 

1868 obj.sync_transaction = obj._assign_proxied(sync_transaction) 

1869 obj.nested = nested 

1870 return obj 

1871 

1872 async def start( 

1873 self, is_ctxmanager: bool = False 

1874 ) -> AsyncSessionTransaction: 

1875 self.sync_transaction = self._assign_proxied( 

1876 await greenlet_spawn( 

1877 self.session.sync_session.begin_nested 

1878 if self.nested 

1879 else self.session.sync_session.begin 

1880 ) 

1881 ) 

1882 if is_ctxmanager: 

1883 self.sync_transaction.__enter__() 

1884 return self 

1885 

1886 async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: 

1887 await greenlet_spawn( 

1888 self._sync_transaction().__exit__, type_, value, traceback 

1889 ) 

1890 

1891 

1892def async_object_session(instance: object) -> Optional[AsyncSession]: 

1893 """Return the :class:`_asyncio.AsyncSession` to which the given instance 

1894 belongs. 

1895 

1896 This function makes use of the sync-API function 

1897 :class:`_orm.object_session` to retrieve the :class:`_orm.Session` which 

1898 refers to the given instance, and from there links it to the original 

1899 :class:`_asyncio.AsyncSession`. 

1900 

1901 If the :class:`_asyncio.AsyncSession` has been garbage collected, the 

1902 return value is ``None``. 

1903 

1904 This functionality is also available from the 

1905 :attr:`_orm.InstanceState.async_session` accessor. 

1906 

1907 :param instance: an ORM mapped instance 

1908 :return: an :class:`_asyncio.AsyncSession` object, or ``None``. 

1909 

1910 .. versionadded:: 1.4.18 

1911 

1912 """ 

1913 

1914 session = object_session(instance) 

1915 if session is not None: 

1916 return async_session(session) 

1917 else: 

1918 return None 

1919 

1920 

1921def async_session(session: Session) -> Optional[AsyncSession]: 

1922 """Return the :class:`_asyncio.AsyncSession` which is proxying the given 

1923 :class:`_orm.Session` object, if any. 

1924 

1925 :param session: a :class:`_orm.Session` instance. 

1926 :return: a :class:`_asyncio.AsyncSession` instance, or ``None``. 

1927 

1928 .. versionadded:: 1.4.18 

1929 

1930 """ 

1931 return AsyncSession._retrieve_proxy_for_target(session, regenerate=False) 

1932 

1933 

1934async def close_all_sessions() -> None: 

1935 """Close all :class:`_asyncio.AsyncSession` sessions. 

1936 

1937 .. versionadded:: 2.0.23 

1938 

1939 .. seealso:: 

1940 

1941 :func:`.session.close_all_sessions` 

1942 

1943 """ 

1944 await greenlet_spawn(_sync_close_all_sessions) 

1945 

1946 

1947_instance_state._async_provider = async_session # type: ignore