Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/cli/spinners.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

148 statements  

1from __future__ import annotations 

2 

3import contextlib 

4import itertools 

5import logging 

6import sys 

7import time 

8from collections.abc import Generator 

9from typing import IO, Final 

10 

11from pip._vendor.rich.console import ( 

12 Console, 

13 ConsoleOptions, 

14 RenderableType, 

15 RenderResult, 

16) 

17from pip._vendor.rich.live import Live 

18from pip._vendor.rich.measure import Measurement 

19from pip._vendor.rich.text import Text 

20 

21from pip._internal.utils.compat import WINDOWS 

22from pip._internal.utils.logging import get_console, get_indentation 

23 

24logger = logging.getLogger(__name__) 

25 

26SPINNER_CHARS: Final = r"-\|/" 

27SPINS_PER_SECOND: Final = 8 

28 

29 

30class SpinnerInterface: 

31 def spin(self) -> None: 

32 raise NotImplementedError() 

33 

34 def finish(self, final_status: str) -> None: 

35 raise NotImplementedError() 

36 

37 

38class InteractiveSpinner(SpinnerInterface): 

39 def __init__( 

40 self, 

41 message: str, 

42 file: IO[str] | None = None, 

43 spin_chars: str = SPINNER_CHARS, 

44 # Empirically, 8 updates/second looks nice 

45 min_update_interval_seconds: float = 1 / SPINS_PER_SECOND, 

46 ): 

47 self._message = message 

48 if file is None: 

49 file = sys.stdout 

50 self._file = file 

51 self._rate_limiter = RateLimiter(min_update_interval_seconds) 

52 self._finished = False 

53 

54 self._spin_cycle = itertools.cycle(spin_chars) 

55 

56 self._file.write(" " * get_indentation() + self._message + " ... ") 

57 self._width = 0 

58 

59 def _write(self, status: str) -> None: 

60 assert not self._finished 

61 # Erase what we wrote before by backspacing to the beginning, writing 

62 # spaces to overwrite the old text, and then backspacing again 

63 backup = "\b" * self._width 

64 self._file.write(backup + " " * self._width + backup) 

65 # Now we have a blank slate to add our status 

66 self._file.write(status) 

67 self._width = len(status) 

68 self._file.flush() 

69 self._rate_limiter.reset() 

70 

71 def spin(self) -> None: 

72 if self._finished: 

73 return 

74 if not self._rate_limiter.ready(): 

75 return 

76 self._write(next(self._spin_cycle)) 

77 

78 def finish(self, final_status: str) -> None: 

79 if self._finished: 

80 return 

81 self._write(final_status) 

82 self._file.write("\n") 

83 self._file.flush() 

84 self._finished = True 

85 

86 

87# Used for dumb terminals, non-interactive installs (no tty), etc. 

88# We still print updates occasionally (once every 60 seconds by default) to 

89# act as a keep-alive for systems like Travis-CI that take lack-of-output as 

90# an indication that a task has frozen. 

91class NonInteractiveSpinner(SpinnerInterface): 

92 def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None: 

93 self._message = message 

94 self._finished = False 

95 self._rate_limiter = RateLimiter(min_update_interval_seconds) 

96 self._update("started") 

97 

98 def _update(self, status: str) -> None: 

99 assert not self._finished 

100 self._rate_limiter.reset() 

101 logger.info("%s: %s", self._message, status) 

102 

103 def spin(self) -> None: 

104 if self._finished: 

105 return 

106 if not self._rate_limiter.ready(): 

107 return 

108 self._update("still running...") 

109 

110 def finish(self, final_status: str) -> None: 

111 if self._finished: 

112 return 

113 self._update(f"finished with status '{final_status}'") 

114 self._finished = True 

115 

116 

117class RateLimiter: 

118 def __init__(self, min_update_interval_seconds: float) -> None: 

119 self._min_update_interval_seconds = min_update_interval_seconds 

120 self._last_update: float = 0 

121 

122 def ready(self) -> bool: 

123 now = time.time() 

124 delta = now - self._last_update 

125 return delta >= self._min_update_interval_seconds 

126 

127 def reset(self) -> None: 

128 self._last_update = time.time() 

129 

130 

131@contextlib.contextmanager 

132def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]: 

133 # Interactive spinner goes directly to sys.stdout rather than being routed 

134 # through the logging system, but it acts like it has level INFO, 

135 # i.e. it's only displayed if we're at level INFO or better. 

136 # Non-interactive spinner goes through the logging system, so it is always 

137 # in sync with logging configuration. 

138 if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO: 

139 spinner: SpinnerInterface = InteractiveSpinner(message) 

140 else: 

141 spinner = NonInteractiveSpinner(message) 

142 try: 

143 with hidden_cursor(sys.stdout): 

144 yield spinner 

145 except KeyboardInterrupt: 

146 spinner.finish("canceled") 

147 raise 

148 except Exception: 

149 spinner.finish("error") 

150 raise 

151 else: 

152 spinner.finish("done") 

153 

154 

155class _PipRichSpinner: 

156 """ 

157 Custom rich spinner that matches the style of the legacy spinners. 

158 

159 (*) Updates will be handled in a background thread by a rich live panel 

160 which will call render() automatically at the appropriate time. 

161 """ 

162 

163 def __init__(self, label: str) -> None: 

164 self.label = label 

165 self._spin_cycle = itertools.cycle(SPINNER_CHARS) 

166 self._spinner_text = "" 

167 self._finished = False 

168 self._indent = get_indentation() * " " 

169 

170 def __rich_console__( 

171 self, console: Console, options: ConsoleOptions 

172 ) -> RenderResult: 

173 yield self.render() 

174 

175 def __rich_measure__( 

176 self, console: Console, options: ConsoleOptions 

177 ) -> Measurement: 

178 text = self.render() 

179 return Measurement.get(console, options, text) 

180 

181 def render(self) -> RenderableType: 

182 if not self._finished: 

183 self._spinner_text = next(self._spin_cycle) 

184 

185 return Text.assemble(self._indent, self.label, " ... ", self._spinner_text) 

186 

187 def finish(self, status: str) -> None: 

188 """Stop spinning and set a final status message.""" 

189 self._spinner_text = status 

190 self._finished = True 

191 

192 

193@contextlib.contextmanager 

194def open_rich_spinner(label: str, console: Console | None = None) -> Generator[None]: 

195 if not logger.isEnabledFor(logging.INFO): 

196 # Don't show spinner if --quiet is given. 

197 yield 

198 return 

199 

200 console = console or get_console() 

201 spinner = _PipRichSpinner(label) 

202 with Live(spinner, refresh_per_second=SPINS_PER_SECOND, console=console): 

203 try: 

204 yield 

205 except KeyboardInterrupt: 

206 spinner.finish("canceled") 

207 raise 

208 except Exception: 

209 spinner.finish("error") 

210 raise 

211 else: 

212 spinner.finish("done") 

213 

214 

215HIDE_CURSOR = "\x1b[?25l" 

216SHOW_CURSOR = "\x1b[?25h" 

217 

218 

219@contextlib.contextmanager 

220def hidden_cursor(file: IO[str]) -> Generator[None, None, None]: 

221 # The Windows terminal does not support the hide/show cursor ANSI codes, 

222 # even via colorama. So don't even try. 

223 if WINDOWS: 

224 yield 

225 # We don't want to clutter the output with control characters if we're 

226 # writing to a file, or if the user is running with --quiet. 

227 # See https://github.com/pypa/pip/issues/3418 

228 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO: 

229 yield 

230 else: 

231 file.write(HIDE_CURSOR) 

232 try: 

233 yield 

234 finally: 

235 file.write(SHOW_CURSOR)