Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/locks.py: 36%

28 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import asyncio 

2import collections 

3from typing import Any, Optional 

4 

5try: 

6 from typing import Deque 

7except ImportError: 

8 from typing_extensions import Deque 

9 

10 

11class EventResultOrError: 

12 """Event asyncio lock helper class. 

13 

14 Wraps the Event asyncio lock allowing either to awake the 

15 locked Tasks without any error or raising an exception. 

16 

17 thanks to @vorpalsmith for the simple design. 

18 """ 

19 

20 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

21 self._loop = loop 

22 self._exc: Optional[BaseException] = None 

23 self._event = asyncio.Event() 

24 self._waiters: Deque[asyncio.Future[Any]] = collections.deque() 

25 

26 def set(self, exc: Optional[BaseException] = None) -> None: 

27 self._exc = exc 

28 self._event.set() 

29 

30 async def wait(self) -> Any: 

31 waiter = self._loop.create_task(self._event.wait()) 

32 self._waiters.append(waiter) 

33 try: 

34 val = await waiter 

35 finally: 

36 self._waiters.remove(waiter) 

37 

38 if self._exc is not None: 

39 raise self._exc 

40 

41 return val 

42 

43 def cancel(self) -> None: 

44 """Cancel all waiters""" 

45 for waiter in self._waiters: 

46 waiter.cancel()