1from __future__ import annotations
2
3import math
4from collections import deque
5from dataclasses import dataclass
6from types import TracebackType
7
8from sniffio import AsyncLibraryNotFoundError
9
10from ..lowlevel import checkpoint
11from ._eventloop import get_async_backend
12from ._exceptions import BusyResourceError
13from ._tasks import CancelScope
14from ._testing import TaskInfo, get_current_task
15
16
17@dataclass(frozen=True)
18class EventStatistics:
19 """
20 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait`
21 """
22
23 tasks_waiting: int
24
25
26@dataclass(frozen=True)
27class CapacityLimiterStatistics:
28 """
29 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks
30 :ivar float total_tokens: total number of available tokens
31 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from
32 this limiter
33 :ivar int tasks_waiting: number of tasks waiting on
34 :meth:`~.CapacityLimiter.acquire` or
35 :meth:`~.CapacityLimiter.acquire_on_behalf_of`
36 """
37
38 borrowed_tokens: int
39 total_tokens: float
40 borrowers: tuple[object, ...]
41 tasks_waiting: int
42
43
44@dataclass(frozen=True)
45class LockStatistics:
46 """
47 :ivar bool locked: flag indicating if this lock is locked or not
48 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the
49 lock is not held by any task)
50 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire`
51 """
52
53 locked: bool
54 owner: TaskInfo | None
55 tasks_waiting: int
56
57
58@dataclass(frozen=True)
59class ConditionStatistics:
60 """
61 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait`
62 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying
63 :class:`~.Lock`
64 """
65
66 tasks_waiting: int
67 lock_statistics: LockStatistics
68
69
70@dataclass(frozen=True)
71class SemaphoreStatistics:
72 """
73 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire`
74
75 """
76
77 tasks_waiting: int
78
79
80class Event:
81 def __new__(cls) -> Event:
82 try:
83 return get_async_backend().create_event()
84 except AsyncLibraryNotFoundError:
85 return EventAdapter()
86
87 def set(self) -> None:
88 """Set the flag, notifying all listeners."""
89 raise NotImplementedError
90
91 def is_set(self) -> bool:
92 """Return ``True`` if the flag is set, ``False`` if not."""
93 raise NotImplementedError
94
95 async def wait(self) -> None:
96 """
97 Wait until the flag has been set.
98
99 If the flag has already been set when this method is called, it returns
100 immediately.
101
102 """
103 raise NotImplementedError
104
105 def statistics(self) -> EventStatistics:
106 """Return statistics about the current state of this event."""
107 raise NotImplementedError
108
109
110class EventAdapter(Event):
111 _internal_event: Event | None = None
112
113 def __new__(cls) -> EventAdapter:
114 return object.__new__(cls)
115
116 @property
117 def _event(self) -> Event:
118 if self._internal_event is None:
119 self._internal_event = get_async_backend().create_event()
120
121 return self._internal_event
122
123 def set(self) -> None:
124 self._event.set()
125
126 def is_set(self) -> bool:
127 return self._internal_event is not None and self._internal_event.is_set()
128
129 async def wait(self) -> None:
130 await self._event.wait()
131
132 def statistics(self) -> EventStatistics:
133 if self._internal_event is None:
134 return EventStatistics(tasks_waiting=0)
135
136 return self._internal_event.statistics()
137
138
139class Lock:
140 def __new__(cls, *, fast_acquire: bool = False) -> Lock:
141 try:
142 return get_async_backend().create_lock(fast_acquire=fast_acquire)
143 except AsyncLibraryNotFoundError:
144 return LockAdapter(fast_acquire=fast_acquire)
145
146 async def __aenter__(self) -> None:
147 await self.acquire()
148
149 async def __aexit__(
150 self,
151 exc_type: type[BaseException] | None,
152 exc_val: BaseException | None,
153 exc_tb: TracebackType | None,
154 ) -> None:
155 self.release()
156
157 async def acquire(self) -> None:
158 """Acquire the lock."""
159 raise NotImplementedError
160
161 def acquire_nowait(self) -> None:
162 """
163 Acquire the lock, without blocking.
164
165 :raises ~anyio.WouldBlock: if the operation would block
166
167 """
168 raise NotImplementedError
169
170 def release(self) -> None:
171 """Release the lock."""
172 raise NotImplementedError
173
174 def locked(self) -> bool:
175 """Return True if the lock is currently held."""
176 raise NotImplementedError
177
178 def statistics(self) -> LockStatistics:
179 """
180 Return statistics about the current state of this lock.
181
182 .. versionadded:: 3.0
183 """
184 raise NotImplementedError
185
186
187class LockAdapter(Lock):
188 _internal_lock: Lock | None = None
189
190 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter:
191 return object.__new__(cls)
192
193 def __init__(self, *, fast_acquire: bool = False):
194 self._fast_acquire = fast_acquire
195
196 @property
197 def _lock(self) -> Lock:
198 if self._internal_lock is None:
199 self._internal_lock = get_async_backend().create_lock(
200 fast_acquire=self._fast_acquire
201 )
202
203 return self._internal_lock
204
205 async def __aenter__(self) -> None:
206 await self._lock.acquire()
207
208 async def __aexit__(
209 self,
210 exc_type: type[BaseException] | None,
211 exc_val: BaseException | None,
212 exc_tb: TracebackType | None,
213 ) -> None:
214 if self._internal_lock is not None:
215 self._internal_lock.release()
216
217 async def acquire(self) -> None:
218 """Acquire the lock."""
219 await self._lock.acquire()
220
221 def acquire_nowait(self) -> None:
222 """
223 Acquire the lock, without blocking.
224
225 :raises ~anyio.WouldBlock: if the operation would block
226
227 """
228 self._lock.acquire_nowait()
229
230 def release(self) -> None:
231 """Release the lock."""
232 self._lock.release()
233
234 def locked(self) -> bool:
235 """Return True if the lock is currently held."""
236 return self._lock.locked()
237
238 def statistics(self) -> LockStatistics:
239 """
240 Return statistics about the current state of this lock.
241
242 .. versionadded:: 3.0
243
244 """
245 if self._internal_lock is None:
246 return LockStatistics(False, None, 0)
247
248 return self._internal_lock.statistics()
249
250
251class Condition:
252 _owner_task: TaskInfo | None = None
253
254 def __init__(self, lock: Lock | None = None):
255 self._lock = lock or Lock()
256 self._waiters: deque[Event] = deque()
257
258 async def __aenter__(self) -> None:
259 await self.acquire()
260
261 async def __aexit__(
262 self,
263 exc_type: type[BaseException] | None,
264 exc_val: BaseException | None,
265 exc_tb: TracebackType | None,
266 ) -> None:
267 self.release()
268
269 def _check_acquired(self) -> None:
270 if self._owner_task != get_current_task():
271 raise RuntimeError("The current task is not holding the underlying lock")
272
273 async def acquire(self) -> None:
274 """Acquire the underlying lock."""
275 await self._lock.acquire()
276 self._owner_task = get_current_task()
277
278 def acquire_nowait(self) -> None:
279 """
280 Acquire the underlying lock, without blocking.
281
282 :raises ~anyio.WouldBlock: if the operation would block
283
284 """
285 self._lock.acquire_nowait()
286 self._owner_task = get_current_task()
287
288 def release(self) -> None:
289 """Release the underlying lock."""
290 self._lock.release()
291
292 def locked(self) -> bool:
293 """Return True if the lock is set."""
294 return self._lock.locked()
295
296 def notify(self, n: int = 1) -> None:
297 """Notify exactly n listeners."""
298 self._check_acquired()
299 for _ in range(n):
300 try:
301 event = self._waiters.popleft()
302 except IndexError:
303 break
304
305 event.set()
306
307 def notify_all(self) -> None:
308 """Notify all the listeners."""
309 self._check_acquired()
310 for event in self._waiters:
311 event.set()
312
313 self._waiters.clear()
314
315 async def wait(self) -> None:
316 """Wait for a notification."""
317 await checkpoint()
318 event = Event()
319 self._waiters.append(event)
320 self.release()
321 try:
322 await event.wait()
323 except BaseException:
324 if not event.is_set():
325 self._waiters.remove(event)
326
327 raise
328 finally:
329 with CancelScope(shield=True):
330 await self.acquire()
331
332 def statistics(self) -> ConditionStatistics:
333 """
334 Return statistics about the current state of this condition.
335
336 .. versionadded:: 3.0
337 """
338 return ConditionStatistics(len(self._waiters), self._lock.statistics())
339
340
341class Semaphore:
342 def __new__(
343 cls,
344 initial_value: int,
345 *,
346 max_value: int | None = None,
347 fast_acquire: bool = False,
348 ) -> Semaphore:
349 try:
350 return get_async_backend().create_semaphore(
351 initial_value, max_value=max_value, fast_acquire=fast_acquire
352 )
353 except AsyncLibraryNotFoundError:
354 return SemaphoreAdapter(initial_value, max_value=max_value)
355
356 def __init__(
357 self,
358 initial_value: int,
359 *,
360 max_value: int | None = None,
361 fast_acquire: bool = False,
362 ):
363 if not isinstance(initial_value, int):
364 raise TypeError("initial_value must be an integer")
365 if initial_value < 0:
366 raise ValueError("initial_value must be >= 0")
367 if max_value is not None:
368 if not isinstance(max_value, int):
369 raise TypeError("max_value must be an integer or None")
370 if max_value < initial_value:
371 raise ValueError(
372 "max_value must be equal to or higher than initial_value"
373 )
374
375 self._fast_acquire = fast_acquire
376
377 async def __aenter__(self) -> Semaphore:
378 await self.acquire()
379 return self
380
381 async def __aexit__(
382 self,
383 exc_type: type[BaseException] | None,
384 exc_val: BaseException | None,
385 exc_tb: TracebackType | None,
386 ) -> None:
387 self.release()
388
389 async def acquire(self) -> None:
390 """Decrement the semaphore value, blocking if necessary."""
391 raise NotImplementedError
392
393 def acquire_nowait(self) -> None:
394 """
395 Acquire the underlying lock, without blocking.
396
397 :raises ~anyio.WouldBlock: if the operation would block
398
399 """
400 raise NotImplementedError
401
402 def release(self) -> None:
403 """Increment the semaphore value."""
404 raise NotImplementedError
405
406 @property
407 def value(self) -> int:
408 """The current value of the semaphore."""
409 raise NotImplementedError
410
411 @property
412 def max_value(self) -> int | None:
413 """The maximum value of the semaphore."""
414 raise NotImplementedError
415
416 def statistics(self) -> SemaphoreStatistics:
417 """
418 Return statistics about the current state of this semaphore.
419
420 .. versionadded:: 3.0
421 """
422 raise NotImplementedError
423
424
425class SemaphoreAdapter(Semaphore):
426 _internal_semaphore: Semaphore | None = None
427
428 def __new__(
429 cls,
430 initial_value: int,
431 *,
432 max_value: int | None = None,
433 fast_acquire: bool = False,
434 ) -> SemaphoreAdapter:
435 return object.__new__(cls)
436
437 def __init__(
438 self,
439 initial_value: int,
440 *,
441 max_value: int | None = None,
442 fast_acquire: bool = False,
443 ) -> None:
444 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire)
445 self._initial_value = initial_value
446 self._max_value = max_value
447
448 @property
449 def _semaphore(self) -> Semaphore:
450 if self._internal_semaphore is None:
451 self._internal_semaphore = get_async_backend().create_semaphore(
452 self._initial_value, max_value=self._max_value
453 )
454
455 return self._internal_semaphore
456
457 async def acquire(self) -> None:
458 await self._semaphore.acquire()
459
460 def acquire_nowait(self) -> None:
461 self._semaphore.acquire_nowait()
462
463 def release(self) -> None:
464 self._semaphore.release()
465
466 @property
467 def value(self) -> int:
468 if self._internal_semaphore is None:
469 return self._initial_value
470
471 return self._semaphore.value
472
473 @property
474 def max_value(self) -> int | None:
475 return self._max_value
476
477 def statistics(self) -> SemaphoreStatistics:
478 if self._internal_semaphore is None:
479 return SemaphoreStatistics(tasks_waiting=0)
480
481 return self._semaphore.statistics()
482
483
484class CapacityLimiter:
485 def __new__(cls, total_tokens: float) -> CapacityLimiter:
486 try:
487 return get_async_backend().create_capacity_limiter(total_tokens)
488 except AsyncLibraryNotFoundError:
489 return CapacityLimiterAdapter(total_tokens)
490
491 async def __aenter__(self) -> None:
492 raise NotImplementedError
493
494 async def __aexit__(
495 self,
496 exc_type: type[BaseException] | None,
497 exc_val: BaseException | None,
498 exc_tb: TracebackType | None,
499 ) -> bool | None:
500 raise NotImplementedError
501
502 @property
503 def total_tokens(self) -> float:
504 """
505 The total number of tokens available for borrowing.
506
507 This is a read-write property. If the total number of tokens is increased, the
508 proportionate number of tasks waiting on this limiter will be granted their
509 tokens.
510
511 .. versionchanged:: 3.0
512 The property is now writable.
513
514 """
515 raise NotImplementedError
516
517 @total_tokens.setter
518 def total_tokens(self, value: float) -> None:
519 raise NotImplementedError
520
521 @property
522 def borrowed_tokens(self) -> int:
523 """The number of tokens that have currently been borrowed."""
524 raise NotImplementedError
525
526 @property
527 def available_tokens(self) -> float:
528 """The number of tokens currently available to be borrowed"""
529 raise NotImplementedError
530
531 def acquire_nowait(self) -> None:
532 """
533 Acquire a token for the current task without waiting for one to become
534 available.
535
536 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
537
538 """
539 raise NotImplementedError
540
541 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
542 """
543 Acquire a token without waiting for one to become available.
544
545 :param borrower: the entity borrowing a token
546 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing
547
548 """
549 raise NotImplementedError
550
551 async def acquire(self) -> None:
552 """
553 Acquire a token for the current task, waiting if necessary for one to become
554 available.
555
556 """
557 raise NotImplementedError
558
559 async def acquire_on_behalf_of(self, borrower: object) -> None:
560 """
561 Acquire a token, waiting if necessary for one to become available.
562
563 :param borrower: the entity borrowing a token
564
565 """
566 raise NotImplementedError
567
568 def release(self) -> None:
569 """
570 Release the token held by the current task.
571
572 :raises RuntimeError: if the current task has not borrowed a token from this
573 limiter.
574
575 """
576 raise NotImplementedError
577
578 def release_on_behalf_of(self, borrower: object) -> None:
579 """
580 Release the token held by the given borrower.
581
582 :raises RuntimeError: if the borrower has not borrowed a token from this
583 limiter.
584
585 """
586 raise NotImplementedError
587
588 def statistics(self) -> CapacityLimiterStatistics:
589 """
590 Return statistics about the current state of this limiter.
591
592 .. versionadded:: 3.0
593
594 """
595 raise NotImplementedError
596
597
598class CapacityLimiterAdapter(CapacityLimiter):
599 _internal_limiter: CapacityLimiter | None = None
600
601 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter:
602 return object.__new__(cls)
603
604 def __init__(self, total_tokens: float) -> None:
605 self.total_tokens = total_tokens
606
607 @property
608 def _limiter(self) -> CapacityLimiter:
609 if self._internal_limiter is None:
610 self._internal_limiter = get_async_backend().create_capacity_limiter(
611 self._total_tokens
612 )
613
614 return self._internal_limiter
615
616 async def __aenter__(self) -> None:
617 await self._limiter.__aenter__()
618
619 async def __aexit__(
620 self,
621 exc_type: type[BaseException] | None,
622 exc_val: BaseException | None,
623 exc_tb: TracebackType | None,
624 ) -> bool | None:
625 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb)
626
627 @property
628 def total_tokens(self) -> float:
629 if self._internal_limiter is None:
630 return self._total_tokens
631
632 return self._internal_limiter.total_tokens
633
634 @total_tokens.setter
635 def total_tokens(self, value: float) -> None:
636 if not isinstance(value, int) and value is not math.inf:
637 raise TypeError("total_tokens must be an int or math.inf")
638 elif value < 1:
639 raise ValueError("total_tokens must be >= 1")
640
641 if self._internal_limiter is None:
642 self._total_tokens = value
643 return
644
645 self._limiter.total_tokens = value
646
647 @property
648 def borrowed_tokens(self) -> int:
649 if self._internal_limiter is None:
650 return 0
651
652 return self._internal_limiter.borrowed_tokens
653
654 @property
655 def available_tokens(self) -> float:
656 if self._internal_limiter is None:
657 return self._total_tokens
658
659 return self._internal_limiter.available_tokens
660
661 def acquire_nowait(self) -> None:
662 self._limiter.acquire_nowait()
663
664 def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
665 self._limiter.acquire_on_behalf_of_nowait(borrower)
666
667 async def acquire(self) -> None:
668 await self._limiter.acquire()
669
670 async def acquire_on_behalf_of(self, borrower: object) -> None:
671 await self._limiter.acquire_on_behalf_of(borrower)
672
673 def release(self) -> None:
674 self._limiter.release()
675
676 def release_on_behalf_of(self, borrower: object) -> None:
677 self._limiter.release_on_behalf_of(borrower)
678
679 def statistics(self) -> CapacityLimiterStatistics:
680 if self._internal_limiter is None:
681 return CapacityLimiterStatistics(
682 borrowed_tokens=0,
683 total_tokens=self.total_tokens,
684 borrowers=(),
685 tasks_waiting=0,
686 )
687
688 return self._internal_limiter.statistics()
689
690
691class ResourceGuard:
692 """
693 A context manager for ensuring that a resource is only used by a single task at a
694 time.
695
696 Entering this context manager while the previous has not exited it yet will trigger
697 :exc:`BusyResourceError`.
698
699 :param action: the action to guard against (visible in the :exc:`BusyResourceError`
700 when triggered, e.g. "Another task is already {action} this resource")
701
702 .. versionadded:: 4.1
703 """
704
705 __slots__ = "action", "_guarded"
706
707 def __init__(self, action: str = "using"):
708 self.action: str = action
709 self._guarded = False
710
711 def __enter__(self) -> None:
712 if self._guarded:
713 raise BusyResourceError(self.action)
714
715 self._guarded = True
716
717 def __exit__(
718 self,
719 exc_type: type[BaseException] | None,
720 exc_val: BaseException | None,
721 exc_tb: TracebackType | None,
722 ) -> bool | None:
723 self._guarded = False
724 return None