Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/locks.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

155 statements  

1# Copyright 2015 The Tornado Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14 

15import collections 

16import datetime 

17import types 

18 

19from tornado import gen, ioloop 

20from tornado.concurrent import Future, future_set_result_unless_cancelled 

21 

22from typing import Union, Optional, Type, Any, Awaitable 

23import typing 

24 

25if typing.TYPE_CHECKING: 

26 from typing import Deque, Set # noqa: F401 

27 

28__all__ = ["Condition", "Event", "Semaphore", "BoundedSemaphore", "Lock"] 

29 

30 

31class _TimeoutGarbageCollector: 

32 """Base class for objects that periodically clean up timed-out waiters. 

33 

34 Avoids memory leak in a common pattern like: 

35 

36 while True: 

37 yield condition.wait(short_timeout) 

38 print('looping....') 

39 """ 

40 

41 def __init__(self) -> None: 

42 self._waiters = collections.deque() # type: Deque[Future] 

43 self._timeouts = 0 

44 

45 def _garbage_collect(self) -> None: 

46 # Occasionally clear timed-out waiters. 

47 self._timeouts += 1 

48 if self._timeouts > 100: 

49 self._timeouts = 0 

50 self._waiters = collections.deque(w for w in self._waiters if not w.done()) 

51 

52 

53class Condition(_TimeoutGarbageCollector): 

54 """A condition allows one or more coroutines to wait until notified. 

55 

56 Like a standard `threading.Condition`, but does not need an underlying lock 

57 that is acquired and released. 

58 

59 With a `Condition`, coroutines can wait to be notified by other coroutines: 

60 

61 .. testcode:: 

62 

63 import asyncio 

64 from tornado import gen 

65 from tornado.locks import Condition 

66 

67 condition = Condition() 

68 

69 async def waiter(): 

70 print("I'll wait right here") 

71 await condition.wait() 

72 print("I'm done waiting") 

73 

74 async def notifier(): 

75 print("About to notify") 

76 condition.notify() 

77 print("Done notifying") 

78 

79 async def runner(): 

80 # Wait for waiter() and notifier() in parallel 

81 await gen.multi([waiter(), notifier()]) 

82 

83 asyncio.run(runner()) 

84 

85 .. testoutput:: 

86 

87 I'll wait right here 

88 About to notify 

89 Done notifying 

90 I'm done waiting 

91 

92 `wait` takes an optional ``timeout`` argument, which is either an absolute 

93 timestamp:: 

94 

95 io_loop = IOLoop.current() 

96 

97 # Wait up to 1 second for a notification. 

98 await condition.wait(timeout=io_loop.time() + 1) 

99 

100 ...or a `datetime.timedelta` for a timeout relative to the current time:: 

101 

102 # Wait up to 1 second. 

103 await condition.wait(timeout=datetime.timedelta(seconds=1)) 

104 

105 The method returns False if there's no notification before the deadline. 

106 

107 .. versionchanged:: 5.0 

108 Previously, waiters could be notified synchronously from within 

109 `notify`. Now, the notification will always be received on the 

110 next iteration of the `.IOLoop`. 

111 """ 

112 

113 def __repr__(self) -> str: 

114 result = f"<{self.__class__.__name__}" 

115 if self._waiters: 

116 result += " waiters[%s]" % len(self._waiters) 

117 return result + ">" 

118 

119 def wait( 

120 self, timeout: Optional[Union[float, datetime.timedelta]] = None 

121 ) -> Awaitable[bool]: 

122 """Wait for `.notify`. 

123 

124 Returns a `.Future` that resolves ``True`` if the condition is notified, 

125 or ``False`` after a timeout. 

126 """ 

127 waiter = Future() # type: Future[bool] 

128 self._waiters.append(waiter) 

129 if timeout: 

130 

131 def on_timeout() -> None: 

132 if not waiter.done(): 

133 future_set_result_unless_cancelled(waiter, False) 

134 self._garbage_collect() 

135 

136 io_loop = ioloop.IOLoop.current() 

137 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 

138 waiter.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle)) 

139 return waiter 

140 

141 def notify(self, n: int = 1) -> None: 

142 """Wake ``n`` waiters.""" 

143 waiters = [] # Waiters we plan to run right now. 

144 while n and self._waiters: 

145 waiter = self._waiters.popleft() 

146 if not waiter.done(): # Might have timed out. 

147 n -= 1 

148 waiters.append(waiter) 

149 

150 for waiter in waiters: 

151 future_set_result_unless_cancelled(waiter, True) 

152 

153 def notify_all(self) -> None: 

154 """Wake all waiters.""" 

155 self.notify(len(self._waiters)) 

156 

157 

158class Event: 

159 """An event blocks coroutines until its internal flag is set to True. 

160 

161 Similar to `threading.Event`. 

162 

163 A coroutine can wait for an event to be set. Once it is set, calls to 

164 ``yield event.wait()`` will not block unless the event has been cleared: 

165 

166 .. testcode:: 

167 

168 import asyncio 

169 from tornado import gen 

170 from tornado.locks import Event 

171 

172 event = Event() 

173 

174 async def waiter(): 

175 print("Waiting for event") 

176 await event.wait() 

177 print("Not waiting this time") 

178 await event.wait() 

179 print("Done") 

180 

181 async def setter(): 

182 print("About to set the event") 

183 event.set() 

184 

185 async def runner(): 

186 await gen.multi([waiter(), setter()]) 

187 

188 asyncio.run(runner()) 

189 

190 .. testoutput:: 

191 

192 Waiting for event 

193 About to set the event 

194 Not waiting this time 

195 Done 

196 """ 

197 

198 def __init__(self) -> None: 

199 self._value = False 

200 self._waiters = set() # type: Set[Future[None]] 

201 

202 def __repr__(self) -> str: 

203 return "<{} {}>".format( 

204 self.__class__.__name__, 

205 "set" if self.is_set() else "clear", 

206 ) 

207 

208 def is_set(self) -> bool: 

209 """Return ``True`` if the internal flag is true.""" 

210 return self._value 

211 

212 def set(self) -> None: 

213 """Set the internal flag to ``True``. All waiters are awakened. 

214 

215 Calling `.wait` once the flag is set will not block. 

216 """ 

217 if not self._value: 

218 self._value = True 

219 

220 for fut in self._waiters: 

221 if not fut.done(): 

222 fut.set_result(None) 

223 

224 def clear(self) -> None: 

225 """Reset the internal flag to ``False``. 

226 

227 Calls to `.wait` will block until `.set` is called. 

228 """ 

229 self._value = False 

230 

231 def wait( 

232 self, timeout: Optional[Union[float, datetime.timedelta]] = None 

233 ) -> Awaitable[None]: 

234 """Block until the internal flag is true. 

235 

236 Returns an awaitable, which raises `tornado.util.TimeoutError` after a 

237 timeout. 

238 """ 

239 fut = Future() # type: Future[None] 

240 if self._value: 

241 fut.set_result(None) 

242 return fut 

243 self._waiters.add(fut) 

244 fut.add_done_callback(lambda fut: self._waiters.remove(fut)) 

245 if timeout is None: 

246 return fut 

247 else: 

248 timeout_fut = gen.with_timeout(timeout, fut) 

249 # This is a slightly clumsy workaround for the fact that 

250 # gen.with_timeout doesn't cancel its futures. Cancelling 

251 # fut will remove it from the waiters list. 

252 timeout_fut.add_done_callback( 

253 lambda tf: fut.cancel() if not fut.done() else None 

254 ) 

255 return timeout_fut 

256 

257 

258class _ReleasingContextManager: 

259 """Releases a Lock or Semaphore at the end of a "with" statement. 

260 

261 with (yield semaphore.acquire()): 

262 pass 

263 

264 # Now semaphore.release() has been called. 

265 """ 

266 

267 def __init__(self, obj: Any) -> None: 

268 self._obj = obj 

269 

270 def __enter__(self) -> None: 

271 pass 

272 

273 def __exit__( 

274 self, 

275 exc_type: "Optional[Type[BaseException]]", 

276 exc_val: Optional[BaseException], 

277 exc_tb: Optional[types.TracebackType], 

278 ) -> None: 

279 self._obj.release() 

280 

281 

282class Semaphore(_TimeoutGarbageCollector): 

283 """A lock that can be acquired a fixed number of times before blocking. 

284 

285 A Semaphore manages a counter representing the number of `.release` calls 

286 minus the number of `.acquire` calls, plus an initial value. The `.acquire` 

287 method blocks if necessary until it can return without making the counter 

288 negative. 

289 

290 Semaphores limit access to a shared resource. To allow access for two 

291 workers at a time: 

292 

293 .. testsetup:: semaphore 

294 

295 from collections import deque 

296 

297 from tornado import gen 

298 from tornado.ioloop import IOLoop 

299 from tornado.concurrent import Future 

300 

301 inited = False 

302 

303 async def simulator(futures): 

304 for f in futures: 

305 # simulate the asynchronous passage of time 

306 await gen.sleep(0) 

307 await gen.sleep(0) 

308 f.set_result(None) 

309 

310 def use_some_resource(): 

311 global inited 

312 global futures_q 

313 if not inited: 

314 inited = True 

315 # Ensure reliable doctest output: resolve Futures one at a time. 

316 futures_q = deque([Future() for _ in range(3)]) 

317 IOLoop.current().add_callback(simulator, list(futures_q)) 

318 

319 return futures_q.popleft() 

320 

321 .. testcode:: semaphore 

322 

323 import asyncio 

324 from tornado import gen 

325 from tornado.locks import Semaphore 

326 

327 sem = Semaphore(2) 

328 

329 async def worker(worker_id): 

330 await sem.acquire() 

331 try: 

332 print("Worker %d is working" % worker_id) 

333 await use_some_resource() 

334 finally: 

335 print("Worker %d is done" % worker_id) 

336 sem.release() 

337 

338 async def runner(): 

339 # Join all workers. 

340 await gen.multi([worker(i) for i in range(3)]) 

341 

342 asyncio.run(runner()) 

343 

344 .. testoutput:: semaphore 

345 

346 Worker 0 is working 

347 Worker 1 is working 

348 Worker 0 is done 

349 Worker 2 is working 

350 Worker 1 is done 

351 Worker 2 is done 

352 

353 Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until 

354 the semaphore has been released once, by worker 0. 

355 

356 The semaphore can be used as an async context manager:: 

357 

358 async def worker(worker_id): 

359 async with sem: 

360 print("Worker %d is working" % worker_id) 

361 await use_some_resource() 

362 

363 # Now the semaphore has been released. 

364 print("Worker %d is done" % worker_id) 

365 

366 For compatibility with older versions of Python, `.acquire` is a 

367 context manager, so ``worker`` could also be written as:: 

368 

369 @gen.coroutine 

370 def worker(worker_id): 

371 with (yield sem.acquire()): 

372 print("Worker %d is working" % worker_id) 

373 yield use_some_resource() 

374 

375 # Now the semaphore has been released. 

376 print("Worker %d is done" % worker_id) 

377 

378 .. versionchanged:: 4.3 

379 Added ``async with`` support in Python 3.5. 

380 

381 """ 

382 

383 def __init__(self, value: int = 1) -> None: 

384 super().__init__() 

385 if value < 0: 

386 raise ValueError("semaphore initial value must be >= 0") 

387 

388 self._value = value 

389 

390 def __repr__(self) -> str: 

391 res = super().__repr__() 

392 extra = "locked" if self._value == 0 else f"unlocked,value:{self._value}" 

393 if self._waiters: 

394 extra = f"{extra},waiters:{len(self._waiters)}" 

395 return f"<{res[1:-1]} [{extra}]>" 

396 

397 def release(self) -> None: 

398 """Increment the counter and wake one waiter.""" 

399 self._value += 1 

400 while self._waiters: 

401 waiter = self._waiters.popleft() 

402 if not waiter.done(): 

403 self._value -= 1 

404 

405 # If the waiter is a coroutine paused at 

406 # 

407 # with (yield semaphore.acquire()): 

408 # 

409 # then the context manager's __exit__ calls release() at the end 

410 # of the "with" block. 

411 waiter.set_result(_ReleasingContextManager(self)) 

412 break 

413 

414 def acquire( 

415 self, timeout: Optional[Union[float, datetime.timedelta]] = None 

416 ) -> Awaitable[_ReleasingContextManager]: 

417 """Decrement the counter. Returns an awaitable. 

418 

419 Block if the counter is zero and wait for a `.release`. The awaitable 

420 raises `.TimeoutError` after the deadline. 

421 """ 

422 waiter = Future() # type: Future[_ReleasingContextManager] 

423 if self._value > 0: 

424 self._value -= 1 

425 waiter.set_result(_ReleasingContextManager(self)) 

426 else: 

427 self._waiters.append(waiter) 

428 if timeout: 

429 

430 def on_timeout() -> None: 

431 if not waiter.done(): 

432 waiter.set_exception(gen.TimeoutError()) 

433 self._garbage_collect() 

434 

435 io_loop = ioloop.IOLoop.current() 

436 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 

437 waiter.add_done_callback( 

438 lambda _: io_loop.remove_timeout(timeout_handle) 

439 ) 

440 return waiter 

441 

442 def __enter__(self) -> None: 

443 raise RuntimeError("Use 'async with' instead of 'with' for Semaphore") 

444 

445 def __exit__( 

446 self, 

447 typ: "Optional[Type[BaseException]]", 

448 value: Optional[BaseException], 

449 traceback: Optional[types.TracebackType], 

450 ) -> None: 

451 self.__enter__() 

452 

453 async def __aenter__(self) -> None: 

454 await self.acquire() 

455 

456 async def __aexit__( 

457 self, 

458 typ: "Optional[Type[BaseException]]", 

459 value: Optional[BaseException], 

460 tb: Optional[types.TracebackType], 

461 ) -> None: 

462 self.release() 

463 

464 

465class BoundedSemaphore(Semaphore): 

466 """A semaphore that prevents release() being called too many times. 

467 

468 If `.release` would increment the semaphore's value past the initial 

469 value, it raises `ValueError`. Semaphores are mostly used to guard 

470 resources with limited capacity, so a semaphore released too many times 

471 is a sign of a bug. 

472 """ 

473 

474 def __init__(self, value: int = 1) -> None: 

475 super().__init__(value=value) 

476 self._initial_value = value 

477 

478 def release(self) -> None: 

479 """Increment the counter and wake one waiter.""" 

480 if self._value >= self._initial_value: 

481 raise ValueError("Semaphore released too many times") 

482 super().release() 

483 

484 

485class Lock: 

486 """A lock for coroutines. 

487 

488 A Lock begins unlocked, and `acquire` locks it immediately. While it is 

489 locked, a coroutine that yields `acquire` waits until another coroutine 

490 calls `release`. 

491 

492 Releasing an unlocked lock raises `RuntimeError`. 

493 

494 A Lock can be used as an async context manager with the ``async 

495 with`` statement: 

496 

497 >>> from tornado import locks 

498 >>> lock = locks.Lock() 

499 >>> 

500 >>> async def f(): 

501 ... async with lock: 

502 ... # Do something holding the lock. 

503 ... pass 

504 ... 

505 ... # Now the lock is released. 

506 

507 For compatibility with older versions of Python, the `.acquire` 

508 method asynchronously returns a regular context manager: 

509 

510 >>> async def f2(): 

511 ... with (yield lock.acquire()): 

512 ... # Do something holding the lock. 

513 ... pass 

514 ... 

515 ... # Now the lock is released. 

516 

517 .. versionchanged:: 4.3 

518 Added ``async with`` support in Python 3.5. 

519 

520 """ 

521 

522 def __init__(self) -> None: 

523 self._block = BoundedSemaphore(value=1) 

524 

525 def __repr__(self) -> str: 

526 return f"<{self.__class__.__name__} _block={self._block}>" 

527 

528 def acquire( 

529 self, timeout: Optional[Union[float, datetime.timedelta]] = None 

530 ) -> Awaitable[_ReleasingContextManager]: 

531 """Attempt to lock. Returns an awaitable. 

532 

533 Returns an awaitable, which raises `tornado.util.TimeoutError` after a 

534 timeout. 

535 """ 

536 return self._block.acquire(timeout) 

537 

538 def release(self) -> None: 

539 """Unlock. 

540 

541 The first coroutine in line waiting for `acquire` gets the lock. 

542 

543 If not locked, raise a `RuntimeError`. 

544 """ 

545 try: 

546 self._block.release() 

547 except ValueError: 

548 raise RuntimeError("release unlocked lock") 

549 

550 def __enter__(self) -> None: 

551 raise RuntimeError("Use `async with` instead of `with` for Lock") 

552 

553 def __exit__( 

554 self, 

555 typ: "Optional[Type[BaseException]]", 

556 value: Optional[BaseException], 

557 tb: Optional[types.TracebackType], 

558 ) -> None: 

559 self.__enter__() 

560 

561 async def __aenter__(self) -> None: 

562 await self.acquire() 

563 

564 async def __aexit__( 

565 self, 

566 typ: "Optional[Type[BaseException]]", 

567 value: Optional[BaseException], 

568 tb: Optional[types.TracebackType], 

569 ) -> None: 

570 self.release()