Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_synchronization.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

340 statements  

1from __future__ import annotations 

2 

3import math 

4from collections import deque 

5from dataclasses import dataclass 

6from types import TracebackType 

7 

8from sniffio import AsyncLibraryNotFoundError 

9 

10from ..lowlevel import checkpoint 

11from ._eventloop import get_async_backend 

12from ._exceptions import BusyResourceError 

13from ._tasks import CancelScope 

14from ._testing import TaskInfo, get_current_task 

15 

16 

17@dataclass(frozen=True) 

18class EventStatistics: 

19 """ 

20 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` 

21 """ 

22 

23 tasks_waiting: int 

24 

25 

26@dataclass(frozen=True) 

27class CapacityLimiterStatistics: 

28 """ 

29 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks 

30 :ivar float total_tokens: total number of available tokens 

31 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from 

32 this limiter 

33 :ivar int tasks_waiting: number of tasks waiting on 

34 :meth:`~.CapacityLimiter.acquire` or 

35 :meth:`~.CapacityLimiter.acquire_on_behalf_of` 

36 """ 

37 

38 borrowed_tokens: int 

39 total_tokens: float 

40 borrowers: tuple[object, ...] 

41 tasks_waiting: int 

42 

43 

44@dataclass(frozen=True) 

45class LockStatistics: 

46 """ 

47 :ivar bool locked: flag indicating if this lock is locked or not 

48 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the 

49 lock is not held by any task) 

50 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` 

51 """ 

52 

53 locked: bool 

54 owner: TaskInfo | None 

55 tasks_waiting: int 

56 

57 

58@dataclass(frozen=True) 

59class ConditionStatistics: 

60 """ 

61 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` 

62 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying 

63 :class:`~.Lock` 

64 """ 

65 

66 tasks_waiting: int 

67 lock_statistics: LockStatistics 

68 

69 

70@dataclass(frozen=True) 

71class SemaphoreStatistics: 

72 """ 

73 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` 

74 

75 """ 

76 

77 tasks_waiting: int 

78 

79 

80class Event: 

81 def __new__(cls) -> Event: 

82 try: 

83 return get_async_backend().create_event() 

84 except AsyncLibraryNotFoundError: 

85 return EventAdapter() 

86 

87 def set(self) -> None: 

88 """Set the flag, notifying all listeners.""" 

89 raise NotImplementedError 

90 

91 def is_set(self) -> bool: 

92 """Return ``True`` if the flag is set, ``False`` if not.""" 

93 raise NotImplementedError 

94 

95 async def wait(self) -> None: 

96 """ 

97 Wait until the flag has been set. 

98 

99 If the flag has already been set when this method is called, it returns 

100 immediately. 

101 

102 """ 

103 raise NotImplementedError 

104 

105 def statistics(self) -> EventStatistics: 

106 """Return statistics about the current state of this event.""" 

107 raise NotImplementedError 

108 

109 

110class EventAdapter(Event): 

111 _internal_event: Event | None = None 

112 _is_set: bool = False 

113 

114 def __new__(cls) -> EventAdapter: 

115 return object.__new__(cls) 

116 

117 @property 

118 def _event(self) -> Event: 

119 if self._internal_event is None: 

120 self._internal_event = get_async_backend().create_event() 

121 if self._is_set: 

122 self._internal_event.set() 

123 

124 return self._internal_event 

125 

126 def set(self) -> None: 

127 if self._internal_event is None: 

128 self._is_set = True 

129 else: 

130 self._event.set() 

131 

132 def is_set(self) -> bool: 

133 if self._internal_event is None: 

134 return self._is_set 

135 

136 return self._internal_event.is_set() 

137 

138 async def wait(self) -> None: 

139 await self._event.wait() 

140 

141 def statistics(self) -> EventStatistics: 

142 if self._internal_event is None: 

143 return EventStatistics(tasks_waiting=0) 

144 

145 return self._internal_event.statistics() 

146 

147 

148class Lock: 

149 def __new__(cls, *, fast_acquire: bool = False) -> Lock: 

150 try: 

151 return get_async_backend().create_lock(fast_acquire=fast_acquire) 

152 except AsyncLibraryNotFoundError: 

153 return LockAdapter(fast_acquire=fast_acquire) 

154 

155 async def __aenter__(self) -> None: 

156 await self.acquire() 

157 

158 async def __aexit__( 

159 self, 

160 exc_type: type[BaseException] | None, 

161 exc_val: BaseException | None, 

162 exc_tb: TracebackType | None, 

163 ) -> None: 

164 self.release() 

165 

166 async def acquire(self) -> None: 

167 """Acquire the lock.""" 

168 raise NotImplementedError 

169 

170 def acquire_nowait(self) -> None: 

171 """ 

172 Acquire the lock, without blocking. 

173 

174 :raises ~anyio.WouldBlock: if the operation would block 

175 

176 """ 

177 raise NotImplementedError 

178 

179 def release(self) -> None: 

180 """Release the lock.""" 

181 raise NotImplementedError 

182 

183 def locked(self) -> bool: 

184 """Return True if the lock is currently held.""" 

185 raise NotImplementedError 

186 

187 def statistics(self) -> LockStatistics: 

188 """ 

189 Return statistics about the current state of this lock. 

190 

191 .. versionadded:: 3.0 

192 """ 

193 raise NotImplementedError 

194 

195 

196class LockAdapter(Lock): 

197 _internal_lock: Lock | None = None 

198 

199 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter: 

200 return object.__new__(cls) 

201 

202 def __init__(self, *, fast_acquire: bool = False): 

203 self._fast_acquire = fast_acquire 

204 

205 @property 

206 def _lock(self) -> Lock: 

207 if self._internal_lock is None: 

208 self._internal_lock = get_async_backend().create_lock( 

209 fast_acquire=self._fast_acquire 

210 ) 

211 

212 return self._internal_lock 

213 

214 async def __aenter__(self) -> None: 

215 await self._lock.acquire() 

216 

217 async def __aexit__( 

218 self, 

219 exc_type: type[BaseException] | None, 

220 exc_val: BaseException | None, 

221 exc_tb: TracebackType | None, 

222 ) -> None: 

223 if self._internal_lock is not None: 

224 self._internal_lock.release() 

225 

226 async def acquire(self) -> None: 

227 """Acquire the lock.""" 

228 await self._lock.acquire() 

229 

230 def acquire_nowait(self) -> None: 

231 """ 

232 Acquire the lock, without blocking. 

233 

234 :raises ~anyio.WouldBlock: if the operation would block 

235 

236 """ 

237 self._lock.acquire_nowait() 

238 

239 def release(self) -> None: 

240 """Release the lock.""" 

241 self._lock.release() 

242 

243 def locked(self) -> bool: 

244 """Return True if the lock is currently held.""" 

245 return self._lock.locked() 

246 

247 def statistics(self) -> LockStatistics: 

248 """ 

249 Return statistics about the current state of this lock. 

250 

251 .. versionadded:: 3.0 

252 

253 """ 

254 if self._internal_lock is None: 

255 return LockStatistics(False, None, 0) 

256 

257 return self._internal_lock.statistics() 

258 

259 

260class Condition: 

261 _owner_task: TaskInfo | None = None 

262 

263 def __init__(self, lock: Lock | None = None): 

264 self._lock = lock or Lock() 

265 self._waiters: deque[Event] = deque() 

266 

267 async def __aenter__(self) -> None: 

268 await self.acquire() 

269 

270 async def __aexit__( 

271 self, 

272 exc_type: type[BaseException] | None, 

273 exc_val: BaseException | None, 

274 exc_tb: TracebackType | None, 

275 ) -> None: 

276 self.release() 

277 

278 def _check_acquired(self) -> None: 

279 if self._owner_task != get_current_task(): 

280 raise RuntimeError("The current task is not holding the underlying lock") 

281 

282 async def acquire(self) -> None: 

283 """Acquire the underlying lock.""" 

284 await self._lock.acquire() 

285 self._owner_task = get_current_task() 

286 

287 def acquire_nowait(self) -> None: 

288 """ 

289 Acquire the underlying lock, without blocking. 

290 

291 :raises ~anyio.WouldBlock: if the operation would block 

292 

293 """ 

294 self._lock.acquire_nowait() 

295 self._owner_task = get_current_task() 

296 

297 def release(self) -> None: 

298 """Release the underlying lock.""" 

299 self._lock.release() 

300 

301 def locked(self) -> bool: 

302 """Return True if the lock is set.""" 

303 return self._lock.locked() 

304 

305 def notify(self, n: int = 1) -> None: 

306 """Notify exactly n listeners.""" 

307 self._check_acquired() 

308 for _ in range(n): 

309 try: 

310 event = self._waiters.popleft() 

311 except IndexError: 

312 break 

313 

314 event.set() 

315 

316 def notify_all(self) -> None: 

317 """Notify all the listeners.""" 

318 self._check_acquired() 

319 for event in self._waiters: 

320 event.set() 

321 

322 self._waiters.clear() 

323 

324 async def wait(self) -> None: 

325 """Wait for a notification.""" 

326 await checkpoint() 

327 event = Event() 

328 self._waiters.append(event) 

329 self.release() 

330 try: 

331 await event.wait() 

332 except BaseException: 

333 if not event.is_set(): 

334 self._waiters.remove(event) 

335 

336 raise 

337 finally: 

338 with CancelScope(shield=True): 

339 await self.acquire() 

340 

341 def statistics(self) -> ConditionStatistics: 

342 """ 

343 Return statistics about the current state of this condition. 

344 

345 .. versionadded:: 3.0 

346 """ 

347 return ConditionStatistics(len(self._waiters), self._lock.statistics()) 

348 

349 

350class Semaphore: 

351 def __new__( 

352 cls, 

353 initial_value: int, 

354 *, 

355 max_value: int | None = None, 

356 fast_acquire: bool = False, 

357 ) -> Semaphore: 

358 try: 

359 return get_async_backend().create_semaphore( 

360 initial_value, max_value=max_value, fast_acquire=fast_acquire 

361 ) 

362 except AsyncLibraryNotFoundError: 

363 return SemaphoreAdapter(initial_value, max_value=max_value) 

364 

365 def __init__( 

366 self, 

367 initial_value: int, 

368 *, 

369 max_value: int | None = None, 

370 fast_acquire: bool = False, 

371 ): 

372 if not isinstance(initial_value, int): 

373 raise TypeError("initial_value must be an integer") 

374 if initial_value < 0: 

375 raise ValueError("initial_value must be >= 0") 

376 if max_value is not None: 

377 if not isinstance(max_value, int): 

378 raise TypeError("max_value must be an integer or None") 

379 if max_value < initial_value: 

380 raise ValueError( 

381 "max_value must be equal to or higher than initial_value" 

382 ) 

383 

384 self._fast_acquire = fast_acquire 

385 

386 async def __aenter__(self) -> Semaphore: 

387 await self.acquire() 

388 return self 

389 

390 async def __aexit__( 

391 self, 

392 exc_type: type[BaseException] | None, 

393 exc_val: BaseException | None, 

394 exc_tb: TracebackType | None, 

395 ) -> None: 

396 self.release() 

397 

398 async def acquire(self) -> None: 

399 """Decrement the semaphore value, blocking if necessary.""" 

400 raise NotImplementedError 

401 

402 def acquire_nowait(self) -> None: 

403 """ 

404 Acquire the underlying lock, without blocking. 

405 

406 :raises ~anyio.WouldBlock: if the operation would block 

407 

408 """ 

409 raise NotImplementedError 

410 

411 def release(self) -> None: 

412 """Increment the semaphore value.""" 

413 raise NotImplementedError 

414 

415 @property 

416 def value(self) -> int: 

417 """The current value of the semaphore.""" 

418 raise NotImplementedError 

419 

420 @property 

421 def max_value(self) -> int | None: 

422 """The maximum value of the semaphore.""" 

423 raise NotImplementedError 

424 

425 def statistics(self) -> SemaphoreStatistics: 

426 """ 

427 Return statistics about the current state of this semaphore. 

428 

429 .. versionadded:: 3.0 

430 """ 

431 raise NotImplementedError 

432 

433 

434class SemaphoreAdapter(Semaphore): 

435 _internal_semaphore: Semaphore | None = None 

436 

437 def __new__( 

438 cls, 

439 initial_value: int, 

440 *, 

441 max_value: int | None = None, 

442 fast_acquire: bool = False, 

443 ) -> SemaphoreAdapter: 

444 return object.__new__(cls) 

445 

446 def __init__( 

447 self, 

448 initial_value: int, 

449 *, 

450 max_value: int | None = None, 

451 fast_acquire: bool = False, 

452 ) -> None: 

453 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire) 

454 self._initial_value = initial_value 

455 self._max_value = max_value 

456 

457 @property 

458 def _semaphore(self) -> Semaphore: 

459 if self._internal_semaphore is None: 

460 self._internal_semaphore = get_async_backend().create_semaphore( 

461 self._initial_value, max_value=self._max_value 

462 ) 

463 

464 return self._internal_semaphore 

465 

466 async def acquire(self) -> None: 

467 await self._semaphore.acquire() 

468 

469 def acquire_nowait(self) -> None: 

470 self._semaphore.acquire_nowait() 

471 

472 def release(self) -> None: 

473 self._semaphore.release() 

474 

475 @property 

476 def value(self) -> int: 

477 if self._internal_semaphore is None: 

478 return self._initial_value 

479 

480 return self._semaphore.value 

481 

482 @property 

483 def max_value(self) -> int | None: 

484 return self._max_value 

485 

486 def statistics(self) -> SemaphoreStatistics: 

487 if self._internal_semaphore is None: 

488 return SemaphoreStatistics(tasks_waiting=0) 

489 

490 return self._semaphore.statistics() 

491 

492 

493class CapacityLimiter: 

494 def __new__(cls, total_tokens: float) -> CapacityLimiter: 

495 try: 

496 return get_async_backend().create_capacity_limiter(total_tokens) 

497 except AsyncLibraryNotFoundError: 

498 return CapacityLimiterAdapter(total_tokens) 

499 

500 async def __aenter__(self) -> None: 

501 raise NotImplementedError 

502 

503 async def __aexit__( 

504 self, 

505 exc_type: type[BaseException] | None, 

506 exc_val: BaseException | None, 

507 exc_tb: TracebackType | None, 

508 ) -> bool | None: 

509 raise NotImplementedError 

510 

511 @property 

512 def total_tokens(self) -> float: 

513 """ 

514 The total number of tokens available for borrowing. 

515 

516 This is a read-write property. If the total number of tokens is increased, the 

517 proportionate number of tasks waiting on this limiter will be granted their 

518 tokens. 

519 

520 .. versionchanged:: 3.0 

521 The property is now writable. 

522 

523 """ 

524 raise NotImplementedError 

525 

526 @total_tokens.setter 

527 def total_tokens(self, value: float) -> None: 

528 raise NotImplementedError 

529 

530 @property 

531 def borrowed_tokens(self) -> int: 

532 """The number of tokens that have currently been borrowed.""" 

533 raise NotImplementedError 

534 

535 @property 

536 def available_tokens(self) -> float: 

537 """The number of tokens currently available to be borrowed""" 

538 raise NotImplementedError 

539 

540 def acquire_nowait(self) -> None: 

541 """ 

542 Acquire a token for the current task without waiting for one to become 

543 available. 

544 

545 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

546 

547 """ 

548 raise NotImplementedError 

549 

550 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

551 """ 

552 Acquire a token without waiting for one to become available. 

553 

554 :param borrower: the entity borrowing a token 

555 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

556 

557 """ 

558 raise NotImplementedError 

559 

560 async def acquire(self) -> None: 

561 """ 

562 Acquire a token for the current task, waiting if necessary for one to become 

563 available. 

564 

565 """ 

566 raise NotImplementedError 

567 

568 async def acquire_on_behalf_of(self, borrower: object) -> None: 

569 """ 

570 Acquire a token, waiting if necessary for one to become available. 

571 

572 :param borrower: the entity borrowing a token 

573 

574 """ 

575 raise NotImplementedError 

576 

577 def release(self) -> None: 

578 """ 

579 Release the token held by the current task. 

580 

581 :raises RuntimeError: if the current task has not borrowed a token from this 

582 limiter. 

583 

584 """ 

585 raise NotImplementedError 

586 

587 def release_on_behalf_of(self, borrower: object) -> None: 

588 """ 

589 Release the token held by the given borrower. 

590 

591 :raises RuntimeError: if the borrower has not borrowed a token from this 

592 limiter. 

593 

594 """ 

595 raise NotImplementedError 

596 

597 def statistics(self) -> CapacityLimiterStatistics: 

598 """ 

599 Return statistics about the current state of this limiter. 

600 

601 .. versionadded:: 3.0 

602 

603 """ 

604 raise NotImplementedError 

605 

606 

607class CapacityLimiterAdapter(CapacityLimiter): 

608 _internal_limiter: CapacityLimiter | None = None 

609 

610 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter: 

611 return object.__new__(cls) 

612 

613 def __init__(self, total_tokens: float) -> None: 

614 self.total_tokens = total_tokens 

615 

616 @property 

617 def _limiter(self) -> CapacityLimiter: 

618 if self._internal_limiter is None: 

619 self._internal_limiter = get_async_backend().create_capacity_limiter( 

620 self._total_tokens 

621 ) 

622 

623 return self._internal_limiter 

624 

625 async def __aenter__(self) -> None: 

626 await self._limiter.__aenter__() 

627 

628 async def __aexit__( 

629 self, 

630 exc_type: type[BaseException] | None, 

631 exc_val: BaseException | None, 

632 exc_tb: TracebackType | None, 

633 ) -> bool | None: 

634 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb) 

635 

636 @property 

637 def total_tokens(self) -> float: 

638 if self._internal_limiter is None: 

639 return self._total_tokens 

640 

641 return self._internal_limiter.total_tokens 

642 

643 @total_tokens.setter 

644 def total_tokens(self, value: float) -> None: 

645 if not isinstance(value, int) and value is not math.inf: 

646 raise TypeError("total_tokens must be an int or math.inf") 

647 elif value < 1: 

648 raise ValueError("total_tokens must be >= 1") 

649 

650 if self._internal_limiter is None: 

651 self._total_tokens = value 

652 return 

653 

654 self._limiter.total_tokens = value 

655 

656 @property 

657 def borrowed_tokens(self) -> int: 

658 if self._internal_limiter is None: 

659 return 0 

660 

661 return self._internal_limiter.borrowed_tokens 

662 

663 @property 

664 def available_tokens(self) -> float: 

665 if self._internal_limiter is None: 

666 return self._total_tokens 

667 

668 return self._internal_limiter.available_tokens 

669 

670 def acquire_nowait(self) -> None: 

671 self._limiter.acquire_nowait() 

672 

673 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

674 self._limiter.acquire_on_behalf_of_nowait(borrower) 

675 

676 async def acquire(self) -> None: 

677 await self._limiter.acquire() 

678 

679 async def acquire_on_behalf_of(self, borrower: object) -> None: 

680 await self._limiter.acquire_on_behalf_of(borrower) 

681 

682 def release(self) -> None: 

683 self._limiter.release() 

684 

685 def release_on_behalf_of(self, borrower: object) -> None: 

686 self._limiter.release_on_behalf_of(borrower) 

687 

688 def statistics(self) -> CapacityLimiterStatistics: 

689 if self._internal_limiter is None: 

690 return CapacityLimiterStatistics( 

691 borrowed_tokens=0, 

692 total_tokens=self.total_tokens, 

693 borrowers=(), 

694 tasks_waiting=0, 

695 ) 

696 

697 return self._internal_limiter.statistics() 

698 

699 

700class ResourceGuard: 

701 """ 

702 A context manager for ensuring that a resource is only used by a single task at a 

703 time. 

704 

705 Entering this context manager while the previous has not exited it yet will trigger 

706 :exc:`BusyResourceError`. 

707 

708 :param action: the action to guard against (visible in the :exc:`BusyResourceError` 

709 when triggered, e.g. "Another task is already {action} this resource") 

710 

711 .. versionadded:: 4.1 

712 """ 

713 

714 __slots__ = "action", "_guarded" 

715 

716 def __init__(self, action: str = "using"): 

717 self.action: str = action 

718 self._guarded = False 

719 

720 def __enter__(self) -> None: 

721 if self._guarded: 

722 raise BusyResourceError(self.action) 

723 

724 self._guarded = True 

725 

726 def __exit__( 

727 self, 

728 exc_type: type[BaseException] | None, 

729 exc_val: BaseException | None, 

730 exc_tb: TracebackType | None, 

731 ) -> None: 

732 self._guarded = False