Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_synchronization.py: 39%
264 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1from __future__ import annotations
3from collections import deque
4from dataclasses import dataclass
5from types import TracebackType
6from warnings import warn
8from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled
9from ._compat import DeprecatedAwaitable
10from ._eventloop import get_asynclib
11from ._exceptions import BusyResourceError, WouldBlock
12from ._tasks import CancelScope
13from ._testing import TaskInfo, get_current_task
16@dataclass(frozen=True)
17class EventStatistics:
18 """
19 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
20 """
22 tasks_waiting: int
25@dataclass(frozen=True)
26class CapacityLimiterStatistics:
27 """
28 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
29 :ivar float total_tokens: total number of available tokens
30 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from this
31 limiter
32 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.CapacityLimiter.acquire` or
33 :meth:`~.CapacityLimiter.acquire_on_behalf_of`
34 """
36 borrowed_tokens: int
37 total_tokens: float
38 borrowers: tuple[object, ...]
39 tasks_waiting: int
42@dataclass(frozen=True)
43class LockStatistics:
44 """
45 :ivar bool locked: flag indicating if this lock is locked or not
46 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the lock is not
47 held by any task)
48 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
49 """
51 locked: bool
52 owner: TaskInfo | None
53 tasks_waiting: int
56@dataclass(frozen=True)
57class ConditionStatistics:
58 """
59 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
60 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying :class:`~.Lock`
61 """
63 tasks_waiting: int
64 lock_statistics: LockStatistics
67@dataclass(frozen=True)
68class SemaphoreStatistics:
69 """
70 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
72 """
74 tasks_waiting: int
77class Event:
78 def __new__(cls) -> Event:
79 return get_asynclib().Event()
81 def set(self) -> DeprecatedAwaitable:
82 """Set the flag, notifying all listeners."""
83 raise NotImplementedError
85 def is_set(self) -> bool:
86 """Return ``True`` if the flag is set, ``False`` if not."""
87 raise NotImplementedError
89 async def wait(self) -> None:
90 """
91 Wait until the flag has been set.
93 If the flag has already been set when this method is called, it returns immediately.
95 """
96 raise NotImplementedError
98 def statistics(self) -> EventStatistics:
99 """Return statistics about the current state of this event."""
100 raise NotImplementedError
103class Lock:
104 _owner_task: TaskInfo | None = None
106 def __init__(self) -> None:
107 self._waiters: deque[tuple[TaskInfo, Event]] = deque()
109 async def __aenter__(self) -> None:
110 await self.acquire()
112 async def __aexit__(
113 self,
114 exc_type: type[BaseException] | None,
115 exc_val: BaseException | None,
116 exc_tb: TracebackType | None,
117 ) -> None:
118 self.release()
120 async def acquire(self) -> None:
121 """Acquire the lock."""
122 await checkpoint_if_cancelled()
123 try:
124 self.acquire_nowait()
125 except WouldBlock:
126 task = get_current_task()
127 event = Event()
128 token = task, event
129 self._waiters.append(token)
130 try:
131 await event.wait()
132 except BaseException:
133 if not event.is_set():
134 self._waiters.remove(token)
135 elif self._owner_task == task:
136 self.release()
138 raise
140 assert self._owner_task == task
141 else:
142 try:
143 await cancel_shielded_checkpoint()
144 except BaseException:
145 self.release()
146 raise
148 def acquire_nowait(self) -> None:
149 """
150 Acquire the lock, without blocking.
152 :raises ~WouldBlock: if the operation would block
154 """
155 task = get_current_task()
156 if self._owner_task == task:
157 raise RuntimeError("Attempted to acquire an already held Lock")
159 if self._owner_task is not None:
160 raise WouldBlock
162 self._owner_task = task
164 def release(self) -> DeprecatedAwaitable:
165 """Release the lock."""
166 if self._owner_task != get_current_task():
167 raise RuntimeError("The current task is not holding this lock")
169 if self._waiters:
170 self._owner_task, event = self._waiters.popleft()
171 event.set()
172 else:
173 del self._owner_task
175 return DeprecatedAwaitable(self.release)
177 def locked(self) -> bool:
178 """Return True if the lock is currently held."""
179 return self._owner_task is not None
181 def statistics(self) -> LockStatistics:
182 """
183 Return statistics about the current state of this lock.
185 .. versionadded:: 3.0
186 """
187 return LockStatistics(self.locked(), self._owner_task, len(self._waiters))
190class Condition:
191 _owner_task: TaskInfo | None = None
193 def __init__(self, lock: Lock | None = None):
194 self._lock = lock or Lock()
195 self._waiters: deque[Event] = deque()
197 async def __aenter__(self) -> None:
198 await self.acquire()
200 async def __aexit__(
201 self,
202 exc_type: type[BaseException] | None,
203 exc_val: BaseException | None,
204 exc_tb: TracebackType | None,
205 ) -> None:
206 self.release()
208 def _check_acquired(self) -> None:
209 if self._owner_task != get_current_task():
210 raise RuntimeError("The current task is not holding the underlying lock")
212 async def acquire(self) -> None:
213 """Acquire the underlying lock."""
214 await self._lock.acquire()
215 self._owner_task = get_current_task()
217 def acquire_nowait(self) -> None:
218 """
219 Acquire the underlying lock, without blocking.
221 :raises ~WouldBlock: if the operation would block
223 """
224 self._lock.acquire_nowait()
225 self._owner_task = get_current_task()
227 def release(self) -> DeprecatedAwaitable:
228 """Release the underlying lock."""
229 self._lock.release()
230 return DeprecatedAwaitable(self.release)
232 def locked(self) -> bool:
233 """Return True if the lock is set."""
234 return self._lock.locked()
236 def notify(self, n: int = 1) -> None:
237 """Notify exactly n listeners."""
238 self._check_acquired()
239 for _ in range(n):
240 try:
241 event = self._waiters.popleft()
242 except IndexError:
243 break
245 event.set()
247 def notify_all(self) -> None:
248 """Notify all the listeners."""
249 self._check_acquired()
250 for event in self._waiters:
251 event.set()
253 self._waiters.clear()
255 async def wait(self) -> None:
256 """Wait for a notification."""
257 await checkpoint()
258 event = Event()
259 self._waiters.append(event)
260 self.release()
261 try:
262 await event.wait()
263 except BaseException:
264 if not event.is_set():
265 self._waiters.remove(event)
267 raise
268 finally:
269 with CancelScope(shield=True):
270 await self.acquire()
272 def statistics(self) -> ConditionStatistics:
273 """
274 Return statistics about the current state of this condition.
276 .. versionadded:: 3.0
277 """
278 return ConditionStatistics(len(self._waiters), self._lock.statistics())
281class Semaphore:
282 def __init__(self, initial_value: int, *, max_value: int | None = None):
283 if not isinstance(initial_value, int):
284 raise TypeError("initial_value must be an integer")
285 if initial_value < 0:
286 raise ValueError("initial_value must be >= 0")
287 if max_value is not None:
288 if not isinstance(max_value, int):
289 raise TypeError("max_value must be an integer or None")
290 if max_value < initial_value:
291 raise ValueError(
292 "max_value must be equal to or higher than initial_value"
293 )
295 self._value = initial_value
296 self._max_value = max_value
297 self._waiters: deque[Event] = deque()
299 async def __aenter__(self) -> Semaphore:
300 await self.acquire()
301 return self
303 async def __aexit__(
304 self,
305 exc_type: type[BaseException] | None,
306 exc_val: BaseException | None,
307 exc_tb: TracebackType | None,
308 ) -> None:
309 self.release()
311 async def acquire(self) -> None:
312 """Decrement the semaphore value, blocking if necessary."""
313 await checkpoint_if_cancelled()
314 try:
315 self.acquire_nowait()
316 except WouldBlock:
317 event = Event()
318 self._waiters.append(event)
319 try:
320 await event.wait()
321 except BaseException:
322 if not event.is_set():
323 self._waiters.remove(event)
324 else:
325 self.release()
327 raise
328 else:
329 try:
330 await cancel_shielded_checkpoint()
331 except BaseException:
332 self.release()
333 raise
335 def acquire_nowait(self) -> None:
336 """
337 Acquire the underlying lock, without blocking.
339 :raises ~WouldBlock: if the operation would block
341 """
342 if self._value == 0:
343 raise WouldBlock
345 self._value -= 1
347 def release(self) -> DeprecatedAwaitable:
348 """Increment the semaphore value."""
349 if self._max_value is not None and self._value == self._max_value:
350 raise ValueError("semaphore released too many times")
352 if self._waiters:
353 self._waiters.popleft().set()
354 else:
355 self._value += 1
357 return DeprecatedAwaitable(self.release)
359 @property
360 def value(self) -> int:
361 """The current value of the semaphore."""
362 return self._value
364 @property
365 def max_value(self) -> int | None:
366 """The maximum value of the semaphore."""
367 return self._max_value
369 def statistics(self) -> SemaphoreStatistics:
370 """
371 Return statistics about the current state of this semaphore.
373 .. versionadded:: 3.0
374 """
375 return SemaphoreStatistics(len(self._waiters))
378class CapacityLimiter:
379 def __new__(cls, total_tokens: float) -> CapacityLimiter:
380 return get_asynclib().CapacityLimiter(total_tokens)
382 async def __aenter__(self) -> None:
383 raise NotImplementedError
385 async def __aexit__(
386 self,
387 exc_type: type[BaseException] | None,
388 exc_val: BaseException | None,
389 exc_tb: TracebackType | None,
390 ) -> bool | None:
391 raise NotImplementedError
393 @property
394 def total_tokens(self) -> float:
395 """
396 The total number of tokens available for borrowing.
398 This is a read-write property. If the total number of tokens is increased, the
399 proportionate number of tasks waiting on this limiter will be granted their tokens.
401 .. versionchanged:: 3.0
402 The property is now writable.
404 """
405 raise NotImplementedError
407 @total_tokens.setter
408 def total_tokens(self, value: float) -> None:
409 raise NotImplementedError
411 async def set_total_tokens(self, value: float) -> None:
412 warn(
413 "CapacityLimiter.set_total_tokens has been deprecated. Set the value of the"
414 '"total_tokens" attribute directly.',
415 DeprecationWarning,
416 )
417 self.total_tokens = value
419 @property
420 def borrowed_tokens(self) -> int:
421 """The number of tokens that have currently been borrowed."""
422 raise NotImplementedError
424 @property
425 def available_tokens(self) -> float:
426 """The number of tokens currently available to be borrowed"""
427 raise NotImplementedError
429 def acquire_nowait(self) -> DeprecatedAwaitable:
430 """
431 Acquire a token for the current task without waiting for one to become available.
433 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
435 """
436 raise NotImplementedError
438 def acquire_on_behalf_of_nowait(self, borrower: object) -> DeprecatedAwaitable:
439 """
440 Acquire a token without waiting for one to become available.
442 :param borrower: the entity borrowing a token
443 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
445 """
446 raise NotImplementedError
448 async def acquire(self) -> None:
449 """
450 Acquire a token for the current task, waiting if necessary for one to become available.
452 """
453 raise NotImplementedError
455 async def acquire_on_behalf_of(self, borrower: object) -> None:
456 """
457 Acquire a token, waiting if necessary for one to become available.
459 :param borrower: the entity borrowing a token
461 """
462 raise NotImplementedError
464 def release(self) -> None:
465 """
466 Release the token held by the current task.
467 :raises RuntimeError: if the current task has not borrowed a token from this limiter.
469 """
470 raise NotImplementedError
472 def release_on_behalf_of(self, borrower: object) -> None:
473 """
474 Release the token held by the given borrower.
476 :raises RuntimeError: if the borrower has not borrowed a token from this limiter.
478 """
479 raise NotImplementedError
481 def statistics(self) -> CapacityLimiterStatistics:
482 """
483 Return statistics about the current state of this limiter.
485 .. versionadded:: 3.0
487 """
488 raise NotImplementedError
491def create_lock() -> Lock:
492 """
493 Create an asynchronous lock.
495 :return: a lock object
497 .. deprecated:: 3.0
498 Use :class:`~Lock` directly.
500 """
501 warn("create_lock() is deprecated -- use Lock() directly", DeprecationWarning)
502 return Lock()
505def create_condition(lock: Lock | None = None) -> Condition:
506 """
507 Create an asynchronous condition.
509 :param lock: the lock to base the condition object on
510 :return: a condition object
512 .. deprecated:: 3.0
513 Use :class:`~Condition` directly.
515 """
516 warn(
517 "create_condition() is deprecated -- use Condition() directly",
518 DeprecationWarning,
519 )
520 return Condition(lock=lock)
523def create_event() -> Event:
524 """
525 Create an asynchronous event object.
527 :return: an event object
529 .. deprecated:: 3.0
530 Use :class:`~Event` directly.
532 """
533 warn("create_event() is deprecated -- use Event() directly", DeprecationWarning)
534 return get_asynclib().Event()
537def create_semaphore(value: int, *, max_value: int | None = None) -> Semaphore:
538 """
539 Create an asynchronous semaphore.
541 :param value: the semaphore's initial value
542 :param max_value: if set, makes this a "bounded" semaphore that raises :exc:`ValueError` if the
543 semaphore's value would exceed this number
544 :return: a semaphore object
546 .. deprecated:: 3.0
547 Use :class:`~Semaphore` directly.
549 """
550 warn(
551 "create_semaphore() is deprecated -- use Semaphore() directly",
552 DeprecationWarning,
553 )
554 return Semaphore(value, max_value=max_value)
557def create_capacity_limiter(total_tokens: float) -> CapacityLimiter:
558 """
559 Create a capacity limiter.
561 :param total_tokens: the total number of tokens available for borrowing (can be an integer or
562 :data:`math.inf`)
563 :return: a capacity limiter object
565 .. deprecated:: 3.0
566 Use :class:`~CapacityLimiter` directly.
568 """
569 warn(
570 "create_capacity_limiter() is deprecated -- use CapacityLimiter() directly",
571 DeprecationWarning,
572 )
573 return get_asynclib().CapacityLimiter(total_tokens)
576class ResourceGuard:
577 __slots__ = "action", "_guarded"
579 def __init__(self, action: str):
580 self.action = action
581 self._guarded = False
583 def __enter__(self) -> None:
584 if self._guarded:
585 raise BusyResourceError(self.action)
587 self._guarded = True
589 def __exit__(
590 self,
591 exc_type: type[BaseException] | None,
592 exc_val: BaseException | None,
593 exc_tb: TracebackType | None,
594 ) -> bool | None:
595 self._guarded = False
596 return None