Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/locks.py: 23%

228 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1"""Synchronization primitives.""" 

2 

3__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore') 

4 

5import collections 

6import types 

7import warnings 

8 

9from . import events 

10from . import futures 

11from . import exceptions 

12from .import coroutines 

13 

14 

15class _ContextManager: 

16 """Context manager. 

17 

18 This enables the following idiom for acquiring and releasing a 

19 lock around a block: 

20 

21 with (yield from lock): 

22 <block> 

23 

24 while failing loudly when accidentally using: 

25 

26 with lock: 

27 <block> 

28 

29 Deprecated, use 'async with' statement: 

30 async with lock: 

31 <block> 

32 """ 

33 

34 def __init__(self, lock): 

35 self._lock = lock 

36 

37 def __enter__(self): 

38 # We have no use for the "as ..." clause in the with 

39 # statement for locks. 

40 return None 

41 

42 def __exit__(self, *args): 

43 try: 

44 self._lock.release() 

45 finally: 

46 self._lock = None # Crudely prevent reuse. 

47 

48 

49class _ContextManagerMixin: 

50 def __enter__(self): 

51 raise RuntimeError( 

52 '"yield from" should be used as context manager expression') 

53 

54 def __exit__(self, *args): 

55 # This must exist because __enter__ exists, even though that 

56 # always raises; that's how the with-statement works. 

57 pass 

58 

59 @types.coroutine 

60 def __iter__(self): 

61 # This is not a coroutine. It is meant to enable the idiom: 

62 # 

63 # with (yield from lock): 

64 # <block> 

65 # 

66 # as an alternative to: 

67 # 

68 # yield from lock.acquire() 

69 # try: 

70 # <block> 

71 # finally: 

72 # lock.release() 

73 # Deprecated, use 'async with' statement: 

74 # async with lock: 

75 # <block> 

76 warnings.warn("'with (yield from lock)' is deprecated " 

77 "use 'async with lock' instead", 

78 DeprecationWarning, stacklevel=2) 

79 yield from self.acquire() 

80 return _ContextManager(self) 

81 

82 # The flag is needed for legacy asyncio.iscoroutine() 

83 __iter__._is_coroutine = coroutines._is_coroutine 

84 

85 async def __acquire_ctx(self): 

86 await self.acquire() 

87 return _ContextManager(self) 

88 

89 def __await__(self): 

90 warnings.warn("'with await lock' is deprecated " 

91 "use 'async with lock' instead", 

92 DeprecationWarning, stacklevel=2) 

93 # To make "with await lock" work. 

94 return self.__acquire_ctx().__await__() 

95 

96 async def __aenter__(self): 

97 await self.acquire() 

98 # We have no use for the "as ..." clause in the with 

99 # statement for locks. 

100 return None 

101 

102 async def __aexit__(self, exc_type, exc, tb): 

103 self.release() 

104 

105 

106class Lock(_ContextManagerMixin): 

107 """Primitive lock objects. 

108 

109 A primitive lock is a synchronization primitive that is not owned 

110 by a particular coroutine when locked. A primitive lock is in one 

111 of two states, 'locked' or 'unlocked'. 

112 

113 It is created in the unlocked state. It has two basic methods, 

114 acquire() and release(). When the state is unlocked, acquire() 

115 changes the state to locked and returns immediately. When the 

116 state is locked, acquire() blocks until a call to release() in 

117 another coroutine changes it to unlocked, then the acquire() call 

118 resets it to locked and returns. The release() method should only 

119 be called in the locked state; it changes the state to unlocked 

120 and returns immediately. If an attempt is made to release an 

121 unlocked lock, a RuntimeError will be raised. 

122 

123 When more than one coroutine is blocked in acquire() waiting for 

124 the state to turn to unlocked, only one coroutine proceeds when a 

125 release() call resets the state to unlocked; first coroutine which 

126 is blocked in acquire() is being processed. 

127 

128 acquire() is a coroutine and should be called with 'await'. 

129 

130 Locks also support the asynchronous context management protocol. 

131 'async with lock' statement should be used. 

132 

133 Usage: 

134 

135 lock = Lock() 

136 ... 

137 await lock.acquire() 

138 try: 

139 ... 

140 finally: 

141 lock.release() 

142 

143 Context manager usage: 

144 

145 lock = Lock() 

146 ... 

147 async with lock: 

148 ... 

149 

150 Lock objects can be tested for locking state: 

151 

152 if not lock.locked(): 

153 await lock.acquire() 

154 else: 

155 # lock is acquired 

156 ... 

157 

158 """ 

159 

160 def __init__(self, *, loop=None): 

161 self._waiters = None 

162 self._locked = False 

163 if loop is None: 

164 self._loop = events.get_event_loop() 

165 else: 

166 self._loop = loop 

167 warnings.warn("The loop argument is deprecated since Python 3.8, " 

168 "and scheduled for removal in Python 3.10.", 

169 DeprecationWarning, stacklevel=2) 

170 

171 def __repr__(self): 

172 res = super().__repr__() 

173 extra = 'locked' if self._locked else 'unlocked' 

174 if self._waiters: 

175 extra = f'{extra}, waiters:{len(self._waiters)}' 

176 return f'<{res[1:-1]} [{extra}]>' 

177 

178 def locked(self): 

179 """Return True if lock is acquired.""" 

180 return self._locked 

181 

182 async def acquire(self): 

183 """Acquire a lock. 

184 

185 This method blocks until the lock is unlocked, then sets it to 

186 locked and returns True. 

187 """ 

188 if (not self._locked and (self._waiters is None or 

189 all(w.cancelled() for w in self._waiters))): 

190 self._locked = True 

191 return True 

192 

193 if self._waiters is None: 

194 self._waiters = collections.deque() 

195 fut = self._loop.create_future() 

196 self._waiters.append(fut) 

197 

198 # Finally block should be called before the CancelledError 

199 # handling as we don't want CancelledError to call 

200 # _wake_up_first() and attempt to wake up itself. 

201 try: 

202 try: 

203 await fut 

204 finally: 

205 self._waiters.remove(fut) 

206 except exceptions.CancelledError: 

207 if not self._locked: 

208 self._wake_up_first() 

209 raise 

210 

211 self._locked = True 

212 return True 

213 

214 def release(self): 

215 """Release a lock. 

216 

217 When the lock is locked, reset it to unlocked, and return. 

218 If any other coroutines are blocked waiting for the lock to become 

219 unlocked, allow exactly one of them to proceed. 

220 

221 When invoked on an unlocked lock, a RuntimeError is raised. 

222 

223 There is no return value. 

224 """ 

225 if self._locked: 

226 self._locked = False 

227 self._wake_up_first() 

228 else: 

229 raise RuntimeError('Lock is not acquired.') 

230 

231 def _wake_up_first(self): 

232 """Wake up the first waiter if it isn't done.""" 

233 if not self._waiters: 

234 return 

235 try: 

236 fut = next(iter(self._waiters)) 

237 except StopIteration: 

238 return 

239 

240 # .done() necessarily means that a waiter will wake up later on and 

241 # either take the lock, or, if it was cancelled and lock wasn't 

242 # taken already, will hit this again and wake up a new waiter. 

243 if not fut.done(): 

244 fut.set_result(True) 

245 

246 

247class Event: 

248 """Asynchronous equivalent to threading.Event. 

249 

250 Class implementing event objects. An event manages a flag that can be set 

251 to true with the set() method and reset to false with the clear() method. 

252 The wait() method blocks until the flag is true. The flag is initially 

253 false. 

254 """ 

255 

256 def __init__(self, *, loop=None): 

257 self._waiters = collections.deque() 

258 self._value = False 

259 if loop is None: 

260 self._loop = events.get_event_loop() 

261 else: 

262 self._loop = loop 

263 warnings.warn("The loop argument is deprecated since Python 3.8, " 

264 "and scheduled for removal in Python 3.10.", 

265 DeprecationWarning, stacklevel=2) 

266 

267 def __repr__(self): 

268 res = super().__repr__() 

269 extra = 'set' if self._value else 'unset' 

270 if self._waiters: 

271 extra = f'{extra}, waiters:{len(self._waiters)}' 

272 return f'<{res[1:-1]} [{extra}]>' 

273 

274 def is_set(self): 

275 """Return True if and only if the internal flag is true.""" 

276 return self._value 

277 

278 def set(self): 

279 """Set the internal flag to true. All coroutines waiting for it to 

280 become true are awakened. Coroutine that call wait() once the flag is 

281 true will not block at all. 

282 """ 

283 if not self._value: 

284 self._value = True 

285 

286 for fut in self._waiters: 

287 if not fut.done(): 

288 fut.set_result(True) 

289 

290 def clear(self): 

291 """Reset the internal flag to false. Subsequently, coroutines calling 

292 wait() will block until set() is called to set the internal flag 

293 to true again.""" 

294 self._value = False 

295 

296 async def wait(self): 

297 """Block until the internal flag is true. 

298 

299 If the internal flag is true on entry, return True 

300 immediately. Otherwise, block until another coroutine calls 

301 set() to set the flag to true, then return True. 

302 """ 

303 if self._value: 

304 return True 

305 

306 fut = self._loop.create_future() 

307 self._waiters.append(fut) 

308 try: 

309 await fut 

310 return True 

311 finally: 

312 self._waiters.remove(fut) 

313 

314 

315class Condition(_ContextManagerMixin): 

316 """Asynchronous equivalent to threading.Condition. 

317 

318 This class implements condition variable objects. A condition variable 

319 allows one or more coroutines to wait until they are notified by another 

320 coroutine. 

321 

322 A new Lock object is created and used as the underlying lock. 

323 """ 

324 

325 def __init__(self, lock=None, *, loop=None): 

326 if loop is None: 

327 self._loop = events.get_event_loop() 

328 else: 

329 self._loop = loop 

330 warnings.warn("The loop argument is deprecated since Python 3.8, " 

331 "and scheduled for removal in Python 3.10.", 

332 DeprecationWarning, stacklevel=2) 

333 

334 if lock is None: 

335 lock = Lock(loop=loop) 

336 elif lock._loop is not self._loop: 

337 raise ValueError("loop argument must agree with lock") 

338 

339 self._lock = lock 

340 # Export the lock's locked(), acquire() and release() methods. 

341 self.locked = lock.locked 

342 self.acquire = lock.acquire 

343 self.release = lock.release 

344 

345 self._waiters = collections.deque() 

346 

347 def __repr__(self): 

348 res = super().__repr__() 

349 extra = 'locked' if self.locked() else 'unlocked' 

350 if self._waiters: 

351 extra = f'{extra}, waiters:{len(self._waiters)}' 

352 return f'<{res[1:-1]} [{extra}]>' 

353 

354 async def wait(self): 

355 """Wait until notified. 

356 

357 If the calling coroutine has not acquired the lock when this 

358 method is called, a RuntimeError is raised. 

359 

360 This method releases the underlying lock, and then blocks 

361 until it is awakened by a notify() or notify_all() call for 

362 the same condition variable in another coroutine. Once 

363 awakened, it re-acquires the lock and returns True. 

364 """ 

365 if not self.locked(): 

366 raise RuntimeError('cannot wait on un-acquired lock') 

367 

368 self.release() 

369 try: 

370 fut = self._loop.create_future() 

371 self._waiters.append(fut) 

372 try: 

373 await fut 

374 return True 

375 finally: 

376 self._waiters.remove(fut) 

377 

378 finally: 

379 # Must reacquire lock even if wait is cancelled 

380 cancelled = False 

381 while True: 

382 try: 

383 await self.acquire() 

384 break 

385 except exceptions.CancelledError: 

386 cancelled = True 

387 

388 if cancelled: 

389 raise exceptions.CancelledError 

390 

391 async def wait_for(self, predicate): 

392 """Wait until a predicate becomes true. 

393 

394 The predicate should be a callable which result will be 

395 interpreted as a boolean value. The final predicate value is 

396 the return value. 

397 """ 

398 result = predicate() 

399 while not result: 

400 await self.wait() 

401 result = predicate() 

402 return result 

403 

404 def notify(self, n=1): 

405 """By default, wake up one coroutine waiting on this condition, if any. 

406 If the calling coroutine has not acquired the lock when this method 

407 is called, a RuntimeError is raised. 

408 

409 This method wakes up at most n of the coroutines waiting for the 

410 condition variable; it is a no-op if no coroutines are waiting. 

411 

412 Note: an awakened coroutine does not actually return from its 

413 wait() call until it can reacquire the lock. Since notify() does 

414 not release the lock, its caller should. 

415 """ 

416 if not self.locked(): 

417 raise RuntimeError('cannot notify on un-acquired lock') 

418 

419 idx = 0 

420 for fut in self._waiters: 

421 if idx >= n: 

422 break 

423 

424 if not fut.done(): 

425 idx += 1 

426 fut.set_result(False) 

427 

428 def notify_all(self): 

429 """Wake up all threads waiting on this condition. This method acts 

430 like notify(), but wakes up all waiting threads instead of one. If the 

431 calling thread has not acquired the lock when this method is called, 

432 a RuntimeError is raised. 

433 """ 

434 self.notify(len(self._waiters)) 

435 

436 

437class Semaphore(_ContextManagerMixin): 

438 """A Semaphore implementation. 

439 

440 A semaphore manages an internal counter which is decremented by each 

441 acquire() call and incremented by each release() call. The counter 

442 can never go below zero; when acquire() finds that it is zero, it blocks, 

443 waiting until some other thread calls release(). 

444 

445 Semaphores also support the context management protocol. 

446 

447 The optional argument gives the initial value for the internal 

448 counter; it defaults to 1. If the value given is less than 0, 

449 ValueError is raised. 

450 """ 

451 

452 def __init__(self, value=1, *, loop=None): 

453 if value < 0: 

454 raise ValueError("Semaphore initial value must be >= 0") 

455 self._value = value 

456 self._waiters = collections.deque() 

457 if loop is None: 

458 self._loop = events.get_event_loop() 

459 else: 

460 self._loop = loop 

461 warnings.warn("The loop argument is deprecated since Python 3.8, " 

462 "and scheduled for removal in Python 3.10.", 

463 DeprecationWarning, stacklevel=2) 

464 

465 def __repr__(self): 

466 res = super().__repr__() 

467 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}' 

468 if self._waiters: 

469 extra = f'{extra}, waiters:{len(self._waiters)}' 

470 return f'<{res[1:-1]} [{extra}]>' 

471 

472 def _wake_up_next(self): 

473 while self._waiters: 

474 waiter = self._waiters.popleft() 

475 if not waiter.done(): 

476 waiter.set_result(None) 

477 return 

478 

479 def locked(self): 

480 """Returns True if semaphore can not be acquired immediately.""" 

481 return self._value == 0 

482 

483 async def acquire(self): 

484 """Acquire a semaphore. 

485 

486 If the internal counter is larger than zero on entry, 

487 decrement it by one and return True immediately. If it is 

488 zero on entry, block, waiting until some other coroutine has 

489 called release() to make it larger than 0, and then return 

490 True. 

491 """ 

492 while self._value <= 0: 

493 fut = self._loop.create_future() 

494 self._waiters.append(fut) 

495 try: 

496 await fut 

497 except: 

498 # See the similar code in Queue.get. 

499 fut.cancel() 

500 if self._value > 0 and not fut.cancelled(): 

501 self._wake_up_next() 

502 raise 

503 self._value -= 1 

504 return True 

505 

506 def release(self): 

507 """Release a semaphore, incrementing the internal counter by one. 

508 When it was zero on entry and another coroutine is waiting for it to 

509 become larger than zero again, wake up that coroutine. 

510 """ 

511 self._value += 1 

512 self._wake_up_next() 

513 

514 

515class BoundedSemaphore(Semaphore): 

516 """A bounded semaphore implementation. 

517 

518 This raises ValueError in release() if it would increase the value 

519 above the initial value. 

520 """ 

521 

522 def __init__(self, value=1, *, loop=None): 

523 if loop: 

524 warnings.warn("The loop argument is deprecated since Python 3.8, " 

525 "and scheduled for removal in Python 3.10.", 

526 DeprecationWarning, stacklevel=2) 

527 

528 self._bound_value = value 

529 super().__init__(value, loop=loop) 

530 

531 def release(self): 

532 if self._value >= self._bound_value: 

533 raise ValueError('BoundedSemaphore released too many times') 

534 super().release()