Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tqdm/_monitor.py: 83%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import atexit
2from threading import Event, Thread, current_thread
3from time import time
4from warnings import warn
6__all__ = ["TMonitor", "TqdmSynchronisationWarning"]
9class TqdmSynchronisationWarning(RuntimeWarning):
10 """
11 tqdm multi-thread/-process errors which may cause incorrect nesting
12 but otherwise no adverse effects
13 """
16class TMonitor(Thread):
17 """
18 Monitoring thread for tqdm bars.
19 Monitors if tqdm bars are taking too much time to display
20 and readjusts miniters automatically if necessary.
22 Parameters
23 ----------
24 tqdm_cls : class
25 tqdm class to use (can be core tqdm or a submodule).
26 sleep_interval : float
27 Time to sleep between monitoring checks.
28 """
29 _test = {} # internal vars for unit testing
31 def __init__(self, tqdm_cls, sleep_interval):
32 Thread.__init__(self, name="tqdm_monitor")
33 self.daemon = True # kill thread when main killed (KeyboardInterrupt)
34 self.woken = 0 # last time woken up, to sync with monitor
35 self.tqdm_cls = tqdm_cls
36 self.sleep_interval = sleep_interval
37 self._time = self._test.get("time", time)
38 self.was_killed = self._test.get("Event", Event)()
39 atexit.register(self._atexit_signal)
40 self.start()
42 def _atexit_signal(self):
43 """
44 Non-joining shutdown signal.
45 Avoids deadlocks at interpreter exit from other threads, dead forks, etc.
46 This daemon thread is auto-reaped on shutdown without needing a join.
47 """
48 self.was_killed.set()
50 def exit(self):
51 self.was_killed.set()
52 if self is not current_thread():
53 self.join()
54 return self.report()
56 def get_instances(self):
57 # returns a copy of started `tqdm_cls` instances
58 return [i for i in self.tqdm_cls._instances.copy()
59 # Avoid race by checking that the instance started
60 if hasattr(i, 'start_t')]
62 def run(self):
63 cur_t = self._time()
64 while True:
65 # After processing and before sleeping, notify that we woke
66 # Need to be done just before sleeping
67 self.woken = cur_t
68 # Sleep some time...
69 self.was_killed.wait(self.sleep_interval)
70 # Quit if killed
71 if self.was_killed.is_set():
72 return
73 # Then monitor!
74 # Acquire lock (to access _instances)
75 with self.tqdm_cls.get_lock():
76 cur_t = self._time()
77 # Check tqdm instances are waiting too long to print
78 instances = self.get_instances()
79 for instance in instances:
80 # Check event in loop to reduce blocking time on exit
81 if self.was_killed.is_set():
82 return
83 # Only if mininterval > 1 (else iterations are just slow)
84 # and last refresh exceeded maxinterval
85 if (
86 instance.miniters > 1
87 and (cur_t - instance.last_print_t) >= instance.maxinterval
88 ):
89 # force bypassing miniters on next iteration
90 # (dynamic_miniters adjusts mininterval automatically)
91 instance.miniters = 1
92 # Refresh now! (works only for manual tqdm)
93 instance.refresh(nolock=True)
94 # Remove accidental long-lived strong reference
95 del instance
96 if instances != self.get_instances(): # pragma: nocover
97 warn("Set changed size during iteration" +
98 " (see https://github.com/tqdm/tqdm/issues/481)",
99 TqdmSynchronisationWarning, stacklevel=2)
100 # Remove accidental long-lived strong references
101 del instances
103 def report(self):
104 return not self.was_killed.is_set()