Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_synchronization.py: 40%
241 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
1from __future__ import annotations
3from collections import deque
4from dataclasses import dataclass
5from types import TracebackType
7from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled
8from ._eventloop import get_async_backend
9from ._exceptions import BusyResourceError, WouldBlock
10from ._tasks import CancelScope
11from ._testing import TaskInfo, get_current_task
14@dataclass(frozen=True)
15class EventStatistics:
16 """
17 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
18 """
20 tasks_waiting: int
23@dataclass(frozen=True)
24class CapacityLimiterStatistics:
25 """
26 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
27 :ivar float total_tokens: total number of available tokens
28 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from
29 this limiter
30 :ivar int tasks_waiting: number of tasks waiting on
31 :meth:`~.CapacityLimiter.acquire` or
32 :meth:`~.CapacityLimiter.acquire_on_behalf_of`
33 """
35 borrowed_tokens: int
36 total_tokens: float
37 borrowers: tuple[object, ...]
38 tasks_waiting: int
41@dataclass(frozen=True)
42class LockStatistics:
43 """
44 :ivar bool locked: flag indicating if this lock is locked or not
45 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the
46 lock is not held by any task)
47 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
48 """
50 locked: bool
51 owner: TaskInfo | None
52 tasks_waiting: int
55@dataclass(frozen=True)
56class ConditionStatistics:
57 """
58 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
59 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying
60 :class:`~.Lock`
61 """
63 tasks_waiting: int
64 lock_statistics: LockStatistics
67@dataclass(frozen=True)
68class SemaphoreStatistics:
69 """
70 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
72 """
74 tasks_waiting: int
77class Event:
78 def __new__(cls) -> Event:
79 return get_async_backend().create_event()
81 def set(self) -> None:
82 """Set the flag, notifying all listeners."""
83 raise NotImplementedError
85 def is_set(self) -> bool:
86 """Return ``True`` if the flag is set, ``False`` if not."""
87 raise NotImplementedError
89 async def wait(self) -> None:
90 """
91 Wait until the flag has been set.
93 If the flag has already been set when this method is called, it returns
94 immediately.
96 """
97 raise NotImplementedError
99 def statistics(self) -> EventStatistics:
100 """Return statistics about the current state of this event."""
101 raise NotImplementedError
104class Lock:
105 _owner_task: TaskInfo | None = None
107 def __init__(self) -> None:
108 self._waiters: deque[tuple[TaskInfo, Event]] = deque()
110 async def __aenter__(self) -> None:
111 await self.acquire()
113 async def __aexit__(
114 self,
115 exc_type: type[BaseException] | None,
116 exc_val: BaseException | None,
117 exc_tb: TracebackType | None,
118 ) -> None:
119 self.release()
121 async def acquire(self) -> None:
122 """Acquire the lock."""
123 await checkpoint_if_cancelled()
124 try:
125 self.acquire_nowait()
126 except WouldBlock:
127 task = get_current_task()
128 event = Event()
129 token = task, event
130 self._waiters.append(token)
131 try:
132 await event.wait()
133 except BaseException:
134 if not event.is_set():
135 self._waiters.remove(token)
136 elif self._owner_task == task:
137 self.release()
139 raise
141 assert self._owner_task == task
142 else:
143 try:
144 await cancel_shielded_checkpoint()
145 except BaseException:
146 self.release()
147 raise
149 def acquire_nowait(self) -> None:
150 """
151 Acquire the lock, without blocking.
153 :raises ~anyio.WouldBlock: if the operation would block
155 """
156 task = get_current_task()
157 if self._owner_task == task:
158 raise RuntimeError("Attempted to acquire an already held Lock")
160 if self._owner_task is not None:
161 raise WouldBlock
163 self._owner_task = task
165 def release(self) -> None:
166 """Release the lock."""
167 if self._owner_task != get_current_task():
168 raise RuntimeError("The current task is not holding this lock")
170 if self._waiters:
171 self._owner_task, event = self._waiters.popleft()
172 event.set()
173 else:
174 del self._owner_task
176 def locked(self) -> bool:
177 """Return True if the lock is currently held."""
178 return self._owner_task is not None
180 def statistics(self) -> LockStatistics:
181 """
182 Return statistics about the current state of this lock.
184 .. versionadded:: 3.0
185 """
186 return LockStatistics(self.locked(), self._owner_task, len(self._waiters))
189class Condition:
190 _owner_task: TaskInfo | None = None
192 def __init__(self, lock: Lock | None = None):
193 self._lock = lock or Lock()
194 self._waiters: deque[Event] = deque()
196 async def __aenter__(self) -> None:
197 await self.acquire()
199 async def __aexit__(
200 self,
201 exc_type: type[BaseException] | None,
202 exc_val: BaseException | None,
203 exc_tb: TracebackType | None,
204 ) -> None:
205 self.release()
207 def _check_acquired(self) -> None:
208 if self._owner_task != get_current_task():
209 raise RuntimeError("The current task is not holding the underlying lock")
211 async def acquire(self) -> None:
212 """Acquire the underlying lock."""
213 await self._lock.acquire()
214 self._owner_task = get_current_task()
216 def acquire_nowait(self) -> None:
217 """
218 Acquire the underlying lock, without blocking.
220 :raises ~anyio.WouldBlock: if the operation would block
222 """
223 self._lock.acquire_nowait()
224 self._owner_task = get_current_task()
226 def release(self) -> None:
227 """Release the underlying lock."""
228 self._lock.release()
230 def locked(self) -> bool:
231 """Return True if the lock is set."""
232 return self._lock.locked()
234 def notify(self, n: int = 1) -> None:
235 """Notify exactly n listeners."""
236 self._check_acquired()
237 for _ in range(n):
238 try:
239 event = self._waiters.popleft()
240 except IndexError:
241 break
243 event.set()
245 def notify_all(self) -> None:
246 """Notify all the listeners."""
247 self._check_acquired()
248 for event in self._waiters:
249 event.set()
251 self._waiters.clear()
253 async def wait(self) -> None:
254 """Wait for a notification."""
255 await checkpoint()
256 event = Event()
257 self._waiters.append(event)
258 self.release()
259 try:
260 await event.wait()
261 except BaseException:
262 if not event.is_set():
263 self._waiters.remove(event)
265 raise
266 finally:
267 with CancelScope(shield=True):
268 await self.acquire()
270 def statistics(self) -> ConditionStatistics:
271 """
272 Return statistics about the current state of this condition.
274 .. versionadded:: 3.0
275 """
276 return ConditionStatistics(len(self._waiters), self._lock.statistics())
279class Semaphore:
280 def __init__(self, initial_value: int, *, max_value: int | None = None):
281 if not isinstance(initial_value, int):
282 raise TypeError("initial_value must be an integer")
283 if initial_value < 0:
284 raise ValueError("initial_value must be >= 0")
285 if max_value is not None:
286 if not isinstance(max_value, int):
287 raise TypeError("max_value must be an integer or None")
288 if max_value < initial_value:
289 raise ValueError(
290 "max_value must be equal to or higher than initial_value"
291 )
293 self._value = initial_value
294 self._max_value = max_value
295 self._waiters: deque[Event] = deque()
297 async def __aenter__(self) -> Semaphore:
298 await self.acquire()
299 return self
301 async def __aexit__(
302 self,
303 exc_type: type[BaseException] | None,
304 exc_val: BaseException | None,
305 exc_tb: TracebackType | None,
306 ) -> None:
307 self.release()
309 async def acquire(self) -> None:
310 """Decrement the semaphore value, blocking if necessary."""
311 await checkpoint_if_cancelled()
312 try:
313 self.acquire_nowait()
314 except WouldBlock:
315 event = Event()
316 self._waiters.append(event)
317 try:
318 await event.wait()
319 except BaseException:
320 if not event.is_set():
321 self._waiters.remove(event)
322 else:
323 self.release()
325 raise
326 else:
327 try:
328 await cancel_shielded_checkpoint()
329 except BaseException:
330 self.release()
331 raise
333 def acquire_nowait(self) -> None:
334 """
335 Acquire the underlying lock, without blocking.
337 :raises ~anyio.WouldBlock: if the operation would block
339 """
340 if self._value == 0:
341 raise WouldBlock
343 self._value -= 1
345 def release(self) -> None:
346 """Increment the semaphore value."""
347 if self._max_value is not None and self._value == self._max_value:
348 raise ValueError("semaphore released too many times")
350 if self._waiters:
351 self._waiters.popleft().set()
352 else:
353 self._value += 1
355 @property
356 def value(self) -> int:
357 """The current value of the semaphore."""
358 return self._value
360 @property
361 def max_value(self) -> int | None:
362 """The maximum value of the semaphore."""
363 return self._max_value
365 def statistics(self) -> SemaphoreStatistics:
366 """
367 Return statistics about the current state of this semaphore.
369 .. versionadded:: 3.0
370 """
371 return SemaphoreStatistics(len(self._waiters))
374class CapacityLimiter:
375 def __new__(cls, total_tokens: float) -> CapacityLimiter:
376 return get_async_backend().create_capacity_limiter(total_tokens)
378 async def __aenter__(self) -> None:
379 raise NotImplementedError
381 async def __aexit__(
382 self,
383 exc_type: type[BaseException] | None,
384 exc_val: BaseException | None,
385 exc_tb: TracebackType | None,
386 ) -> bool | None:
387 raise NotImplementedError
389 @property
390 def total_tokens(self) -> float:
391 """
392 The total number of tokens available for borrowing.
394 This is a read-write property. If the total number of tokens is increased, the
395 proportionate number of tasks waiting on this limiter will be granted their
396 tokens.
398 .. versionchanged:: 3.0
399 The property is now writable.
401 """
402 raise NotImplementedError
404 @total_tokens.setter
405 def total_tokens(self, value: float) -> None:
406 raise NotImplementedError
408 @property
409 def borrowed_tokens(self) -> int:
410 """The number of tokens that have currently been borrowed."""
411 raise NotImplementedError
413 @property
414 def available_tokens(self) -> float:
415 """The number of tokens currently available to be borrowed"""
416 raise NotImplementedError
418 def acquire_nowait(self) -> None:
419 """
420 Acquire a token for the current task without waiting for one to become
421 available.
423 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
425 """
426 raise NotImplementedError
428 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
429 """
430 Acquire a token without waiting for one to become available.
432 :param borrower: the entity borrowing a token
433 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
435 """
436 raise NotImplementedError
438 async def acquire(self) -> None:
439 """
440 Acquire a token for the current task, waiting if necessary for one to become
441 available.
443 """
444 raise NotImplementedError
446 async def acquire_on_behalf_of(self, borrower: object) -> None:
447 """
448 Acquire a token, waiting if necessary for one to become available.
450 :param borrower: the entity borrowing a token
452 """
453 raise NotImplementedError
455 def release(self) -> None:
456 """
457 Release the token held by the current task.
459 :raises RuntimeError: if the current task has not borrowed a token from this
460 limiter.
462 """
463 raise NotImplementedError
465 def release_on_behalf_of(self, borrower: object) -> None:
466 """
467 Release the token held by the given borrower.
469 :raises RuntimeError: if the borrower has not borrowed a token from this
470 limiter.
472 """
473 raise NotImplementedError
475 def statistics(self) -> CapacityLimiterStatistics:
476 """
477 Return statistics about the current state of this limiter.
479 .. versionadded:: 3.0
481 """
482 raise NotImplementedError
485class ResourceGuard:
486 __slots__ = "action", "_guarded"
488 def __init__(self, action: str):
489 self.action = action
490 self._guarded = False
492 def __enter__(self) -> None:
493 if self._guarded:
494 raise BusyResourceError(self.action)
496 self._guarded = True
498 def __exit__(
499 self,
500 exc_type: type[BaseException] | None,
501 exc_val: BaseException | None,
502 exc_tb: TracebackType | None,
503 ) -> bool | None:
504 self._guarded = False
505 return None