1from __future__ import annotations
2
3import math
4from collections import deque
5from collections.abc import Callable
6from dataclasses import dataclass
7from types import TracebackType
8from typing import TypeVar
9
10from sniffio import AsyncLibraryNotFoundError
11
12from ..lowlevel import checkpoint_if_cancelled
13from ._eventloop import get_async_backend
14from ._exceptions import BusyResourceError
15from ._tasks import CancelScope
16from ._testing import TaskInfo, get_current_task
17
18T = TypeVar("T")
19
20
21@dataclass(frozen=True)
22class EventStatistics:
23 """
24 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
25 """
26
27 tasks_waiting: int
28
29
30@dataclass(frozen=True)
31class CapacityLimiterStatistics:
32 """
33 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
34 :ivar float total_tokens: total number of available tokens
35 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from
36 this limiter
37 :ivar int tasks_waiting: number of tasks waiting on
38 :meth:`~.CapacityLimiter.acquire` or
39 :meth:`~.CapacityLimiter.acquire_on_behalf_of`
40 """
41
42 borrowed_tokens: int
43 total_tokens: float
44 borrowers: tuple[object, ...]
45 tasks_waiting: int
46
47
48@dataclass(frozen=True)
49class LockStatistics:
50 """
51 :ivar bool locked: flag indicating if this lock is locked or not
52 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the
53 lock is not held by any task)
54 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
55 """
56
57 locked: bool
58 owner: TaskInfo | None
59 tasks_waiting: int
60
61
62@dataclass(frozen=True)
63class ConditionStatistics:
64 """
65 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
66 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying
67 :class:`~.Lock`
68 """
69
70 tasks_waiting: int
71 lock_statistics: LockStatistics
72
73
74@dataclass(frozen=True)
75class SemaphoreStatistics:
76 """
77 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
78
79 """
80
81 tasks_waiting: int
82
83
84class Event:
85 def __new__(cls) -> Event:
86 try:
87 return get_async_backend().create_event()
88 except AsyncLibraryNotFoundError:
89 return EventAdapter()
90
91 def set(self) -> None:
92 """Set the flag, notifying all listeners."""
93 raise NotImplementedError
94
95 def is_set(self) -> bool:
96 """Return ``True`` if the flag is set, ``False`` if not."""
97 raise NotImplementedError
98
99 async def wait(self) -> None:
100 """
101 Wait until the flag has been set.
102
103 If the flag has already been set when this method is called, it returns
104 immediately.
105
106 """
107 raise NotImplementedError
108
109 def statistics(self) -> EventStatistics:
110 """Return statistics about the current state of this event."""
111 raise NotImplementedError
112
113
114class EventAdapter(Event):
115 _internal_event: Event | None = None
116 _is_set: bool = False
117
118 def __new__(cls) -> EventAdapter:
119 return object.__new__(cls)
120
121 @property
122 def _event(self) -> Event:
123 if self._internal_event is None:
124 self._internal_event = get_async_backend().create_event()
125 if self._is_set:
126 self._internal_event.set()
127
128 return self._internal_event
129
130 def set(self) -> None:
131 if self._internal_event is None:
132 self._is_set = True
133 else:
134 self._event.set()
135
136 def is_set(self) -> bool:
137 if self._internal_event is None:
138 return self._is_set
139
140 return self._internal_event.is_set()
141
142 async def wait(self) -> None:
143 await self._event.wait()
144
145 def statistics(self) -> EventStatistics:
146 if self._internal_event is None:
147 return EventStatistics(tasks_waiting=0)
148
149 return self._internal_event.statistics()
150
151
152class Lock:
153 def __new__(cls, *, fast_acquire: bool = False) -> Lock:
154 try:
155 return get_async_backend().create_lock(fast_acquire=fast_acquire)
156 except AsyncLibraryNotFoundError:
157 return LockAdapter(fast_acquire=fast_acquire)
158
159 async def __aenter__(self) -> None:
160 await self.acquire()
161
162 async def __aexit__(
163 self,
164 exc_type: type[BaseException] | None,
165 exc_val: BaseException | None,
166 exc_tb: TracebackType | None,
167 ) -> None:
168 self.release()
169
170 async def acquire(self) -> None:
171 """Acquire the lock."""
172 raise NotImplementedError
173
174 def acquire_nowait(self) -> None:
175 """
176 Acquire the lock, without blocking.
177
178 :raises ~anyio.WouldBlock: if the operation would block
179
180 """
181 raise NotImplementedError
182
183 def release(self) -> None:
184 """Release the lock."""
185 raise NotImplementedError
186
187 def locked(self) -> bool:
188 """Return True if the lock is currently held."""
189 raise NotImplementedError
190
191 def statistics(self) -> LockStatistics:
192 """
193 Return statistics about the current state of this lock.
194
195 .. versionadded:: 3.0
196 """
197 raise NotImplementedError
198
199
200class LockAdapter(Lock):
201 _internal_lock: Lock | None = None
202
203 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter:
204 return object.__new__(cls)
205
206 def __init__(self, *, fast_acquire: bool = False):
207 self._fast_acquire = fast_acquire
208
209 @property
210 def _lock(self) -> Lock:
211 if self._internal_lock is None:
212 self._internal_lock = get_async_backend().create_lock(
213 fast_acquire=self._fast_acquire
214 )
215
216 return self._internal_lock
217
218 async def __aenter__(self) -> None:
219 await self._lock.acquire()
220
221 async def __aexit__(
222 self,
223 exc_type: type[BaseException] | None,
224 exc_val: BaseException | None,
225 exc_tb: TracebackType | None,
226 ) -> None:
227 if self._internal_lock is not None:
228 self._internal_lock.release()
229
230 async def acquire(self) -> None:
231 """Acquire the lock."""
232 await self._lock.acquire()
233
234 def acquire_nowait(self) -> None:
235 """
236 Acquire the lock, without blocking.
237
238 :raises ~anyio.WouldBlock: if the operation would block
239
240 """
241 self._lock.acquire_nowait()
242
243 def release(self) -> None:
244 """Release the lock."""
245 self._lock.release()
246
247 def locked(self) -> bool:
248 """Return True if the lock is currently held."""
249 return self._lock.locked()
250
251 def statistics(self) -> LockStatistics:
252 """
253 Return statistics about the current state of this lock.
254
255 .. versionadded:: 3.0
256
257 """
258 if self._internal_lock is None:
259 return LockStatistics(False, None, 0)
260
261 return self._internal_lock.statistics()
262
263
264class Condition:
265 _owner_task: TaskInfo | None = None
266
267 def __init__(self, lock: Lock | None = None):
268 self._lock = lock or Lock()
269 self._waiters: deque[Event] = deque()
270
271 async def __aenter__(self) -> None:
272 await self.acquire()
273
274 async def __aexit__(
275 self,
276 exc_type: type[BaseException] | None,
277 exc_val: BaseException | None,
278 exc_tb: TracebackType | None,
279 ) -> None:
280 self.release()
281
282 def _check_acquired(self) -> None:
283 if self._owner_task != get_current_task():
284 raise RuntimeError("The current task is not holding the underlying lock")
285
286 async def acquire(self) -> None:
287 """Acquire the underlying lock."""
288 await self._lock.acquire()
289 self._owner_task = get_current_task()
290
291 def acquire_nowait(self) -> None:
292 """
293 Acquire the underlying lock, without blocking.
294
295 :raises ~anyio.WouldBlock: if the operation would block
296
297 """
298 self._lock.acquire_nowait()
299 self._owner_task = get_current_task()
300
301 def release(self) -> None:
302 """Release the underlying lock."""
303 self._lock.release()
304
305 def locked(self) -> bool:
306 """Return True if the lock is set."""
307 return self._lock.locked()
308
309 def notify(self, n: int = 1) -> None:
310 """Notify exactly n listeners."""
311 self._check_acquired()
312 for _ in range(n):
313 try:
314 event = self._waiters.popleft()
315 except IndexError:
316 break
317
318 event.set()
319
320 def notify_all(self) -> None:
321 """Notify all the listeners."""
322 self._check_acquired()
323 for event in self._waiters:
324 event.set()
325
326 self._waiters.clear()
327
328 async def wait(self) -> None:
329 """Wait for a notification."""
330 await checkpoint_if_cancelled()
331 self._check_acquired()
332 event = Event()
333 self._waiters.append(event)
334 self.release()
335 try:
336 await event.wait()
337 except BaseException:
338 if not event.is_set():
339 self._waiters.remove(event)
340
341 raise
342 finally:
343 with CancelScope(shield=True):
344 await self.acquire()
345
346 async def wait_for(self, predicate: Callable[[], T]) -> T:
347 """
348 Wait until a predicate becomes true.
349
350 :param predicate: a callable that returns a truthy value when the condition is
351 met
352 :return: the result of the predicate
353
354 .. versionadded:: 4.11.0
355
356 """
357 while not (result := predicate()):
358 await self.wait()
359
360 return result
361
362 def statistics(self) -> ConditionStatistics:
363 """
364 Return statistics about the current state of this condition.
365
366 .. versionadded:: 3.0
367 """
368 return ConditionStatistics(len(self._waiters), self._lock.statistics())
369
370
371class Semaphore:
372 def __new__(
373 cls,
374 initial_value: int,
375 *,
376 max_value: int | None = None,
377 fast_acquire: bool = False,
378 ) -> Semaphore:
379 try:
380 return get_async_backend().create_semaphore(
381 initial_value, max_value=max_value, fast_acquire=fast_acquire
382 )
383 except AsyncLibraryNotFoundError:
384 return SemaphoreAdapter(initial_value, max_value=max_value)
385
386 def __init__(
387 self,
388 initial_value: int,
389 *,
390 max_value: int | None = None,
391 fast_acquire: bool = False,
392 ):
393 if not isinstance(initial_value, int):
394 raise TypeError("initial_value must be an integer")
395 if initial_value < 0:
396 raise ValueError("initial_value must be >= 0")
397 if max_value is not None:
398 if not isinstance(max_value, int):
399 raise TypeError("max_value must be an integer or None")
400 if max_value < initial_value:
401 raise ValueError(
402 "max_value must be equal to or higher than initial_value"
403 )
404
405 self._fast_acquire = fast_acquire
406
407 async def __aenter__(self) -> Semaphore:
408 await self.acquire()
409 return self
410
411 async def __aexit__(
412 self,
413 exc_type: type[BaseException] | None,
414 exc_val: BaseException | None,
415 exc_tb: TracebackType | None,
416 ) -> None:
417 self.release()
418
419 async def acquire(self) -> None:
420 """Decrement the semaphore value, blocking if necessary."""
421 raise NotImplementedError
422
423 def acquire_nowait(self) -> None:
424 """
425 Acquire the underlying lock, without blocking.
426
427 :raises ~anyio.WouldBlock: if the operation would block
428
429 """
430 raise NotImplementedError
431
432 def release(self) -> None:
433 """Increment the semaphore value."""
434 raise NotImplementedError
435
436 @property
437 def value(self) -> int:
438 """The current value of the semaphore."""
439 raise NotImplementedError
440
441 @property
442 def max_value(self) -> int | None:
443 """The maximum value of the semaphore."""
444 raise NotImplementedError
445
446 def statistics(self) -> SemaphoreStatistics:
447 """
448 Return statistics about the current state of this semaphore.
449
450 .. versionadded:: 3.0
451 """
452 raise NotImplementedError
453
454
455class SemaphoreAdapter(Semaphore):
456 _internal_semaphore: Semaphore | None = None
457
458 def __new__(
459 cls,
460 initial_value: int,
461 *,
462 max_value: int | None = None,
463 fast_acquire: bool = False,
464 ) -> SemaphoreAdapter:
465 return object.__new__(cls)
466
467 def __init__(
468 self,
469 initial_value: int,
470 *,
471 max_value: int | None = None,
472 fast_acquire: bool = False,
473 ) -> None:
474 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
475 self._initial_value = initial_value
476 self._max_value = max_value
477
478 @property
479 def _semaphore(self) -> Semaphore:
480 if self._internal_semaphore is None:
481 self._internal_semaphore = get_async_backend().create_semaphore(
482 self._initial_value, max_value=self._max_value
483 )
484
485 return self._internal_semaphore
486
487 async def acquire(self) -> None:
488 await self._semaphore.acquire()
489
490 def acquire_nowait(self) -> None:
491 self._semaphore.acquire_nowait()
492
493 def release(self) -> None:
494 self._semaphore.release()
495
496 @property
497 def value(self) -> int:
498 if self._internal_semaphore is None:
499 return self._initial_value
500
501 return self._semaphore.value
502
503 @property
504 def max_value(self) -> int | None:
505 return self._max_value
506
507 def statistics(self) -> SemaphoreStatistics:
508 if self._internal_semaphore is None:
509 return SemaphoreStatistics(tasks_waiting=0)
510
511 return self._semaphore.statistics()
512
513
514class CapacityLimiter:
515 def __new__(cls, total_tokens: float) -> CapacityLimiter:
516 try:
517 return get_async_backend().create_capacity_limiter(total_tokens)
518 except AsyncLibraryNotFoundError:
519 return CapacityLimiterAdapter(total_tokens)
520
521 async def __aenter__(self) -> None:
522 raise NotImplementedError
523
524 async def __aexit__(
525 self,
526 exc_type: type[BaseException] | None,
527 exc_val: BaseException | None,
528 exc_tb: TracebackType | None,
529 ) -> None:
530 raise NotImplementedError
531
532 @property
533 def total_tokens(self) -> float:
534 """
535 The total number of tokens available for borrowing.
536
537 This is a read-write property. If the total number of tokens is increased, the
538 proportionate number of tasks waiting on this limiter will be granted their
539 tokens.
540
541 .. versionchanged:: 3.0
542 The property is now writable.
543
544 """
545 raise NotImplementedError
546
547 @total_tokens.setter
548 def total_tokens(self, value: float) -> None:
549 raise NotImplementedError
550
551 @property
552 def borrowed_tokens(self) -> int:
553 """The number of tokens that have currently been borrowed."""
554 raise NotImplementedError
555
556 @property
557 def available_tokens(self) -> float:
558 """The number of tokens currently available to be borrowed"""
559 raise NotImplementedError
560
561 def acquire_nowait(self) -> None:
562 """
563 Acquire a token for the current task without waiting for one to become
564 available.
565
566 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
567
568 """
569 raise NotImplementedError
570
571 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
572 """
573 Acquire a token without waiting for one to become available.
574
575 :param borrower: the entity borrowing a token
576 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
577
578 """
579 raise NotImplementedError
580
581 async def acquire(self) -> None:
582 """
583 Acquire a token for the current task, waiting if necessary for one to become
584 available.
585
586 """
587 raise NotImplementedError
588
589 async def acquire_on_behalf_of(self, borrower: object) -> None:
590 """
591 Acquire a token, waiting if necessary for one to become available.
592
593 :param borrower: the entity borrowing a token
594
595 """
596 raise NotImplementedError
597
598 def release(self) -> None:
599 """
600 Release the token held by the current task.
601
602 :raises RuntimeError: if the current task has not borrowed a token from this
603 limiter.
604
605 """
606 raise NotImplementedError
607
608 def release_on_behalf_of(self, borrower: object) -> None:
609 """
610 Release the token held by the given borrower.
611
612 :raises RuntimeError: if the borrower has not borrowed a token from this
613 limiter.
614
615 """
616 raise NotImplementedError
617
618 def statistics(self) -> CapacityLimiterStatistics:
619 """
620 Return statistics about the current state of this limiter.
621
622 .. versionadded:: 3.0
623
624 """
625 raise NotImplementedError
626
627
628class CapacityLimiterAdapter(CapacityLimiter):
629 _internal_limiter: CapacityLimiter | None = None
630
631 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter:
632 return object.__new__(cls)
633
634 def __init__(self, total_tokens: float) -> None:
635 self.total_tokens = total_tokens
636
637 @property
638 def _limiter(self) -> CapacityLimiter:
639 if self._internal_limiter is None:
640 self._internal_limiter = get_async_backend().create_capacity_limiter(
641 self._total_tokens
642 )
643
644 return self._internal_limiter
645
646 async def __aenter__(self) -> None:
647 await self._limiter.__aenter__()
648
649 async def __aexit__(
650 self,
651 exc_type: type[BaseException] | None,
652 exc_val: BaseException | None,
653 exc_tb: TracebackType | None,
654 ) -> None:
655 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb)
656
657 @property
658 def total_tokens(self) -> float:
659 if self._internal_limiter is None:
660 return self._total_tokens
661
662 return self._internal_limiter.total_tokens
663
664 @total_tokens.setter
665 def total_tokens(self, value: float) -> None:
666 if not isinstance(value, int) and value is not math.inf:
667 raise TypeError("total_tokens must be an int or math.inf")
668 elif value < 1:
669 raise ValueError("total_tokens must be >= 1")
670
671 if self._internal_limiter is None:
672 self._total_tokens = value
673 return
674
675 self._limiter.total_tokens = value
676
677 @property
678 def borrowed_tokens(self) -> int:
679 if self._internal_limiter is None:
680 return 0
681
682 return self._internal_limiter.borrowed_tokens
683
684 @property
685 def available_tokens(self) -> float:
686 if self._internal_limiter is None:
687 return self._total_tokens
688
689 return self._internal_limiter.available_tokens
690
691 def acquire_nowait(self) -> None:
692 self._limiter.acquire_nowait()
693
694 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
695 self._limiter.acquire_on_behalf_of_nowait(borrower)
696
697 async def acquire(self) -> None:
698 await self._limiter.acquire()
699
700 async def acquire_on_behalf_of(self, borrower: object) -> None:
701 await self._limiter.acquire_on_behalf_of(borrower)
702
703 def release(self) -> None:
704 self._limiter.release()
705
706 def release_on_behalf_of(self, borrower: object) -> None:
707 self._limiter.release_on_behalf_of(borrower)
708
709 def statistics(self) -> CapacityLimiterStatistics:
710 if self._internal_limiter is None:
711 return CapacityLimiterStatistics(
712 borrowed_tokens=0,
713 total_tokens=self.total_tokens,
714 borrowers=(),
715 tasks_waiting=0,
716 )
717
718 return self._internal_limiter.statistics()
719
720
721class ResourceGuard:
722 """
723 A context manager for ensuring that a resource is only used by a single task at a
724 time.
725
726 Entering this context manager while the previous has not exited it yet will trigger
727 :exc:`BusyResourceError`.
728
729 :param action: the action to guard against (visible in the :exc:`BusyResourceError`
730 when triggered, e.g. "Another task is already {action} this resource")
731
732 .. versionadded:: 4.1
733 """
734
735 __slots__ = "action", "_guarded"
736
737 def __init__(self, action: str = "using"):
738 self.action: str = action
739 self._guarded = False
740
741 def __enter__(self) -> None:
742 if self._guarded:
743 raise BusyResourceError(self.action)
744
745 self._guarded = True
746
747 def __exit__(
748 self,
749 exc_type: type[BaseException] | None,
750 exc_val: BaseException | None,
751 exc_tb: TracebackType | None,
752 ) -> None:
753 self._guarded = False