1from __future__ import annotations
2
3import math
4from collections import deque
5from dataclasses import dataclass
6from types import TracebackType
7
8from sniffio import AsyncLibraryNotFoundError
9
10from ..lowlevel import checkpoint
11from ._eventloop import get_async_backend
12from ._exceptions import BusyResourceError
13from ._tasks import CancelScope
14from ._testing import TaskInfo, get_current_task
15
16
17@dataclass(frozen=True)
18class EventStatistics:
19 """
20 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
21 """
22
23 tasks_waiting: int
24
25
26@dataclass(frozen=True)
27class CapacityLimiterStatistics:
28 """
29 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
30 :ivar float total_tokens: total number of available tokens
31 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from
32 this limiter
33 :ivar int tasks_waiting: number of tasks waiting on
34 :meth:`~.CapacityLimiter.acquire` or
35 :meth:`~.CapacityLimiter.acquire_on_behalf_of`
36 """
37
38 borrowed_tokens: int
39 total_tokens: float
40 borrowers: tuple[object, ...]
41 tasks_waiting: int
42
43
44@dataclass(frozen=True)
45class LockStatistics:
46 """
47 :ivar bool locked: flag indicating if this lock is locked or not
48 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the
49 lock is not held by any task)
50 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
51 """
52
53 locked: bool
54 owner: TaskInfo | None
55 tasks_waiting: int
56
57
58@dataclass(frozen=True)
59class ConditionStatistics:
60 """
61 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
62 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying
63 :class:`~.Lock`
64 """
65
66 tasks_waiting: int
67 lock_statistics: LockStatistics
68
69
70@dataclass(frozen=True)
71class SemaphoreStatistics:
72 """
73 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
74
75 """
76
77 tasks_waiting: int
78
79
80class Event:
81 def __new__(cls) -> Event:
82 try:
83 return get_async_backend().create_event()
84 except AsyncLibraryNotFoundError:
85 return EventAdapter()
86
87 def set(self) -> None:
88 """Set the flag, notifying all listeners."""
89 raise NotImplementedError
90
91 def is_set(self) -> bool:
92 """Return ``True`` if the flag is set, ``False`` if not."""
93 raise NotImplementedError
94
95 async def wait(self) -> None:
96 """
97 Wait until the flag has been set.
98
99 If the flag has already been set when this method is called, it returns
100 immediately.
101
102 """
103 raise NotImplementedError
104
105 def statistics(self) -> EventStatistics:
106 """Return statistics about the current state of this event."""
107 raise NotImplementedError
108
109
110class EventAdapter(Event):
111 _internal_event: Event | None = None
112 _is_set: bool = False
113
114 def __new__(cls) -> EventAdapter:
115 return object.__new__(cls)
116
117 @property
118 def _event(self) -> Event:
119 if self._internal_event is None:
120 self._internal_event = get_async_backend().create_event()
121 if self._is_set:
122 self._internal_event.set()
123
124 return self._internal_event
125
126 def set(self) -> None:
127 if self._internal_event is None:
128 self._is_set = True
129 else:
130 self._event.set()
131
132 def is_set(self) -> bool:
133 if self._internal_event is None:
134 return self._is_set
135
136 return self._internal_event.is_set()
137
138 async def wait(self) -> None:
139 await self._event.wait()
140
141 def statistics(self) -> EventStatistics:
142 if self._internal_event is None:
143 return EventStatistics(tasks_waiting=0)
144
145 return self._internal_event.statistics()
146
147
148class Lock:
149 def __new__(cls, *, fast_acquire: bool = False) -> Lock:
150 try:
151 return get_async_backend().create_lock(fast_acquire=fast_acquire)
152 except AsyncLibraryNotFoundError:
153 return LockAdapter(fast_acquire=fast_acquire)
154
155 async def __aenter__(self) -> None:
156 await self.acquire()
157
158 async def __aexit__(
159 self,
160 exc_type: type[BaseException] | None,
161 exc_val: BaseException | None,
162 exc_tb: TracebackType | None,
163 ) -> None:
164 self.release()
165
166 async def acquire(self) -> None:
167 """Acquire the lock."""
168 raise NotImplementedError
169
170 def acquire_nowait(self) -> None:
171 """
172 Acquire the lock, without blocking.
173
174 :raises ~anyio.WouldBlock: if the operation would block
175
176 """
177 raise NotImplementedError
178
179 def release(self) -> None:
180 """Release the lock."""
181 raise NotImplementedError
182
183 def locked(self) -> bool:
184 """Return True if the lock is currently held."""
185 raise NotImplementedError
186
187 def statistics(self) -> LockStatistics:
188 """
189 Return statistics about the current state of this lock.
190
191 .. versionadded:: 3.0
192 """
193 raise NotImplementedError
194
195
196class LockAdapter(Lock):
197 _internal_lock: Lock | None = None
198
199 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter:
200 return object.__new__(cls)
201
202 def __init__(self, *, fast_acquire: bool = False):
203 self._fast_acquire = fast_acquire
204
205 @property
206 def _lock(self) -> Lock:
207 if self._internal_lock is None:
208 self._internal_lock = get_async_backend().create_lock(
209 fast_acquire=self._fast_acquire
210 )
211
212 return self._internal_lock
213
214 async def __aenter__(self) -> None:
215 await self._lock.acquire()
216
217 async def __aexit__(
218 self,
219 exc_type: type[BaseException] | None,
220 exc_val: BaseException | None,
221 exc_tb: TracebackType | None,
222 ) -> None:
223 if self._internal_lock is not None:
224 self._internal_lock.release()
225
226 async def acquire(self) -> None:
227 """Acquire the lock."""
228 await self._lock.acquire()
229
230 def acquire_nowait(self) -> None:
231 """
232 Acquire the lock, without blocking.
233
234 :raises ~anyio.WouldBlock: if the operation would block
235
236 """
237 self._lock.acquire_nowait()
238
239 def release(self) -> None:
240 """Release the lock."""
241 self._lock.release()
242
243 def locked(self) -> bool:
244 """Return True if the lock is currently held."""
245 return self._lock.locked()
246
247 def statistics(self) -> LockStatistics:
248 """
249 Return statistics about the current state of this lock.
250
251 .. versionadded:: 3.0
252
253 """
254 if self._internal_lock is None:
255 return LockStatistics(False, None, 0)
256
257 return self._internal_lock.statistics()
258
259
260class Condition:
261 _owner_task: TaskInfo | None = None
262
263 def __init__(self, lock: Lock | None = None):
264 self._lock = lock or Lock()
265 self._waiters: deque[Event] = deque()
266
267 async def __aenter__(self) -> None:
268 await self.acquire()
269
270 async def __aexit__(
271 self,
272 exc_type: type[BaseException] | None,
273 exc_val: BaseException | None,
274 exc_tb: TracebackType | None,
275 ) -> None:
276 self.release()
277
278 def _check_acquired(self) -> None:
279 if self._owner_task != get_current_task():
280 raise RuntimeError("The current task is not holding the underlying lock")
281
282 async def acquire(self) -> None:
283 """Acquire the underlying lock."""
284 await self._lock.acquire()
285 self._owner_task = get_current_task()
286
287 def acquire_nowait(self) -> None:
288 """
289 Acquire the underlying lock, without blocking.
290
291 :raises ~anyio.WouldBlock: if the operation would block
292
293 """
294 self._lock.acquire_nowait()
295 self._owner_task = get_current_task()
296
297 def release(self) -> None:
298 """Release the underlying lock."""
299 self._lock.release()
300
301 def locked(self) -> bool:
302 """Return True if the lock is set."""
303 return self._lock.locked()
304
305 def notify(self, n: int = 1) -> None:
306 """Notify exactly n listeners."""
307 self._check_acquired()
308 for _ in range(n):
309 try:
310 event = self._waiters.popleft()
311 except IndexError:
312 break
313
314 event.set()
315
316 def notify_all(self) -> None:
317 """Notify all the listeners."""
318 self._check_acquired()
319 for event in self._waiters:
320 event.set()
321
322 self._waiters.clear()
323
324 async def wait(self) -> None:
325 """Wait for a notification."""
326 await checkpoint()
327 event = Event()
328 self._waiters.append(event)
329 self.release()
330 try:
331 await event.wait()
332 except BaseException:
333 if not event.is_set():
334 self._waiters.remove(event)
335
336 raise
337 finally:
338 with CancelScope(shield=True):
339 await self.acquire()
340
341 def statistics(self) -> ConditionStatistics:
342 """
343 Return statistics about the current state of this condition.
344
345 .. versionadded:: 3.0
346 """
347 return ConditionStatistics(len(self._waiters), self._lock.statistics())
348
349
350class Semaphore:
351 def __new__(
352 cls,
353 initial_value: int,
354 *,
355 max_value: int | None = None,
356 fast_acquire: bool = False,
357 ) -> Semaphore:
358 try:
359 return get_async_backend().create_semaphore(
360 initial_value, max_value=max_value, fast_acquire=fast_acquire
361 )
362 except AsyncLibraryNotFoundError:
363 return SemaphoreAdapter(initial_value, max_value=max_value)
364
365 def __init__(
366 self,
367 initial_value: int,
368 *,
369 max_value: int | None = None,
370 fast_acquire: bool = False,
371 ):
372 if not isinstance(initial_value, int):
373 raise TypeError("initial_value must be an integer")
374 if initial_value < 0:
375 raise ValueError("initial_value must be >= 0")
376 if max_value is not None:
377 if not isinstance(max_value, int):
378 raise TypeError("max_value must be an integer or None")
379 if max_value < initial_value:
380 raise ValueError(
381 "max_value must be equal to or higher than initial_value"
382 )
383
384 self._fast_acquire = fast_acquire
385
386 async def __aenter__(self) -> Semaphore:
387 await self.acquire()
388 return self
389
390 async def __aexit__(
391 self,
392 exc_type: type[BaseException] | None,
393 exc_val: BaseException | None,
394 exc_tb: TracebackType | None,
395 ) -> None:
396 self.release()
397
398 async def acquire(self) -> None:
399 """Decrement the semaphore value, blocking if necessary."""
400 raise NotImplementedError
401
402 def acquire_nowait(self) -> None:
403 """
404 Acquire the underlying lock, without blocking.
405
406 :raises ~anyio.WouldBlock: if the operation would block
407
408 """
409 raise NotImplementedError
410
411 def release(self) -> None:
412 """Increment the semaphore value."""
413 raise NotImplementedError
414
415 @property
416 def value(self) -> int:
417 """The current value of the semaphore."""
418 raise NotImplementedError
419
420 @property
421 def max_value(self) -> int | None:
422 """The maximum value of the semaphore."""
423 raise NotImplementedError
424
425 def statistics(self) -> SemaphoreStatistics:
426 """
427 Return statistics about the current state of this semaphore.
428
429 .. versionadded:: 3.0
430 """
431 raise NotImplementedError
432
433
434class SemaphoreAdapter(Semaphore):
435 _internal_semaphore: Semaphore | None = None
436
437 def __new__(
438 cls,
439 initial_value: int,
440 *,
441 max_value: int | None = None,
442 fast_acquire: bool = False,
443 ) -> SemaphoreAdapter:
444 return object.__new__(cls)
445
446 def __init__(
447 self,
448 initial_value: int,
449 *,
450 max_value: int | None = None,
451 fast_acquire: bool = False,
452 ) -> None:
453 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
454 self._initial_value = initial_value
455 self._max_value = max_value
456
457 @property
458 def _semaphore(self) -> Semaphore:
459 if self._internal_semaphore is None:
460 self._internal_semaphore = get_async_backend().create_semaphore(
461 self._initial_value, max_value=self._max_value
462 )
463
464 return self._internal_semaphore
465
466 async def acquire(self) -> None:
467 await self._semaphore.acquire()
468
469 def acquire_nowait(self) -> None:
470 self._semaphore.acquire_nowait()
471
472 def release(self) -> None:
473 self._semaphore.release()
474
475 @property
476 def value(self) -> int:
477 if self._internal_semaphore is None:
478 return self._initial_value
479
480 return self._semaphore.value
481
482 @property
483 def max_value(self) -> int | None:
484 return self._max_value
485
486 def statistics(self) -> SemaphoreStatistics:
487 if self._internal_semaphore is None:
488 return SemaphoreStatistics(tasks_waiting=0)
489
490 return self._semaphore.statistics()
491
492
493class CapacityLimiter:
494 def __new__(cls, total_tokens: float) -> CapacityLimiter:
495 try:
496 return get_async_backend().create_capacity_limiter(total_tokens)
497 except AsyncLibraryNotFoundError:
498 return CapacityLimiterAdapter(total_tokens)
499
500 async def __aenter__(self) -> None:
501 raise NotImplementedError
502
503 async def __aexit__(
504 self,
505 exc_type: type[BaseException] | None,
506 exc_val: BaseException | None,
507 exc_tb: TracebackType | None,
508 ) -> bool | None:
509 raise NotImplementedError
510
511 @property
512 def total_tokens(self) -> float:
513 """
514 The total number of tokens available for borrowing.
515
516 This is a read-write property. If the total number of tokens is increased, the
517 proportionate number of tasks waiting on this limiter will be granted their
518 tokens.
519
520 .. versionchanged:: 3.0
521 The property is now writable.
522
523 """
524 raise NotImplementedError
525
526 @total_tokens.setter
527 def total_tokens(self, value: float) -> None:
528 raise NotImplementedError
529
530 @property
531 def borrowed_tokens(self) -> int:
532 """The number of tokens that have currently been borrowed."""
533 raise NotImplementedError
534
535 @property
536 def available_tokens(self) -> float:
537 """The number of tokens currently available to be borrowed"""
538 raise NotImplementedError
539
540 def acquire_nowait(self) -> None:
541 """
542 Acquire a token for the current task without waiting for one to become
543 available.
544
545 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
546
547 """
548 raise NotImplementedError
549
550 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
551 """
552 Acquire a token without waiting for one to become available.
553
554 :param borrower: the entity borrowing a token
555 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
556
557 """
558 raise NotImplementedError
559
560 async def acquire(self) -> None:
561 """
562 Acquire a token for the current task, waiting if necessary for one to become
563 available.
564
565 """
566 raise NotImplementedError
567
568 async def acquire_on_behalf_of(self, borrower: object) -> None:
569 """
570 Acquire a token, waiting if necessary for one to become available.
571
572 :param borrower: the entity borrowing a token
573
574 """
575 raise NotImplementedError
576
577 def release(self) -> None:
578 """
579 Release the token held by the current task.
580
581 :raises RuntimeError: if the current task has not borrowed a token from this
582 limiter.
583
584 """
585 raise NotImplementedError
586
587 def release_on_behalf_of(self, borrower: object) -> None:
588 """
589 Release the token held by the given borrower.
590
591 :raises RuntimeError: if the borrower has not borrowed a token from this
592 limiter.
593
594 """
595 raise NotImplementedError
596
597 def statistics(self) -> CapacityLimiterStatistics:
598 """
599 Return statistics about the current state of this limiter.
600
601 .. versionadded:: 3.0
602
603 """
604 raise NotImplementedError
605
606
607class CapacityLimiterAdapter(CapacityLimiter):
608 _internal_limiter: CapacityLimiter | None = None
609
610 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter:
611 return object.__new__(cls)
612
613 def __init__(self, total_tokens: float) -> None:
614 self.total_tokens = total_tokens
615
616 @property
617 def _limiter(self) -> CapacityLimiter:
618 if self._internal_limiter is None:
619 self._internal_limiter = get_async_backend().create_capacity_limiter(
620 self._total_tokens
621 )
622
623 return self._internal_limiter
624
625 async def __aenter__(self) -> None:
626 await self._limiter.__aenter__()
627
628 async def __aexit__(
629 self,
630 exc_type: type[BaseException] | None,
631 exc_val: BaseException | None,
632 exc_tb: TracebackType | None,
633 ) -> bool | None:
634 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb)
635
636 @property
637 def total_tokens(self) -> float:
638 if self._internal_limiter is None:
639 return self._total_tokens
640
641 return self._internal_limiter.total_tokens
642
643 @total_tokens.setter
644 def total_tokens(self, value: float) -> None:
645 if not isinstance(value, int) and value is not math.inf:
646 raise TypeError("total_tokens must be an int or math.inf")
647 elif value < 1:
648 raise ValueError("total_tokens must be >= 1")
649
650 if self._internal_limiter is None:
651 self._total_tokens = value
652 return
653
654 self._limiter.total_tokens = value
655
656 @property
657 def borrowed_tokens(self) -> int:
658 if self._internal_limiter is None:
659 return 0
660
661 return self._internal_limiter.borrowed_tokens
662
663 @property
664 def available_tokens(self) -> float:
665 if self._internal_limiter is None:
666 return self._total_tokens
667
668 return self._internal_limiter.available_tokens
669
670 def acquire_nowait(self) -> None:
671 self._limiter.acquire_nowait()
672
673 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
674 self._limiter.acquire_on_behalf_of_nowait(borrower)
675
676 async def acquire(self) -> None:
677 await self._limiter.acquire()
678
679 async def acquire_on_behalf_of(self, borrower: object) -> None:
680 await self._limiter.acquire_on_behalf_of(borrower)
681
682 def release(self) -> None:
683 self._limiter.release()
684
685 def release_on_behalf_of(self, borrower: object) -> None:
686 self._limiter.release_on_behalf_of(borrower)
687
688 def statistics(self) -> CapacityLimiterStatistics:
689 if self._internal_limiter is None:
690 return CapacityLimiterStatistics(
691 borrowed_tokens=0,
692 total_tokens=self.total_tokens,
693 borrowers=(),
694 tasks_waiting=0,
695 )
696
697 return self._internal_limiter.statistics()
698
699
700class ResourceGuard:
701 """
702 A context manager for ensuring that a resource is only used by a single task at a
703 time.
704
705 Entering this context manager while the previous has not exited it yet will trigger
706 :exc:`BusyResourceError`.
707
708 :param action: the action to guard against (visible in the :exc:`BusyResourceError`
709 when triggered, e.g. "Another task is already {action} this resource")
710
711 .. versionadded:: 4.1
712 """
713
714 __slots__ = "action", "_guarded"
715
716 def __init__(self, action: str = "using"):
717 self.action: str = action
718 self._guarded = False
719
720 def __enter__(self) -> None:
721 if self._guarded:
722 raise BusyResourceError(self.action)
723
724 self._guarded = True
725
726 def __exit__(
727 self,
728 exc_type: type[BaseException] | None,
729 exc_val: BaseException | None,
730 exc_tb: TracebackType | None,
731 ) -> None:
732 self._guarded = False