1from __future__ import annotations
2
3import math
4from collections import deque
5from collections.abc import Callable
6from dataclasses import dataclass
7from types import TracebackType
8from typing import TypeVar
9
10from ..lowlevel import checkpoint_if_cancelled
11from ._eventloop import NoCurrentAsyncBackend, get_async_backend
12from ._exceptions import BusyResourceError
13from ._tasks import CancelScope
14from ._testing import TaskInfo, get_current_task
15
16T = TypeVar("T")
17
18
19@dataclass(frozen=True)
20class EventStatistics:
21 """
22 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
23 """
24
25 tasks_waiting: int
26
27
28@dataclass(frozen=True)
29class CapacityLimiterStatistics:
30 """
31 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
32 :ivar float total_tokens: total number of available tokens
33 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from
34 this limiter
35 :ivar int tasks_waiting: number of tasks waiting on
36 :meth:`~.CapacityLimiter.acquire` or
37 :meth:`~.CapacityLimiter.acquire_on_behalf_of`
38 """
39
40 borrowed_tokens: int
41 total_tokens: float
42 borrowers: tuple[object, ...]
43 tasks_waiting: int
44
45
46@dataclass(frozen=True)
47class LockStatistics:
48 """
49 :ivar bool locked: flag indicating if this lock is locked or not
50 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the
51 lock is not held by any task)
52 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
53 """
54
55 locked: bool
56 owner: TaskInfo | None
57 tasks_waiting: int
58
59
60@dataclass(frozen=True)
61class ConditionStatistics:
62 """
63 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
64 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying
65 :class:`~.Lock`
66 """
67
68 tasks_waiting: int
69 lock_statistics: LockStatistics
70
71
72@dataclass(frozen=True)
73class SemaphoreStatistics:
74 """
75 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
76
77 """
78
79 tasks_waiting: int
80
81
82class Event:
83 def __new__(cls) -> Event:
84 try:
85 return get_async_backend().create_event()
86 except NoCurrentAsyncBackend:
87 return EventAdapter()
88
89 def set(self) -> None:
90 """Set the flag, notifying all listeners."""
91 raise NotImplementedError
92
93 def is_set(self) -> bool:
94 """Return ``True`` if the flag is set, ``False`` if not."""
95 raise NotImplementedError
96
97 async def wait(self) -> None:
98 """
99 Wait until the flag has been set.
100
101 If the flag has already been set when this method is called, it returns
102 immediately.
103
104 """
105 raise NotImplementedError
106
107 def statistics(self) -> EventStatistics:
108 """Return statistics about the current state of this event."""
109 raise NotImplementedError
110
111
112class EventAdapter(Event):
113 _internal_event: Event | None = None
114 _is_set: bool = False
115
116 def __new__(cls) -> EventAdapter:
117 return object.__new__(cls)
118
119 @property
120 def _event(self) -> Event:
121 if self._internal_event is None:
122 self._internal_event = get_async_backend().create_event()
123 if self._is_set:
124 self._internal_event.set()
125
126 return self._internal_event
127
128 def set(self) -> None:
129 if self._internal_event is None:
130 self._is_set = True
131 else:
132 self._event.set()
133
134 def is_set(self) -> bool:
135 if self._internal_event is None:
136 return self._is_set
137
138 return self._internal_event.is_set()
139
140 async def wait(self) -> None:
141 await self._event.wait()
142
143 def statistics(self) -> EventStatistics:
144 if self._internal_event is None:
145 return EventStatistics(tasks_waiting=0)
146
147 return self._internal_event.statistics()
148
149
150class Lock:
151 def __new__(cls, *, fast_acquire: bool = False) -> Lock:
152 try:
153 return get_async_backend().create_lock(fast_acquire=fast_acquire)
154 except NoCurrentAsyncBackend:
155 return LockAdapter(fast_acquire=fast_acquire)
156
157 async def __aenter__(self) -> None:
158 await self.acquire()
159
160 async def __aexit__(
161 self,
162 exc_type: type[BaseException] | None,
163 exc_val: BaseException | None,
164 exc_tb: TracebackType | None,
165 ) -> None:
166 self.release()
167
168 async def acquire(self) -> None:
169 """Acquire the lock."""
170 raise NotImplementedError
171
172 def acquire_nowait(self) -> None:
173 """
174 Acquire the lock, without blocking.
175
176 :raises ~anyio.WouldBlock: if the operation would block
177
178 """
179 raise NotImplementedError
180
181 def release(self) -> None:
182 """Release the lock."""
183 raise NotImplementedError
184
185 def locked(self) -> bool:
186 """Return True if the lock is currently held."""
187 raise NotImplementedError
188
189 def statistics(self) -> LockStatistics:
190 """
191 Return statistics about the current state of this lock.
192
193 .. versionadded:: 3.0
194 """
195 raise NotImplementedError
196
197
198class LockAdapter(Lock):
199 _internal_lock: Lock | None = None
200
201 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter:
202 return object.__new__(cls)
203
204 def __init__(self, *, fast_acquire: bool = False):
205 self._fast_acquire = fast_acquire
206
207 @property
208 def _lock(self) -> Lock:
209 if self._internal_lock is None:
210 self._internal_lock = get_async_backend().create_lock(
211 fast_acquire=self._fast_acquire
212 )
213
214 return self._internal_lock
215
216 async def __aenter__(self) -> None:
217 await self._lock.acquire()
218
219 async def __aexit__(
220 self,
221 exc_type: type[BaseException] | None,
222 exc_val: BaseException | None,
223 exc_tb: TracebackType | None,
224 ) -> None:
225 if self._internal_lock is not None:
226 self._internal_lock.release()
227
228 async def acquire(self) -> None:
229 """Acquire the lock."""
230 await self._lock.acquire()
231
232 def acquire_nowait(self) -> None:
233 """
234 Acquire the lock, without blocking.
235
236 :raises ~anyio.WouldBlock: if the operation would block
237
238 """
239 self._lock.acquire_nowait()
240
241 def release(self) -> None:
242 """Release the lock."""
243 self._lock.release()
244
245 def locked(self) -> bool:
246 """Return True if the lock is currently held."""
247 return self._lock.locked()
248
249 def statistics(self) -> LockStatistics:
250 """
251 Return statistics about the current state of this lock.
252
253 .. versionadded:: 3.0
254
255 """
256 if self._internal_lock is None:
257 return LockStatistics(False, None, 0)
258
259 return self._internal_lock.statistics()
260
261
262class Condition:
263 _owner_task: TaskInfo | None = None
264
265 def __init__(self, lock: Lock | None = None):
266 self._lock = lock or Lock()
267 self._waiters: deque[Event] = deque()
268
269 async def __aenter__(self) -> None:
270 await self.acquire()
271
272 async def __aexit__(
273 self,
274 exc_type: type[BaseException] | None,
275 exc_val: BaseException | None,
276 exc_tb: TracebackType | None,
277 ) -> None:
278 self.release()
279
280 def _check_acquired(self) -> None:
281 if self._owner_task != get_current_task():
282 raise RuntimeError("The current task is not holding the underlying lock")
283
284 async def acquire(self) -> None:
285 """Acquire the underlying lock."""
286 await self._lock.acquire()
287 self._owner_task = get_current_task()
288
289 def acquire_nowait(self) -> None:
290 """
291 Acquire the underlying lock, without blocking.
292
293 :raises ~anyio.WouldBlock: if the operation would block
294
295 """
296 self._lock.acquire_nowait()
297 self._owner_task = get_current_task()
298
299 def release(self) -> None:
300 """Release the underlying lock."""
301 self._lock.release()
302
303 def locked(self) -> bool:
304 """Return True if the lock is set."""
305 return self._lock.locked()
306
307 def notify(self, n: int = 1) -> None:
308 """Notify exactly n listeners."""
309 self._check_acquired()
310 for _ in range(n):
311 try:
312 event = self._waiters.popleft()
313 except IndexError:
314 break
315
316 event.set()
317
318 def notify_all(self) -> None:
319 """Notify all the listeners."""
320 self._check_acquired()
321 for event in self._waiters:
322 event.set()
323
324 self._waiters.clear()
325
326 async def wait(self) -> None:
327 """Wait for a notification."""
328 await checkpoint_if_cancelled()
329 self._check_acquired()
330 event = Event()
331 self._waiters.append(event)
332 self.release()
333 try:
334 await event.wait()
335 except BaseException:
336 if not event.is_set():
337 self._waiters.remove(event)
338
339 raise
340 finally:
341 with CancelScope(shield=True):
342 await self.acquire()
343
344 async def wait_for(self, predicate: Callable[[], T]) -> T:
345 """
346 Wait until a predicate becomes true.
347
348 :param predicate: a callable that returns a truthy value when the condition is
349 met
350 :return: the result of the predicate
351
352 .. versionadded:: 4.11.0
353
354 """
355 while not (result := predicate()):
356 await self.wait()
357
358 return result
359
360 def statistics(self) -> ConditionStatistics:
361 """
362 Return statistics about the current state of this condition.
363
364 .. versionadded:: 3.0
365 """
366 return ConditionStatistics(len(self._waiters), self._lock.statistics())
367
368
369class Semaphore:
370 def __new__(
371 cls,
372 initial_value: int,
373 *,
374 max_value: int | None = None,
375 fast_acquire: bool = False,
376 ) -> Semaphore:
377 try:
378 return get_async_backend().create_semaphore(
379 initial_value, max_value=max_value, fast_acquire=fast_acquire
380 )
381 except NoCurrentAsyncBackend:
382 return SemaphoreAdapter(initial_value, max_value=max_value)
383
384 def __init__(
385 self,
386 initial_value: int,
387 *,
388 max_value: int | None = None,
389 fast_acquire: bool = False,
390 ):
391 if not isinstance(initial_value, int):
392 raise TypeError("initial_value must be an integer")
393 if initial_value < 0:
394 raise ValueError("initial_value must be >= 0")
395 if max_value is not None:
396 if not isinstance(max_value, int):
397 raise TypeError("max_value must be an integer or None")
398 if max_value < initial_value:
399 raise ValueError(
400 "max_value must be equal to or higher than initial_value"
401 )
402
403 self._fast_acquire = fast_acquire
404
405 async def __aenter__(self) -> Semaphore:
406 await self.acquire()
407 return self
408
409 async def __aexit__(
410 self,
411 exc_type: type[BaseException] | None,
412 exc_val: BaseException | None,
413 exc_tb: TracebackType | None,
414 ) -> None:
415 self.release()
416
417 async def acquire(self) -> None:
418 """Decrement the semaphore value, blocking if necessary."""
419 raise NotImplementedError
420
421 def acquire_nowait(self) -> None:
422 """
423 Acquire the underlying lock, without blocking.
424
425 :raises ~anyio.WouldBlock: if the operation would block
426
427 """
428 raise NotImplementedError
429
430 def release(self) -> None:
431 """Increment the semaphore value."""
432 raise NotImplementedError
433
434 @property
435 def value(self) -> int:
436 """The current value of the semaphore."""
437 raise NotImplementedError
438
439 @property
440 def max_value(self) -> int | None:
441 """The maximum value of the semaphore."""
442 raise NotImplementedError
443
444 def statistics(self) -> SemaphoreStatistics:
445 """
446 Return statistics about the current state of this semaphore.
447
448 .. versionadded:: 3.0
449 """
450 raise NotImplementedError
451
452
453class SemaphoreAdapter(Semaphore):
454 _internal_semaphore: Semaphore | None = None
455
456 def __new__(
457 cls,
458 initial_value: int,
459 *,
460 max_value: int | None = None,
461 fast_acquire: bool = False,
462 ) -> SemaphoreAdapter:
463 return object.__new__(cls)
464
465 def __init__(
466 self,
467 initial_value: int,
468 *,
469 max_value: int | None = None,
470 fast_acquire: bool = False,
471 ) -> None:
472 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
473 self._initial_value = initial_value
474 self._max_value = max_value
475
476 @property
477 def _semaphore(self) -> Semaphore:
478 if self._internal_semaphore is None:
479 self._internal_semaphore = get_async_backend().create_semaphore(
480 self._initial_value, max_value=self._max_value
481 )
482
483 return self._internal_semaphore
484
485 async def acquire(self) -> None:
486 await self._semaphore.acquire()
487
488 def acquire_nowait(self) -> None:
489 self._semaphore.acquire_nowait()
490
491 def release(self) -> None:
492 self._semaphore.release()
493
494 @property
495 def value(self) -> int:
496 if self._internal_semaphore is None:
497 return self._initial_value
498
499 return self._semaphore.value
500
501 @property
502 def max_value(self) -> int | None:
503 return self._max_value
504
505 def statistics(self) -> SemaphoreStatistics:
506 if self._internal_semaphore is None:
507 return SemaphoreStatistics(tasks_waiting=0)
508
509 return self._semaphore.statistics()
510
511
512class CapacityLimiter:
513 def __new__(cls, total_tokens: float) -> CapacityLimiter:
514 try:
515 return get_async_backend().create_capacity_limiter(total_tokens)
516 except NoCurrentAsyncBackend:
517 return CapacityLimiterAdapter(total_tokens)
518
519 async def __aenter__(self) -> None:
520 raise NotImplementedError
521
522 async def __aexit__(
523 self,
524 exc_type: type[BaseException] | None,
525 exc_val: BaseException | None,
526 exc_tb: TracebackType | None,
527 ) -> None:
528 raise NotImplementedError
529
530 @property
531 def total_tokens(self) -> float:
532 """
533 The total number of tokens available for borrowing.
534
535 This is a read-write property. If the total number of tokens is increased, the
536 proportionate number of tasks waiting on this limiter will be granted their
537 tokens.
538
539 .. versionchanged:: 3.0
540 The property is now writable.
541 .. versionchanged:: 4.12
542 The value can now be set to 0.
543
544 """
545 raise NotImplementedError
546
547 @total_tokens.setter
548 def total_tokens(self, value: float) -> None:
549 raise NotImplementedError
550
551 @property
552 def borrowed_tokens(self) -> int:
553 """The number of tokens that have currently been borrowed."""
554 raise NotImplementedError
555
556 @property
557 def available_tokens(self) -> float:
558 """The number of tokens currently available to be borrowed"""
559 raise NotImplementedError
560
561 def acquire_nowait(self) -> None:
562 """
563 Acquire a token for the current task without waiting for one to become
564 available.
565
566 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
567
568 """
569 raise NotImplementedError
570
571 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
572 """
573 Acquire a token without waiting for one to become available.
574
575 :param borrower: the entity borrowing a token
576 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
577
578 """
579 raise NotImplementedError
580
581 async def acquire(self) -> None:
582 """
583 Acquire a token for the current task, waiting if necessary for one to become
584 available.
585
586 """
587 raise NotImplementedError
588
589 async def acquire_on_behalf_of(self, borrower: object) -> None:
590 """
591 Acquire a token, waiting if necessary for one to become available.
592
593 :param borrower: the entity borrowing a token
594
595 """
596 raise NotImplementedError
597
598 def release(self) -> None:
599 """
600 Release the token held by the current task.
601
602 :raises RuntimeError: if the current task has not borrowed a token from this
603 limiter.
604
605 """
606 raise NotImplementedError
607
608 def release_on_behalf_of(self, borrower: object) -> None:
609 """
610 Release the token held by the given borrower.
611
612 :raises RuntimeError: if the borrower has not borrowed a token from this
613 limiter.
614
615 """
616 raise NotImplementedError
617
618 def statistics(self) -> CapacityLimiterStatistics:
619 """
620 Return statistics about the current state of this limiter.
621
622 .. versionadded:: 3.0
623
624 """
625 raise NotImplementedError
626
627
628class CapacityLimiterAdapter(CapacityLimiter):
629 _internal_limiter: CapacityLimiter | None = None
630
631 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter:
632 return object.__new__(cls)
633
634 def __init__(self, total_tokens: float) -> None:
635 self.total_tokens = total_tokens
636
637 @property
638 def _limiter(self) -> CapacityLimiter:
639 if self._internal_limiter is None:
640 self._internal_limiter = get_async_backend().create_capacity_limiter(
641 self._total_tokens
642 )
643
644 return self._internal_limiter
645
646 async def __aenter__(self) -> None:
647 await self._limiter.__aenter__()
648
649 async def __aexit__(
650 self,
651 exc_type: type[BaseException] | None,
652 exc_val: BaseException | None,
653 exc_tb: TracebackType | None,
654 ) -> None:
655 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb)
656
657 @property
658 def total_tokens(self) -> float:
659 if self._internal_limiter is None:
660 return self._total_tokens
661
662 return self._internal_limiter.total_tokens
663
664 @total_tokens.setter
665 def total_tokens(self, value: float) -> None:
666 if not isinstance(value, int) and value is not math.inf:
667 raise TypeError("total_tokens must be an int or math.inf")
668 elif value < 1:
669 raise ValueError("total_tokens must be >= 1")
670
671 if self._internal_limiter is None:
672 self._total_tokens = value
673 return
674
675 self._limiter.total_tokens = value
676
677 @property
678 def borrowed_tokens(self) -> int:
679 if self._internal_limiter is None:
680 return 0
681
682 return self._internal_limiter.borrowed_tokens
683
684 @property
685 def available_tokens(self) -> float:
686 if self._internal_limiter is None:
687 return self._total_tokens
688
689 return self._internal_limiter.available_tokens
690
691 def acquire_nowait(self) -> None:
692 self._limiter.acquire_nowait()
693
694 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
695 self._limiter.acquire_on_behalf_of_nowait(borrower)
696
697 async def acquire(self) -> None:
698 await self._limiter.acquire()
699
700 async def acquire_on_behalf_of(self, borrower: object) -> None:
701 await self._limiter.acquire_on_behalf_of(borrower)
702
703 def release(self) -> None:
704 self._limiter.release()
705
706 def release_on_behalf_of(self, borrower: object) -> None:
707 self._limiter.release_on_behalf_of(borrower)
708
709 def statistics(self) -> CapacityLimiterStatistics:
710 if self._internal_limiter is None:
711 return CapacityLimiterStatistics(
712 borrowed_tokens=0,
713 total_tokens=self.total_tokens,
714 borrowers=(),
715 tasks_waiting=0,
716 )
717
718 return self._internal_limiter.statistics()
719
720
721class ResourceGuard:
722 """
723 A context manager for ensuring that a resource is only used by a single task at a
724 time.
725
726 Entering this context manager while the previous has not exited it yet will trigger
727 :exc:`BusyResourceError`.
728
729 :param action: the action to guard against (visible in the :exc:`BusyResourceError`
730 when triggered, e.g. "Another task is already {action} this resource")
731
732 .. versionadded:: 4.1
733 """
734
735 __slots__ = "action", "_guarded"
736
737 def __init__(self, action: str = "using"):
738 self.action: str = action
739 self._guarded = False
740
741 def __enter__(self) -> None:
742 if self._guarded:
743 raise BusyResourceError(self.action)
744
745 self._guarded = True
746
747 def __exit__(
748 self,
749 exc_type: type[BaseException] | None,
750 exc_val: BaseException | None,
751 exc_tb: TracebackType | None,
752 ) -> None:
753 self._guarded = False