Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/locks.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2015 The Tornado Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15import collections
16import datetime
17import types
19from tornado import gen, ioloop
20from tornado.concurrent import Future, future_set_result_unless_cancelled
22from typing import Union, Optional, Type, Any, Awaitable
23import typing
25if typing.TYPE_CHECKING:
26 from typing import Deque, Set # noqa: F401
28__all__ = ["Condition", "Event", "Semaphore", "BoundedSemaphore", "Lock"]
31class _TimeoutGarbageCollector:
32 """Base class for objects that periodically clean up timed-out waiters.
34 Avoids memory leak in a common pattern like:
36 while True:
37 yield condition.wait(short_timeout)
38 print('looping....')
39 """
41 def __init__(self) -> None:
42 self._waiters = collections.deque() # type: Deque[Future]
43 self._timeouts = 0
45 def _garbage_collect(self) -> None:
46 # Occasionally clear timed-out waiters.
47 self._timeouts += 1
48 if self._timeouts > 100:
49 self._timeouts = 0
50 self._waiters = collections.deque(w for w in self._waiters if not w.done())
53class Condition(_TimeoutGarbageCollector):
54 """A condition allows one or more coroutines to wait until notified.
56 Like a standard `threading.Condition`, but does not need an underlying lock
57 that is acquired and released.
59 With a `Condition`, coroutines can wait to be notified by other coroutines:
61 .. testcode::
63 import asyncio
64 from tornado import gen
65 from tornado.locks import Condition
67 condition = Condition()
69 async def waiter():
70 print("I'll wait right here")
71 await condition.wait()
72 print("I'm done waiting")
74 async def notifier():
75 print("About to notify")
76 condition.notify()
77 print("Done notifying")
79 async def runner():
80 # Wait for waiter() and notifier() in parallel
81 await gen.multi([waiter(), notifier()])
83 asyncio.run(runner())
85 .. testoutput::
87 I'll wait right here
88 About to notify
89 Done notifying
90 I'm done waiting
92 `wait` takes an optional ``timeout`` argument, which is either an absolute
93 timestamp::
95 io_loop = IOLoop.current()
97 # Wait up to 1 second for a notification.
98 await condition.wait(timeout=io_loop.time() + 1)
100 ...or a `datetime.timedelta` for a timeout relative to the current time::
102 # Wait up to 1 second.
103 await condition.wait(timeout=datetime.timedelta(seconds=1))
105 The method returns False if there's no notification before the deadline.
107 .. versionchanged:: 5.0
108 Previously, waiters could be notified synchronously from within
109 `notify`. Now, the notification will always be received on the
110 next iteration of the `.IOLoop`.
111 """
113 def __repr__(self) -> str:
114 result = f"<{self.__class__.__name__}"
115 if self._waiters:
116 result += " waiters[%s]" % len(self._waiters)
117 return result + ">"
119 def wait(
120 self, timeout: Optional[Union[float, datetime.timedelta]] = None
121 ) -> Awaitable[bool]:
122 """Wait for `.notify`.
124 Returns a `.Future` that resolves ``True`` if the condition is notified,
125 or ``False`` after a timeout.
126 """
127 waiter = Future() # type: Future[bool]
128 self._waiters.append(waiter)
129 if timeout:
131 def on_timeout() -> None:
132 if not waiter.done():
133 future_set_result_unless_cancelled(waiter, False)
134 self._garbage_collect()
136 io_loop = ioloop.IOLoop.current()
137 timeout_handle = io_loop.add_timeout(timeout, on_timeout)
138 waiter.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle))
139 return waiter
141 def notify(self, n: int = 1) -> None:
142 """Wake ``n`` waiters."""
143 waiters = [] # Waiters we plan to run right now.
144 while n and self._waiters:
145 waiter = self._waiters.popleft()
146 if not waiter.done(): # Might have timed out.
147 n -= 1
148 waiters.append(waiter)
150 for waiter in waiters:
151 future_set_result_unless_cancelled(waiter, True)
153 def notify_all(self) -> None:
154 """Wake all waiters."""
155 self.notify(len(self._waiters))
158class Event:
159 """An event blocks coroutines until its internal flag is set to True.
161 Similar to `threading.Event`.
163 A coroutine can wait for an event to be set. Once it is set, calls to
164 ``yield event.wait()`` will not block unless the event has been cleared:
166 .. testcode::
168 import asyncio
169 from tornado import gen
170 from tornado.locks import Event
172 event = Event()
174 async def waiter():
175 print("Waiting for event")
176 await event.wait()
177 print("Not waiting this time")
178 await event.wait()
179 print("Done")
181 async def setter():
182 print("About to set the event")
183 event.set()
185 async def runner():
186 await gen.multi([waiter(), setter()])
188 asyncio.run(runner())
190 .. testoutput::
192 Waiting for event
193 About to set the event
194 Not waiting this time
195 Done
196 """
198 def __init__(self) -> None:
199 self._value = False
200 self._waiters = set() # type: Set[Future[None]]
202 def __repr__(self) -> str:
203 return "<{} {}>".format(
204 self.__class__.__name__,
205 "set" if self.is_set() else "clear",
206 )
208 def is_set(self) -> bool:
209 """Return ``True`` if the internal flag is true."""
210 return self._value
212 def set(self) -> None:
213 """Set the internal flag to ``True``. All waiters are awakened.
215 Calling `.wait` once the flag is set will not block.
216 """
217 if not self._value:
218 self._value = True
220 for fut in self._waiters:
221 if not fut.done():
222 fut.set_result(None)
224 def clear(self) -> None:
225 """Reset the internal flag to ``False``.
227 Calls to `.wait` will block until `.set` is called.
228 """
229 self._value = False
231 def wait(
232 self, timeout: Optional[Union[float, datetime.timedelta]] = None
233 ) -> Awaitable[None]:
234 """Block until the internal flag is true.
236 Returns an awaitable, which raises `tornado.util.TimeoutError` after a
237 timeout.
238 """
239 fut = Future() # type: Future[None]
240 if self._value:
241 fut.set_result(None)
242 return fut
243 self._waiters.add(fut)
244 fut.add_done_callback(lambda fut: self._waiters.remove(fut))
245 if timeout is None:
246 return fut
247 else:
248 timeout_fut = gen.with_timeout(timeout, fut)
249 # This is a slightly clumsy workaround for the fact that
250 # gen.with_timeout doesn't cancel its futures. Cancelling
251 # fut will remove it from the waiters list.
252 timeout_fut.add_done_callback(
253 lambda tf: fut.cancel() if not fut.done() else None
254 )
255 return timeout_fut
258class _ReleasingContextManager:
259 """Releases a Lock or Semaphore at the end of a "with" statement.
261 with (yield semaphore.acquire()):
262 pass
264 # Now semaphore.release() has been called.
265 """
267 def __init__(self, obj: Any) -> None:
268 self._obj = obj
270 def __enter__(self) -> None:
271 pass
273 def __exit__(
274 self,
275 exc_type: "Optional[Type[BaseException]]",
276 exc_val: Optional[BaseException],
277 exc_tb: Optional[types.TracebackType],
278 ) -> None:
279 self._obj.release()
282class Semaphore(_TimeoutGarbageCollector):
283 """A lock that can be acquired a fixed number of times before blocking.
285 A Semaphore manages a counter representing the number of `.release` calls
286 minus the number of `.acquire` calls, plus an initial value. The `.acquire`
287 method blocks if necessary until it can return without making the counter
288 negative.
290 Semaphores limit access to a shared resource. To allow access for two
291 workers at a time:
293 .. testsetup:: semaphore
295 from collections import deque
297 from tornado import gen
298 from tornado.ioloop import IOLoop
299 from tornado.concurrent import Future
301 inited = False
303 async def simulator(futures):
304 for f in futures:
305 # simulate the asynchronous passage of time
306 await gen.sleep(0)
307 await gen.sleep(0)
308 f.set_result(None)
310 def use_some_resource():
311 global inited
312 global futures_q
313 if not inited:
314 inited = True
315 # Ensure reliable doctest output: resolve Futures one at a time.
316 futures_q = deque([Future() for _ in range(3)])
317 IOLoop.current().add_callback(simulator, list(futures_q))
319 return futures_q.popleft()
321 .. testcode:: semaphore
323 import asyncio
324 from tornado import gen
325 from tornado.locks import Semaphore
327 sem = Semaphore(2)
329 async def worker(worker_id):
330 await sem.acquire()
331 try:
332 print("Worker %d is working" % worker_id)
333 await use_some_resource()
334 finally:
335 print("Worker %d is done" % worker_id)
336 sem.release()
338 async def runner():
339 # Join all workers.
340 await gen.multi([worker(i) for i in range(3)])
342 asyncio.run(runner())
344 .. testoutput:: semaphore
346 Worker 0 is working
347 Worker 1 is working
348 Worker 0 is done
349 Worker 2 is working
350 Worker 1 is done
351 Worker 2 is done
353 Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
354 the semaphore has been released once, by worker 0.
356 The semaphore can be used as an async context manager::
358 async def worker(worker_id):
359 async with sem:
360 print("Worker %d is working" % worker_id)
361 await use_some_resource()
363 # Now the semaphore has been released.
364 print("Worker %d is done" % worker_id)
366 For compatibility with older versions of Python, `.acquire` is a
367 context manager, so ``worker`` could also be written as::
369 @gen.coroutine
370 def worker(worker_id):
371 with (yield sem.acquire()):
372 print("Worker %d is working" % worker_id)
373 yield use_some_resource()
375 # Now the semaphore has been released.
376 print("Worker %d is done" % worker_id)
378 .. versionchanged:: 4.3
379 Added ``async with`` support in Python 3.5.
381 """
383 def __init__(self, value: int = 1) -> None:
384 super().__init__()
385 if value < 0:
386 raise ValueError("semaphore initial value must be >= 0")
388 self._value = value
390 def __repr__(self) -> str:
391 res = super().__repr__()
392 extra = "locked" if self._value == 0 else f"unlocked,value:{self._value}"
393 if self._waiters:
394 extra = f"{extra},waiters:{len(self._waiters)}"
395 return f"<{res[1:-1]} [{extra}]>"
397 def release(self) -> None:
398 """Increment the counter and wake one waiter."""
399 self._value += 1
400 while self._waiters:
401 waiter = self._waiters.popleft()
402 if not waiter.done():
403 self._value -= 1
405 # If the waiter is a coroutine paused at
406 #
407 # with (yield semaphore.acquire()):
408 #
409 # then the context manager's __exit__ calls release() at the end
410 # of the "with" block.
411 waiter.set_result(_ReleasingContextManager(self))
412 break
414 def acquire(
415 self, timeout: Optional[Union[float, datetime.timedelta]] = None
416 ) -> Awaitable[_ReleasingContextManager]:
417 """Decrement the counter. Returns an awaitable.
419 Block if the counter is zero and wait for a `.release`. The awaitable
420 raises `.TimeoutError` after the deadline.
421 """
422 waiter = Future() # type: Future[_ReleasingContextManager]
423 if self._value > 0:
424 self._value -= 1
425 waiter.set_result(_ReleasingContextManager(self))
426 else:
427 self._waiters.append(waiter)
428 if timeout:
430 def on_timeout() -> None:
431 if not waiter.done():
432 waiter.set_exception(gen.TimeoutError())
433 self._garbage_collect()
435 io_loop = ioloop.IOLoop.current()
436 timeout_handle = io_loop.add_timeout(timeout, on_timeout)
437 waiter.add_done_callback(
438 lambda _: io_loop.remove_timeout(timeout_handle)
439 )
440 return waiter
442 def __enter__(self) -> None:
443 raise RuntimeError("Use 'async with' instead of 'with' for Semaphore")
445 def __exit__(
446 self,
447 typ: "Optional[Type[BaseException]]",
448 value: Optional[BaseException],
449 traceback: Optional[types.TracebackType],
450 ) -> None:
451 self.__enter__()
453 async def __aenter__(self) -> None:
454 await self.acquire()
456 async def __aexit__(
457 self,
458 typ: "Optional[Type[BaseException]]",
459 value: Optional[BaseException],
460 tb: Optional[types.TracebackType],
461 ) -> None:
462 self.release()
465class BoundedSemaphore(Semaphore):
466 """A semaphore that prevents release() being called too many times.
468 If `.release` would increment the semaphore's value past the initial
469 value, it raises `ValueError`. Semaphores are mostly used to guard
470 resources with limited capacity, so a semaphore released too many times
471 is a sign of a bug.
472 """
474 def __init__(self, value: int = 1) -> None:
475 super().__init__(value=value)
476 self._initial_value = value
478 def release(self) -> None:
479 """Increment the counter and wake one waiter."""
480 if self._value >= self._initial_value:
481 raise ValueError("Semaphore released too many times")
482 super().release()
485class Lock:
486 """A lock for coroutines.
488 A Lock begins unlocked, and `acquire` locks it immediately. While it is
489 locked, a coroutine that yields `acquire` waits until another coroutine
490 calls `release`.
492 Releasing an unlocked lock raises `RuntimeError`.
494 A Lock can be used as an async context manager with the ``async
495 with`` statement:
497 >>> from tornado import locks
498 >>> lock = locks.Lock()
499 >>>
500 >>> async def f():
501 ... async with lock:
502 ... # Do something holding the lock.
503 ... pass
504 ...
505 ... # Now the lock is released.
507 For compatibility with older versions of Python, the `.acquire`
508 method asynchronously returns a regular context manager:
510 >>> async def f2():
511 ... with (yield lock.acquire()):
512 ... # Do something holding the lock.
513 ... pass
514 ...
515 ... # Now the lock is released.
517 .. versionchanged:: 4.3
518 Added ``async with`` support in Python 3.5.
520 """
522 def __init__(self) -> None:
523 self._block = BoundedSemaphore(value=1)
525 def __repr__(self) -> str:
526 return f"<{self.__class__.__name__} _block={self._block}>"
528 def acquire(
529 self, timeout: Optional[Union[float, datetime.timedelta]] = None
530 ) -> Awaitable[_ReleasingContextManager]:
531 """Attempt to lock. Returns an awaitable.
533 Returns an awaitable, which raises `tornado.util.TimeoutError` after a
534 timeout.
535 """
536 return self._block.acquire(timeout)
538 def release(self) -> None:
539 """Unlock.
541 The first coroutine in line waiting for `acquire` gets the lock.
543 If not locked, raise a `RuntimeError`.
544 """
545 try:
546 self._block.release()
547 except ValueError:
548 raise RuntimeError("release unlocked lock")
550 def __enter__(self) -> None:
551 raise RuntimeError("Use `async with` instead of `with` for Lock")
553 def __exit__(
554 self,
555 typ: "Optional[Type[BaseException]]",
556 value: Optional[BaseException],
557 tb: Optional[types.TracebackType],
558 ) -> None:
559 self.__enter__()
561 async def __aenter__(self) -> None:
562 await self.acquire()
564 async def __aexit__(
565 self,
566 typ: "Optional[Type[BaseException]]",
567 value: Optional[BaseException],
568 tb: Optional[types.TracebackType],
569 ) -> None:
570 self.release()