Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/cli/spinners.py: 31%
104 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1import contextlib
2import itertools
3import logging
4import sys
5import time
6from typing import IO, Generator, Optional
8from pip._internal.utils.compat import WINDOWS
9from pip._internal.utils.logging import get_indentation
11logger = logging.getLogger(__name__)
14class SpinnerInterface:
15 def spin(self) -> None:
16 raise NotImplementedError()
18 def finish(self, final_status: str) -> None:
19 raise NotImplementedError()
22class InteractiveSpinner(SpinnerInterface):
23 def __init__(
24 self,
25 message: str,
26 file: Optional[IO[str]] = None,
27 spin_chars: str = "-\\|/",
28 # Empirically, 8 updates/second looks nice
29 min_update_interval_seconds: float = 0.125,
30 ):
31 self._message = message
32 if file is None:
33 file = sys.stdout
34 self._file = file
35 self._rate_limiter = RateLimiter(min_update_interval_seconds)
36 self._finished = False
38 self._spin_cycle = itertools.cycle(spin_chars)
40 self._file.write(" " * get_indentation() + self._message + " ... ")
41 self._width = 0
43 def _write(self, status: str) -> None:
44 assert not self._finished
45 # Erase what we wrote before by backspacing to the beginning, writing
46 # spaces to overwrite the old text, and then backspacing again
47 backup = "\b" * self._width
48 self._file.write(backup + " " * self._width + backup)
49 # Now we have a blank slate to add our status
50 self._file.write(status)
51 self._width = len(status)
52 self._file.flush()
53 self._rate_limiter.reset()
55 def spin(self) -> None:
56 if self._finished:
57 return
58 if not self._rate_limiter.ready():
59 return
60 self._write(next(self._spin_cycle))
62 def finish(self, final_status: str) -> None:
63 if self._finished:
64 return
65 self._write(final_status)
66 self._file.write("\n")
67 self._file.flush()
68 self._finished = True
71# Used for dumb terminals, non-interactive installs (no tty), etc.
72# We still print updates occasionally (once every 60 seconds by default) to
73# act as a keep-alive for systems like Travis-CI that take lack-of-output as
74# an indication that a task has frozen.
75class NonInteractiveSpinner(SpinnerInterface):
76 def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
77 self._message = message
78 self._finished = False
79 self._rate_limiter = RateLimiter(min_update_interval_seconds)
80 self._update("started")
82 def _update(self, status: str) -> None:
83 assert not self._finished
84 self._rate_limiter.reset()
85 logger.info("%s: %s", self._message, status)
87 def spin(self) -> None:
88 if self._finished:
89 return
90 if not self._rate_limiter.ready():
91 return
92 self._update("still running...")
94 def finish(self, final_status: str) -> None:
95 if self._finished:
96 return
97 self._update(f"finished with status '{final_status}'")
98 self._finished = True
101class RateLimiter:
102 def __init__(self, min_update_interval_seconds: float) -> None:
103 self._min_update_interval_seconds = min_update_interval_seconds
104 self._last_update: float = 0
106 def ready(self) -> bool:
107 now = time.time()
108 delta = now - self._last_update
109 return delta >= self._min_update_interval_seconds
111 def reset(self) -> None:
112 self._last_update = time.time()
115@contextlib.contextmanager
116def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
117 # Interactive spinner goes directly to sys.stdout rather than being routed
118 # through the logging system, but it acts like it has level INFO,
119 # i.e. it's only displayed if we're at level INFO or better.
120 # Non-interactive spinner goes through the logging system, so it is always
121 # in sync with logging configuration.
122 if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
123 spinner: SpinnerInterface = InteractiveSpinner(message)
124 else:
125 spinner = NonInteractiveSpinner(message)
126 try:
127 with hidden_cursor(sys.stdout):
128 yield spinner
129 except KeyboardInterrupt:
130 spinner.finish("canceled")
131 raise
132 except Exception:
133 spinner.finish("error")
134 raise
135 else:
136 spinner.finish("done")
139HIDE_CURSOR = "\x1b[?25l"
140SHOW_CURSOR = "\x1b[?25h"
143@contextlib.contextmanager
144def hidden_cursor(file: IO[str]) -> Generator[None, None, None]:
145 # The Windows terminal does not support the hide/show cursor ANSI codes,
146 # even via colorama. So don't even try.
147 if WINDOWS:
148 yield
149 # We don't want to clutter the output with control characters if we're
150 # writing to a file, or if the user is running with --quiet.
151 # See https://github.com/pypa/pip/issues/3418
152 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
153 yield
154 else:
155 file.write(HIDE_CURSOR)
156 try:
157 yield
158 finally:
159 file.write(SHOW_CURSOR)