Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_synchronization.py: 40%

241 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:38 +0000

1from __future__ import annotations 

2 

3from collections import deque 

4from dataclasses import dataclass 

5from types import TracebackType 

6 

7from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled 

8from ._eventloop import get_async_backend 

9from ._exceptions import BusyResourceError, WouldBlock 

10from ._tasks import CancelScope 

11from ._testing import TaskInfo, get_current_task 

12 

13 

14@dataclass(frozen=True) 

15class EventStatistics: 

16 """ 

17 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` 

18 """ 

19 

20 tasks_waiting: int 

21 

22 

23@dataclass(frozen=True) 

24class CapacityLimiterStatistics: 

25 """ 

26 :ivar int borrowed_tokens: number of tokens currently borrowed by tasks 

27 :ivar float total_tokens: total number of available tokens 

28 :ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from 

29 this limiter 

30 :ivar int tasks_waiting: number of tasks waiting on 

31 :meth:`~.CapacityLimiter.acquire` or 

32 :meth:`~.CapacityLimiter.acquire_on_behalf_of` 

33 """ 

34 

35 borrowed_tokens: int 

36 total_tokens: float 

37 borrowers: tuple[object, ...] 

38 tasks_waiting: int 

39 

40 

41@dataclass(frozen=True) 

42class LockStatistics: 

43 """ 

44 :ivar bool locked: flag indicating if this lock is locked or not 

45 :ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the 

46 lock is not held by any task) 

47 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` 

48 """ 

49 

50 locked: bool 

51 owner: TaskInfo | None 

52 tasks_waiting: int 

53 

54 

55@dataclass(frozen=True) 

56class ConditionStatistics: 

57 """ 

58 :ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` 

59 :ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying 

60 :class:`~.Lock` 

61 """ 

62 

63 tasks_waiting: int 

64 lock_statistics: LockStatistics 

65 

66 

67@dataclass(frozen=True) 

68class SemaphoreStatistics: 

69 """ 

70 :ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` 

71 

72 """ 

73 

74 tasks_waiting: int 

75 

76 

77class Event: 

78 def __new__(cls) -> Event: 

79 return get_async_backend().create_event() 

80 

81 def set(self) -> None: 

82 """Set the flag, notifying all listeners.""" 

83 raise NotImplementedError 

84 

85 def is_set(self) -> bool: 

86 """Return ``True`` if the flag is set, ``False`` if not.""" 

87 raise NotImplementedError 

88 

89 async def wait(self) -> None: 

90 """ 

91 Wait until the flag has been set. 

92 

93 If the flag has already been set when this method is called, it returns 

94 immediately. 

95 

96 """ 

97 raise NotImplementedError 

98 

99 def statistics(self) -> EventStatistics: 

100 """Return statistics about the current state of this event.""" 

101 raise NotImplementedError 

102 

103 

104class Lock: 

105 _owner_task: TaskInfo | None = None 

106 

107 def __init__(self) -> None: 

108 self._waiters: deque[tuple[TaskInfo, Event]] = deque() 

109 

110 async def __aenter__(self) -> None: 

111 await self.acquire() 

112 

113 async def __aexit__( 

114 self, 

115 exc_type: type[BaseException] | None, 

116 exc_val: BaseException | None, 

117 exc_tb: TracebackType | None, 

118 ) -> None: 

119 self.release() 

120 

121 async def acquire(self) -> None: 

122 """Acquire the lock.""" 

123 await checkpoint_if_cancelled() 

124 try: 

125 self.acquire_nowait() 

126 except WouldBlock: 

127 task = get_current_task() 

128 event = Event() 

129 token = task, event 

130 self._waiters.append(token) 

131 try: 

132 await event.wait() 

133 except BaseException: 

134 if not event.is_set(): 

135 self._waiters.remove(token) 

136 elif self._owner_task == task: 

137 self.release() 

138 

139 raise 

140 

141 assert self._owner_task == task 

142 else: 

143 try: 

144 await cancel_shielded_checkpoint() 

145 except BaseException: 

146 self.release() 

147 raise 

148 

149 def acquire_nowait(self) -> None: 

150 """ 

151 Acquire the lock, without blocking. 

152 

153 :raises ~anyio.WouldBlock: if the operation would block 

154 

155 """ 

156 task = get_current_task() 

157 if self._owner_task == task: 

158 raise RuntimeError("Attempted to acquire an already held Lock") 

159 

160 if self._owner_task is not None: 

161 raise WouldBlock 

162 

163 self._owner_task = task 

164 

165 def release(self) -> None: 

166 """Release the lock.""" 

167 if self._owner_task != get_current_task(): 

168 raise RuntimeError("The current task is not holding this lock") 

169 

170 if self._waiters: 

171 self._owner_task, event = self._waiters.popleft() 

172 event.set() 

173 else: 

174 del self._owner_task 

175 

176 def locked(self) -> bool: 

177 """Return True if the lock is currently held.""" 

178 return self._owner_task is not None 

179 

180 def statistics(self) -> LockStatistics: 

181 """ 

182 Return statistics about the current state of this lock. 

183 

184 .. versionadded:: 3.0 

185 """ 

186 return LockStatistics(self.locked(), self._owner_task, len(self._waiters)) 

187 

188 

189class Condition: 

190 _owner_task: TaskInfo | None = None 

191 

192 def __init__(self, lock: Lock | None = None): 

193 self._lock = lock or Lock() 

194 self._waiters: deque[Event] = deque() 

195 

196 async def __aenter__(self) -> None: 

197 await self.acquire() 

198 

199 async def __aexit__( 

200 self, 

201 exc_type: type[BaseException] | None, 

202 exc_val: BaseException | None, 

203 exc_tb: TracebackType | None, 

204 ) -> None: 

205 self.release() 

206 

207 def _check_acquired(self) -> None: 

208 if self._owner_task != get_current_task(): 

209 raise RuntimeError("The current task is not holding the underlying lock") 

210 

211 async def acquire(self) -> None: 

212 """Acquire the underlying lock.""" 

213 await self._lock.acquire() 

214 self._owner_task = get_current_task() 

215 

216 def acquire_nowait(self) -> None: 

217 """ 

218 Acquire the underlying lock, without blocking. 

219 

220 :raises ~anyio.WouldBlock: if the operation would block 

221 

222 """ 

223 self._lock.acquire_nowait() 

224 self._owner_task = get_current_task() 

225 

226 def release(self) -> None: 

227 """Release the underlying lock.""" 

228 self._lock.release() 

229 

230 def locked(self) -> bool: 

231 """Return True if the lock is set.""" 

232 return self._lock.locked() 

233 

234 def notify(self, n: int = 1) -> None: 

235 """Notify exactly n listeners.""" 

236 self._check_acquired() 

237 for _ in range(n): 

238 try: 

239 event = self._waiters.popleft() 

240 except IndexError: 

241 break 

242 

243 event.set() 

244 

245 def notify_all(self) -> None: 

246 """Notify all the listeners.""" 

247 self._check_acquired() 

248 for event in self._waiters: 

249 event.set() 

250 

251 self._waiters.clear() 

252 

253 async def wait(self) -> None: 

254 """Wait for a notification.""" 

255 await checkpoint() 

256 event = Event() 

257 self._waiters.append(event) 

258 self.release() 

259 try: 

260 await event.wait() 

261 except BaseException: 

262 if not event.is_set(): 

263 self._waiters.remove(event) 

264 

265 raise 

266 finally: 

267 with CancelScope(shield=True): 

268 await self.acquire() 

269 

270 def statistics(self) -> ConditionStatistics: 

271 """ 

272 Return statistics about the current state of this condition. 

273 

274 .. versionadded:: 3.0 

275 """ 

276 return ConditionStatistics(len(self._waiters), self._lock.statistics()) 

277 

278 

279class Semaphore: 

280 def __init__(self, initial_value: int, *, max_value: int | None = None): 

281 if not isinstance(initial_value, int): 

282 raise TypeError("initial_value must be an integer") 

283 if initial_value < 0: 

284 raise ValueError("initial_value must be >= 0") 

285 if max_value is not None: 

286 if not isinstance(max_value, int): 

287 raise TypeError("max_value must be an integer or None") 

288 if max_value < initial_value: 

289 raise ValueError( 

290 "max_value must be equal to or higher than initial_value" 

291 ) 

292 

293 self._value = initial_value 

294 self._max_value = max_value 

295 self._waiters: deque[Event] = deque() 

296 

297 async def __aenter__(self) -> Semaphore: 

298 await self.acquire() 

299 return self 

300 

301 async def __aexit__( 

302 self, 

303 exc_type: type[BaseException] | None, 

304 exc_val: BaseException | None, 

305 exc_tb: TracebackType | None, 

306 ) -> None: 

307 self.release() 

308 

309 async def acquire(self) -> None: 

310 """Decrement the semaphore value, blocking if necessary.""" 

311 await checkpoint_if_cancelled() 

312 try: 

313 self.acquire_nowait() 

314 except WouldBlock: 

315 event = Event() 

316 self._waiters.append(event) 

317 try: 

318 await event.wait() 

319 except BaseException: 

320 if not event.is_set(): 

321 self._waiters.remove(event) 

322 else: 

323 self.release() 

324 

325 raise 

326 else: 

327 try: 

328 await cancel_shielded_checkpoint() 

329 except BaseException: 

330 self.release() 

331 raise 

332 

333 def acquire_nowait(self) -> None: 

334 """ 

335 Acquire the underlying lock, without blocking. 

336 

337 :raises ~anyio.WouldBlock: if the operation would block 

338 

339 """ 

340 if self._value == 0: 

341 raise WouldBlock 

342 

343 self._value -= 1 

344 

345 def release(self) -> None: 

346 """Increment the semaphore value.""" 

347 if self._max_value is not None and self._value == self._max_value: 

348 raise ValueError("semaphore released too many times") 

349 

350 if self._waiters: 

351 self._waiters.popleft().set() 

352 else: 

353 self._value += 1 

354 

355 @property 

356 def value(self) -> int: 

357 """The current value of the semaphore.""" 

358 return self._value 

359 

360 @property 

361 def max_value(self) -> int | None: 

362 """The maximum value of the semaphore.""" 

363 return self._max_value 

364 

365 def statistics(self) -> SemaphoreStatistics: 

366 """ 

367 Return statistics about the current state of this semaphore. 

368 

369 .. versionadded:: 3.0 

370 """ 

371 return SemaphoreStatistics(len(self._waiters)) 

372 

373 

374class CapacityLimiter: 

375 def __new__(cls, total_tokens: float) -> CapacityLimiter: 

376 return get_async_backend().create_capacity_limiter(total_tokens) 

377 

378 async def __aenter__(self) -> None: 

379 raise NotImplementedError 

380 

381 async def __aexit__( 

382 self, 

383 exc_type: type[BaseException] | None, 

384 exc_val: BaseException | None, 

385 exc_tb: TracebackType | None, 

386 ) -> bool | None: 

387 raise NotImplementedError 

388 

389 @property 

390 def total_tokens(self) -> float: 

391 """ 

392 The total number of tokens available for borrowing. 

393 

394 This is a read-write property. If the total number of tokens is increased, the 

395 proportionate number of tasks waiting on this limiter will be granted their 

396 tokens. 

397 

398 .. versionchanged:: 3.0 

399 The property is now writable. 

400 

401 """ 

402 raise NotImplementedError 

403 

404 @total_tokens.setter 

405 def total_tokens(self, value: float) -> None: 

406 raise NotImplementedError 

407 

408 @property 

409 def borrowed_tokens(self) -> int: 

410 """The number of tokens that have currently been borrowed.""" 

411 raise NotImplementedError 

412 

413 @property 

414 def available_tokens(self) -> float: 

415 """The number of tokens currently available to be borrowed""" 

416 raise NotImplementedError 

417 

418 def acquire_nowait(self) -> None: 

419 """ 

420 Acquire a token for the current task without waiting for one to become 

421 available. 

422 

423 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

424 

425 """ 

426 raise NotImplementedError 

427 

428 def acquire_on_behalf_of_nowait(self, borrower: object) -> None: 

429 """ 

430 Acquire a token without waiting for one to become available. 

431 

432 :param borrower: the entity borrowing a token 

433 :raises ~anyio.WouldBlock: if there are no tokens available for borrowing 

434 

435 """ 

436 raise NotImplementedError 

437 

438 async def acquire(self) -> None: 

439 """ 

440 Acquire a token for the current task, waiting if necessary for one to become 

441 available. 

442 

443 """ 

444 raise NotImplementedError 

445 

446 async def acquire_on_behalf_of(self, borrower: object) -> None: 

447 """ 

448 Acquire a token, waiting if necessary for one to become available. 

449 

450 :param borrower: the entity borrowing a token 

451 

452 """ 

453 raise NotImplementedError 

454 

455 def release(self) -> None: 

456 """ 

457 Release the token held by the current task. 

458 

459 :raises RuntimeError: if the current task has not borrowed a token from this 

460 limiter. 

461 

462 """ 

463 raise NotImplementedError 

464 

465 def release_on_behalf_of(self, borrower: object) -> None: 

466 """ 

467 Release the token held by the given borrower. 

468 

469 :raises RuntimeError: if the borrower has not borrowed a token from this 

470 limiter. 

471 

472 """ 

473 raise NotImplementedError 

474 

475 def statistics(self) -> CapacityLimiterStatistics: 

476 """ 

477 Return statistics about the current state of this limiter. 

478 

479 .. versionadded:: 3.0 

480 

481 """ 

482 raise NotImplementedError 

483 

484 

485class ResourceGuard: 

486 __slots__ = "action", "_guarded" 

487 

488 def __init__(self, action: str): 

489 self.action = action 

490 self._guarded = False 

491 

492 def __enter__(self) -> None: 

493 if self._guarded: 

494 raise BusyResourceError(self.action) 

495 

496 self._guarded = True 

497 

498 def __exit__( 

499 self, 

500 exc_type: type[BaseException] | None, 

501 exc_val: BaseException | None, 

502 exc_tb: TracebackType | None, 

503 ) -> bool | None: 

504 self._guarded = False 

505 return None