Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tqdm/_monitor.py: 83%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

47 statements  

1import atexit 

2from threading import Event, Thread, current_thread 

3from time import time 

4from warnings import warn 

5 

6__all__ = ["TMonitor", "TqdmSynchronisationWarning"] 

7 

8 

9class TqdmSynchronisationWarning(RuntimeWarning): 

10 """ 

11 tqdm multi-thread/-process errors which may cause incorrect nesting 

12 but otherwise no adverse effects 

13 """ 

14 

15 

16class TMonitor(Thread): 

17 """ 

18 Monitoring thread for tqdm bars. 

19 Monitors if tqdm bars are taking too much time to display 

20 and readjusts miniters automatically if necessary. 

21 

22 Parameters 

23 ---------- 

24 tqdm_cls : class 

25 tqdm class to use (can be core tqdm or a submodule). 

26 sleep_interval : float 

27 Time to sleep between monitoring checks. 

28 """ 

29 _test = {} # internal vars for unit testing 

30 

31 def __init__(self, tqdm_cls, sleep_interval): 

32 Thread.__init__(self, name="tqdm_monitor") 

33 self.daemon = True # kill thread when main killed (KeyboardInterrupt) 

34 self.woken = 0 # last time woken up, to sync with monitor 

35 self.tqdm_cls = tqdm_cls 

36 self.sleep_interval = sleep_interval 

37 self._time = self._test.get("time", time) 

38 self.was_killed = self._test.get("Event", Event)() 

39 atexit.register(self._atexit_signal) 

40 self.start() 

41 

42 def _atexit_signal(self): 

43 """ 

44 Non-joining shutdown signal. 

45 Avoids deadlocks at interpreter exit from other threads, dead forks, etc. 

46 This daemon thread is auto-reaped on shutdown without needing a join. 

47 """ 

48 self.was_killed.set() 

49 

50 def exit(self): 

51 self.was_killed.set() 

52 if self is not current_thread(): 

53 self.join() 

54 return self.report() 

55 

56 def get_instances(self): 

57 # returns a copy of started `tqdm_cls` instances 

58 return [i for i in self.tqdm_cls._instances.copy() 

59 # Avoid race by checking that the instance started 

60 if hasattr(i, 'start_t')] 

61 

62 def run(self): 

63 cur_t = self._time() 

64 while True: 

65 # After processing and before sleeping, notify that we woke 

66 # Need to be done just before sleeping 

67 self.woken = cur_t 

68 # Sleep some time... 

69 self.was_killed.wait(self.sleep_interval) 

70 # Quit if killed 

71 if self.was_killed.is_set(): 

72 return 

73 # Then monitor! 

74 # Acquire lock (to access _instances) 

75 with self.tqdm_cls.get_lock(): 

76 cur_t = self._time() 

77 # Check tqdm instances are waiting too long to print 

78 instances = self.get_instances() 

79 for instance in instances: 

80 # Check event in loop to reduce blocking time on exit 

81 if self.was_killed.is_set(): 

82 return 

83 # Only if mininterval > 1 (else iterations are just slow) 

84 # and last refresh exceeded maxinterval 

85 if ( 

86 instance.miniters > 1 

87 and (cur_t - instance.last_print_t) >= instance.maxinterval 

88 ): 

89 # force bypassing miniters on next iteration 

90 # (dynamic_miniters adjusts mininterval automatically) 

91 instance.miniters = 1 

92 # Refresh now! (works only for manual tqdm) 

93 instance.refresh(nolock=True) 

94 # Remove accidental long-lived strong reference 

95 del instance 

96 if instances != self.get_instances(): # pragma: nocover 

97 warn("Set changed size during iteration" + 

98 " (see https://github.com/tqdm/tqdm/issues/481)", 

99 TqdmSynchronisationWarning, stacklevel=2) 

100 # Remove accidental long-lived strong references 

101 del instances 

102 

103 def report(self): 

104 return not self.was_killed.is_set()