Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_synchronization.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

349 statements  

1from __future__ import annotations 

2 

3import math 

4from collections import deque 

5from collections.abc import Callable 

6from dataclasses import dataclass 

7from types import TracebackType 

8from typing import TypeVar 

9 

10from ..lowlevel import checkpoint_if_cancelled 

11from ._eventloop import get_async_backend 

12from ._exceptions import BusyResourceError, NoEventLoopError 

13from ._tasks import CancelScope 

14from ._testing import TaskInfo, get_current_task 

15 

16T = TypeVar("T") 

17 

18 

19@dataclass(frozen=True) 

20class EventStatistics: 

21 """ 

22 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` 

23 """ 

24 

25 tasks_waiting: int 

26 

27 

28@dataclass(frozen=True) 

29class CapacityLimiterStatistics: 

30 """ 

31 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks 

32 :ivar float total_tokens: total number of available tokens 

33 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from 

34 this limiter 

35 :ivar int tasks_waiting: number of tasks waiting on 

36 :meth:`~.CapacityLimiter.acquire` or 

37 :meth:`~.CapacityLimiter.acquire_on_behalf_of` 

38 """ 

39 

40 borrowed_tokens: int 

41 total_tokens: float 

42 borrowers: tuple[object, ...] 

43 tasks_waiting: int 

44 

45 

46@dataclass(frozen=True) 

47class LockStatistics: 

48 """ 

49 :ivar bool locked: flag indicating if this lock is locked or not 

50 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the 

51 lock is not held by any task) 

52 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` 

53 """ 

54 

55 locked: bool 

56 owner: TaskInfo | None 

57 tasks_waiting: int 

58 

59 

60@dataclass(frozen=True) 

61class ConditionStatistics: 

62 """ 

63 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` 

64 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying 

65 :class:`~.Lock` 

66 """ 

67 

68 tasks_waiting: int 

69 lock_statistics: LockStatistics 

70 

71 

72@dataclass(frozen=True) 

73class SemaphoreStatistics: 

74 """ 

75 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` 

76 

77 """ 

78 

79 tasks_waiting: int 

80 

81 

82class Event: 

83 def __new__(cls) -> Event: 

84 try: 

85 return get_async_backend().create_event() 

86 except NoEventLoopError: 

87 return EventAdapter() 

88 

89 def set(self) -> None: 

90 """Set the flag, notifying all listeners.""" 

91 raise NotImplementedError 

92 

93 def is_set(self) -> bool: 

94 """Return ``True`` if the flag is set, ``False`` if not.""" 

95 raise NotImplementedError 

96 

97 async def wait(self) -> None: 

98 """ 

99 Wait until the flag has been set. 

100 

101 If the flag has already been set when this method is called, it returns 

102 immediately. 

103 

104 """ 

105 raise NotImplementedError 

106 

107 def statistics(self) -> EventStatistics: 

108 """Return statistics about the current state of this event.""" 

109 raise NotImplementedError 

110 

111 

112class EventAdapter(Event): 

113 _internal_event: Event | None = None 

114 _is_set: bool = False 

115 

116 def __new__(cls) -> EventAdapter: 

117 return object.__new__(cls) 

118 

119 @property 

120 def _event(self) -> Event: 

121 if self._internal_event is None: 

122 self._internal_event = get_async_backend().create_event() 

123 if self._is_set: 

124 self._internal_event.set() 

125 

126 return self._internal_event 

127 

128 def set(self) -> None: 

129 if self._internal_event is None: 

130 self._is_set = True 

131 else: 

132 self._event.set() 

133 

134 def is_set(self) -> bool: 

135 if self._internal_event is None: 

136 return self._is_set 

137 

138 return self._internal_event.is_set() 

139 

140 async def wait(self) -> None: 

141 await self._event.wait() 

142 

143 def statistics(self) -> EventStatistics: 

144 if self._internal_event is None: 

145 return EventStatistics(tasks_waiting=0) 

146 

147 return self._internal_event.statistics() 

148 

149 

150class Lock: 

151 def __new__(cls, *, fast_acquire: bool = False) -> Lock: 

152 try: 

153 return get_async_backend().create_lock(fast_acquire=fast_acquire) 

154 except NoEventLoopError: 

155 return LockAdapter(fast_acquire=fast_acquire) 

156 

157 async def __aenter__(self) -> None: 

158 await self.acquire() 

159 

160 async def __aexit__( 

161 self, 

162 exc_type: type[BaseException] | None, 

163 exc_val: BaseException | None, 

164 exc_tb: TracebackType | None, 

165 ) -> None: 

166 self.release() 

167 

168 async def acquire(self) -> None: 

169 """Acquire the lock.""" 

170 raise NotImplementedError 

171 

172 def acquire_nowait(self) -> None: 

173 """ 

174 Acquire the lock, without blocking. 

175 

176 :raises ~anyio.WouldBlock: if the operation would block 

177 

178 """ 

179 raise NotImplementedError 

180 

181 def release(self) -> None: 

182 """Release the lock.""" 

183 raise NotImplementedError 

184 

185 def locked(self) -> bool: 

186 """Return True if the lock is currently held.""" 

187 raise NotImplementedError 

188 

189 def statistics(self) -> LockStatistics: 

190 """ 

191 Return statistics about the current state of this lock. 

192 

193 .. versionadded:: 3.0 

194 """ 

195 raise NotImplementedError 

196 

197 

198class LockAdapter(Lock): 

199 _internal_lock: Lock | None = None 

200 

201 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter: 

202 return object.__new__(cls) 

203 

204 def __init__(self, *, fast_acquire: bool = False): 

205 self._fast_acquire = fast_acquire 

206 

207 @property 

208 def _lock(self) -> Lock: 

209 if self._internal_lock is None: 

210 self._internal_lock = get_async_backend().create_lock( 

211 fast_acquire=self._fast_acquire 

212 ) 

213 

214 return self._internal_lock 

215 

216 async def __aenter__(self) -> None: 

217 await self._lock.acquire() 

218 

219 async def __aexit__( 

220 self, 

221 exc_type: type[BaseException] | None, 

222 exc_val: BaseException | None, 

223 exc_tb: TracebackType | None, 

224 ) -> None: 

225 if self._internal_lock is not None: 

226 self._internal_lock.release() 

227 

228 async def acquire(self) -> None: 

229 """Acquire the lock.""" 

230 await self._lock.acquire() 

231 

232 def acquire_nowait(self) -> None: 

233 """ 

234 Acquire the lock, without blocking. 

235 

236 :raises ~anyio.WouldBlock: if the operation would block 

237 

238 """ 

239 self._lock.acquire_nowait() 

240 

241 def release(self) -> None: 

242 """Release the lock.""" 

243 self._lock.release() 

244 

245 def locked(self) -> bool: 

246 """Return True if the lock is currently held.""" 

247 return self._lock.locked() 

248 

249 def statistics(self) -> LockStatistics: 

250 """ 

251 Return statistics about the current state of this lock. 

252 

253 .. versionadded:: 3.0 

254 

255 """ 

256 if self._internal_lock is None: 

257 return LockStatistics(False, None, 0) 

258 

259 return self._internal_lock.statistics() 

260 

261 

262class Condition: 

263 _owner_task: TaskInfo | None = None 

264 

265 def __init__(self, lock: Lock | None = None): 

266 self._lock = lock or Lock() 

267 self._waiters: deque[Event] = deque() 

268 

269 async def __aenter__(self) -> None: 

270 await self.acquire() 

271 

272 async def __aexit__( 

273 self, 

274 exc_type: type[BaseException] | None, 

275 exc_val: BaseException | None, 

276 exc_tb: TracebackType | None, 

277 ) -> None: 

278 self.release() 

279 

280 def _check_acquired(self) -> None: 

281 if self._owner_task != get_current_task(): 

282 raise RuntimeError("The current task is not holding the underlying lock") 

283 

284 async def acquire(self) -> None: 

285 """Acquire the underlying lock.""" 

286 await self._lock.acquire() 

287 self._owner_task = get_current_task() 

288 

289 def acquire_nowait(self) -> None: 

290 """ 

291 Acquire the underlying lock, without blocking. 

292 

293 :raises ~anyio.WouldBlock: if the operation would block 

294 

295 """ 

296 self._lock.acquire_nowait() 

297 self._owner_task = get_current_task() 

298 

299 def release(self) -> None: 

300 """Release the underlying lock.""" 

301 self._lock.release() 

302 

303 def locked(self) -> bool: 

304 """Return True if the lock is set.""" 

305 return self._lock.locked() 

306 

307 def notify(self, n: int = 1) -> None: 

308 """Notify exactly n listeners.""" 

309 self._check_acquired() 

310 for _ in range(n): 

311 try: 

312 event = self._waiters.popleft() 

313 except IndexError: 

314 break 

315 

316 event.set() 

317 

318 def notify_all(self) -> None: 

319 """Notify all the listeners.""" 

320 self._check_acquired() 

321 for event in self._waiters: 

322 event.set() 

323 

324 self._waiters.clear() 

325 

326 async def wait(self) -> None: 

327 """Wait for a notification.""" 

328 await checkpoint_if_cancelled() 

329 self._check_acquired() 

330 event = Event() 

331 self._waiters.append(event) 

332 self.release() 

333 try: 

334 await event.wait() 

335 except BaseException: 

336 if not event.is_set(): 

337 self._waiters.remove(event) 

338 elif self._waiters: 

339 # This task was notified by could not act on it, so pass 

340 # it on to the next task 

341 self._waiters.popleft().set() 

342 

343 raise 

344 finally: 

345 with CancelScope(shield=True): 

346 await self.acquire() 

347 

348 async def wait_for(self, predicate: Callable[[], T]) -> T: 

349 """ 

350 Wait until a predicate becomes true. 

351 

352 :param predicate: a callable that returns a truthy value when the condition is 

353 met 

354 :return: the result of the predicate 

355 

356 .. versionadded:: 4.11.0 

357 

358 """ 

359 while not (result := predicate()): 

360 await self.wait() 

361 

362 return result 

363 

364 def statistics(self) -> ConditionStatistics: 

365 """ 

366 Return statistics about the current state of this condition. 

367 

368 .. versionadded:: 3.0 

369 """ 

370 return ConditionStatistics(len(self._waiters), self._lock.statistics()) 

371 

372 

373class Semaphore: 

374 def __new__( 

375 cls, 

376 initial_value: int, 

377 *, 

378 max_value: int | None = None, 

379 fast_acquire: bool = False, 

380 ) -> Semaphore: 

381 try: 

382 return get_async_backend().create_semaphore( 

383 initial_value, max_value=max_value, fast_acquire=fast_acquire 

384 ) 

385 except NoEventLoopError: 

386 return SemaphoreAdapter(initial_value, max_value=max_value) 

387 

388 def __init__( 

389 self, 

390 initial_value: int, 

391 *, 

392 max_value: int | None = None, 

393 fast_acquire: bool = False, 

394 ): 

395 if not isinstance(initial_value, int): 

396 raise TypeError("initial_value must be an integer") 

397 if initial_value < 0: 

398 raise ValueError("initial_value must be >= 0") 

399 if max_value is not None: 

400 if not isinstance(max_value, int): 

401 raise TypeError("max_value must be an integer or None") 

402 if max_value < initial_value: 

403 raise ValueError( 

404 "max_value must be equal to or higher than initial_value" 

405 ) 

406 

407 self._fast_acquire = fast_acquire 

408 

409 async def __aenter__(self) -> Semaphore: 

410 await self.acquire() 

411 return self 

412 

413 async def __aexit__( 

414 self, 

415 exc_type: type[BaseException] | None, 

416 exc_val: BaseException | None, 

417 exc_tb: TracebackType | None, 

418 ) -> None: 

419 self.release() 

420 

421 async def acquire(self) -> None: 

422 """Decrement the semaphore value, blocking if necessary.""" 

423 raise NotImplementedError 

424 

425 def acquire_nowait(self) -> None: 

426 """ 

427 Acquire the underlying lock, without blocking. 

428 

429 :raises ~anyio.WouldBlock: if the operation would block 

430 

431 """ 

432 raise NotImplementedError 

433 

434 def release(self) -> None: 

435 """Increment the semaphore value.""" 

436 raise NotImplementedError 

437 

438 @property 

439 def value(self) -> int: 

440 """The current value of the semaphore.""" 

441 raise NotImplementedError 

442 

443 @property 

444 def max_value(self) -> int | None: 

445 """The maximum value of the semaphore.""" 

446 raise NotImplementedError 

447 

448 def statistics(self) -> SemaphoreStatistics: 

449 """ 

450 Return statistics about the current state of this semaphore. 

451 

452 .. versionadded:: 3.0 

453 """ 

454 raise NotImplementedError 

455 

456 

457class SemaphoreAdapter(Semaphore): 

458 _internal_semaphore: Semaphore | None = None 

459 

460 def __new__( 

461 cls, 

462 initial_value: int, 

463 *, 

464 max_value: int | None = None, 

465 fast_acquire: bool = False, 

466 ) -> SemaphoreAdapter: 

467 return object.__new__(cls) 

468 

469 def __init__( 

470 self, 

471 initial_value: int, 

472 *, 

473 max_value: int | None = None, 

474 fast_acquire: bool = False, 

475 ) -> None: 

476 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire) 

477 self._initial_value = initial_value 

478 self._max_value = max_value 

479 

480 @property 

481 def _semaphore(self) -> Semaphore: 

482 if self._internal_semaphore is None: 

483 self._internal_semaphore = get_async_backend().create_semaphore( 

484 self._initial_value, max_value=self._max_value 

485 ) 

486 

487 return self._internal_semaphore 

488 

489 async def acquire(self) -> None: 

490 await self._semaphore.acquire() 

491 

492 def acquire_nowait(self) -> None: 

493 self._semaphore.acquire_nowait() 

494 

495 def release(self) -> None: 

496 self._semaphore.release() 

497 

498 @property 

499 def value(self) -> int: 

500 if self._internal_semaphore is None: 

501 return self._initial_value 

502 

503 return self._semaphore.value 

504 

505 @property 

506 def max_value(self) -> int | None: 

507 return self._max_value 

508 

509 def statistics(self) -> SemaphoreStatistics: 

510 if self._internal_semaphore is None: 

511 return SemaphoreStatistics(tasks_waiting=0) 

512 

513 return self._semaphore.statistics() 

514 

515 

516class CapacityLimiter: 

517 def __new__(cls, total_tokens: float) -> CapacityLimiter: 

518 try: 

519 return get_async_backend().create_capacity_limiter(total_tokens) 

520 except NoEventLoopError: 

521 return CapacityLimiterAdapter(total_tokens) 

522 

523 async def __aenter__(self) -> None: 

524 raise NotImplementedError 

525 

526 async def __aexit__( 

527 self, 

528 exc_type: type[BaseException] | None, 

529 exc_val: BaseException | None, 

530 exc_tb: TracebackType | None, 

531 ) -> None: 

532 raise NotImplementedError 

533 

534 @property 

535 def total_tokens(self) -> float: 

536 """ 

537 The total number of tokens available for borrowing. 

538 

539 This is a read-write property. If the total number of tokens is increased, the 

540 proportionate number of tasks waiting on this limiter will be granted their 

541 tokens. 

542 

543 .. versionchanged:: 3.0 

544 The property is now writable. 

545 .. versionchanged:: 4.12 

546 The value can now be set to 0. 

547 

548 """ 

549 raise NotImplementedError 

550 

551 @total_tokens.setter 

552 def total_tokens(self, value: float) -> None: 

553 raise NotImplementedError 

554 

555 @property 

556 def borrowed_tokens(self) -> int: 

557 """The number of tokens that have currently been borrowed.""" 

558 raise NotImplementedError 

559 

560 @property 

561 def available_tokens(self) -> float: 

562 """The number of tokens currently available to be borrowed""" 

563 raise NotImplementedError 

564 

565 def acquire_nowait(self) -> None: 

566 """ 

567 Acquire a token for the current task without waiting for one to become 

568 available. 

569 

570 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

571 

572 """ 

573 raise NotImplementedError 

574 

575 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

576 """ 

577 Acquire a token without waiting for one to become available. 

578 

579 :param borrower: the entity borrowing a token 

580 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

581 

582 """ 

583 raise NotImplementedError 

584 

585 async def acquire(self) -> None: 

586 """ 

587 Acquire a token for the current task, waiting if necessary for one to become 

588 available. 

589 

590 """ 

591 raise NotImplementedError 

592 

593 async def acquire_on_behalf_of(self, borrower: object) -> None: 

594 """ 

595 Acquire a token, waiting if necessary for one to become available. 

596 

597 :param borrower: the entity borrowing a token 

598 

599 """ 

600 raise NotImplementedError 

601 

602 def release(self) -> None: 

603 """ 

604 Release the token held by the current task. 

605 

606 :raises RuntimeError: if the current task has not borrowed a token from this 

607 limiter. 

608 

609 """ 

610 raise NotImplementedError 

611 

612 def release_on_behalf_of(self, borrower: object) -> None: 

613 """ 

614 Release the token held by the given borrower. 

615 

616 :raises RuntimeError: if the borrower has not borrowed a token from this 

617 limiter. 

618 

619 """ 

620 raise NotImplementedError 

621 

622 def statistics(self) -> CapacityLimiterStatistics: 

623 """ 

624 Return statistics about the current state of this limiter. 

625 

626 .. versionadded:: 3.0 

627 

628 """ 

629 raise NotImplementedError 

630 

631 

632class CapacityLimiterAdapter(CapacityLimiter): 

633 _internal_limiter: CapacityLimiter | None = None 

634 

635 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter: 

636 return object.__new__(cls) 

637 

638 def __init__(self, total_tokens: float) -> None: 

639 self.total_tokens = total_tokens 

640 

641 @property 

642 def _limiter(self) -> CapacityLimiter: 

643 if self._internal_limiter is None: 

644 self._internal_limiter = get_async_backend().create_capacity_limiter( 

645 self._total_tokens 

646 ) 

647 

648 return self._internal_limiter 

649 

650 async def __aenter__(self) -> None: 

651 await self._limiter.__aenter__() 

652 

653 async def __aexit__( 

654 self, 

655 exc_type: type[BaseException] | None, 

656 exc_val: BaseException | None, 

657 exc_tb: TracebackType | None, 

658 ) -> None: 

659 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb) 

660 

661 @property 

662 def total_tokens(self) -> float: 

663 if self._internal_limiter is None: 

664 return self._total_tokens 

665 

666 return self._internal_limiter.total_tokens 

667 

668 @total_tokens.setter 

669 def total_tokens(self, value: float) -> None: 

670 if not isinstance(value, int) and value is not math.inf: 

671 raise TypeError("total_tokens must be an int or math.inf") 

672 elif value < 1: 

673 raise ValueError("total_tokens must be >= 1") 

674 

675 if self._internal_limiter is None: 

676 self._total_tokens = value 

677 return 

678 

679 self._limiter.total_tokens = value 

680 

681 @property 

682 def borrowed_tokens(self) -> int: 

683 if self._internal_limiter is None: 

684 return 0 

685 

686 return self._internal_limiter.borrowed_tokens 

687 

688 @property 

689 def available_tokens(self) -> float: 

690 if self._internal_limiter is None: 

691 return self._total_tokens 

692 

693 return self._internal_limiter.available_tokens 

694 

695 def acquire_nowait(self) -> None: 

696 self._limiter.acquire_nowait() 

697 

698 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

699 self._limiter.acquire_on_behalf_of_nowait(borrower) 

700 

701 async def acquire(self) -> None: 

702 await self._limiter.acquire() 

703 

704 async def acquire_on_behalf_of(self, borrower: object) -> None: 

705 await self._limiter.acquire_on_behalf_of(borrower) 

706 

707 def release(self) -> None: 

708 self._limiter.release() 

709 

710 def release_on_behalf_of(self, borrower: object) -> None: 

711 self._limiter.release_on_behalf_of(borrower) 

712 

713 def statistics(self) -> CapacityLimiterStatistics: 

714 if self._internal_limiter is None: 

715 return CapacityLimiterStatistics( 

716 borrowed_tokens=0, 

717 total_tokens=self.total_tokens, 

718 borrowers=(), 

719 tasks_waiting=0, 

720 ) 

721 

722 return self._internal_limiter.statistics() 

723 

724 

725class ResourceGuard: 

726 """ 

727 A context manager for ensuring that a resource is only used by a single task at a 

728 time. 

729 

730 Entering this context manager while the previous has not exited it yet will trigger 

731 :exc:`BusyResourceError`. 

732 

733 :param action: the action to guard against (visible in the :exc:`BusyResourceError` 

734 when triggered, e.g. "Another task is already {action} this resource") 

735 

736 .. versionadded:: 4.1 

737 """ 

738 

739 __slots__ = "action", "_guarded" 

740 

741 def __init__(self, action: str = "using"): 

742 self.action: str = action 

743 self._guarded = False 

744 

745 def __enter__(self) -> None: 

746 if self._guarded: 

747 raise BusyResourceError(self.action) 

748 

749 self._guarded = True 

750 

751 def __exit__( 

752 self, 

753 exc_type: type[BaseException] | None, 

754 exc_val: BaseException | None, 

755 exc_tb: TracebackType | None, 

756 ) -> None: 

757 self._guarded = False