Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_synchronization.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

348 statements  

1from __future__ import annotations 

2 

3import math 

4from collections import deque 

5from collections.abc import Callable 

6from dataclasses import dataclass 

7from types import TracebackType 

8from typing import TypeVar 

9 

10from sniffio import AsyncLibraryNotFoundError 

11 

12from ..lowlevel import checkpoint_if_cancelled 

13from ._eventloop import get_async_backend 

14from ._exceptions import BusyResourceError 

15from ._tasks import CancelScope 

16from ._testing import TaskInfo, get_current_task 

17 

18T = TypeVar("T") 

19 

20 

21@dataclass(frozen=True) 

22class EventStatistics: 

23 """ 

24 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` 

25 """ 

26 

27 tasks_waiting: int 

28 

29 

30@dataclass(frozen=True) 

31class CapacityLimiterStatistics: 

32 """ 

33 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks 

34 :ivar float total_tokens: total number of available tokens 

35 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from 

36 this limiter 

37 :ivar int tasks_waiting: number of tasks waiting on 

38 :meth:`~.CapacityLimiter.acquire` or 

39 :meth:`~.CapacityLimiter.acquire_on_behalf_of` 

40 """ 

41 

42 borrowed_tokens: int 

43 total_tokens: float 

44 borrowers: tuple[object, ...] 

45 tasks_waiting: int 

46 

47 

48@dataclass(frozen=True) 

49class LockStatistics: 

50 """ 

51 :ivar bool locked: flag indicating if this lock is locked or not 

52 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the 

53 lock is not held by any task) 

54 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` 

55 """ 

56 

57 locked: bool 

58 owner: TaskInfo | None 

59 tasks_waiting: int 

60 

61 

62@dataclass(frozen=True) 

63class ConditionStatistics: 

64 """ 

65 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` 

66 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying 

67 :class:`~.Lock` 

68 """ 

69 

70 tasks_waiting: int 

71 lock_statistics: LockStatistics 

72 

73 

74@dataclass(frozen=True) 

75class SemaphoreStatistics: 

76 """ 

77 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` 

78 

79 """ 

80 

81 tasks_waiting: int 

82 

83 

84class Event: 

85 def __new__(cls) -> Event: 

86 try: 

87 return get_async_backend().create_event() 

88 except AsyncLibraryNotFoundError: 

89 return EventAdapter() 

90 

91 def set(self) -> None: 

92 """Set the flag, notifying all listeners.""" 

93 raise NotImplementedError 

94 

95 def is_set(self) -> bool: 

96 """Return ``True`` if the flag is set, ``False`` if not.""" 

97 raise NotImplementedError 

98 

99 async def wait(self) -> None: 

100 """ 

101 Wait until the flag has been set. 

102 

103 If the flag has already been set when this method is called, it returns 

104 immediately. 

105 

106 """ 

107 raise NotImplementedError 

108 

109 def statistics(self) -> EventStatistics: 

110 """Return statistics about the current state of this event.""" 

111 raise NotImplementedError 

112 

113 

114class EventAdapter(Event): 

115 _internal_event: Event | None = None 

116 _is_set: bool = False 

117 

118 def __new__(cls) -> EventAdapter: 

119 return object.__new__(cls) 

120 

121 @property 

122 def _event(self) -> Event: 

123 if self._internal_event is None: 

124 self._internal_event = get_async_backend().create_event() 

125 if self._is_set: 

126 self._internal_event.set() 

127 

128 return self._internal_event 

129 

130 def set(self) -> None: 

131 if self._internal_event is None: 

132 self._is_set = True 

133 else: 

134 self._event.set() 

135 

136 def is_set(self) -> bool: 

137 if self._internal_event is None: 

138 return self._is_set 

139 

140 return self._internal_event.is_set() 

141 

142 async def wait(self) -> None: 

143 await self._event.wait() 

144 

145 def statistics(self) -> EventStatistics: 

146 if self._internal_event is None: 

147 return EventStatistics(tasks_waiting=0) 

148 

149 return self._internal_event.statistics() 

150 

151 

152class Lock: 

153 def __new__(cls, *, fast_acquire: bool = False) -> Lock: 

154 try: 

155 return get_async_backend().create_lock(fast_acquire=fast_acquire) 

156 except AsyncLibraryNotFoundError: 

157 return LockAdapter(fast_acquire=fast_acquire) 

158 

159 async def __aenter__(self) -> None: 

160 await self.acquire() 

161 

162 async def __aexit__( 

163 self, 

164 exc_type: type[BaseException] | None, 

165 exc_val: BaseException | None, 

166 exc_tb: TracebackType | None, 

167 ) -> None: 

168 self.release() 

169 

170 async def acquire(self) -> None: 

171 """Acquire the lock.""" 

172 raise NotImplementedError 

173 

174 def acquire_nowait(self) -> None: 

175 """ 

176 Acquire the lock, without blocking. 

177 

178 :raises ~anyio.WouldBlock: if the operation would block 

179 

180 """ 

181 raise NotImplementedError 

182 

183 def release(self) -> None: 

184 """Release the lock.""" 

185 raise NotImplementedError 

186 

187 def locked(self) -> bool: 

188 """Return True if the lock is currently held.""" 

189 raise NotImplementedError 

190 

191 def statistics(self) -> LockStatistics: 

192 """ 

193 Return statistics about the current state of this lock. 

194 

195 .. versionadded:: 3.0 

196 """ 

197 raise NotImplementedError 

198 

199 

200class LockAdapter(Lock): 

201 _internal_lock: Lock | None = None 

202 

203 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter: 

204 return object.__new__(cls) 

205 

206 def __init__(self, *, fast_acquire: bool = False): 

207 self._fast_acquire = fast_acquire 

208 

209 @property 

210 def _lock(self) -> Lock: 

211 if self._internal_lock is None: 

212 self._internal_lock = get_async_backend().create_lock( 

213 fast_acquire=self._fast_acquire 

214 ) 

215 

216 return self._internal_lock 

217 

218 async def __aenter__(self) -> None: 

219 await self._lock.acquire() 

220 

221 async def __aexit__( 

222 self, 

223 exc_type: type[BaseException] | None, 

224 exc_val: BaseException | None, 

225 exc_tb: TracebackType | None, 

226 ) -> None: 

227 if self._internal_lock is not None: 

228 self._internal_lock.release() 

229 

230 async def acquire(self) -> None: 

231 """Acquire the lock.""" 

232 await self._lock.acquire() 

233 

234 def acquire_nowait(self) -> None: 

235 """ 

236 Acquire the lock, without blocking. 

237 

238 :raises ~anyio.WouldBlock: if the operation would block 

239 

240 """ 

241 self._lock.acquire_nowait() 

242 

243 def release(self) -> None: 

244 """Release the lock.""" 

245 self._lock.release() 

246 

247 def locked(self) -> bool: 

248 """Return True if the lock is currently held.""" 

249 return self._lock.locked() 

250 

251 def statistics(self) -> LockStatistics: 

252 """ 

253 Return statistics about the current state of this lock. 

254 

255 .. versionadded:: 3.0 

256 

257 """ 

258 if self._internal_lock is None: 

259 return LockStatistics(False, None, 0) 

260 

261 return self._internal_lock.statistics() 

262 

263 

264class Condition: 

265 _owner_task: TaskInfo | None = None 

266 

267 def __init__(self, lock: Lock | None = None): 

268 self._lock = lock or Lock() 

269 self._waiters: deque[Event] = deque() 

270 

271 async def __aenter__(self) -> None: 

272 await self.acquire() 

273 

274 async def __aexit__( 

275 self, 

276 exc_type: type[BaseException] | None, 

277 exc_val: BaseException | None, 

278 exc_tb: TracebackType | None, 

279 ) -> None: 

280 self.release() 

281 

282 def _check_acquired(self) -> None: 

283 if self._owner_task != get_current_task(): 

284 raise RuntimeError("The current task is not holding the underlying lock") 

285 

286 async def acquire(self) -> None: 

287 """Acquire the underlying lock.""" 

288 await self._lock.acquire() 

289 self._owner_task = get_current_task() 

290 

291 def acquire_nowait(self) -> None: 

292 """ 

293 Acquire the underlying lock, without blocking. 

294 

295 :raises ~anyio.WouldBlock: if the operation would block 

296 

297 """ 

298 self._lock.acquire_nowait() 

299 self._owner_task = get_current_task() 

300 

301 def release(self) -> None: 

302 """Release the underlying lock.""" 

303 self._lock.release() 

304 

305 def locked(self) -> bool: 

306 """Return True if the lock is set.""" 

307 return self._lock.locked() 

308 

309 def notify(self, n: int = 1) -> None: 

310 """Notify exactly n listeners.""" 

311 self._check_acquired() 

312 for _ in range(n): 

313 try: 

314 event = self._waiters.popleft() 

315 except IndexError: 

316 break 

317 

318 event.set() 

319 

320 def notify_all(self) -> None: 

321 """Notify all the listeners.""" 

322 self._check_acquired() 

323 for event in self._waiters: 

324 event.set() 

325 

326 self._waiters.clear() 

327 

328 async def wait(self) -> None: 

329 """Wait for a notification.""" 

330 await checkpoint_if_cancelled() 

331 self._check_acquired() 

332 event = Event() 

333 self._waiters.append(event) 

334 self.release() 

335 try: 

336 await event.wait() 

337 except BaseException: 

338 if not event.is_set(): 

339 self._waiters.remove(event) 

340 

341 raise 

342 finally: 

343 with CancelScope(shield=True): 

344 await self.acquire() 

345 

346 async def wait_for(self, predicate: Callable[[], T]) -> T: 

347 """ 

348 Wait until a predicate becomes true. 

349 

350 :param predicate: a callable that returns a truthy value when the condition is 

351 met 

352 :return: the result of the predicate 

353 

354 .. versionadded:: 4.11.0 

355 

356 """ 

357 while not (result := predicate()): 

358 await self.wait() 

359 

360 return result 

361 

362 def statistics(self) -> ConditionStatistics: 

363 """ 

364 Return statistics about the current state of this condition. 

365 

366 .. versionadded:: 3.0 

367 """ 

368 return ConditionStatistics(len(self._waiters), self._lock.statistics()) 

369 

370 

371class Semaphore: 

372 def __new__( 

373 cls, 

374 initial_value: int, 

375 *, 

376 max_value: int | None = None, 

377 fast_acquire: bool = False, 

378 ) -> Semaphore: 

379 try: 

380 return get_async_backend().create_semaphore( 

381 initial_value, max_value=max_value, fast_acquire=fast_acquire 

382 ) 

383 except AsyncLibraryNotFoundError: 

384 return SemaphoreAdapter(initial_value, max_value=max_value) 

385 

386 def __init__( 

387 self, 

388 initial_value: int, 

389 *, 

390 max_value: int | None = None, 

391 fast_acquire: bool = False, 

392 ): 

393 if not isinstance(initial_value, int): 

394 raise TypeError("initial_value must be an integer") 

395 if initial_value < 0: 

396 raise ValueError("initial_value must be >= 0") 

397 if max_value is not None: 

398 if not isinstance(max_value, int): 

399 raise TypeError("max_value must be an integer or None") 

400 if max_value < initial_value: 

401 raise ValueError( 

402 "max_value must be equal to or higher than initial_value" 

403 ) 

404 

405 self._fast_acquire = fast_acquire 

406 

407 async def __aenter__(self) -> Semaphore: 

408 await self.acquire() 

409 return self 

410 

411 async def __aexit__( 

412 self, 

413 exc_type: type[BaseException] | None, 

414 exc_val: BaseException | None, 

415 exc_tb: TracebackType | None, 

416 ) -> None: 

417 self.release() 

418 

419 async def acquire(self) -> None: 

420 """Decrement the semaphore value, blocking if necessary.""" 

421 raise NotImplementedError 

422 

423 def acquire_nowait(self) -> None: 

424 """ 

425 Acquire the underlying lock, without blocking. 

426 

427 :raises ~anyio.WouldBlock: if the operation would block 

428 

429 """ 

430 raise NotImplementedError 

431 

432 def release(self) -> None: 

433 """Increment the semaphore value.""" 

434 raise NotImplementedError 

435 

436 @property 

437 def value(self) -> int: 

438 """The current value of the semaphore.""" 

439 raise NotImplementedError 

440 

441 @property 

442 def max_value(self) -> int | None: 

443 """The maximum value of the semaphore.""" 

444 raise NotImplementedError 

445 

446 def statistics(self) -> SemaphoreStatistics: 

447 """ 

448 Return statistics about the current state of this semaphore. 

449 

450 .. versionadded:: 3.0 

451 """ 

452 raise NotImplementedError 

453 

454 

455class SemaphoreAdapter(Semaphore): 

456 _internal_semaphore: Semaphore | None = None 

457 

458 def __new__( 

459 cls, 

460 initial_value: int, 

461 *, 

462 max_value: int | None = None, 

463 fast_acquire: bool = False, 

464 ) -> SemaphoreAdapter: 

465 return object.__new__(cls) 

466 

467 def __init__( 

468 self, 

469 initial_value: int, 

470 *, 

471 max_value: int | None = None, 

472 fast_acquire: bool = False, 

473 ) -> None: 

474 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire) 

475 self._initial_value = initial_value 

476 self._max_value = max_value 

477 

478 @property 

479 def _semaphore(self) -> Semaphore: 

480 if self._internal_semaphore is None: 

481 self._internal_semaphore = get_async_backend().create_semaphore( 

482 self._initial_value, max_value=self._max_value 

483 ) 

484 

485 return self._internal_semaphore 

486 

487 async def acquire(self) -> None: 

488 await self._semaphore.acquire() 

489 

490 def acquire_nowait(self) -> None: 

491 self._semaphore.acquire_nowait() 

492 

493 def release(self) -> None: 

494 self._semaphore.release() 

495 

496 @property 

497 def value(self) -> int: 

498 if self._internal_semaphore is None: 

499 return self._initial_value 

500 

501 return self._semaphore.value 

502 

503 @property 

504 def max_value(self) -> int | None: 

505 return self._max_value 

506 

507 def statistics(self) -> SemaphoreStatistics: 

508 if self._internal_semaphore is None: 

509 return SemaphoreStatistics(tasks_waiting=0) 

510 

511 return self._semaphore.statistics() 

512 

513 

514class CapacityLimiter: 

515 def __new__(cls, total_tokens: float) -> CapacityLimiter: 

516 try: 

517 return get_async_backend().create_capacity_limiter(total_tokens) 

518 except AsyncLibraryNotFoundError: 

519 return CapacityLimiterAdapter(total_tokens) 

520 

521 async def __aenter__(self) -> None: 

522 raise NotImplementedError 

523 

524 async def __aexit__( 

525 self, 

526 exc_type: type[BaseException] | None, 

527 exc_val: BaseException | None, 

528 exc_tb: TracebackType | None, 

529 ) -> None: 

530 raise NotImplementedError 

531 

532 @property 

533 def total_tokens(self) -> float: 

534 """ 

535 The total number of tokens available for borrowing. 

536 

537 This is a read-write property. If the total number of tokens is increased, the 

538 proportionate number of tasks waiting on this limiter will be granted their 

539 tokens. 

540 

541 .. versionchanged:: 3.0 

542 The property is now writable. 

543 

544 """ 

545 raise NotImplementedError 

546 

547 @total_tokens.setter 

548 def total_tokens(self, value: float) -> None: 

549 raise NotImplementedError 

550 

551 @property 

552 def borrowed_tokens(self) -> int: 

553 """The number of tokens that have currently been borrowed.""" 

554 raise NotImplementedError 

555 

556 @property 

557 def available_tokens(self) -> float: 

558 """The number of tokens currently available to be borrowed""" 

559 raise NotImplementedError 

560 

561 def acquire_nowait(self) -> None: 

562 """ 

563 Acquire a token for the current task without waiting for one to become 

564 available. 

565 

566 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

567 

568 """ 

569 raise NotImplementedError 

570 

571 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

572 """ 

573 Acquire a token without waiting for one to become available. 

574 

575 :param borrower: the entity borrowing a token 

576 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

577 

578 """ 

579 raise NotImplementedError 

580 

581 async def acquire(self) -> None: 

582 """ 

583 Acquire a token for the current task, waiting if necessary for one to become 

584 available. 

585 

586 """ 

587 raise NotImplementedError 

588 

589 async def acquire_on_behalf_of(self, borrower: object) -> None: 

590 """ 

591 Acquire a token, waiting if necessary for one to become available. 

592 

593 :param borrower: the entity borrowing a token 

594 

595 """ 

596 raise NotImplementedError 

597 

598 def release(self) -> None: 

599 """ 

600 Release the token held by the current task. 

601 

602 :raises RuntimeError: if the current task has not borrowed a token from this 

603 limiter. 

604 

605 """ 

606 raise NotImplementedError 

607 

608 def release_on_behalf_of(self, borrower: object) -> None: 

609 """ 

610 Release the token held by the given borrower. 

611 

612 :raises RuntimeError: if the borrower has not borrowed a token from this 

613 limiter. 

614 

615 """ 

616 raise NotImplementedError 

617 

618 def statistics(self) -> CapacityLimiterStatistics: 

619 """ 

620 Return statistics about the current state of this limiter. 

621 

622 .. versionadded:: 3.0 

623 

624 """ 

625 raise NotImplementedError 

626 

627 

628class CapacityLimiterAdapter(CapacityLimiter): 

629 _internal_limiter: CapacityLimiter | None = None 

630 

631 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter: 

632 return object.__new__(cls) 

633 

634 def __init__(self, total_tokens: float) -> None: 

635 self.total_tokens = total_tokens 

636 

637 @property 

638 def _limiter(self) -> CapacityLimiter: 

639 if self._internal_limiter is None: 

640 self._internal_limiter = get_async_backend().create_capacity_limiter( 

641 self._total_tokens 

642 ) 

643 

644 return self._internal_limiter 

645 

646 async def __aenter__(self) -> None: 

647 await self._limiter.__aenter__() 

648 

649 async def __aexit__( 

650 self, 

651 exc_type: type[BaseException] | None, 

652 exc_val: BaseException | None, 

653 exc_tb: TracebackType | None, 

654 ) -> None: 

655 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb) 

656 

657 @property 

658 def total_tokens(self) -> float: 

659 if self._internal_limiter is None: 

660 return self._total_tokens 

661 

662 return self._internal_limiter.total_tokens 

663 

664 @total_tokens.setter 

665 def total_tokens(self, value: float) -> None: 

666 if not isinstance(value, int) and value is not math.inf: 

667 raise TypeError("total_tokens must be an int or math.inf") 

668 elif value < 1: 

669 raise ValueError("total_tokens must be >= 1") 

670 

671 if self._internal_limiter is None: 

672 self._total_tokens = value 

673 return 

674 

675 self._limiter.total_tokens = value 

676 

677 @property 

678 def borrowed_tokens(self) -> int: 

679 if self._internal_limiter is None: 

680 return 0 

681 

682 return self._internal_limiter.borrowed_tokens 

683 

684 @property 

685 def available_tokens(self) -> float: 

686 if self._internal_limiter is None: 

687 return self._total_tokens 

688 

689 return self._internal_limiter.available_tokens 

690 

691 def acquire_nowait(self) -> None: 

692 self._limiter.acquire_nowait() 

693 

694 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

695 self._limiter.acquire_on_behalf_of_nowait(borrower) 

696 

697 async def acquire(self) -> None: 

698 await self._limiter.acquire() 

699 

700 async def acquire_on_behalf_of(self, borrower: object) -> None: 

701 await self._limiter.acquire_on_behalf_of(borrower) 

702 

703 def release(self) -> None: 

704 self._limiter.release() 

705 

706 def release_on_behalf_of(self, borrower: object) -> None: 

707 self._limiter.release_on_behalf_of(borrower) 

708 

709 def statistics(self) -> CapacityLimiterStatistics: 

710 if self._internal_limiter is None: 

711 return CapacityLimiterStatistics( 

712 borrowed_tokens=0, 

713 total_tokens=self.total_tokens, 

714 borrowers=(), 

715 tasks_waiting=0, 

716 ) 

717 

718 return self._internal_limiter.statistics() 

719 

720 

721class ResourceGuard: 

722 """ 

723 A context manager for ensuring that a resource is only used by a single task at a 

724 time. 

725 

726 Entering this context manager while the previous has not exited it yet will trigger 

727 :exc:`BusyResourceError`. 

728 

729 :param action: the action to guard against (visible in the :exc:`BusyResourceError` 

730 when triggered, e.g. "Another task is already {action} this resource") 

731 

732 .. versionadded:: 4.1 

733 """ 

734 

735 __slots__ = "action", "_guarded" 

736 

737 def __init__(self, action: str = "using"): 

738 self.action: str = action 

739 self._guarded = False 

740 

741 def __enter__(self) -> None: 

742 if self._guarded: 

743 raise BusyResourceError(self.action) 

744 

745 self._guarded = True 

746 

747 def __exit__( 

748 self, 

749 exc_type: type[BaseException] | None, 

750 exc_val: BaseException | None, 

751 exc_tb: TracebackType | None, 

752 ) -> None: 

753 self._guarded = False