Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_synchronization.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

347 statements  

1from __future__ import annotations 

2 

3import math 

4from collections import deque 

5from collections.abc import Callable 

6from dataclasses import dataclass 

7from types import TracebackType 

8from typing import TypeVar 

9 

10from ..lowlevel import checkpoint_if_cancelled 

11from ._eventloop import NoCurrentAsyncBackend, get_async_backend 

12from ._exceptions import BusyResourceError 

13from ._tasks import CancelScope 

14from ._testing import TaskInfo, get_current_task 

15 

16T = TypeVar("T") 

17 

18 

19@dataclass(frozen=True) 

20class EventStatistics: 

21 """ 

22 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` 

23 """ 

24 

25 tasks_waiting: int 

26 

27 

28@dataclass(frozen=True) 

29class CapacityLimiterStatistics: 

30 """ 

31 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks 

32 :ivar float total_tokens: total number of available tokens 

33 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from 

34 this limiter 

35 :ivar int tasks_waiting: number of tasks waiting on 

36 :meth:`~.CapacityLimiter.acquire` or 

37 :meth:`~.CapacityLimiter.acquire_on_behalf_of` 

38 """ 

39 

40 borrowed_tokens: int 

41 total_tokens: float 

42 borrowers: tuple[object, ...] 

43 tasks_waiting: int 

44 

45 

46@dataclass(frozen=True) 

47class LockStatistics: 

48 """ 

49 :ivar bool locked: flag indicating if this lock is locked or not 

50 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the 

51 lock is not held by any task) 

52 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` 

53 """ 

54 

55 locked: bool 

56 owner: TaskInfo | None 

57 tasks_waiting: int 

58 

59 

60@dataclass(frozen=True) 

61class ConditionStatistics: 

62 """ 

63 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` 

64 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying 

65 :class:`~.Lock` 

66 """ 

67 

68 tasks_waiting: int 

69 lock_statistics: LockStatistics 

70 

71 

72@dataclass(frozen=True) 

73class SemaphoreStatistics: 

74 """ 

75 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` 

76 

77 """ 

78 

79 tasks_waiting: int 

80 

81 

82class Event: 

83 def __new__(cls) -> Event: 

84 try: 

85 return get_async_backend().create_event() 

86 except NoCurrentAsyncBackend: 

87 return EventAdapter() 

88 

89 def set(self) -> None: 

90 """Set the flag, notifying all listeners.""" 

91 raise NotImplementedError 

92 

93 def is_set(self) -> bool: 

94 """Return ``True`` if the flag is set, ``False`` if not.""" 

95 raise NotImplementedError 

96 

97 async def wait(self) -> None: 

98 """ 

99 Wait until the flag has been set. 

100 

101 If the flag has already been set when this method is called, it returns 

102 immediately. 

103 

104 """ 

105 raise NotImplementedError 

106 

107 def statistics(self) -> EventStatistics: 

108 """Return statistics about the current state of this event.""" 

109 raise NotImplementedError 

110 

111 

112class EventAdapter(Event): 

113 _internal_event: Event | None = None 

114 _is_set: bool = False 

115 

116 def __new__(cls) -> EventAdapter: 

117 return object.__new__(cls) 

118 

119 @property 

120 def _event(self) -> Event: 

121 if self._internal_event is None: 

122 self._internal_event = get_async_backend().create_event() 

123 if self._is_set: 

124 self._internal_event.set() 

125 

126 return self._internal_event 

127 

128 def set(self) -> None: 

129 if self._internal_event is None: 

130 self._is_set = True 

131 else: 

132 self._event.set() 

133 

134 def is_set(self) -> bool: 

135 if self._internal_event is None: 

136 return self._is_set 

137 

138 return self._internal_event.is_set() 

139 

140 async def wait(self) -> None: 

141 await self._event.wait() 

142 

143 def statistics(self) -> EventStatistics: 

144 if self._internal_event is None: 

145 return EventStatistics(tasks_waiting=0) 

146 

147 return self._internal_event.statistics() 

148 

149 

150class Lock: 

151 def __new__(cls, *, fast_acquire: bool = False) -> Lock: 

152 try: 

153 return get_async_backend().create_lock(fast_acquire=fast_acquire) 

154 except NoCurrentAsyncBackend: 

155 return LockAdapter(fast_acquire=fast_acquire) 

156 

157 async def __aenter__(self) -> None: 

158 await self.acquire() 

159 

160 async def __aexit__( 

161 self, 

162 exc_type: type[BaseException] | None, 

163 exc_val: BaseException | None, 

164 exc_tb: TracebackType | None, 

165 ) -> None: 

166 self.release() 

167 

168 async def acquire(self) -> None: 

169 """Acquire the lock.""" 

170 raise NotImplementedError 

171 

172 def acquire_nowait(self) -> None: 

173 """ 

174 Acquire the lock, without blocking. 

175 

176 :raises ~anyio.WouldBlock: if the operation would block 

177 

178 """ 

179 raise NotImplementedError 

180 

181 def release(self) -> None: 

182 """Release the lock.""" 

183 raise NotImplementedError 

184 

185 def locked(self) -> bool: 

186 """Return True if the lock is currently held.""" 

187 raise NotImplementedError 

188 

189 def statistics(self) -> LockStatistics: 

190 """ 

191 Return statistics about the current state of this lock. 

192 

193 .. versionadded:: 3.0 

194 """ 

195 raise NotImplementedError 

196 

197 

198class LockAdapter(Lock): 

199 _internal_lock: Lock | None = None 

200 

201 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter: 

202 return object.__new__(cls) 

203 

204 def __init__(self, *, fast_acquire: bool = False): 

205 self._fast_acquire = fast_acquire 

206 

207 @property 

208 def _lock(self) -> Lock: 

209 if self._internal_lock is None: 

210 self._internal_lock = get_async_backend().create_lock( 

211 fast_acquire=self._fast_acquire 

212 ) 

213 

214 return self._internal_lock 

215 

216 async def __aenter__(self) -> None: 

217 await self._lock.acquire() 

218 

219 async def __aexit__( 

220 self, 

221 exc_type: type[BaseException] | None, 

222 exc_val: BaseException | None, 

223 exc_tb: TracebackType | None, 

224 ) -> None: 

225 if self._internal_lock is not None: 

226 self._internal_lock.release() 

227 

228 async def acquire(self) -> None: 

229 """Acquire the lock.""" 

230 await self._lock.acquire() 

231 

232 def acquire_nowait(self) -> None: 

233 """ 

234 Acquire the lock, without blocking. 

235 

236 :raises ~anyio.WouldBlock: if the operation would block 

237 

238 """ 

239 self._lock.acquire_nowait() 

240 

241 def release(self) -> None: 

242 """Release the lock.""" 

243 self._lock.release() 

244 

245 def locked(self) -> bool: 

246 """Return True if the lock is currently held.""" 

247 return self._lock.locked() 

248 

249 def statistics(self) -> LockStatistics: 

250 """ 

251 Return statistics about the current state of this lock. 

252 

253 .. versionadded:: 3.0 

254 

255 """ 

256 if self._internal_lock is None: 

257 return LockStatistics(False, None, 0) 

258 

259 return self._internal_lock.statistics() 

260 

261 

262class Condition: 

263 _owner_task: TaskInfo | None = None 

264 

265 def __init__(self, lock: Lock | None = None): 

266 self._lock = lock or Lock() 

267 self._waiters: deque[Event] = deque() 

268 

269 async def __aenter__(self) -> None: 

270 await self.acquire() 

271 

272 async def __aexit__( 

273 self, 

274 exc_type: type[BaseException] | None, 

275 exc_val: BaseException | None, 

276 exc_tb: TracebackType | None, 

277 ) -> None: 

278 self.release() 

279 

280 def _check_acquired(self) -> None: 

281 if self._owner_task != get_current_task(): 

282 raise RuntimeError("The current task is not holding the underlying lock") 

283 

284 async def acquire(self) -> None: 

285 """Acquire the underlying lock.""" 

286 await self._lock.acquire() 

287 self._owner_task = get_current_task() 

288 

289 def acquire_nowait(self) -> None: 

290 """ 

291 Acquire the underlying lock, without blocking. 

292 

293 :raises ~anyio.WouldBlock: if the operation would block 

294 

295 """ 

296 self._lock.acquire_nowait() 

297 self._owner_task = get_current_task() 

298 

299 def release(self) -> None: 

300 """Release the underlying lock.""" 

301 self._lock.release() 

302 

303 def locked(self) -> bool: 

304 """Return True if the lock is set.""" 

305 return self._lock.locked() 

306 

307 def notify(self, n: int = 1) -> None: 

308 """Notify exactly n listeners.""" 

309 self._check_acquired() 

310 for _ in range(n): 

311 try: 

312 event = self._waiters.popleft() 

313 except IndexError: 

314 break 

315 

316 event.set() 

317 

318 def notify_all(self) -> None: 

319 """Notify all the listeners.""" 

320 self._check_acquired() 

321 for event in self._waiters: 

322 event.set() 

323 

324 self._waiters.clear() 

325 

326 async def wait(self) -> None: 

327 """Wait for a notification.""" 

328 await checkpoint_if_cancelled() 

329 self._check_acquired() 

330 event = Event() 

331 self._waiters.append(event) 

332 self.release() 

333 try: 

334 await event.wait() 

335 except BaseException: 

336 if not event.is_set(): 

337 self._waiters.remove(event) 

338 

339 raise 

340 finally: 

341 with CancelScope(shield=True): 

342 await self.acquire() 

343 

344 async def wait_for(self, predicate: Callable[[], T]) -> T: 

345 """ 

346 Wait until a predicate becomes true. 

347 

348 :param predicate: a callable that returns a truthy value when the condition is 

349 met 

350 :return: the result of the predicate 

351 

352 .. versionadded:: 4.11.0 

353 

354 """ 

355 while not (result := predicate()): 

356 await self.wait() 

357 

358 return result 

359 

360 def statistics(self) -> ConditionStatistics: 

361 """ 

362 Return statistics about the current state of this condition. 

363 

364 .. versionadded:: 3.0 

365 """ 

366 return ConditionStatistics(len(self._waiters), self._lock.statistics()) 

367 

368 

369class Semaphore: 

370 def __new__( 

371 cls, 

372 initial_value: int, 

373 *, 

374 max_value: int | None = None, 

375 fast_acquire: bool = False, 

376 ) -> Semaphore: 

377 try: 

378 return get_async_backend().create_semaphore( 

379 initial_value, max_value=max_value, fast_acquire=fast_acquire 

380 ) 

381 except NoCurrentAsyncBackend: 

382 return SemaphoreAdapter(initial_value, max_value=max_value) 

383 

384 def __init__( 

385 self, 

386 initial_value: int, 

387 *, 

388 max_value: int | None = None, 

389 fast_acquire: bool = False, 

390 ): 

391 if not isinstance(initial_value, int): 

392 raise TypeError("initial_value must be an integer") 

393 if initial_value < 0: 

394 raise ValueError("initial_value must be >= 0") 

395 if max_value is not None: 

396 if not isinstance(max_value, int): 

397 raise TypeError("max_value must be an integer or None") 

398 if max_value < initial_value: 

399 raise ValueError( 

400 "max_value must be equal to or higher than initial_value" 

401 ) 

402 

403 self._fast_acquire = fast_acquire 

404 

405 async def __aenter__(self) -> Semaphore: 

406 await self.acquire() 

407 return self 

408 

409 async def __aexit__( 

410 self, 

411 exc_type: type[BaseException] | None, 

412 exc_val: BaseException | None, 

413 exc_tb: TracebackType | None, 

414 ) -> None: 

415 self.release() 

416 

417 async def acquire(self) -> None: 

418 """Decrement the semaphore value, blocking if necessary.""" 

419 raise NotImplementedError 

420 

421 def acquire_nowait(self) -> None: 

422 """ 

423 Acquire the underlying lock, without blocking. 

424 

425 :raises ~anyio.WouldBlock: if the operation would block 

426 

427 """ 

428 raise NotImplementedError 

429 

430 def release(self) -> None: 

431 """Increment the semaphore value.""" 

432 raise NotImplementedError 

433 

434 @property 

435 def value(self) -> int: 

436 """The current value of the semaphore.""" 

437 raise NotImplementedError 

438 

439 @property 

440 def max_value(self) -> int | None: 

441 """The maximum value of the semaphore.""" 

442 raise NotImplementedError 

443 

444 def statistics(self) -> SemaphoreStatistics: 

445 """ 

446 Return statistics about the current state of this semaphore. 

447 

448 .. versionadded:: 3.0 

449 """ 

450 raise NotImplementedError 

451 

452 

453class SemaphoreAdapter(Semaphore): 

454 _internal_semaphore: Semaphore | None = None 

455 

456 def __new__( 

457 cls, 

458 initial_value: int, 

459 *, 

460 max_value: int | None = None, 

461 fast_acquire: bool = False, 

462 ) -> SemaphoreAdapter: 

463 return object.__new__(cls) 

464 

465 def __init__( 

466 self, 

467 initial_value: int, 

468 *, 

469 max_value: int | None = None, 

470 fast_acquire: bool = False, 

471 ) -> None: 

472 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire) 

473 self._initial_value = initial_value 

474 self._max_value = max_value 

475 

476 @property 

477 def _semaphore(self) -> Semaphore: 

478 if self._internal_semaphore is None: 

479 self._internal_semaphore = get_async_backend().create_semaphore( 

480 self._initial_value, max_value=self._max_value 

481 ) 

482 

483 return self._internal_semaphore 

484 

485 async def acquire(self) -> None: 

486 await self._semaphore.acquire() 

487 

488 def acquire_nowait(self) -> None: 

489 self._semaphore.acquire_nowait() 

490 

491 def release(self) -> None: 

492 self._semaphore.release() 

493 

494 @property 

495 def value(self) -> int: 

496 if self._internal_semaphore is None: 

497 return self._initial_value 

498 

499 return self._semaphore.value 

500 

501 @property 

502 def max_value(self) -> int | None: 

503 return self._max_value 

504 

505 def statistics(self) -> SemaphoreStatistics: 

506 if self._internal_semaphore is None: 

507 return SemaphoreStatistics(tasks_waiting=0) 

508 

509 return self._semaphore.statistics() 

510 

511 

512class CapacityLimiter: 

513 def __new__(cls, total_tokens: float) -> CapacityLimiter: 

514 try: 

515 return get_async_backend().create_capacity_limiter(total_tokens) 

516 except NoCurrentAsyncBackend: 

517 return CapacityLimiterAdapter(total_tokens) 

518 

519 async def __aenter__(self) -> None: 

520 raise NotImplementedError 

521 

522 async def __aexit__( 

523 self, 

524 exc_type: type[BaseException] | None, 

525 exc_val: BaseException | None, 

526 exc_tb: TracebackType | None, 

527 ) -> None: 

528 raise NotImplementedError 

529 

530 @property 

531 def total_tokens(self) -> float: 

532 """ 

533 The total number of tokens available for borrowing. 

534 

535 This is a read-write property. If the total number of tokens is increased, the 

536 proportionate number of tasks waiting on this limiter will be granted their 

537 tokens. 

538 

539 .. versionchanged:: 3.0 

540 The property is now writable. 

541 .. versionchanged:: 4.12 

542 The value can now be set to 0. 

543 

544 """ 

545 raise NotImplementedError 

546 

547 @total_tokens.setter 

548 def total_tokens(self, value: float) -> None: 

549 raise NotImplementedError 

550 

551 @property 

552 def borrowed_tokens(self) -> int: 

553 """The number of tokens that have currently been borrowed.""" 

554 raise NotImplementedError 

555 

556 @property 

557 def available_tokens(self) -> float: 

558 """The number of tokens currently available to be borrowed""" 

559 raise NotImplementedError 

560 

561 def acquire_nowait(self) -> None: 

562 """ 

563 Acquire a token for the current task without waiting for one to become 

564 available. 

565 

566 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

567 

568 """ 

569 raise NotImplementedError 

570 

571 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

572 """ 

573 Acquire a token without waiting for one to become available. 

574 

575 :param borrower: the entity borrowing a token 

576 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

577 

578 """ 

579 raise NotImplementedError 

580 

581 async def acquire(self) -> None: 

582 """ 

583 Acquire a token for the current task, waiting if necessary for one to become 

584 available. 

585 

586 """ 

587 raise NotImplementedError 

588 

589 async def acquire_on_behalf_of(self, borrower: object) -> None: 

590 """ 

591 Acquire a token, waiting if necessary for one to become available. 

592 

593 :param borrower: the entity borrowing a token 

594 

595 """ 

596 raise NotImplementedError 

597 

598 def release(self) -> None: 

599 """ 

600 Release the token held by the current task. 

601 

602 :raises RuntimeError: if the current task has not borrowed a token from this 

603 limiter. 

604 

605 """ 

606 raise NotImplementedError 

607 

608 def release_on_behalf_of(self, borrower: object) -> None: 

609 """ 

610 Release the token held by the given borrower. 

611 

612 :raises RuntimeError: if the borrower has not borrowed a token from this 

613 limiter. 

614 

615 """ 

616 raise NotImplementedError 

617 

618 def statistics(self) -> CapacityLimiterStatistics: 

619 """ 

620 Return statistics about the current state of this limiter. 

621 

622 .. versionadded:: 3.0 

623 

624 """ 

625 raise NotImplementedError 

626 

627 

628class CapacityLimiterAdapter(CapacityLimiter): 

629 _internal_limiter: CapacityLimiter | None = None 

630 

631 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter: 

632 return object.__new__(cls) 

633 

634 def __init__(self, total_tokens: float) -> None: 

635 self.total_tokens = total_tokens 

636 

637 @property 

638 def _limiter(self) -> CapacityLimiter: 

639 if self._internal_limiter is None: 

640 self._internal_limiter = get_async_backend().create_capacity_limiter( 

641 self._total_tokens 

642 ) 

643 

644 return self._internal_limiter 

645 

646 async def __aenter__(self) -> None: 

647 await self._limiter.__aenter__() 

648 

649 async def __aexit__( 

650 self, 

651 exc_type: type[BaseException] | None, 

652 exc_val: BaseException | None, 

653 exc_tb: TracebackType | None, 

654 ) -> None: 

655 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb) 

656 

657 @property 

658 def total_tokens(self) -> float: 

659 if self._internal_limiter is None: 

660 return self._total_tokens 

661 

662 return self._internal_limiter.total_tokens 

663 

664 @total_tokens.setter 

665 def total_tokens(self, value: float) -> None: 

666 if not isinstance(value, int) and value is not math.inf: 

667 raise TypeError("total_tokens must be an int or math.inf") 

668 elif value < 1: 

669 raise ValueError("total_tokens must be >= 1") 

670 

671 if self._internal_limiter is None: 

672 self._total_tokens = value 

673 return 

674 

675 self._limiter.total_tokens = value 

676 

677 @property 

678 def borrowed_tokens(self) -> int: 

679 if self._internal_limiter is None: 

680 return 0 

681 

682 return self._internal_limiter.borrowed_tokens 

683 

684 @property 

685 def available_tokens(self) -> float: 

686 if self._internal_limiter is None: 

687 return self._total_tokens 

688 

689 return self._internal_limiter.available_tokens 

690 

691 def acquire_nowait(self) -> None: 

692 self._limiter.acquire_nowait() 

693 

694 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

695 self._limiter.acquire_on_behalf_of_nowait(borrower) 

696 

697 async def acquire(self) -> None: 

698 await self._limiter.acquire() 

699 

700 async def acquire_on_behalf_of(self, borrower: object) -> None: 

701 await self._limiter.acquire_on_behalf_of(borrower) 

702 

703 def release(self) -> None: 

704 self._limiter.release() 

705 

706 def release_on_behalf_of(self, borrower: object) -> None: 

707 self._limiter.release_on_behalf_of(borrower) 

708 

709 def statistics(self) -> CapacityLimiterStatistics: 

710 if self._internal_limiter is None: 

711 return CapacityLimiterStatistics( 

712 borrowed_tokens=0, 

713 total_tokens=self.total_tokens, 

714 borrowers=(), 

715 tasks_waiting=0, 

716 ) 

717 

718 return self._internal_limiter.statistics() 

719 

720 

721class ResourceGuard: 

722 """ 

723 A context manager for ensuring that a resource is only used by a single task at a 

724 time. 

725 

726 Entering this context manager while the previous has not exited it yet will trigger 

727 :exc:`BusyResourceError`. 

728 

729 :param action: the action to guard against (visible in the :exc:`BusyResourceError` 

730 when triggered, e.g. "Another task is already {action} this resource") 

731 

732 .. versionadded:: 4.1 

733 """ 

734 

735 __slots__ = "action", "_guarded" 

736 

737 def __init__(self, action: str = "using"): 

738 self.action: str = action 

739 self._guarded = False 

740 

741 def __enter__(self) -> None: 

742 if self._guarded: 

743 raise BusyResourceError(self.action) 

744 

745 self._guarded = True 

746 

747 def __exit__( 

748 self, 

749 exc_type: type[BaseException] | None, 

750 exc_val: BaseException | None, 

751 exc_tb: TracebackType | None, 

752 ) -> None: 

753 self._guarded = False