Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_synchronization.py: 39%

264 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1from collections import deque 

2from dataclasses import dataclass 

3from types import TracebackType 

4from typing import Deque, Optional, Tuple, Type 

5from warnings import warn 

6 

7from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled 

8from ._compat import DeprecatedAwaitable 

9from ._eventloop import get_asynclib 

10from ._exceptions import BusyResourceError, WouldBlock 

11from ._tasks import CancelScope 

12from ._testing import TaskInfo, get_current_task 

13 

14 

15@dataclass(frozen=True) 

16class EventStatistics: 

17 """ 

18 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` 

19 """ 

20 

21 tasks_waiting: int 

22 

23 

24@dataclass(frozen=True) 

25class CapacityLimiterStatistics: 

26 """ 

27 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks 

28 :ivar float total_tokens: total number of available tokens 

29 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from this 

30 limiter 

31 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.CapacityLimiter.acquire` or 

32 :meth:`~.CapacityLimiter.acquire_on_behalf_of` 

33 """ 

34 

35 borrowed_tokens: int 

36 total_tokens: float 

37 borrowers: Tuple[object, ...] 

38 tasks_waiting: int 

39 

40 

41@dataclass(frozen=True) 

42class LockStatistics: 

43 """ 

44 :ivar bool locked: flag indicating if this lock is locked or not 

45 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the lock is not 

46 held by any task) 

47 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` 

48 """ 

49 

50 locked: bool 

51 owner: Optional[TaskInfo] 

52 tasks_waiting: int 

53 

54 

55@dataclass(frozen=True) 

56class ConditionStatistics: 

57 """ 

58 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` 

59 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying :class:`~.Lock` 

60 """ 

61 

62 tasks_waiting: int 

63 lock_statistics: LockStatistics 

64 

65 

66@dataclass(frozen=True) 

67class SemaphoreStatistics: 

68 """ 

69 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` 

70 

71 """ 

72 

73 tasks_waiting: int 

74 

75 

76class Event: 

77 def __new__(cls) -> "Event": 

78 return get_asynclib().Event() 

79 

80 def set(self) -> DeprecatedAwaitable: 

81 """Set the flag, notifying all listeners.""" 

82 raise NotImplementedError 

83 

84 def is_set(self) -> bool: 

85 """Return ``True`` if the flag is set, ``False`` if not.""" 

86 raise NotImplementedError 

87 

88 async def wait(self) -> None: 

89 """ 

90 Wait until the flag has been set. 

91 

92 If the flag has already been set when this method is called, it returns immediately. 

93 

94 """ 

95 raise NotImplementedError 

96 

97 def statistics(self) -> EventStatistics: 

98 """Return statistics about the current state of this event.""" 

99 raise NotImplementedError 

100 

101 

102class Lock: 

103 _owner_task: Optional[TaskInfo] = None 

104 

105 def __init__(self) -> None: 

106 self._waiters: Deque[Tuple[TaskInfo, Event]] = deque() 

107 

108 async def __aenter__(self) -> None: 

109 await self.acquire() 

110 

111 async def __aexit__( 

112 self, 

113 exc_type: Optional[Type[BaseException]], 

114 exc_val: Optional[BaseException], 

115 exc_tb: Optional[TracebackType], 

116 ) -> None: 

117 self.release() 

118 

119 async def acquire(self) -> None: 

120 """Acquire the lock.""" 

121 await checkpoint_if_cancelled() 

122 try: 

123 self.acquire_nowait() 

124 except WouldBlock: 

125 task = get_current_task() 

126 event = Event() 

127 token = task, event 

128 self._waiters.append(token) 

129 try: 

130 await event.wait() 

131 except BaseException: 

132 if not event.is_set(): 

133 self._waiters.remove(token) 

134 elif self._owner_task == task: 

135 self.release() 

136 

137 raise 

138 

139 assert self._owner_task == task 

140 else: 

141 try: 

142 await cancel_shielded_checkpoint() 

143 except BaseException: 

144 self.release() 

145 raise 

146 

147 def acquire_nowait(self) -> None: 

148 """ 

149 Acquire the lock, without blocking. 

150 

151 :raises ~WouldBlock: if the operation would block 

152 

153 """ 

154 task = get_current_task() 

155 if self._owner_task == task: 

156 raise RuntimeError("Attempted to acquire an already held Lock") 

157 

158 if self._owner_task is not None: 

159 raise WouldBlock 

160 

161 self._owner_task = task 

162 

163 def release(self) -> DeprecatedAwaitable: 

164 """Release the lock.""" 

165 if self._owner_task != get_current_task(): 

166 raise RuntimeError("The current task is not holding this lock") 

167 

168 if self._waiters: 

169 self._owner_task, event = self._waiters.popleft() 

170 event.set() 

171 else: 

172 del self._owner_task 

173 

174 return DeprecatedAwaitable(self.release) 

175 

176 def locked(self) -> bool: 

177 """Return True if the lock is currently held.""" 

178 return self._owner_task is not None 

179 

180 def statistics(self) -> LockStatistics: 

181 """ 

182 Return statistics about the current state of this lock. 

183 

184 .. versionadded:: 3.0 

185 """ 

186 return LockStatistics(self.locked(), self._owner_task, len(self._waiters)) 

187 

188 

189class Condition: 

190 _owner_task: Optional[TaskInfo] = None 

191 

192 def __init__(self, lock: Optional[Lock] = None): 

193 self._lock = lock or Lock() 

194 self._waiters: Deque[Event] = deque() 

195 

196 async def __aenter__(self) -> None: 

197 await self.acquire() 

198 

199 async def __aexit__( 

200 self, 

201 exc_type: Optional[Type[BaseException]], 

202 exc_val: Optional[BaseException], 

203 exc_tb: Optional[TracebackType], 

204 ) -> None: 

205 self.release() 

206 

207 def _check_acquired(self) -> None: 

208 if self._owner_task != get_current_task(): 

209 raise RuntimeError("The current task is not holding the underlying lock") 

210 

211 async def acquire(self) -> None: 

212 """Acquire the underlying lock.""" 

213 await self._lock.acquire() 

214 self._owner_task = get_current_task() 

215 

216 def acquire_nowait(self) -> None: 

217 """ 

218 Acquire the underlying lock, without blocking. 

219 

220 :raises ~WouldBlock: if the operation would block 

221 

222 """ 

223 self._lock.acquire_nowait() 

224 self._owner_task = get_current_task() 

225 

226 def release(self) -> DeprecatedAwaitable: 

227 """Release the underlying lock.""" 

228 self._lock.release() 

229 return DeprecatedAwaitable(self.release) 

230 

231 def locked(self) -> bool: 

232 """Return True if the lock is set.""" 

233 return self._lock.locked() 

234 

235 def notify(self, n: int = 1) -> None: 

236 """Notify exactly n listeners.""" 

237 self._check_acquired() 

238 for _ in range(n): 

239 try: 

240 event = self._waiters.popleft() 

241 except IndexError: 

242 break 

243 

244 event.set() 

245 

246 def notify_all(self) -> None: 

247 """Notify all the listeners.""" 

248 self._check_acquired() 

249 for event in self._waiters: 

250 event.set() 

251 

252 self._waiters.clear() 

253 

254 async def wait(self) -> None: 

255 """Wait for a notification.""" 

256 await checkpoint() 

257 event = Event() 

258 self._waiters.append(event) 

259 self.release() 

260 try: 

261 await event.wait() 

262 except BaseException: 

263 if not event.is_set(): 

264 self._waiters.remove(event) 

265 

266 raise 

267 finally: 

268 with CancelScope(shield=True): 

269 await self.acquire() 

270 

271 def statistics(self) -> ConditionStatistics: 

272 """ 

273 Return statistics about the current state of this condition. 

274 

275 .. versionadded:: 3.0 

276 """ 

277 return ConditionStatistics(len(self._waiters), self._lock.statistics()) 

278 

279 

280class Semaphore: 

281 def __init__(self, initial_value: int, *, max_value: Optional[int] = None): 

282 if not isinstance(initial_value, int): 

283 raise TypeError("initial_value must be an integer") 

284 if initial_value < 0: 

285 raise ValueError("initial_value must be >= 0") 

286 if max_value is not None: 

287 if not isinstance(max_value, int): 

288 raise TypeError("max_value must be an integer or None") 

289 if max_value < initial_value: 

290 raise ValueError( 

291 "max_value must be equal to or higher than initial_value" 

292 ) 

293 

294 self._value = initial_value 

295 self._max_value = max_value 

296 self._waiters: Deque[Event] = deque() 

297 

298 async def __aenter__(self) -> "Semaphore": 

299 await self.acquire() 

300 return self 

301 

302 async def __aexit__( 

303 self, 

304 exc_type: Optional[Type[BaseException]], 

305 exc_val: Optional[BaseException], 

306 exc_tb: Optional[TracebackType], 

307 ) -> None: 

308 self.release() 

309 

310 async def acquire(self) -> None: 

311 """Decrement the semaphore value, blocking if necessary.""" 

312 await checkpoint_if_cancelled() 

313 try: 

314 self.acquire_nowait() 

315 except WouldBlock: 

316 event = Event() 

317 self._waiters.append(event) 

318 try: 

319 await event.wait() 

320 except BaseException: 

321 if not event.is_set(): 

322 self._waiters.remove(event) 

323 else: 

324 self.release() 

325 

326 raise 

327 else: 

328 try: 

329 await cancel_shielded_checkpoint() 

330 except BaseException: 

331 self.release() 

332 raise 

333 

334 def acquire_nowait(self) -> None: 

335 """ 

336 Acquire the underlying lock, without blocking. 

337 

338 :raises ~WouldBlock: if the operation would block 

339 

340 """ 

341 if self._value == 0: 

342 raise WouldBlock 

343 

344 self._value -= 1 

345 

346 def release(self) -> DeprecatedAwaitable: 

347 """Increment the semaphore value.""" 

348 if self._max_value is not None and self._value == self._max_value: 

349 raise ValueError("semaphore released too many times") 

350 

351 if self._waiters: 

352 self._waiters.popleft().set() 

353 else: 

354 self._value += 1 

355 

356 return DeprecatedAwaitable(self.release) 

357 

358 @property 

359 def value(self) -> int: 

360 """The current value of the semaphore.""" 

361 return self._value 

362 

363 @property 

364 def max_value(self) -> Optional[int]: 

365 """The maximum value of the semaphore.""" 

366 return self._max_value 

367 

368 def statistics(self) -> SemaphoreStatistics: 

369 """ 

370 Return statistics about the current state of this semaphore. 

371 

372 .. versionadded:: 3.0 

373 """ 

374 return SemaphoreStatistics(len(self._waiters)) 

375 

376 

377class CapacityLimiter: 

378 def __new__(cls, total_tokens: float) -> "CapacityLimiter": 

379 return get_asynclib().CapacityLimiter(total_tokens) 

380 

381 async def __aenter__(self) -> None: 

382 raise NotImplementedError 

383 

384 async def __aexit__( 

385 self, 

386 exc_type: Optional[Type[BaseException]], 

387 exc_val: Optional[BaseException], 

388 exc_tb: Optional[TracebackType], 

389 ) -> Optional[bool]: 

390 raise NotImplementedError 

391 

392 @property 

393 def total_tokens(self) -> float: 

394 """ 

395 The total number of tokens available for borrowing. 

396 

397 This is a read-write property. If the total number of tokens is increased, the 

398 proportionate number of tasks waiting on this limiter will be granted their tokens. 

399 

400 .. versionchanged:: 3.0 

401 The property is now writable. 

402 

403 """ 

404 raise NotImplementedError 

405 

406 @total_tokens.setter 

407 def total_tokens(self, value: float) -> None: 

408 raise NotImplementedError 

409 

410 async def set_total_tokens(self, value: float) -> None: 

411 warn( 

412 "CapacityLimiter.set_total_tokens has been deprecated. Set the value of the" 

413 '"total_tokens" attribute directly.', 

414 DeprecationWarning, 

415 ) 

416 self.total_tokens = value 

417 

418 @property 

419 def borrowed_tokens(self) -> int: 

420 """The number of tokens that have currently been borrowed.""" 

421 raise NotImplementedError 

422 

423 @property 

424 def available_tokens(self) -> float: 

425 """The number of tokens currently available to be borrowed""" 

426 raise NotImplementedError 

427 

428 def acquire_nowait(self) -> DeprecatedAwaitable: 

429 """ 

430 Acquire a token for the current task without waiting for one to become available. 

431 

432 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

433 

434 """ 

435 raise NotImplementedError 

436 

437 def acquire_on_behalf_of_nowait(self, borrower: object) -> DeprecatedAwaitable: 

438 """ 

439 Acquire a token without waiting for one to become available. 

440 

441 :param borrower: the entity borrowing a token 

442 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

443 

444 """ 

445 raise NotImplementedError 

446 

447 async def acquire(self) -> None: 

448 """ 

449 Acquire a token for the current task, waiting if necessary for one to become available. 

450 

451 """ 

452 raise NotImplementedError 

453 

454 async def acquire_on_behalf_of(self, borrower: object) -> None: 

455 """ 

456 Acquire a token, waiting if necessary for one to become available. 

457 

458 :param borrower: the entity borrowing a token 

459 

460 """ 

461 raise NotImplementedError 

462 

463 def release(self) -> None: 

464 """ 

465 Release the token held by the current task. 

466 :raises RuntimeError: if the current task has not borrowed a token from this limiter. 

467 

468 """ 

469 raise NotImplementedError 

470 

471 def release_on_behalf_of(self, borrower: object) -> None: 

472 """ 

473 Release the token held by the given borrower. 

474 

475 :raises RuntimeError: if the borrower has not borrowed a token from this limiter. 

476 

477 """ 

478 raise NotImplementedError 

479 

480 def statistics(self) -> CapacityLimiterStatistics: 

481 """ 

482 Return statistics about the current state of this limiter. 

483 

484 .. versionadded:: 3.0 

485 

486 """ 

487 raise NotImplementedError 

488 

489 

490def create_lock() -> Lock: 

491 """ 

492 Create an asynchronous lock. 

493 

494 :return: a lock object 

495 

496 .. deprecated:: 3.0 

497 Use :class:`~Lock` directly. 

498 

499 """ 

500 warn("create_lock() is deprecated -- use Lock() directly", DeprecationWarning) 

501 return Lock() 

502 

503 

504def create_condition(lock: Optional[Lock] = None) -> Condition: 

505 """ 

506 Create an asynchronous condition. 

507 

508 :param lock: the lock to base the condition object on 

509 :return: a condition object 

510 

511 .. deprecated:: 3.0 

512 Use :class:`~Condition` directly. 

513 

514 """ 

515 warn( 

516 "create_condition() is deprecated -- use Condition() directly", 

517 DeprecationWarning, 

518 ) 

519 return Condition(lock=lock) 

520 

521 

522def create_event() -> Event: 

523 """ 

524 Create an asynchronous event object. 

525 

526 :return: an event object 

527 

528 .. deprecated:: 3.0 

529 Use :class:`~Event` directly. 

530 

531 """ 

532 warn("create_event() is deprecated -- use Event() directly", DeprecationWarning) 

533 return get_asynclib().Event() 

534 

535 

536def create_semaphore(value: int, *, max_value: Optional[int] = None) -> Semaphore: 

537 """ 

538 Create an asynchronous semaphore. 

539 

540 :param value: the semaphore's initial value 

541 :param max_value: if set, makes this a "bounded" semaphore that raises :exc:`ValueError` if the 

542 semaphore's value would exceed this number 

543 :return: a semaphore object 

544 

545 .. deprecated:: 3.0 

546 Use :class:`~Semaphore` directly. 

547 

548 """ 

549 warn( 

550 "create_semaphore() is deprecated -- use Semaphore() directly", 

551 DeprecationWarning, 

552 ) 

553 return Semaphore(value, max_value=max_value) 

554 

555 

556def create_capacity_limiter(total_tokens: float) -> CapacityLimiter: 

557 """ 

558 Create a capacity limiter. 

559 

560 :param total_tokens: the total number of tokens available for borrowing (can be an integer or 

561 :data:`math.inf`) 

562 :return: a capacity limiter object 

563 

564 .. deprecated:: 3.0 

565 Use :class:`~CapacityLimiter` directly. 

566 

567 """ 

568 warn( 

569 "create_capacity_limiter() is deprecated -- use CapacityLimiter() directly", 

570 DeprecationWarning, 

571 ) 

572 return get_asynclib().CapacityLimiter(total_tokens) 

573 

574 

575class ResourceGuard: 

576 __slots__ = "action", "_guarded" 

577 

578 def __init__(self, action: str): 

579 self.action = action 

580 self._guarded = False 

581 

582 def __enter__(self) -> None: 

583 if self._guarded: 

584 raise BusyResourceError(self.action) 

585 

586 self._guarded = True 

587 

588 def __exit__( 

589 self, 

590 exc_type: Optional[Type[BaseException]], 

591 exc_val: Optional[BaseException], 

592 exc_tb: Optional[TracebackType], 

593 ) -> Optional[bool]: 

594 self._guarded = False 

595 return None