Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/ext/asyncio/events.py: 41%

22 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# ext/asyncio/events.py 

2# Copyright (C) 2020-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8from .engine import AsyncConnectable 

9from .session import AsyncSession 

10from ...engine import events as engine_event 

11from ...orm import events as orm_event 

12 

13 

14class AsyncConnectionEvents(engine_event.ConnectionEvents): 

15 _target_class_doc = "SomeEngine" 

16 _dispatch_target = AsyncConnectable 

17 

18 @classmethod 

19 def _no_async_engine_events(cls): 

20 raise NotImplementedError( 

21 "asynchronous events are not implemented at this time. Apply " 

22 "synchronous listeners to the AsyncEngine.sync_engine or " 

23 "AsyncConnection.sync_connection attributes." 

24 ) 

25 

26 @classmethod 

27 def _listen(cls, event_key, retval=False): 

28 cls._no_async_engine_events() 

29 

30 

31class AsyncSessionEvents(orm_event.SessionEvents): 

32 _target_class_doc = "SomeSession" 

33 _dispatch_target = AsyncSession 

34 

35 @classmethod 

36 def _no_async_engine_events(cls): 

37 raise NotImplementedError( 

38 "asynchronous events are not implemented at this time. Apply " 

39 "synchronous listeners to the AsyncSession.sync_session." 

40 ) 

41 

42 @classmethod 

43 def _listen(cls, event_key, retval=False): 

44 cls._no_async_engine_events()