Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/locks.py: 36%
28 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import asyncio
2import collections
3from typing import Any, Optional
5try:
6 from typing import Deque
7except ImportError:
8 from typing_extensions import Deque
11class EventResultOrError:
12 """Event asyncio lock helper class.
14 Wraps the Event asyncio lock allowing either to awake the
15 locked Tasks without any error or raising an exception.
17 thanks to @vorpalsmith for the simple design.
18 """
20 def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
21 self._loop = loop
22 self._exc: Optional[BaseException] = None
23 self._event = asyncio.Event()
24 self._waiters: Deque[asyncio.Future[Any]] = collections.deque()
26 def set(self, exc: Optional[BaseException] = None) -> None:
27 self._exc = exc
28 self._event.set()
30 async def wait(self) -> Any:
31 waiter = self._loop.create_task(self._event.wait())
32 self._waiters.append(waiter)
33 try:
34 val = await waiter
35 finally:
36 self._waiters.remove(waiter)
38 if self._exc is not None:
39 raise self._exc
41 return val
43 def cancel(self) -> None:
44 """Cancel all waiters"""
45 for waiter in self._waiters:
46 waiter.cancel()