Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_synchronization.py: 39%

264 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1from __future__ import annotations 

2 

3from collections import deque 

4from dataclasses import dataclass 

5from types import TracebackType 

6from warnings import warn 

7 

8from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled 

9from ._compat import DeprecatedAwaitable 

10from ._eventloop import get_asynclib 

11from ._exceptions import BusyResourceError, WouldBlock 

12from ._tasks import CancelScope 

13from ._testing import TaskInfo, get_current_task 

14 

15 

16@dataclass(frozen=True) 

17class EventStatistics: 

18 """ 

19 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` 

20 """ 

21 

22 tasks_waiting: int 

23 

24 

25@dataclass(frozen=True) 

26class CapacityLimiterStatistics: 

27 """ 

28 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks 

29 :ivar float total_tokens: total number of available tokens 

30 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from this 

31 limiter 

32 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.CapacityLimiter.acquire` or 

33 :meth:`~.CapacityLimiter.acquire_on_behalf_of` 

34 """ 

35 

36 borrowed_tokens: int 

37 total_tokens: float 

38 borrowers: tuple[object, ...] 

39 tasks_waiting: int 

40 

41 

42@dataclass(frozen=True) 

43class LockStatistics: 

44 """ 

45 :ivar bool locked: flag indicating if this lock is locked or not 

46 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the lock is not 

47 held by any task) 

48 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` 

49 """ 

50 

51 locked: bool 

52 owner: TaskInfo | None 

53 tasks_waiting: int 

54 

55 

56@dataclass(frozen=True) 

57class ConditionStatistics: 

58 """ 

59 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` 

60 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying :class:`~.Lock` 

61 """ 

62 

63 tasks_waiting: int 

64 lock_statistics: LockStatistics 

65 

66 

67@dataclass(frozen=True) 

68class SemaphoreStatistics: 

69 """ 

70 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` 

71 

72 """ 

73 

74 tasks_waiting: int 

75 

76 

77class Event: 

78 def __new__(cls) -> Event: 

79 return get_asynclib().Event() 

80 

81 def set(self) -> DeprecatedAwaitable: 

82 """Set the flag, notifying all listeners.""" 

83 raise NotImplementedError 

84 

85 def is_set(self) -> bool: 

86 """Return ``True`` if the flag is set, ``False`` if not.""" 

87 raise NotImplementedError 

88 

89 async def wait(self) -> None: 

90 """ 

91 Wait until the flag has been set. 

92 

93 If the flag has already been set when this method is called, it returns immediately. 

94 

95 """ 

96 raise NotImplementedError 

97 

98 def statistics(self) -> EventStatistics: 

99 """Return statistics about the current state of this event.""" 

100 raise NotImplementedError 

101 

102 

103class Lock: 

104 _owner_task: TaskInfo | None = None 

105 

106 def __init__(self) -> None: 

107 self._waiters: deque[tuple[TaskInfo, Event]] = deque() 

108 

109 async def __aenter__(self) -> None: 

110 await self.acquire() 

111 

112 async def __aexit__( 

113 self, 

114 exc_type: type[BaseException] | None, 

115 exc_val: BaseException | None, 

116 exc_tb: TracebackType | None, 

117 ) -> None: 

118 self.release() 

119 

120 async def acquire(self) -> None: 

121 """Acquire the lock.""" 

122 await checkpoint_if_cancelled() 

123 try: 

124 self.acquire_nowait() 

125 except WouldBlock: 

126 task = get_current_task() 

127 event = Event() 

128 token = task, event 

129 self._waiters.append(token) 

130 try: 

131 await event.wait() 

132 except BaseException: 

133 if not event.is_set(): 

134 self._waiters.remove(token) 

135 elif self._owner_task == task: 

136 self.release() 

137 

138 raise 

139 

140 assert self._owner_task == task 

141 else: 

142 try: 

143 await cancel_shielded_checkpoint() 

144 except BaseException: 

145 self.release() 

146 raise 

147 

148 def acquire_nowait(self) -> None: 

149 """ 

150 Acquire the lock, without blocking. 

151 

152 :raises ~WouldBlock: if the operation would block 

153 

154 """ 

155 task = get_current_task() 

156 if self._owner_task == task: 

157 raise RuntimeError("Attempted to acquire an already held Lock") 

158 

159 if self._owner_task is not None: 

160 raise WouldBlock 

161 

162 self._owner_task = task 

163 

164 def release(self) -> DeprecatedAwaitable: 

165 """Release the lock.""" 

166 if self._owner_task != get_current_task(): 

167 raise RuntimeError("The current task is not holding this lock") 

168 

169 if self._waiters: 

170 self._owner_task, event = self._waiters.popleft() 

171 event.set() 

172 else: 

173 del self._owner_task 

174 

175 return DeprecatedAwaitable(self.release) 

176 

177 def locked(self) -> bool: 

178 """Return True if the lock is currently held.""" 

179 return self._owner_task is not None 

180 

181 def statistics(self) -> LockStatistics: 

182 """ 

183 Return statistics about the current state of this lock. 

184 

185 .. versionadded:: 3.0 

186 """ 

187 return LockStatistics(self.locked(), self._owner_task, len(self._waiters)) 

188 

189 

190class Condition: 

191 _owner_task: TaskInfo | None = None 

192 

193 def __init__(self, lock: Lock | None = None): 

194 self._lock = lock or Lock() 

195 self._waiters: deque[Event] = deque() 

196 

197 async def __aenter__(self) -> None: 

198 await self.acquire() 

199 

200 async def __aexit__( 

201 self, 

202 exc_type: type[BaseException] | None, 

203 exc_val: BaseException | None, 

204 exc_tb: TracebackType | None, 

205 ) -> None: 

206 self.release() 

207 

208 def _check_acquired(self) -> None: 

209 if self._owner_task != get_current_task(): 

210 raise RuntimeError("The current task is not holding the underlying lock") 

211 

212 async def acquire(self) -> None: 

213 """Acquire the underlying lock.""" 

214 await self._lock.acquire() 

215 self._owner_task = get_current_task() 

216 

217 def acquire_nowait(self) -> None: 

218 """ 

219 Acquire the underlying lock, without blocking. 

220 

221 :raises ~WouldBlock: if the operation would block 

222 

223 """ 

224 self._lock.acquire_nowait() 

225 self._owner_task = get_current_task() 

226 

227 def release(self) -> DeprecatedAwaitable: 

228 """Release the underlying lock.""" 

229 self._lock.release() 

230 return DeprecatedAwaitable(self.release) 

231 

232 def locked(self) -> bool: 

233 """Return True if the lock is set.""" 

234 return self._lock.locked() 

235 

236 def notify(self, n: int = 1) -> None: 

237 """Notify exactly n listeners.""" 

238 self._check_acquired() 

239 for _ in range(n): 

240 try: 

241 event = self._waiters.popleft() 

242 except IndexError: 

243 break 

244 

245 event.set() 

246 

247 def notify_all(self) -> None: 

248 """Notify all the listeners.""" 

249 self._check_acquired() 

250 for event in self._waiters: 

251 event.set() 

252 

253 self._waiters.clear() 

254 

255 async def wait(self) -> None: 

256 """Wait for a notification.""" 

257 await checkpoint() 

258 event = Event() 

259 self._waiters.append(event) 

260 self.release() 

261 try: 

262 await event.wait() 

263 except BaseException: 

264 if not event.is_set(): 

265 self._waiters.remove(event) 

266 

267 raise 

268 finally: 

269 with CancelScope(shield=True): 

270 await self.acquire() 

271 

272 def statistics(self) -> ConditionStatistics: 

273 """ 

274 Return statistics about the current state of this condition. 

275 

276 .. versionadded:: 3.0 

277 """ 

278 return ConditionStatistics(len(self._waiters), self._lock.statistics()) 

279 

280 

281class Semaphore: 

282 def __init__(self, initial_value: int, *, max_value: int | None = None): 

283 if not isinstance(initial_value, int): 

284 raise TypeError("initial_value must be an integer") 

285 if initial_value < 0: 

286 raise ValueError("initial_value must be >= 0") 

287 if max_value is not None: 

288 if not isinstance(max_value, int): 

289 raise TypeError("max_value must be an integer or None") 

290 if max_value < initial_value: 

291 raise ValueError( 

292 "max_value must be equal to or higher than initial_value" 

293 ) 

294 

295 self._value = initial_value 

296 self._max_value = max_value 

297 self._waiters: deque[Event] = deque() 

298 

299 async def __aenter__(self) -> Semaphore: 

300 await self.acquire() 

301 return self 

302 

303 async def __aexit__( 

304 self, 

305 exc_type: type[BaseException] | None, 

306 exc_val: BaseException | None, 

307 exc_tb: TracebackType | None, 

308 ) -> None: 

309 self.release() 

310 

311 async def acquire(self) -> None: 

312 """Decrement the semaphore value, blocking if necessary.""" 

313 await checkpoint_if_cancelled() 

314 try: 

315 self.acquire_nowait() 

316 except WouldBlock: 

317 event = Event() 

318 self._waiters.append(event) 

319 try: 

320 await event.wait() 

321 except BaseException: 

322 if not event.is_set(): 

323 self._waiters.remove(event) 

324 else: 

325 self.release() 

326 

327 raise 

328 else: 

329 try: 

330 await cancel_shielded_checkpoint() 

331 except BaseException: 

332 self.release() 

333 raise 

334 

335 def acquire_nowait(self) -> None: 

336 """ 

337 Acquire the underlying lock, without blocking. 

338 

339 :raises ~WouldBlock: if the operation would block 

340 

341 """ 

342 if self._value == 0: 

343 raise WouldBlock 

344 

345 self._value -= 1 

346 

347 def release(self) -> DeprecatedAwaitable: 

348 """Increment the semaphore value.""" 

349 if self._max_value is not None and self._value == self._max_value: 

350 raise ValueError("semaphore released too many times") 

351 

352 if self._waiters: 

353 self._waiters.popleft().set() 

354 else: 

355 self._value += 1 

356 

357 return DeprecatedAwaitable(self.release) 

358 

359 @property 

360 def value(self) -> int: 

361 """The current value of the semaphore.""" 

362 return self._value 

363 

364 @property 

365 def max_value(self) -> int | None: 

366 """The maximum value of the semaphore.""" 

367 return self._max_value 

368 

369 def statistics(self) -> SemaphoreStatistics: 

370 """ 

371 Return statistics about the current state of this semaphore. 

372 

373 .. versionadded:: 3.0 

374 """ 

375 return SemaphoreStatistics(len(self._waiters)) 

376 

377 

378class CapacityLimiter: 

379 def __new__(cls, total_tokens: float) -> CapacityLimiter: 

380 return get_asynclib().CapacityLimiter(total_tokens) 

381 

382 async def __aenter__(self) -> None: 

383 raise NotImplementedError 

384 

385 async def __aexit__( 

386 self, 

387 exc_type: type[BaseException] | None, 

388 exc_val: BaseException | None, 

389 exc_tb: TracebackType | None, 

390 ) -> bool | None: 

391 raise NotImplementedError 

392 

393 @property 

394 def total_tokens(self) -> float: 

395 """ 

396 The total number of tokens available for borrowing. 

397 

398 This is a read-write property. If the total number of tokens is increased, the 

399 proportionate number of tasks waiting on this limiter will be granted their tokens. 

400 

401 .. versionchanged:: 3.0 

402 The property is now writable. 

403 

404 """ 

405 raise NotImplementedError 

406 

407 @total_tokens.setter 

408 def total_tokens(self, value: float) -> None: 

409 raise NotImplementedError 

410 

411 async def set_total_tokens(self, value: float) -> None: 

412 warn( 

413 "CapacityLimiter.set_total_tokens has been deprecated. Set the value of the" 

414 '"total_tokens" attribute directly.', 

415 DeprecationWarning, 

416 ) 

417 self.total_tokens = value 

418 

419 @property 

420 def borrowed_tokens(self) -> int: 

421 """The number of tokens that have currently been borrowed.""" 

422 raise NotImplementedError 

423 

424 @property 

425 def available_tokens(self) -> float: 

426 """The number of tokens currently available to be borrowed""" 

427 raise NotImplementedError 

428 

429 def acquire_nowait(self) -> DeprecatedAwaitable: 

430 """ 

431 Acquire a token for the current task without waiting for one to become available. 

432 

433 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

434 

435 """ 

436 raise NotImplementedError 

437 

438 def acquire_on_behalf_of_nowait(self, borrower: object) -> DeprecatedAwaitable: 

439 """ 

440 Acquire a token without waiting for one to become available. 

441 

442 :param borrower: the entity borrowing a token 

443 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

444 

445 """ 

446 raise NotImplementedError 

447 

448 async def acquire(self) -> None: 

449 """ 

450 Acquire a token for the current task, waiting if necessary for one to become available. 

451 

452 """ 

453 raise NotImplementedError 

454 

455 async def acquire_on_behalf_of(self, borrower: object) -> None: 

456 """ 

457 Acquire a token, waiting if necessary for one to become available. 

458 

459 :param borrower: the entity borrowing a token 

460 

461 """ 

462 raise NotImplementedError 

463 

464 def release(self) -> None: 

465 """ 

466 Release the token held by the current task. 

467 :raises RuntimeError: if the current task has not borrowed a token from this limiter. 

468 

469 """ 

470 raise NotImplementedError 

471 

472 def release_on_behalf_of(self, borrower: object) -> None: 

473 """ 

474 Release the token held by the given borrower. 

475 

476 :raises RuntimeError: if the borrower has not borrowed a token from this limiter. 

477 

478 """ 

479 raise NotImplementedError 

480 

481 def statistics(self) -> CapacityLimiterStatistics: 

482 """ 

483 Return statistics about the current state of this limiter. 

484 

485 .. versionadded:: 3.0 

486 

487 """ 

488 raise NotImplementedError 

489 

490 

491def create_lock() -> Lock: 

492 """ 

493 Create an asynchronous lock. 

494 

495 :return: a lock object 

496 

497 .. deprecated:: 3.0 

498 Use :class:`~Lock` directly. 

499 

500 """ 

501 warn("create_lock() is deprecated -- use Lock() directly", DeprecationWarning) 

502 return Lock() 

503 

504 

505def create_condition(lock: Lock | None = None) -> Condition: 

506 """ 

507 Create an asynchronous condition. 

508 

509 :param lock: the lock to base the condition object on 

510 :return: a condition object 

511 

512 .. deprecated:: 3.0 

513 Use :class:`~Condition` directly. 

514 

515 """ 

516 warn( 

517 "create_condition() is deprecated -- use Condition() directly", 

518 DeprecationWarning, 

519 ) 

520 return Condition(lock=lock) 

521 

522 

523def create_event() -> Event: 

524 """ 

525 Create an asynchronous event object. 

526 

527 :return: an event object 

528 

529 .. deprecated:: 3.0 

530 Use :class:`~Event` directly. 

531 

532 """ 

533 warn("create_event() is deprecated -- use Event() directly", DeprecationWarning) 

534 return get_asynclib().Event() 

535 

536 

537def create_semaphore(value: int, *, max_value: int | None = None) -> Semaphore: 

538 """ 

539 Create an asynchronous semaphore. 

540 

541 :param value: the semaphore's initial value 

542 :param max_value: if set, makes this a "bounded" semaphore that raises :exc:`ValueError` if the 

543 semaphore's value would exceed this number 

544 :return: a semaphore object 

545 

546 .. deprecated:: 3.0 

547 Use :class:`~Semaphore` directly. 

548 

549 """ 

550 warn( 

551 "create_semaphore() is deprecated -- use Semaphore() directly", 

552 DeprecationWarning, 

553 ) 

554 return Semaphore(value, max_value=max_value) 

555 

556 

557def create_capacity_limiter(total_tokens: float) -> CapacityLimiter: 

558 """ 

559 Create a capacity limiter. 

560 

561 :param total_tokens: the total number of tokens available for borrowing (can be an integer or 

562 :data:`math.inf`) 

563 :return: a capacity limiter object 

564 

565 .. deprecated:: 3.0 

566 Use :class:`~CapacityLimiter` directly. 

567 

568 """ 

569 warn( 

570 "create_capacity_limiter() is deprecated -- use CapacityLimiter() directly", 

571 DeprecationWarning, 

572 ) 

573 return get_asynclib().CapacityLimiter(total_tokens) 

574 

575 

576class ResourceGuard: 

577 __slots__ = "action", "_guarded" 

578 

579 def __init__(self, action: str): 

580 self.action = action 

581 self._guarded = False 

582 

583 def __enter__(self) -> None: 

584 if self._guarded: 

585 raise BusyResourceError(self.action) 

586 

587 self._guarded = True 

588 

589 def __exit__( 

590 self, 

591 exc_type: type[BaseException] | None, 

592 exc_val: BaseException | None, 

593 exc_tb: TracebackType | None, 

594 ) -> bool | None: 

595 self._guarded = False 

596 return None