Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/cli/spinners.py: 31%

104 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import contextlib 

2import itertools 

3import logging 

4import sys 

5import time 

6from typing import IO, Generator, Optional 

7 

8from pip._internal.utils.compat import WINDOWS 

9from pip._internal.utils.logging import get_indentation 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14class SpinnerInterface: 

15 def spin(self) -> None: 

16 raise NotImplementedError() 

17 

18 def finish(self, final_status: str) -> None: 

19 raise NotImplementedError() 

20 

21 

22class InteractiveSpinner(SpinnerInterface): 

23 def __init__( 

24 self, 

25 message: str, 

26 file: Optional[IO[str]] = None, 

27 spin_chars: str = "-\\|/", 

28 # Empirically, 8 updates/second looks nice 

29 min_update_interval_seconds: float = 0.125, 

30 ): 

31 self._message = message 

32 if file is None: 

33 file = sys.stdout 

34 self._file = file 

35 self._rate_limiter = RateLimiter(min_update_interval_seconds) 

36 self._finished = False 

37 

38 self._spin_cycle = itertools.cycle(spin_chars) 

39 

40 self._file.write(" " * get_indentation() + self._message + " ... ") 

41 self._width = 0 

42 

43 def _write(self, status: str) -> None: 

44 assert not self._finished 

45 # Erase what we wrote before by backspacing to the beginning, writing 

46 # spaces to overwrite the old text, and then backspacing again 

47 backup = "\b" * self._width 

48 self._file.write(backup + " " * self._width + backup) 

49 # Now we have a blank slate to add our status 

50 self._file.write(status) 

51 self._width = len(status) 

52 self._file.flush() 

53 self._rate_limiter.reset() 

54 

55 def spin(self) -> None: 

56 if self._finished: 

57 return 

58 if not self._rate_limiter.ready(): 

59 return 

60 self._write(next(self._spin_cycle)) 

61 

62 def finish(self, final_status: str) -> None: 

63 if self._finished: 

64 return 

65 self._write(final_status) 

66 self._file.write("\n") 

67 self._file.flush() 

68 self._finished = True 

69 

70 

71# Used for dumb terminals, non-interactive installs (no tty), etc. 

72# We still print updates occasionally (once every 60 seconds by default) to 

73# act as a keep-alive for systems like Travis-CI that take lack-of-output as 

74# an indication that a task has frozen. 

75class NonInteractiveSpinner(SpinnerInterface): 

76 def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None: 

77 self._message = message 

78 self._finished = False 

79 self._rate_limiter = RateLimiter(min_update_interval_seconds) 

80 self._update("started") 

81 

82 def _update(self, status: str) -> None: 

83 assert not self._finished 

84 self._rate_limiter.reset() 

85 logger.info("%s: %s", self._message, status) 

86 

87 def spin(self) -> None: 

88 if self._finished: 

89 return 

90 if not self._rate_limiter.ready(): 

91 return 

92 self._update("still running...") 

93 

94 def finish(self, final_status: str) -> None: 

95 if self._finished: 

96 return 

97 self._update(f"finished with status '{final_status}'") 

98 self._finished = True 

99 

100 

101class RateLimiter: 

102 def __init__(self, min_update_interval_seconds: float) -> None: 

103 self._min_update_interval_seconds = min_update_interval_seconds 

104 self._last_update: float = 0 

105 

106 def ready(self) -> bool: 

107 now = time.time() 

108 delta = now - self._last_update 

109 return delta >= self._min_update_interval_seconds 

110 

111 def reset(self) -> None: 

112 self._last_update = time.time() 

113 

114 

115@contextlib.contextmanager 

116def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]: 

117 # Interactive spinner goes directly to sys.stdout rather than being routed 

118 # through the logging system, but it acts like it has level INFO, 

119 # i.e. it's only displayed if we're at level INFO or better. 

120 # Non-interactive spinner goes through the logging system, so it is always 

121 # in sync with logging configuration. 

122 if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO: 

123 spinner: SpinnerInterface = InteractiveSpinner(message) 

124 else: 

125 spinner = NonInteractiveSpinner(message) 

126 try: 

127 with hidden_cursor(sys.stdout): 

128 yield spinner 

129 except KeyboardInterrupt: 

130 spinner.finish("canceled") 

131 raise 

132 except Exception: 

133 spinner.finish("error") 

134 raise 

135 else: 

136 spinner.finish("done") 

137 

138 

139HIDE_CURSOR = "\x1b[?25l" 

140SHOW_CURSOR = "\x1b[?25h" 

141 

142 

143@contextlib.contextmanager 

144def hidden_cursor(file: IO[str]) -> Generator[None, None, None]: 

145 # The Windows terminal does not support the hide/show cursor ANSI codes, 

146 # even via colorama. So don't even try. 

147 if WINDOWS: 

148 yield 

149 # We don't want to clutter the output with control characters if we're 

150 # writing to a file, or if the user is running with --quiet. 

151 # See https://github.com/pypa/pip/issues/3418 

152 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO: 

153 yield 

154 else: 

155 file.write(HIDE_CURSOR) 

156 try: 

157 yield 

158 finally: 

159 file.write(SHOW_CURSOR)