1from __future__ import annotations
2
3import math
4from collections import deque
5from collections.abc import Callable
6from dataclasses import dataclass
7from types import TracebackType
8from typing import TypeVar
9
10from ..lowlevel import checkpoint_if_cancelled
11from ._eventloop import get_async_backend
12from ._exceptions import BusyResourceError, NoEventLoopError
13from ._tasks import CancelScope
14from ._testing import TaskInfo, get_current_task
15
16T = TypeVar("T")
17
18
19@dataclass(frozen=True)
20class EventStatistics:
21 """
22 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
23 """
24
25 tasks_waiting: int
26
27
28@dataclass(frozen=True)
29class CapacityLimiterStatistics:
30 """
31 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
32 :ivar float total_tokens: total number of available tokens
33 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from
34 this limiter
35 :ivar int tasks_waiting: number of tasks waiting on
36 :meth:`~.CapacityLimiter.acquire` or
37 :meth:`~.CapacityLimiter.acquire_on_behalf_of`
38 """
39
40 borrowed_tokens: int
41 total_tokens: float
42 borrowers: tuple[object, ...]
43 tasks_waiting: int
44
45
46@dataclass(frozen=True)
47class LockStatistics:
48 """
49 :ivar bool locked: flag indicating if this lock is locked or not
50 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the
51 lock is not held by any task)
52 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
53 """
54
55 locked: bool
56 owner: TaskInfo | None
57 tasks_waiting: int
58
59
60@dataclass(frozen=True)
61class ConditionStatistics:
62 """
63 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
64 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying
65 :class:`~.Lock`
66 """
67
68 tasks_waiting: int
69 lock_statistics: LockStatistics
70
71
72@dataclass(frozen=True)
73class SemaphoreStatistics:
74 """
75 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
76
77 """
78
79 tasks_waiting: int
80
81
82class Event:
83 def __new__(cls) -> Event:
84 try:
85 return get_async_backend().create_event()
86 except NoEventLoopError:
87 return EventAdapter()
88
89 def set(self) -> None:
90 """Set the flag, notifying all listeners."""
91 raise NotImplementedError
92
93 def is_set(self) -> bool:
94 """Return ``True`` if the flag is set, ``False`` if not."""
95 raise NotImplementedError
96
97 async def wait(self) -> None:
98 """
99 Wait until the flag has been set.
100
101 If the flag has already been set when this method is called, it returns
102 immediately.
103
104 """
105 raise NotImplementedError
106
107 def statistics(self) -> EventStatistics:
108 """Return statistics about the current state of this event."""
109 raise NotImplementedError
110
111
112class EventAdapter(Event):
113 _internal_event: Event | None = None
114 _is_set: bool = False
115
116 def __new__(cls) -> EventAdapter:
117 return object.__new__(cls)
118
119 @property
120 def _event(self) -> Event:
121 if self._internal_event is None:
122 self._internal_event = get_async_backend().create_event()
123 if self._is_set:
124 self._internal_event.set()
125
126 return self._internal_event
127
128 def set(self) -> None:
129 if self._internal_event is None:
130 self._is_set = True
131 else:
132 self._event.set()
133
134 def is_set(self) -> bool:
135 if self._internal_event is None:
136 return self._is_set
137
138 return self._internal_event.is_set()
139
140 async def wait(self) -> None:
141 await self._event.wait()
142
143 def statistics(self) -> EventStatistics:
144 if self._internal_event is None:
145 return EventStatistics(tasks_waiting=0)
146
147 return self._internal_event.statistics()
148
149
150class Lock:
151 def __new__(cls, *, fast_acquire: bool = False) -> Lock:
152 try:
153 return get_async_backend().create_lock(fast_acquire=fast_acquire)
154 except NoEventLoopError:
155 return LockAdapter(fast_acquire=fast_acquire)
156
157 async def __aenter__(self) -> None:
158 await self.acquire()
159
160 async def __aexit__(
161 self,
162 exc_type: type[BaseException] | None,
163 exc_val: BaseException | None,
164 exc_tb: TracebackType | None,
165 ) -> None:
166 self.release()
167
168 async def acquire(self) -> None:
169 """Acquire the lock."""
170 raise NotImplementedError
171
172 def acquire_nowait(self) -> None:
173 """
174 Acquire the lock, without blocking.
175
176 :raises ~anyio.WouldBlock: if the operation would block
177
178 """
179 raise NotImplementedError
180
181 def release(self) -> None:
182 """Release the lock."""
183 raise NotImplementedError
184
185 def locked(self) -> bool:
186 """Return True if the lock is currently held."""
187 raise NotImplementedError
188
189 def statistics(self) -> LockStatistics:
190 """
191 Return statistics about the current state of this lock.
192
193 .. versionadded:: 3.0
194 """
195 raise NotImplementedError
196
197
198class LockAdapter(Lock):
199 _internal_lock: Lock | None = None
200
201 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter:
202 return object.__new__(cls)
203
204 def __init__(self, *, fast_acquire: bool = False):
205 self._fast_acquire = fast_acquire
206
207 @property
208 def _lock(self) -> Lock:
209 if self._internal_lock is None:
210 self._internal_lock = get_async_backend().create_lock(
211 fast_acquire=self._fast_acquire
212 )
213
214 return self._internal_lock
215
216 async def __aenter__(self) -> None:
217 await self._lock.acquire()
218
219 async def __aexit__(
220 self,
221 exc_type: type[BaseException] | None,
222 exc_val: BaseException | None,
223 exc_tb: TracebackType | None,
224 ) -> None:
225 if self._internal_lock is not None:
226 self._internal_lock.release()
227
228 async def acquire(self) -> None:
229 """Acquire the lock."""
230 await self._lock.acquire()
231
232 def acquire_nowait(self) -> None:
233 """
234 Acquire the lock, without blocking.
235
236 :raises ~anyio.WouldBlock: if the operation would block
237
238 """
239 self._lock.acquire_nowait()
240
241 def release(self) -> None:
242 """Release the lock."""
243 self._lock.release()
244
245 def locked(self) -> bool:
246 """Return True if the lock is currently held."""
247 return self._lock.locked()
248
249 def statistics(self) -> LockStatistics:
250 """
251 Return statistics about the current state of this lock.
252
253 .. versionadded:: 3.0
254
255 """
256 if self._internal_lock is None:
257 return LockStatistics(False, None, 0)
258
259 return self._internal_lock.statistics()
260
261
262class Condition:
263 _owner_task: TaskInfo | None = None
264
265 def __init__(self, lock: Lock | None = None):
266 self._lock = lock or Lock()
267 self._waiters: deque[Event] = deque()
268
269 async def __aenter__(self) -> None:
270 await self.acquire()
271
272 async def __aexit__(
273 self,
274 exc_type: type[BaseException] | None,
275 exc_val: BaseException | None,
276 exc_tb: TracebackType | None,
277 ) -> None:
278 self.release()
279
280 def _check_acquired(self) -> None:
281 if self._owner_task != get_current_task():
282 raise RuntimeError("The current task is not holding the underlying lock")
283
284 async def acquire(self) -> None:
285 """Acquire the underlying lock."""
286 await self._lock.acquire()
287 self._owner_task = get_current_task()
288
289 def acquire_nowait(self) -> None:
290 """
291 Acquire the underlying lock, without blocking.
292
293 :raises ~anyio.WouldBlock: if the operation would block
294
295 """
296 self._lock.acquire_nowait()
297 self._owner_task = get_current_task()
298
299 def release(self) -> None:
300 """Release the underlying lock."""
301 self._lock.release()
302
303 def locked(self) -> bool:
304 """Return True if the lock is set."""
305 return self._lock.locked()
306
307 def notify(self, n: int = 1) -> None:
308 """Notify exactly n listeners."""
309 self._check_acquired()
310 for _ in range(n):
311 try:
312 event = self._waiters.popleft()
313 except IndexError:
314 break
315
316 event.set()
317
318 def notify_all(self) -> None:
319 """Notify all the listeners."""
320 self._check_acquired()
321 for event in self._waiters:
322 event.set()
323
324 self._waiters.clear()
325
326 async def wait(self) -> None:
327 """Wait for a notification."""
328 await checkpoint_if_cancelled()
329 self._check_acquired()
330 event = Event()
331 self._waiters.append(event)
332 self.release()
333 try:
334 await event.wait()
335 except BaseException:
336 if not event.is_set():
337 self._waiters.remove(event)
338 elif self._waiters:
339 # This task was notified by could not act on it, so pass
340 # it on to the next task
341 self._waiters.popleft().set()
342
343 raise
344 finally:
345 with CancelScope(shield=True):
346 await self.acquire()
347
348 async def wait_for(self, predicate: Callable[[], T]) -> T:
349 """
350 Wait until a predicate becomes true.
351
352 :param predicate: a callable that returns a truthy value when the condition is
353 met
354 :return: the result of the predicate
355
356 .. versionadded:: 4.11.0
357
358 """
359 while not (result := predicate()):
360 await self.wait()
361
362 return result
363
364 def statistics(self) -> ConditionStatistics:
365 """
366 Return statistics about the current state of this condition.
367
368 .. versionadded:: 3.0
369 """
370 return ConditionStatistics(len(self._waiters), self._lock.statistics())
371
372
373class Semaphore:
374 def __new__(
375 cls,
376 initial_value: int,
377 *,
378 max_value: int | None = None,
379 fast_acquire: bool = False,
380 ) -> Semaphore:
381 try:
382 return get_async_backend().create_semaphore(
383 initial_value, max_value=max_value, fast_acquire=fast_acquire
384 )
385 except NoEventLoopError:
386 return SemaphoreAdapter(initial_value, max_value=max_value)
387
388 def __init__(
389 self,
390 initial_value: int,
391 *,
392 max_value: int | None = None,
393 fast_acquire: bool = False,
394 ):
395 if not isinstance(initial_value, int):
396 raise TypeError("initial_value must be an integer")
397 if initial_value < 0:
398 raise ValueError("initial_value must be >= 0")
399 if max_value is not None:
400 if not isinstance(max_value, int):
401 raise TypeError("max_value must be an integer or None")
402 if max_value < initial_value:
403 raise ValueError(
404 "max_value must be equal to or higher than initial_value"
405 )
406
407 self._fast_acquire = fast_acquire
408
409 async def __aenter__(self) -> Semaphore:
410 await self.acquire()
411 return self
412
413 async def __aexit__(
414 self,
415 exc_type: type[BaseException] | None,
416 exc_val: BaseException | None,
417 exc_tb: TracebackType | None,
418 ) -> None:
419 self.release()
420
421 async def acquire(self) -> None:
422 """Decrement the semaphore value, blocking if necessary."""
423 raise NotImplementedError
424
425 def acquire_nowait(self) -> None:
426 """
427 Acquire the underlying lock, without blocking.
428
429 :raises ~anyio.WouldBlock: if the operation would block
430
431 """
432 raise NotImplementedError
433
434 def release(self) -> None:
435 """Increment the semaphore value."""
436 raise NotImplementedError
437
438 @property
439 def value(self) -> int:
440 """The current value of the semaphore."""
441 raise NotImplementedError
442
443 @property
444 def max_value(self) -> int | None:
445 """The maximum value of the semaphore."""
446 raise NotImplementedError
447
448 def statistics(self) -> SemaphoreStatistics:
449 """
450 Return statistics about the current state of this semaphore.
451
452 .. versionadded:: 3.0
453 """
454 raise NotImplementedError
455
456
457class SemaphoreAdapter(Semaphore):
458 _internal_semaphore: Semaphore | None = None
459
460 def __new__(
461 cls,
462 initial_value: int,
463 *,
464 max_value: int | None = None,
465 fast_acquire: bool = False,
466 ) -> SemaphoreAdapter:
467 return object.__new__(cls)
468
469 def __init__(
470 self,
471 initial_value: int,
472 *,
473 max_value: int | None = None,
474 fast_acquire: bool = False,
475 ) -> None:
476 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
477 self._initial_value = initial_value
478 self._max_value = max_value
479
480 @property
481 def _semaphore(self) -> Semaphore:
482 if self._internal_semaphore is None:
483 self._internal_semaphore = get_async_backend().create_semaphore(
484 self._initial_value, max_value=self._max_value
485 )
486
487 return self._internal_semaphore
488
489 async def acquire(self) -> None:
490 await self._semaphore.acquire()
491
492 def acquire_nowait(self) -> None:
493 self._semaphore.acquire_nowait()
494
495 def release(self) -> None:
496 self._semaphore.release()
497
498 @property
499 def value(self) -> int:
500 if self._internal_semaphore is None:
501 return self._initial_value
502
503 return self._semaphore.value
504
505 @property
506 def max_value(self) -> int | None:
507 return self._max_value
508
509 def statistics(self) -> SemaphoreStatistics:
510 if self._internal_semaphore is None:
511 return SemaphoreStatistics(tasks_waiting=0)
512
513 return self._semaphore.statistics()
514
515
516class CapacityLimiter:
517 def __new__(cls, total_tokens: float) -> CapacityLimiter:
518 try:
519 return get_async_backend().create_capacity_limiter(total_tokens)
520 except NoEventLoopError:
521 return CapacityLimiterAdapter(total_tokens)
522
523 async def __aenter__(self) -> None:
524 raise NotImplementedError
525
526 async def __aexit__(
527 self,
528 exc_type: type[BaseException] | None,
529 exc_val: BaseException | None,
530 exc_tb: TracebackType | None,
531 ) -> None:
532 raise NotImplementedError
533
534 @property
535 def total_tokens(self) -> float:
536 """
537 The total number of tokens available for borrowing.
538
539 This is a read-write property. If the total number of tokens is increased, the
540 proportionate number of tasks waiting on this limiter will be granted their
541 tokens.
542
543 .. versionchanged:: 3.0
544 The property is now writable.
545 .. versionchanged:: 4.12
546 The value can now be set to 0.
547
548 """
549 raise NotImplementedError
550
551 @total_tokens.setter
552 def total_tokens(self, value: float) -> None:
553 raise NotImplementedError
554
555 @property
556 def borrowed_tokens(self) -> int:
557 """The number of tokens that have currently been borrowed."""
558 raise NotImplementedError
559
560 @property
561 def available_tokens(self) -> float:
562 """The number of tokens currently available to be borrowed"""
563 raise NotImplementedError
564
565 def acquire_nowait(self) -> None:
566 """
567 Acquire a token for the current task without waiting for one to become
568 available.
569
570 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
571
572 """
573 raise NotImplementedError
574
575 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
576 """
577 Acquire a token without waiting for one to become available.
578
579 :param borrower: the entity borrowing a token
580 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
581
582 """
583 raise NotImplementedError
584
585 async def acquire(self) -> None:
586 """
587 Acquire a token for the current task, waiting if necessary for one to become
588 available.
589
590 """
591 raise NotImplementedError
592
593 async def acquire_on_behalf_of(self, borrower: object) -> None:
594 """
595 Acquire a token, waiting if necessary for one to become available.
596
597 :param borrower: the entity borrowing a token
598
599 """
600 raise NotImplementedError
601
602 def release(self) -> None:
603 """
604 Release the token held by the current task.
605
606 :raises RuntimeError: if the current task has not borrowed a token from this
607 limiter.
608
609 """
610 raise NotImplementedError
611
612 def release_on_behalf_of(self, borrower: object) -> None:
613 """
614 Release the token held by the given borrower.
615
616 :raises RuntimeError: if the borrower has not borrowed a token from this
617 limiter.
618
619 """
620 raise NotImplementedError
621
622 def statistics(self) -> CapacityLimiterStatistics:
623 """
624 Return statistics about the current state of this limiter.
625
626 .. versionadded:: 3.0
627
628 """
629 raise NotImplementedError
630
631
632class CapacityLimiterAdapter(CapacityLimiter):
633 _internal_limiter: CapacityLimiter | None = None
634
635 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter:
636 return object.__new__(cls)
637
638 def __init__(self, total_tokens: float) -> None:
639 self.total_tokens = total_tokens
640
641 @property
642 def _limiter(self) -> CapacityLimiter:
643 if self._internal_limiter is None:
644 self._internal_limiter = get_async_backend().create_capacity_limiter(
645 self._total_tokens
646 )
647
648 return self._internal_limiter
649
650 async def __aenter__(self) -> None:
651 await self._limiter.__aenter__()
652
653 async def __aexit__(
654 self,
655 exc_type: type[BaseException] | None,
656 exc_val: BaseException | None,
657 exc_tb: TracebackType | None,
658 ) -> None:
659 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb)
660
661 @property
662 def total_tokens(self) -> float:
663 if self._internal_limiter is None:
664 return self._total_tokens
665
666 return self._internal_limiter.total_tokens
667
668 @total_tokens.setter
669 def total_tokens(self, value: float) -> None:
670 if not isinstance(value, int) and value is not math.inf:
671 raise TypeError("total_tokens must be an int or math.inf")
672 elif value < 1:
673 raise ValueError("total_tokens must be >= 1")
674
675 if self._internal_limiter is None:
676 self._total_tokens = value
677 return
678
679 self._limiter.total_tokens = value
680
681 @property
682 def borrowed_tokens(self) -> int:
683 if self._internal_limiter is None:
684 return 0
685
686 return self._internal_limiter.borrowed_tokens
687
688 @property
689 def available_tokens(self) -> float:
690 if self._internal_limiter is None:
691 return self._total_tokens
692
693 return self._internal_limiter.available_tokens
694
695 def acquire_nowait(self) -> None:
696 self._limiter.acquire_nowait()
697
698 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
699 self._limiter.acquire_on_behalf_of_nowait(borrower)
700
701 async def acquire(self) -> None:
702 await self._limiter.acquire()
703
704 async def acquire_on_behalf_of(self, borrower: object) -> None:
705 await self._limiter.acquire_on_behalf_of(borrower)
706
707 def release(self) -> None:
708 self._limiter.release()
709
710 def release_on_behalf_of(self, borrower: object) -> None:
711 self._limiter.release_on_behalf_of(borrower)
712
713 def statistics(self) -> CapacityLimiterStatistics:
714 if self._internal_limiter is None:
715 return CapacityLimiterStatistics(
716 borrowed_tokens=0,
717 total_tokens=self.total_tokens,
718 borrowers=(),
719 tasks_waiting=0,
720 )
721
722 return self._internal_limiter.statistics()
723
724
725class ResourceGuard:
726 """
727 A context manager for ensuring that a resource is only used by a single task at a
728 time.
729
730 Entering this context manager while the previous has not exited it yet will trigger
731 :exc:`BusyResourceError`.
732
733 :param action: the action to guard against (visible in the :exc:`BusyResourceError`
734 when triggered, e.g. "Another task is already {action} this resource")
735
736 .. versionadded:: 4.1
737 """
738
739 __slots__ = "action", "_guarded"
740
741 def __init__(self, action: str = "using"):
742 self.action: str = action
743 self._guarded = False
744
745 def __enter__(self) -> None:
746 if self._guarded:
747 raise BusyResourceError(self.action)
748
749 self._guarded = True
750
751 def __exit__(
752 self,
753 exc_type: type[BaseException] | None,
754 exc_val: BaseException | None,
755 exc_tb: TracebackType | None,
756 ) -> None:
757 self._guarded = False