Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_synchronization.py: 38%
96 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import threading
2from types import TracebackType
3from typing import Optional, Type
5import sniffio
7from ._exceptions import ExceptionMapping, PoolTimeout, map_exceptions
9# Our async synchronization primatives use either 'anyio' or 'trio' depending
10# on if they're running under asyncio or trio.
12try:
13 import trio
14except ImportError: # pragma: nocover
15 trio = None # type: ignore
17try:
18 import anyio
19except ImportError: # pragma: nocover
20 anyio = None # type: ignore
23class AsyncLock:
24 def __init__(self) -> None:
25 self._backend = ""
27 def setup(self) -> None:
28 """
29 Detect if we're running under 'asyncio' or 'trio' and create
30 a lock with the correct implementation.
31 """
32 self._backend = sniffio.current_async_library()
33 if self._backend == "trio":
34 if trio is None: # pragma: nocover
35 raise RuntimeError(
36 "Running under trio, requires the 'trio' package to be installed."
37 )
38 self._trio_lock = trio.Lock()
39 else:
40 if anyio is None: # pragma: nocover
41 raise RuntimeError(
42 "Running under asyncio requires the 'anyio' package to be installed."
43 )
44 self._anyio_lock = anyio.Lock()
46 async def __aenter__(self) -> "AsyncLock":
47 if not self._backend:
48 self.setup()
50 if self._backend == "trio":
51 await self._trio_lock.acquire()
52 else:
53 await self._anyio_lock.acquire()
55 return self
57 async def __aexit__(
58 self,
59 exc_type: Optional[Type[BaseException]] = None,
60 exc_value: Optional[BaseException] = None,
61 traceback: Optional[TracebackType] = None,
62 ) -> None:
63 if self._backend == "trio":
64 self._trio_lock.release()
65 else:
66 self._anyio_lock.release()
69class AsyncEvent:
70 def __init__(self) -> None:
71 self._backend = ""
73 def setup(self) -> None:
74 """
75 Detect if we're running under 'asyncio' or 'trio' and create
76 a lock with the correct implementation.
77 """
78 self._backend = sniffio.current_async_library()
79 if self._backend == "trio":
80 if trio is None: # pragma: nocover
81 raise RuntimeError(
82 "Running under trio requires the 'trio' package to be installed."
83 )
84 self._trio_event = trio.Event()
85 else:
86 if anyio is None: # pragma: nocover
87 raise RuntimeError(
88 "Running under asyncio requires the 'anyio' package to be installed."
89 )
90 self._anyio_event = anyio.Event()
92 def set(self) -> None:
93 if not self._backend:
94 self.setup()
96 if self._backend == "trio":
97 self._trio_event.set()
98 else:
99 self._anyio_event.set()
101 async def wait(self, timeout: Optional[float] = None) -> None:
102 if not self._backend:
103 self.setup()
105 if self._backend == "trio":
106 if trio is None: # pragma: nocover
107 raise RuntimeError(
108 "Running under trio requires the 'trio' package to be installed."
109 )
111 trio_exc_map: ExceptionMapping = {trio.TooSlowError: PoolTimeout}
112 timeout_or_inf = float("inf") if timeout is None else timeout
113 with map_exceptions(trio_exc_map):
114 with trio.fail_after(timeout_or_inf):
115 await self._trio_event.wait()
116 else:
117 if anyio is None: # pragma: nocover
118 raise RuntimeError(
119 "Running under asyncio requires the 'anyio' package to be installed."
120 )
122 anyio_exc_map: ExceptionMapping = {TimeoutError: PoolTimeout}
123 with map_exceptions(anyio_exc_map):
124 with anyio.fail_after(timeout):
125 await self._anyio_event.wait()
128class AsyncSemaphore:
129 def __init__(self, bound: int) -> None:
130 self._bound = bound
131 self._backend = ""
133 def setup(self) -> None:
134 """
135 Detect if we're running under 'asyncio' or 'trio' and create
136 a semaphore with the correct implementation.
137 """
138 self._backend = sniffio.current_async_library()
139 if self._backend == "trio":
140 if trio is None: # pragma: nocover
141 raise RuntimeError(
142 "Running under trio requires the 'trio' package to be installed."
143 )
145 self._trio_semaphore = trio.Semaphore(
146 initial_value=self._bound, max_value=self._bound
147 )
148 else:
149 if anyio is None: # pragma: nocover
150 raise RuntimeError(
151 "Running under asyncio requires the 'anyio' package to be installed."
152 )
154 self._anyio_semaphore = anyio.Semaphore(
155 initial_value=self._bound, max_value=self._bound
156 )
158 async def acquire(self) -> None:
159 if not self._backend:
160 self.setup()
162 if self._backend == "trio":
163 await self._trio_semaphore.acquire()
164 else:
165 await self._anyio_semaphore.acquire()
167 async def release(self) -> None:
168 if self._backend == "trio":
169 self._trio_semaphore.release()
170 else:
171 self._anyio_semaphore.release()
174# Our thread-based synchronization primitives...
177class Lock:
178 def __init__(self) -> None:
179 self._lock = threading.Lock()
181 def __enter__(self) -> "Lock":
182 self._lock.acquire()
183 return self
185 def __exit__(
186 self,
187 exc_type: Optional[Type[BaseException]] = None,
188 exc_value: Optional[BaseException] = None,
189 traceback: Optional[TracebackType] = None,
190 ) -> None:
191 self._lock.release()
194class Event:
195 def __init__(self) -> None:
196 self._event = threading.Event()
198 def set(self) -> None:
199 self._event.set()
201 def wait(self, timeout: Optional[float] = None) -> None:
202 if not self._event.wait(timeout=timeout):
203 raise PoolTimeout() # pragma: nocover
206class Semaphore:
207 def __init__(self, bound: int) -> None:
208 self._semaphore = threading.Semaphore(value=bound)
210 def acquire(self) -> None:
211 self._semaphore.acquire()
213 def release(self) -> None:
214 self._semaphore.release()