Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_synchronization.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

334 statements  

1from __future__ import annotations 

2 

3import math 

4from collections import deque 

5from dataclasses import dataclass 

6from types import TracebackType 

7 

8from sniffio import AsyncLibraryNotFoundError 

9 

10from ..lowlevel import checkpoint 

11from ._eventloop import get_async_backend 

12from ._exceptions import BusyResourceError 

13from ._tasks import CancelScope 

14from ._testing import TaskInfo, get_current_task 

15 

16 

17@dataclass(frozen=True) 

18class EventStatistics: 

19 """ 

20 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` 

21 """ 

22 

23 tasks_waiting: int 

24 

25 

26@dataclass(frozen=True) 

27class CapacityLimiterStatistics: 

28 """ 

29 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks 

30 :ivar float total_tokens: total number of available tokens 

31 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from 

32 this limiter 

33 :ivar int tasks_waiting: number of tasks waiting on 

34 :meth:`~.CapacityLimiter.acquire` or 

35 :meth:`~.CapacityLimiter.acquire_on_behalf_of` 

36 """ 

37 

38 borrowed_tokens: int 

39 total_tokens: float 

40 borrowers: tuple[object, ...] 

41 tasks_waiting: int 

42 

43 

44@dataclass(frozen=True) 

45class LockStatistics: 

46 """ 

47 :ivar bool locked: flag indicating if this lock is locked or not 

48 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the 

49 lock is not held by any task) 

50 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` 

51 """ 

52 

53 locked: bool 

54 owner: TaskInfo | None 

55 tasks_waiting: int 

56 

57 

58@dataclass(frozen=True) 

59class ConditionStatistics: 

60 """ 

61 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` 

62 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying 

63 :class:`~.Lock` 

64 """ 

65 

66 tasks_waiting: int 

67 lock_statistics: LockStatistics 

68 

69 

70@dataclass(frozen=True) 

71class SemaphoreStatistics: 

72 """ 

73 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` 

74 

75 """ 

76 

77 tasks_waiting: int 

78 

79 

80class Event: 

81 def __new__(cls) -> Event: 

82 try: 

83 return get_async_backend().create_event() 

84 except AsyncLibraryNotFoundError: 

85 return EventAdapter() 

86 

87 def set(self) -> None: 

88 """Set the flag, notifying all listeners.""" 

89 raise NotImplementedError 

90 

91 def is_set(self) -> bool: 

92 """Return ``True`` if the flag is set, ``False`` if not.""" 

93 raise NotImplementedError 

94 

95 async def wait(self) -> None: 

96 """ 

97 Wait until the flag has been set. 

98 

99 If the flag has already been set when this method is called, it returns 

100 immediately. 

101 

102 """ 

103 raise NotImplementedError 

104 

105 def statistics(self) -> EventStatistics: 

106 """Return statistics about the current state of this event.""" 

107 raise NotImplementedError 

108 

109 

110class EventAdapter(Event): 

111 _internal_event: Event | None = None 

112 

113 def __new__(cls) -> EventAdapter: 

114 return object.__new__(cls) 

115 

116 @property 

117 def _event(self) -> Event: 

118 if self._internal_event is None: 

119 self._internal_event = get_async_backend().create_event() 

120 

121 return self._internal_event 

122 

123 def set(self) -> None: 

124 self._event.set() 

125 

126 def is_set(self) -> bool: 

127 return self._internal_event is not None and self._internal_event.is_set() 

128 

129 async def wait(self) -> None: 

130 await self._event.wait() 

131 

132 def statistics(self) -> EventStatistics: 

133 if self._internal_event is None: 

134 return EventStatistics(tasks_waiting=0) 

135 

136 return self._internal_event.statistics() 

137 

138 

139class Lock: 

140 def __new__(cls, *, fast_acquire: bool = False) -> Lock: 

141 try: 

142 return get_async_backend().create_lock(fast_acquire=fast_acquire) 

143 except AsyncLibraryNotFoundError: 

144 return LockAdapter(fast_acquire=fast_acquire) 

145 

146 async def __aenter__(self) -> None: 

147 await self.acquire() 

148 

149 async def __aexit__( 

150 self, 

151 exc_type: type[BaseException] | None, 

152 exc_val: BaseException | None, 

153 exc_tb: TracebackType | None, 

154 ) -> None: 

155 self.release() 

156 

157 async def acquire(self) -> None: 

158 """Acquire the lock.""" 

159 raise NotImplementedError 

160 

161 def acquire_nowait(self) -> None: 

162 """ 

163 Acquire the lock, without blocking. 

164 

165 :raises ~anyio.WouldBlock: if the operation would block 

166 

167 """ 

168 raise NotImplementedError 

169 

170 def release(self) -> None: 

171 """Release the lock.""" 

172 raise NotImplementedError 

173 

174 def locked(self) -> bool: 

175 """Return True if the lock is currently held.""" 

176 raise NotImplementedError 

177 

178 def statistics(self) -> LockStatistics: 

179 """ 

180 Return statistics about the current state of this lock. 

181 

182 .. versionadded:: 3.0 

183 """ 

184 raise NotImplementedError 

185 

186 

187class LockAdapter(Lock): 

188 _internal_lock: Lock | None = None 

189 

190 def __new__(cls, *, fast_acquire: bool = False) -> LockAdapter: 

191 return object.__new__(cls) 

192 

193 def __init__(self, *, fast_acquire: bool = False): 

194 self._fast_acquire = fast_acquire 

195 

196 @property 

197 def _lock(self) -> Lock: 

198 if self._internal_lock is None: 

199 self._internal_lock = get_async_backend().create_lock( 

200 fast_acquire=self._fast_acquire 

201 ) 

202 

203 return self._internal_lock 

204 

205 async def __aenter__(self) -> None: 

206 await self._lock.acquire() 

207 

208 async def __aexit__( 

209 self, 

210 exc_type: type[BaseException] | None, 

211 exc_val: BaseException | None, 

212 exc_tb: TracebackType | None, 

213 ) -> None: 

214 if self._internal_lock is not None: 

215 self._internal_lock.release() 

216 

217 async def acquire(self) -> None: 

218 """Acquire the lock.""" 

219 await self._lock.acquire() 

220 

221 def acquire_nowait(self) -> None: 

222 """ 

223 Acquire the lock, without blocking. 

224 

225 :raises ~anyio.WouldBlock: if the operation would block 

226 

227 """ 

228 self._lock.acquire_nowait() 

229 

230 def release(self) -> None: 

231 """Release the lock.""" 

232 self._lock.release() 

233 

234 def locked(self) -> bool: 

235 """Return True if the lock is currently held.""" 

236 return self._lock.locked() 

237 

238 def statistics(self) -> LockStatistics: 

239 """ 

240 Return statistics about the current state of this lock. 

241 

242 .. versionadded:: 3.0 

243 

244 """ 

245 if self._internal_lock is None: 

246 return LockStatistics(False, None, 0) 

247 

248 return self._internal_lock.statistics() 

249 

250 

251class Condition: 

252 _owner_task: TaskInfo | None = None 

253 

254 def __init__(self, lock: Lock | None = None): 

255 self._lock = lock or Lock() 

256 self._waiters: deque[Event] = deque() 

257 

258 async def __aenter__(self) -> None: 

259 await self.acquire() 

260 

261 async def __aexit__( 

262 self, 

263 exc_type: type[BaseException] | None, 

264 exc_val: BaseException | None, 

265 exc_tb: TracebackType | None, 

266 ) -> None: 

267 self.release() 

268 

269 def _check_acquired(self) -> None: 

270 if self._owner_task != get_current_task(): 

271 raise RuntimeError("The current task is not holding the underlying lock") 

272 

273 async def acquire(self) -> None: 

274 """Acquire the underlying lock.""" 

275 await self._lock.acquire() 

276 self._owner_task = get_current_task() 

277 

278 def acquire_nowait(self) -> None: 

279 """ 

280 Acquire the underlying lock, without blocking. 

281 

282 :raises ~anyio.WouldBlock: if the operation would block 

283 

284 """ 

285 self._lock.acquire_nowait() 

286 self._owner_task = get_current_task() 

287 

288 def release(self) -> None: 

289 """Release the underlying lock.""" 

290 self._lock.release() 

291 

292 def locked(self) -> bool: 

293 """Return True if the lock is set.""" 

294 return self._lock.locked() 

295 

296 def notify(self, n: int = 1) -> None: 

297 """Notify exactly n listeners.""" 

298 self._check_acquired() 

299 for _ in range(n): 

300 try: 

301 event = self._waiters.popleft() 

302 except IndexError: 

303 break 

304 

305 event.set() 

306 

307 def notify_all(self) -> None: 

308 """Notify all the listeners.""" 

309 self._check_acquired() 

310 for event in self._waiters: 

311 event.set() 

312 

313 self._waiters.clear() 

314 

315 async def wait(self) -> None: 

316 """Wait for a notification.""" 

317 await checkpoint() 

318 event = Event() 

319 self._waiters.append(event) 

320 self.release() 

321 try: 

322 await event.wait() 

323 except BaseException: 

324 if not event.is_set(): 

325 self._waiters.remove(event) 

326 

327 raise 

328 finally: 

329 with CancelScope(shield=True): 

330 await self.acquire() 

331 

332 def statistics(self) -> ConditionStatistics: 

333 """ 

334 Return statistics about the current state of this condition. 

335 

336 .. versionadded:: 3.0 

337 """ 

338 return ConditionStatistics(len(self._waiters), self._lock.statistics()) 

339 

340 

341class Semaphore: 

342 def __new__( 

343 cls, 

344 initial_value: int, 

345 *, 

346 max_value: int | None = None, 

347 fast_acquire: bool = False, 

348 ) -> Semaphore: 

349 try: 

350 return get_async_backend().create_semaphore( 

351 initial_value, max_value=max_value, fast_acquire=fast_acquire 

352 ) 

353 except AsyncLibraryNotFoundError: 

354 return SemaphoreAdapter(initial_value, max_value=max_value) 

355 

356 def __init__( 

357 self, 

358 initial_value: int, 

359 *, 

360 max_value: int | None = None, 

361 fast_acquire: bool = False, 

362 ): 

363 if not isinstance(initial_value, int): 

364 raise TypeError("initial_value must be an integer") 

365 if initial_value < 0: 

366 raise ValueError("initial_value must be >= 0") 

367 if max_value is not None: 

368 if not isinstance(max_value, int): 

369 raise TypeError("max_value must be an integer or None") 

370 if max_value < initial_value: 

371 raise ValueError( 

372 "max_value must be equal to or higher than initial_value" 

373 ) 

374 

375 self._fast_acquire = fast_acquire 

376 

377 async def __aenter__(self) -> Semaphore: 

378 await self.acquire() 

379 return self 

380 

381 async def __aexit__( 

382 self, 

383 exc_type: type[BaseException] | None, 

384 exc_val: BaseException | None, 

385 exc_tb: TracebackType | None, 

386 ) -> None: 

387 self.release() 

388 

389 async def acquire(self) -> None: 

390 """Decrement the semaphore value, blocking if necessary.""" 

391 raise NotImplementedError 

392 

393 def acquire_nowait(self) -> None: 

394 """ 

395 Acquire the underlying lock, without blocking. 

396 

397 :raises ~anyio.WouldBlock: if the operation would block 

398 

399 """ 

400 raise NotImplementedError 

401 

402 def release(self) -> None: 

403 """Increment the semaphore value.""" 

404 raise NotImplementedError 

405 

406 @property 

407 def value(self) -> int: 

408 """The current value of the semaphore.""" 

409 raise NotImplementedError 

410 

411 @property 

412 def max_value(self) -> int | None: 

413 """The maximum value of the semaphore.""" 

414 raise NotImplementedError 

415 

416 def statistics(self) -> SemaphoreStatistics: 

417 """ 

418 Return statistics about the current state of this semaphore. 

419 

420 .. versionadded:: 3.0 

421 """ 

422 raise NotImplementedError 

423 

424 

425class SemaphoreAdapter(Semaphore): 

426 _internal_semaphore: Semaphore | None = None 

427 

428 def __new__( 

429 cls, 

430 initial_value: int, 

431 *, 

432 max_value: int | None = None, 

433 fast_acquire: bool = False, 

434 ) -> SemaphoreAdapter: 

435 return object.__new__(cls) 

436 

437 def __init__( 

438 self, 

439 initial_value: int, 

440 *, 

441 max_value: int | None = None, 

442 fast_acquire: bool = False, 

443 ) -> None: 

444 super().__init__(initial_value, max_value=max_value, fast_acquire=fast_acquire) 

445 self._initial_value = initial_value 

446 self._max_value = max_value 

447 

448 @property 

449 def _semaphore(self) -> Semaphore: 

450 if self._internal_semaphore is None: 

451 self._internal_semaphore = get_async_backend().create_semaphore( 

452 self._initial_value, max_value=self._max_value 

453 ) 

454 

455 return self._internal_semaphore 

456 

457 async def acquire(self) -> None: 

458 await self._semaphore.acquire() 

459 

460 def acquire_nowait(self) -> None: 

461 self._semaphore.acquire_nowait() 

462 

463 def release(self) -> None: 

464 self._semaphore.release() 

465 

466 @property 

467 def value(self) -> int: 

468 if self._internal_semaphore is None: 

469 return self._initial_value 

470 

471 return self._semaphore.value 

472 

473 @property 

474 def max_value(self) -> int | None: 

475 return self._max_value 

476 

477 def statistics(self) -> SemaphoreStatistics: 

478 if self._internal_semaphore is None: 

479 return SemaphoreStatistics(tasks_waiting=0) 

480 

481 return self._semaphore.statistics() 

482 

483 

484class CapacityLimiter: 

485 def __new__(cls, total_tokens: float) -> CapacityLimiter: 

486 try: 

487 return get_async_backend().create_capacity_limiter(total_tokens) 

488 except AsyncLibraryNotFoundError: 

489 return CapacityLimiterAdapter(total_tokens) 

490 

491 async def __aenter__(self) -> None: 

492 raise NotImplementedError 

493 

494 async def __aexit__( 

495 self, 

496 exc_type: type[BaseException] | None, 

497 exc_val: BaseException | None, 

498 exc_tb: TracebackType | None, 

499 ) -> bool | None: 

500 raise NotImplementedError 

501 

502 @property 

503 def total_tokens(self) -> float: 

504 """ 

505 The total number of tokens available for borrowing. 

506 

507 This is a read-write property. If the total number of tokens is increased, the 

508 proportionate number of tasks waiting on this limiter will be granted their 

509 tokens. 

510 

511 .. versionchanged:: 3.0 

512 The property is now writable. 

513 

514 """ 

515 raise NotImplementedError 

516 

517 @total_tokens.setter 

518 def total_tokens(self, value: float) -> None: 

519 raise NotImplementedError 

520 

521 @property 

522 def borrowed_tokens(self) -> int: 

523 """The number of tokens that have currently been borrowed.""" 

524 raise NotImplementedError 

525 

526 @property 

527 def available_tokens(self) -> float: 

528 """The number of tokens currently available to be borrowed""" 

529 raise NotImplementedError 

530 

531 def acquire_nowait(self) -> None: 

532 """ 

533 Acquire a token for the current task without waiting for one to become 

534 available. 

535 

536 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

537 

538 """ 

539 raise NotImplementedError 

540 

541 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

542 """ 

543 Acquire a token without waiting for one to become available. 

544 

545 :param borrower: the entity borrowing a token 

546 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

547 

548 """ 

549 raise NotImplementedError 

550 

551 async def acquire(self) -> None: 

552 """ 

553 Acquire a token for the current task, waiting if necessary for one to become 

554 available. 

555 

556 """ 

557 raise NotImplementedError 

558 

559 async def acquire_on_behalf_of(self, borrower: object) -> None: 

560 """ 

561 Acquire a token, waiting if necessary for one to become available. 

562 

563 :param borrower: the entity borrowing a token 

564 

565 """ 

566 raise NotImplementedError 

567 

568 def release(self) -> None: 

569 """ 

570 Release the token held by the current task. 

571 

572 :raises RuntimeError: if the current task has not borrowed a token from this 

573 limiter. 

574 

575 """ 

576 raise NotImplementedError 

577 

578 def release_on_behalf_of(self, borrower: object) -> None: 

579 """ 

580 Release the token held by the given borrower. 

581 

582 :raises RuntimeError: if the borrower has not borrowed a token from this 

583 limiter. 

584 

585 """ 

586 raise NotImplementedError 

587 

588 def statistics(self) -> CapacityLimiterStatistics: 

589 """ 

590 Return statistics about the current state of this limiter. 

591 

592 .. versionadded:: 3.0 

593 

594 """ 

595 raise NotImplementedError 

596 

597 

598class CapacityLimiterAdapter(CapacityLimiter): 

599 _internal_limiter: CapacityLimiter | None = None 

600 

601 def __new__(cls, total_tokens: float) -> CapacityLimiterAdapter: 

602 return object.__new__(cls) 

603 

604 def __init__(self, total_tokens: float) -> None: 

605 self.total_tokens = total_tokens 

606 

607 @property 

608 def _limiter(self) -> CapacityLimiter: 

609 if self._internal_limiter is None: 

610 self._internal_limiter = get_async_backend().create_capacity_limiter( 

611 self._total_tokens 

612 ) 

613 

614 return self._internal_limiter 

615 

616 async def __aenter__(self) -> None: 

617 await self._limiter.__aenter__() 

618 

619 async def __aexit__( 

620 self, 

621 exc_type: type[BaseException] | None, 

622 exc_val: BaseException | None, 

623 exc_tb: TracebackType | None, 

624 ) -> bool | None: 

625 return await self._limiter.__aexit__(exc_type, exc_val, exc_tb) 

626 

627 @property 

628 def total_tokens(self) -> float: 

629 if self._internal_limiter is None: 

630 return self._total_tokens 

631 

632 return self._internal_limiter.total_tokens 

633 

634 @total_tokens.setter 

635 def total_tokens(self, value: float) -> None: 

636 if not isinstance(value, int) and value is not math.inf: 

637 raise TypeError("total_tokens must be an int or math.inf") 

638 elif value < 1: 

639 raise ValueError("total_tokens must be >= 1") 

640 

641 if self._internal_limiter is None: 

642 self._total_tokens = value 

643 return 

644 

645 self._limiter.total_tokens = value 

646 

647 @property 

648 def borrowed_tokens(self) -> int: 

649 if self._internal_limiter is None: 

650 return 0 

651 

652 return self._internal_limiter.borrowed_tokens 

653 

654 @property 

655 def available_tokens(self) -> float: 

656 if self._internal_limiter is None: 

657 return self._total_tokens 

658 

659 return self._internal_limiter.available_tokens 

660 

661 def acquire_nowait(self) -> None: 

662 self._limiter.acquire_nowait() 

663 

664 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

665 self._limiter.acquire_on_behalf_of_nowait(borrower) 

666 

667 async def acquire(self) -> None: 

668 await self._limiter.acquire() 

669 

670 async def acquire_on_behalf_of(self, borrower: object) -> None: 

671 await self._limiter.acquire_on_behalf_of(borrower) 

672 

673 def release(self) -> None: 

674 self._limiter.release() 

675 

676 def release_on_behalf_of(self, borrower: object) -> None: 

677 self._limiter.release_on_behalf_of(borrower) 

678 

679 def statistics(self) -> CapacityLimiterStatistics: 

680 if self._internal_limiter is None: 

681 return CapacityLimiterStatistics( 

682 borrowed_tokens=0, 

683 total_tokens=self.total_tokens, 

684 borrowers=(), 

685 tasks_waiting=0, 

686 ) 

687 

688 return self._internal_limiter.statistics() 

689 

690 

691class ResourceGuard: 

692 """ 

693 A context manager for ensuring that a resource is only used by a single task at a 

694 time. 

695 

696 Entering this context manager while the previous has not exited it yet will trigger 

697 :exc:`BusyResourceError`. 

698 

699 :param action: the action to guard against (visible in the :exc:`BusyResourceError` 

700 when triggered, e.g. "Another task is already {action} this resource") 

701 

702 .. versionadded:: 4.1 

703 """ 

704 

705 __slots__ = "action", "_guarded" 

706 

707 def __init__(self, action: str = "using"): 

708 self.action: str = action 

709 self._guarded = False 

710 

711 def __enter__(self) -> None: 

712 if self._guarded: 

713 raise BusyResourceError(self.action) 

714 

715 self._guarded = True 

716 

717 def __exit__( 

718 self, 

719 exc_type: type[BaseException] | None, 

720 exc_val: BaseException | None, 

721 exc_tb: TracebackType | None, 

722 ) -> bool | None: 

723 self._guarded = False 

724 return None