Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_synchronization.py: 39%
264 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1from collections import deque
2from dataclasses import dataclass
3from types import TracebackType
4from typing import Deque, Optional, Tuple, Type
5from warnings import warn
7from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled
8from ._compat import DeprecatedAwaitable
9from ._eventloop import get_asynclib
10from ._exceptions import BusyResourceError, WouldBlock
11from ._tasks import CancelScope
12from ._testing import TaskInfo, get_current_task
15@dataclass(frozen=True)
16class EventStatistics:
17 """
18 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
19 """
21 tasks_waiting: int
24@dataclass(frozen=True)
25class CapacityLimiterStatistics:
26 """
27 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
28 :ivar float total_tokens: total number of available tokens
29 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from this
30 limiter
31 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.CapacityLimiter.acquire` or
32 :meth:`~.CapacityLimiter.acquire_on_behalf_of`
33 """
35 borrowed_tokens: int
36 total_tokens: float
37 borrowers: Tuple[object, ...]
38 tasks_waiting: int
41@dataclass(frozen=True)
42class LockStatistics:
43 """
44 :ivar bool locked: flag indicating if this lock is locked or not
45 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the lock is not
46 held by any task)
47 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
48 """
50 locked: bool
51 owner: Optional[TaskInfo]
52 tasks_waiting: int
55@dataclass(frozen=True)
56class ConditionStatistics:
57 """
58 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
59 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying :class:`~.Lock`
60 """
62 tasks_waiting: int
63 lock_statistics: LockStatistics
66@dataclass(frozen=True)
67class SemaphoreStatistics:
68 """
69 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
71 """
73 tasks_waiting: int
76class Event:
77 def __new__(cls) -> "Event":
78 return get_asynclib().Event()
80 def set(self) -> DeprecatedAwaitable:
81 """Set the flag, notifying all listeners."""
82 raise NotImplementedError
84 def is_set(self) -> bool:
85 """Return ``True`` if the flag is set, ``False`` if not."""
86 raise NotImplementedError
88 async def wait(self) -> None:
89 """
90 Wait until the flag has been set.
92 If the flag has already been set when this method is called, it returns immediately.
94 """
95 raise NotImplementedError
97 def statistics(self) -> EventStatistics:
98 """Return statistics about the current state of this event."""
99 raise NotImplementedError
102class Lock:
103 _owner_task: Optional[TaskInfo] = None
105 def __init__(self) -> None:
106 self._waiters: Deque[Tuple[TaskInfo, Event]] = deque()
108 async def __aenter__(self) -> None:
109 await self.acquire()
111 async def __aexit__(
112 self,
113 exc_type: Optional[Type[BaseException]],
114 exc_val: Optional[BaseException],
115 exc_tb: Optional[TracebackType],
116 ) -> None:
117 self.release()
119 async def acquire(self) -> None:
120 """Acquire the lock."""
121 await checkpoint_if_cancelled()
122 try:
123 self.acquire_nowait()
124 except WouldBlock:
125 task = get_current_task()
126 event = Event()
127 token = task, event
128 self._waiters.append(token)
129 try:
130 await event.wait()
131 except BaseException:
132 if not event.is_set():
133 self._waiters.remove(token)
134 elif self._owner_task == task:
135 self.release()
137 raise
139 assert self._owner_task == task
140 else:
141 try:
142 await cancel_shielded_checkpoint()
143 except BaseException:
144 self.release()
145 raise
147 def acquire_nowait(self) -> None:
148 """
149 Acquire the lock, without blocking.
151 :raises ~WouldBlock: if the operation would block
153 """
154 task = get_current_task()
155 if self._owner_task == task:
156 raise RuntimeError("Attempted to acquire an already held Lock")
158 if self._owner_task is not None:
159 raise WouldBlock
161 self._owner_task = task
163 def release(self) -> DeprecatedAwaitable:
164 """Release the lock."""
165 if self._owner_task != get_current_task():
166 raise RuntimeError("The current task is not holding this lock")
168 if self._waiters:
169 self._owner_task, event = self._waiters.popleft()
170 event.set()
171 else:
172 del self._owner_task
174 return DeprecatedAwaitable(self.release)
176 def locked(self) -> bool:
177 """Return True if the lock is currently held."""
178 return self._owner_task is not None
180 def statistics(self) -> LockStatistics:
181 """
182 Return statistics about the current state of this lock.
184 .. versionadded:: 3.0
185 """
186 return LockStatistics(self.locked(), self._owner_task, len(self._waiters))
189class Condition:
190 _owner_task: Optional[TaskInfo] = None
192 def __init__(self, lock: Optional[Lock] = None):
193 self._lock = lock or Lock()
194 self._waiters: Deque[Event] = deque()
196 async def __aenter__(self) -> None:
197 await self.acquire()
199 async def __aexit__(
200 self,
201 exc_type: Optional[Type[BaseException]],
202 exc_val: Optional[BaseException],
203 exc_tb: Optional[TracebackType],
204 ) -> None:
205 self.release()
207 def _check_acquired(self) -> None:
208 if self._owner_task != get_current_task():
209 raise RuntimeError("The current task is not holding the underlying lock")
211 async def acquire(self) -> None:
212 """Acquire the underlying lock."""
213 await self._lock.acquire()
214 self._owner_task = get_current_task()
216 def acquire_nowait(self) -> None:
217 """
218 Acquire the underlying lock, without blocking.
220 :raises ~WouldBlock: if the operation would block
222 """
223 self._lock.acquire_nowait()
224 self._owner_task = get_current_task()
226 def release(self) -> DeprecatedAwaitable:
227 """Release the underlying lock."""
228 self._lock.release()
229 return DeprecatedAwaitable(self.release)
231 def locked(self) -> bool:
232 """Return True if the lock is set."""
233 return self._lock.locked()
235 def notify(self, n: int = 1) -> None:
236 """Notify exactly n listeners."""
237 self._check_acquired()
238 for _ in range(n):
239 try:
240 event = self._waiters.popleft()
241 except IndexError:
242 break
244 event.set()
246 def notify_all(self) -> None:
247 """Notify all the listeners."""
248 self._check_acquired()
249 for event in self._waiters:
250 event.set()
252 self._waiters.clear()
254 async def wait(self) -> None:
255 """Wait for a notification."""
256 await checkpoint()
257 event = Event()
258 self._waiters.append(event)
259 self.release()
260 try:
261 await event.wait()
262 except BaseException:
263 if not event.is_set():
264 self._waiters.remove(event)
266 raise
267 finally:
268 with CancelScope(shield=True):
269 await self.acquire()
271 def statistics(self) -> ConditionStatistics:
272 """
273 Return statistics about the current state of this condition.
275 .. versionadded:: 3.0
276 """
277 return ConditionStatistics(len(self._waiters), self._lock.statistics())
280class Semaphore:
281 def __init__(self, initial_value: int, *, max_value: Optional[int] = None):
282 if not isinstance(initial_value, int):
283 raise TypeError("initial_value must be an integer")
284 if initial_value < 0:
285 raise ValueError("initial_value must be >= 0")
286 if max_value is not None:
287 if not isinstance(max_value, int):
288 raise TypeError("max_value must be an integer or None")
289 if max_value < initial_value:
290 raise ValueError(
291 "max_value must be equal to or higher than initial_value"
292 )
294 self._value = initial_value
295 self._max_value = max_value
296 self._waiters: Deque[Event] = deque()
298 async def __aenter__(self) -> "Semaphore":
299 await self.acquire()
300 return self
302 async def __aexit__(
303 self,
304 exc_type: Optional[Type[BaseException]],
305 exc_val: Optional[BaseException],
306 exc_tb: Optional[TracebackType],
307 ) -> None:
308 self.release()
310 async def acquire(self) -> None:
311 """Decrement the semaphore value, blocking if necessary."""
312 await checkpoint_if_cancelled()
313 try:
314 self.acquire_nowait()
315 except WouldBlock:
316 event = Event()
317 self._waiters.append(event)
318 try:
319 await event.wait()
320 except BaseException:
321 if not event.is_set():
322 self._waiters.remove(event)
323 else:
324 self.release()
326 raise
327 else:
328 try:
329 await cancel_shielded_checkpoint()
330 except BaseException:
331 self.release()
332 raise
334 def acquire_nowait(self) -> None:
335 """
336 Acquire the underlying lock, without blocking.
338 :raises ~WouldBlock: if the operation would block
340 """
341 if self._value == 0:
342 raise WouldBlock
344 self._value -= 1
346 def release(self) -> DeprecatedAwaitable:
347 """Increment the semaphore value."""
348 if self._max_value is not None and self._value == self._max_value:
349 raise ValueError("semaphore released too many times")
351 if self._waiters:
352 self._waiters.popleft().set()
353 else:
354 self._value += 1
356 return DeprecatedAwaitable(self.release)
358 @property
359 def value(self) -> int:
360 """The current value of the semaphore."""
361 return self._value
363 @property
364 def max_value(self) -> Optional[int]:
365 """The maximum value of the semaphore."""
366 return self._max_value
368 def statistics(self) -> SemaphoreStatistics:
369 """
370 Return statistics about the current state of this semaphore.
372 .. versionadded:: 3.0
373 """
374 return SemaphoreStatistics(len(self._waiters))
377class CapacityLimiter:
378 def __new__(cls, total_tokens: float) -> "CapacityLimiter":
379 return get_asynclib().CapacityLimiter(total_tokens)
381 async def __aenter__(self) -> None:
382 raise NotImplementedError
384 async def __aexit__(
385 self,
386 exc_type: Optional[Type[BaseException]],
387 exc_val: Optional[BaseException],
388 exc_tb: Optional[TracebackType],
389 ) -> Optional[bool]:
390 raise NotImplementedError
392 @property
393 def total_tokens(self) -> float:
394 """
395 The total number of tokens available for borrowing.
397 This is a read-write property. If the total number of tokens is increased, the
398 proportionate number of tasks waiting on this limiter will be granted their tokens.
400 .. versionchanged:: 3.0
401 The property is now writable.
403 """
404 raise NotImplementedError
406 @total_tokens.setter
407 def total_tokens(self, value: float) -> None:
408 raise NotImplementedError
410 async def set_total_tokens(self, value: float) -> None:
411 warn(
412 "CapacityLimiter.set_total_tokens has been deprecated. Set the value of the"
413 '"total_tokens" attribute directly.',
414 DeprecationWarning,
415 )
416 self.total_tokens = value
418 @property
419 def borrowed_tokens(self) -> int:
420 """The number of tokens that have currently been borrowed."""
421 raise NotImplementedError
423 @property
424 def available_tokens(self) -> float:
425 """The number of tokens currently available to be borrowed"""
426 raise NotImplementedError
428 def acquire_nowait(self) -> DeprecatedAwaitable:
429 """
430 Acquire a token for the current task without waiting for one to become available.
432 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
434 """
435 raise NotImplementedError
437 def acquire_on_behalf_of_nowait(self, borrower: object) -> DeprecatedAwaitable:
438 """
439 Acquire a token without waiting for one to become available.
441 :param borrower: the entity borrowing a token
442 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
444 """
445 raise NotImplementedError
447 async def acquire(self) -> None:
448 """
449 Acquire a token for the current task, waiting if necessary for one to become available.
451 """
452 raise NotImplementedError
454 async def acquire_on_behalf_of(self, borrower: object) -> None:
455 """
456 Acquire a token, waiting if necessary for one to become available.
458 :param borrower: the entity borrowing a token
460 """
461 raise NotImplementedError
463 def release(self) -> None:
464 """
465 Release the token held by the current task.
466 :raises RuntimeError: if the current task has not borrowed a token from this limiter.
468 """
469 raise NotImplementedError
471 def release_on_behalf_of(self, borrower: object) -> None:
472 """
473 Release the token held by the given borrower.
475 :raises RuntimeError: if the borrower has not borrowed a token from this limiter.
477 """
478 raise NotImplementedError
480 def statistics(self) -> CapacityLimiterStatistics:
481 """
482 Return statistics about the current state of this limiter.
484 .. versionadded:: 3.0
486 """
487 raise NotImplementedError
490def create_lock() -> Lock:
491 """
492 Create an asynchronous lock.
494 :return: a lock object
496 .. deprecated:: 3.0
497 Use :class:`~Lock` directly.
499 """
500 warn("create_lock() is deprecated -- use Lock() directly", DeprecationWarning)
501 return Lock()
504def create_condition(lock: Optional[Lock] = None) -> Condition:
505 """
506 Create an asynchronous condition.
508 :param lock: the lock to base the condition object on
509 :return: a condition object
511 .. deprecated:: 3.0
512 Use :class:`~Condition` directly.
514 """
515 warn(
516 "create_condition() is deprecated -- use Condition() directly",
517 DeprecationWarning,
518 )
519 return Condition(lock=lock)
522def create_event() -> Event:
523 """
524 Create an asynchronous event object.
526 :return: an event object
528 .. deprecated:: 3.0
529 Use :class:`~Event` directly.
531 """
532 warn("create_event() is deprecated -- use Event() directly", DeprecationWarning)
533 return get_asynclib().Event()
536def create_semaphore(value: int, *, max_value: Optional[int] = None) -> Semaphore:
537 """
538 Create an asynchronous semaphore.
540 :param value: the semaphore's initial value
541 :param max_value: if set, makes this a "bounded" semaphore that raises :exc:`ValueError` if the
542 semaphore's value would exceed this number
543 :return: a semaphore object
545 .. deprecated:: 3.0
546 Use :class:`~Semaphore` directly.
548 """
549 warn(
550 "create_semaphore() is deprecated -- use Semaphore() directly",
551 DeprecationWarning,
552 )
553 return Semaphore(value, max_value=max_value)
556def create_capacity_limiter(total_tokens: float) -> CapacityLimiter:
557 """
558 Create a capacity limiter.
560 :param total_tokens: the total number of tokens available for borrowing (can be an integer or
561 :data:`math.inf`)
562 :return: a capacity limiter object
564 .. deprecated:: 3.0
565 Use :class:`~CapacityLimiter` directly.
567 """
568 warn(
569 "create_capacity_limiter() is deprecated -- use CapacityLimiter() directly",
570 DeprecationWarning,
571 )
572 return get_asynclib().CapacityLimiter(total_tokens)
575class ResourceGuard:
576 __slots__ = "action", "_guarded"
578 def __init__(self, action: str):
579 self.action = action
580 self._guarded = False
582 def __enter__(self) -> None:
583 if self._guarded:
584 raise BusyResourceError(self.action)
586 self._guarded = True
588 def __exit__(
589 self,
590 exc_type: Optional[Type[BaseException]],
591 exc_val: Optional[BaseException],
592 exc_tb: Optional[TracebackType],
593 ) -> Optional[bool]:
594 self._guarded = False
595 return None