1# ext/asyncio/events.py
2# Copyright (C) 2020-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8from .engine import AsyncConnectable
9from .session import AsyncSession
10from ...engine import events as engine_event
11from ...orm import events as orm_event
12
13
14class AsyncConnectionEvents(engine_event.ConnectionEvents):
15 _target_class_doc = "SomeEngine"
16 _dispatch_target = AsyncConnectable
17
18 @classmethod
19 def _no_async_engine_events(cls):
20 raise NotImplementedError(
21 "asynchronous events are not implemented at this time. Apply "
22 "synchronous listeners to the AsyncEngine.sync_engine or "
23 "AsyncConnection.sync_connection attributes."
24 )
25
26 @classmethod
27 def _listen(cls, event_key, retval=False):
28 cls._no_async_engine_events()
29
30
31class AsyncSessionEvents(orm_event.SessionEvents):
32 _target_class_doc = "SomeSession"
33 _dispatch_target = AsyncSession
34
35 @classmethod
36 def _no_async_engine_events(cls):
37 raise NotImplementedError(
38 "asynchronous events are not implemented at this time. Apply "
39 "synchronous listeners to the AsyncSession.sync_session."
40 )
41
42 @classmethod
43 def _listen(cls, event_key, retval=False):
44 cls._no_async_engine_events()