1from __future__ import annotations
2
3import contextlib
4import itertools
5import logging
6import sys
7import time
8from collections.abc import Generator
9from typing import IO, Final
10
11from pip._vendor.rich.console import (
12 Console,
13 ConsoleOptions,
14 RenderableType,
15 RenderResult,
16)
17from pip._vendor.rich.live import Live
18from pip._vendor.rich.measure import Measurement
19from pip._vendor.rich.text import Text
20
21from pip._internal.utils.compat import WINDOWS
22from pip._internal.utils.logging import get_console, get_indentation
23
24logger = logging.getLogger(__name__)
25
26SPINNER_CHARS: Final = r"-\|/"
27SPINS_PER_SECOND: Final = 8
28
29
30class SpinnerInterface:
31 def spin(self) -> None:
32 raise NotImplementedError()
33
34 def finish(self, final_status: str) -> None:
35 raise NotImplementedError()
36
37
38class InteractiveSpinner(SpinnerInterface):
39 def __init__(
40 self,
41 message: str,
42 file: IO[str] | None = None,
43 spin_chars: str = SPINNER_CHARS,
44 # Empirically, 8 updates/second looks nice
45 min_update_interval_seconds: float = 1 / SPINS_PER_SECOND,
46 ):
47 self._message = message
48 if file is None:
49 file = sys.stdout
50 self._file = file
51 self._rate_limiter = RateLimiter(min_update_interval_seconds)
52 self._finished = False
53
54 self._spin_cycle = itertools.cycle(spin_chars)
55
56 self._file.write(" " * get_indentation() + self._message + " ... ")
57 self._width = 0
58
59 def _write(self, status: str) -> None:
60 assert not self._finished
61 # Erase what we wrote before by backspacing to the beginning, writing
62 # spaces to overwrite the old text, and then backspacing again
63 backup = "\b" * self._width
64 self._file.write(backup + " " * self._width + backup)
65 # Now we have a blank slate to add our status
66 self._file.write(status)
67 self._width = len(status)
68 self._file.flush()
69 self._rate_limiter.reset()
70
71 def spin(self) -> None:
72 if self._finished:
73 return
74 if not self._rate_limiter.ready():
75 return
76 self._write(next(self._spin_cycle))
77
78 def finish(self, final_status: str) -> None:
79 if self._finished:
80 return
81 self._write(final_status)
82 self._file.write("\n")
83 self._file.flush()
84 self._finished = True
85
86
87# Used for dumb terminals, non-interactive installs (no tty), etc.
88# We still print updates occasionally (once every 60 seconds by default) to
89# act as a keep-alive for systems like Travis-CI that take lack-of-output as
90# an indication that a task has frozen.
91class NonInteractiveSpinner(SpinnerInterface):
92 def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
93 self._message = message
94 self._finished = False
95 self._rate_limiter = RateLimiter(min_update_interval_seconds)
96 self._update("started")
97
98 def _update(self, status: str) -> None:
99 assert not self._finished
100 self._rate_limiter.reset()
101 logger.info("%s: %s", self._message, status)
102
103 def spin(self) -> None:
104 if self._finished:
105 return
106 if not self._rate_limiter.ready():
107 return
108 self._update("still running...")
109
110 def finish(self, final_status: str) -> None:
111 if self._finished:
112 return
113 self._update(f"finished with status '{final_status}'")
114 self._finished = True
115
116
117class RateLimiter:
118 def __init__(self, min_update_interval_seconds: float) -> None:
119 self._min_update_interval_seconds = min_update_interval_seconds
120 self._last_update: float = 0
121
122 def ready(self) -> bool:
123 now = time.time()
124 delta = now - self._last_update
125 return delta >= self._min_update_interval_seconds
126
127 def reset(self) -> None:
128 self._last_update = time.time()
129
130
131@contextlib.contextmanager
132def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
133 # Interactive spinner goes directly to sys.stdout rather than being routed
134 # through the logging system, but it acts like it has level INFO,
135 # i.e. it's only displayed if we're at level INFO or better.
136 # Non-interactive spinner goes through the logging system, so it is always
137 # in sync with logging configuration.
138 if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
139 spinner: SpinnerInterface = InteractiveSpinner(message)
140 else:
141 spinner = NonInteractiveSpinner(message)
142 try:
143 with hidden_cursor(sys.stdout):
144 yield spinner
145 except KeyboardInterrupt:
146 spinner.finish("canceled")
147 raise
148 except Exception:
149 spinner.finish("error")
150 raise
151 else:
152 spinner.finish("done")
153
154
155class _PipRichSpinner:
156 """
157 Custom rich spinner that matches the style of the legacy spinners.
158
159 (*) Updates will be handled in a background thread by a rich live panel
160 which will call render() automatically at the appropriate time.
161 """
162
163 def __init__(self, label: str) -> None:
164 self.label = label
165 self._spin_cycle = itertools.cycle(SPINNER_CHARS)
166 self._spinner_text = ""
167 self._finished = False
168 self._indent = get_indentation() * " "
169
170 def __rich_console__(
171 self, console: Console, options: ConsoleOptions
172 ) -> RenderResult:
173 yield self.render()
174
175 def __rich_measure__(
176 self, console: Console, options: ConsoleOptions
177 ) -> Measurement:
178 text = self.render()
179 return Measurement.get(console, options, text)
180
181 def render(self) -> RenderableType:
182 if not self._finished:
183 self._spinner_text = next(self._spin_cycle)
184
185 return Text.assemble(self._indent, self.label, " ... ", self._spinner_text)
186
187 def finish(self, status: str) -> None:
188 """Stop spinning and set a final status message."""
189 self._spinner_text = status
190 self._finished = True
191
192
193@contextlib.contextmanager
194def open_rich_spinner(label: str, console: Console | None = None) -> Generator[None]:
195 if not logger.isEnabledFor(logging.INFO):
196 # Don't show spinner if --quiet is given.
197 yield
198 return
199
200 console = console or get_console()
201 spinner = _PipRichSpinner(label)
202 with Live(spinner, refresh_per_second=SPINS_PER_SECOND, console=console):
203 try:
204 yield
205 except KeyboardInterrupt:
206 spinner.finish("canceled")
207 raise
208 except Exception:
209 spinner.finish("error")
210 raise
211 else:
212 spinner.finish("done")
213
214
215HIDE_CURSOR = "\x1b[?25l"
216SHOW_CURSOR = "\x1b[?25h"
217
218
219@contextlib.contextmanager
220def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:
221 # The Windows terminal does not support the hide/show cursor ANSI codes,
222 # even via colorama. So don't even try.
223 if WINDOWS:
224 yield
225 # We don't want to clutter the output with control characters if we're
226 # writing to a file, or if the user is running with --quiet.
227 # See https://github.com/pypa/pip/issues/3418
228 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
229 yield
230 else:
231 file.write(HIDE_CURSOR)
232 try:
233 yield
234 finally:
235 file.write(SHOW_CURSOR)