Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tqdm/_monitor.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import atexit
2from threading import Event, Thread, current_thread
3from time import time
4from warnings import warn
6__all__ = ["TMonitor", "TqdmSynchronisationWarning"]
9class TqdmSynchronisationWarning(RuntimeWarning):
10 """tqdm multi-thread/-process errors which may cause incorrect nesting
11 but otherwise no adverse effects"""
12 pass
15class TMonitor(Thread):
16 """
17 Monitoring thread for tqdm bars.
18 Monitors if tqdm bars are taking too much time to display
19 and readjusts miniters automatically if necessary.
21 Parameters
22 ----------
23 tqdm_cls : class
24 tqdm class to use (can be core tqdm or a submodule).
25 sleep_interval : float
26 Time to sleep between monitoring checks.
27 """
28 _test = {} # internal vars for unit testing
30 def __init__(self, tqdm_cls, sleep_interval):
31 Thread.__init__(self)
32 self.daemon = True # kill thread when main killed (KeyboardInterrupt)
33 self.woken = 0 # last time woken up, to sync with monitor
34 self.tqdm_cls = tqdm_cls
35 self.sleep_interval = sleep_interval
36 self._time = self._test.get("time", time)
37 self.was_killed = self._test.get("Event", Event)()
38 atexit.register(self.exit)
39 self.start()
41 def exit(self):
42 self.was_killed.set()
43 if self is not current_thread():
44 self.join()
45 return self.report()
47 def get_instances(self):
48 # returns a copy of started `tqdm_cls` instances
49 return [i for i in self.tqdm_cls._instances.copy()
50 # Avoid race by checking that the instance started
51 if hasattr(i, 'start_t')]
53 def run(self):
54 cur_t = self._time()
55 while True:
56 # After processing and before sleeping, notify that we woke
57 # Need to be done just before sleeping
58 self.woken = cur_t
59 # Sleep some time...
60 self.was_killed.wait(self.sleep_interval)
61 # Quit if killed
62 if self.was_killed.is_set():
63 return
64 # Then monitor!
65 # Acquire lock (to access _instances)
66 with self.tqdm_cls.get_lock():
67 cur_t = self._time()
68 # Check tqdm instances are waiting too long to print
69 instances = self.get_instances()
70 for instance in instances:
71 # Check event in loop to reduce blocking time on exit
72 if self.was_killed.is_set():
73 return
74 # Only if mininterval > 1 (else iterations are just slow)
75 # and last refresh exceeded maxinterval
76 if (
77 instance.miniters > 1
78 and (cur_t - instance.last_print_t) >= instance.maxinterval
79 ):
80 # force bypassing miniters on next iteration
81 # (dynamic_miniters adjusts mininterval automatically)
82 instance.miniters = 1
83 # Refresh now! (works only for manual tqdm)
84 instance.refresh(nolock=True)
85 # Remove accidental long-lived strong reference
86 del instance
87 if instances != self.get_instances(): # pragma: nocover
88 warn("Set changed size during iteration" +
89 " (see https://github.com/tqdm/tqdm/issues/481)",
90 TqdmSynchronisationWarning, stacklevel=2)
91 # Remove accidental long-lived strong references
92 del instances
94 def report(self):
95 return not self.was_killed.is_set()