Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/ext/asyncio/events.py: 41%
22 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# ext/asyncio/events.py
2# Copyright (C) 2020-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8from .engine import AsyncConnectable
9from .session import AsyncSession
10from ...engine import events as engine_event
11from ...orm import events as orm_event
14class AsyncConnectionEvents(engine_event.ConnectionEvents):
15 _target_class_doc = "SomeEngine"
16 _dispatch_target = AsyncConnectable
18 @classmethod
19 def _no_async_engine_events(cls):
20 raise NotImplementedError(
21 "asynchronous events are not implemented at this time. Apply "
22 "synchronous listeners to the AsyncEngine.sync_engine or "
23 "AsyncConnection.sync_connection attributes."
24 )
26 @classmethod
27 def _listen(cls, event_key, retval=False):
28 cls._no_async_engine_events()
31class AsyncSessionEvents(orm_event.SessionEvents):
32 _target_class_doc = "SomeSession"
33 _dispatch_target = AsyncSession
35 @classmethod
36 def _no_async_engine_events(cls):
37 raise NotImplementedError(
38 "asynchronous events are not implemented at this time. Apply "
39 "synchronous listeners to the AsyncSession.sync_session."
40 )
42 @classmethod
43 def _listen(cls, event_key, retval=False):
44 cls._no_async_engine_events()