Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rich/live.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import sys
4from threading import Event, RLock, Thread
5from types import TracebackType
6from typing import IO, TYPE_CHECKING, Any, Callable, List, Optional, TextIO, Type, cast
8from . import get_console
9from .console import Console, ConsoleRenderable, Group, RenderableType, RenderHook
10from .control import Control
11from .file_proxy import FileProxy
12from .jupyter import JupyterMixin
13from .live_render import LiveRender, VerticalOverflowMethod
14from .screen import Screen
15from .text import Text
17if TYPE_CHECKING:
18 # Can be replaced with `from typing import Self` in Python 3.11+
19 from typing_extensions import Self # pragma: no cover
22class _RefreshThread(Thread):
23 """A thread that calls refresh() at regular intervals."""
25 def __init__(self, live: "Live", refresh_per_second: float) -> None:
26 self.live = live
27 self.refresh_per_second = refresh_per_second
28 self.done = Event()
29 super().__init__(daemon=True)
31 def stop(self) -> None:
32 self.done.set()
34 def run(self) -> None:
35 while not self.done.wait(1 / self.refresh_per_second):
36 with self.live._lock:
37 if not self.done.is_set():
38 self.live.refresh()
41class Live(JupyterMixin, RenderHook):
42 """Renders an auto-updating live display of any given renderable.
44 Args:
45 renderable (RenderableType, optional): The renderable to live display. Defaults to displaying nothing.
46 console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout.
47 screen (bool, optional): Enable alternate screen mode. Defaults to False.
48 auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()` or `update()` with refresh flag. Defaults to True
49 refresh_per_second (float, optional): Number of times per second to refresh the live display. Defaults to 4.
50 transient (bool, optional): Clear the renderable on exit (has no effect when screen=True). Defaults to False.
51 redirect_stdout (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True.
52 redirect_stderr (bool, optional): Enable redirection of stderr. Defaults to True.
53 vertical_overflow (VerticalOverflowMethod, optional): How to handle renderable when it is too tall for the console. Defaults to "ellipsis".
54 get_renderable (Callable[[], RenderableType], optional): Optional callable to get renderable. Defaults to None.
55 """
57 def __init__(
58 self,
59 renderable: Optional[RenderableType] = None,
60 *,
61 console: Optional[Console] = None,
62 screen: bool = False,
63 auto_refresh: bool = True,
64 refresh_per_second: float = 4,
65 transient: bool = False,
66 redirect_stdout: bool = True,
67 redirect_stderr: bool = True,
68 vertical_overflow: VerticalOverflowMethod = "ellipsis",
69 get_renderable: Optional[Callable[[], RenderableType]] = None,
70 ) -> None:
71 assert refresh_per_second > 0, "refresh_per_second must be > 0"
72 self._renderable = renderable
73 self.console = console if console is not None else get_console()
74 self._screen = screen
75 self._alt_screen = False
77 self._redirect_stdout = redirect_stdout
78 self._redirect_stderr = redirect_stderr
79 self._restore_stdout: Optional[IO[str]] = None
80 self._restore_stderr: Optional[IO[str]] = None
82 self._lock = RLock()
83 self.ipy_widget: Optional[Any] = None
84 self.auto_refresh = auto_refresh
85 self._started: bool = False
86 self.transient = True if screen else transient
88 self._refresh_thread: Optional[_RefreshThread] = None
89 self.refresh_per_second = refresh_per_second
91 self.vertical_overflow = vertical_overflow
92 self._get_renderable = get_renderable
93 self._live_render = LiveRender(
94 self.get_renderable(), vertical_overflow=vertical_overflow
95 )
96 self._nested = False
98 @property
99 def is_started(self) -> bool:
100 """Check if live display has been started."""
101 return self._started
103 def get_renderable(self) -> RenderableType:
104 renderable = (
105 self._get_renderable()
106 if self._get_renderable is not None
107 else self._renderable
108 )
109 return renderable or ""
111 def start(self, refresh: bool = False) -> None:
112 """Start live rendering display.
114 Args:
115 refresh (bool, optional): Also refresh. Defaults to False.
116 """
117 with self._lock:
118 if self._started:
119 return
120 self._started = True
122 if not self.console.set_live(self):
123 self._nested = True
124 return
126 if self._screen:
127 self._alt_screen = self.console.set_alt_screen(True)
128 self.console.show_cursor(False)
129 self._enable_redirect_io()
130 self.console.push_render_hook(self)
131 if refresh:
132 try:
133 self.refresh()
134 except Exception:
135 # If refresh fails, we want to stop the redirection of sys.stderr,
136 # so the error stacktrace is properly displayed in the terminal.
137 # (or, if the code that calls Rich captures the exception and wants to display something,
138 # let this be displayed in the terminal).
139 self.stop()
140 raise
141 if self.auto_refresh:
142 self._refresh_thread = _RefreshThread(self, self.refresh_per_second)
143 self._refresh_thread.start()
145 def stop(self) -> None:
146 """Stop live rendering display."""
147 with self._lock:
148 if not self._started:
149 return
150 self._started = False
151 self.console.clear_live()
152 if self._nested:
153 if not self.transient:
154 self.console.print(self.renderable)
155 return
157 if self.auto_refresh and self._refresh_thread is not None:
158 self._refresh_thread.stop()
159 self._refresh_thread = None
160 # allow it to fully render on the last even if overflow
161 self.vertical_overflow = "visible"
162 with self.console:
163 try:
164 if not self._alt_screen and not self.console.is_jupyter:
165 self.refresh()
166 finally:
167 self._disable_redirect_io()
168 self.console.pop_render_hook()
169 if not self._alt_screen and self.console.is_terminal:
170 self.console.line()
171 self.console.show_cursor(True)
172 if self._alt_screen:
173 self.console.set_alt_screen(False)
174 if self.transient and not self._alt_screen:
175 self.console.control(self._live_render.restore_cursor())
176 if self.ipy_widget is not None and self.transient:
177 self.ipy_widget.close() # pragma: no cover
179 def __enter__(self) -> Self:
180 self.start(refresh=self._renderable is not None)
181 return self
183 def __exit__(
184 self,
185 exc_type: Optional[Type[BaseException]],
186 exc_val: Optional[BaseException],
187 exc_tb: Optional[TracebackType],
188 ) -> None:
189 self.stop()
191 def _enable_redirect_io(self) -> None:
192 """Enable redirecting of stdout / stderr."""
193 if self.console.is_terminal or self.console.is_jupyter:
194 if self._redirect_stdout and not isinstance(sys.stdout, FileProxy):
195 self._restore_stdout = sys.stdout
196 sys.stdout = cast("TextIO", FileProxy(self.console, sys.stdout))
197 if self._redirect_stderr and not isinstance(sys.stderr, FileProxy):
198 self._restore_stderr = sys.stderr
199 sys.stderr = cast("TextIO", FileProxy(self.console, sys.stderr))
201 def _disable_redirect_io(self) -> None:
202 """Disable redirecting of stdout / stderr."""
203 if self._restore_stdout:
204 sys.stdout = cast("TextIO", self._restore_stdout)
205 self._restore_stdout = None
206 if self._restore_stderr:
207 sys.stderr = cast("TextIO", self._restore_stderr)
208 self._restore_stderr = None
210 @property
211 def renderable(self) -> RenderableType:
212 """Get the renderable that is being displayed
214 Returns:
215 RenderableType: Displayed renderable.
216 """
217 live_stack = self.console._live_stack
218 renderable: RenderableType
219 if live_stack and self is live_stack[0]:
220 # The first Live instance will render everything in the Live stack
221 renderable = Group(*[live.get_renderable() for live in live_stack])
222 else:
223 renderable = self.get_renderable()
224 return Screen(renderable) if self._alt_screen else renderable
226 def update(self, renderable: RenderableType, *, refresh: bool = False) -> None:
227 """Update the renderable that is being displayed
229 Args:
230 renderable (RenderableType): New renderable to use.
231 refresh (bool, optional): Refresh the display. Defaults to False.
232 """
233 if isinstance(renderable, str):
234 renderable = self.console.render_str(renderable)
235 with self._lock:
236 self._renderable = renderable
237 if refresh:
238 self.refresh()
240 def refresh(self) -> None:
241 """Update the display of the Live Render."""
242 with self._lock:
243 self._live_render.set_renderable(self.renderable)
244 if self._nested:
245 if self.console._live_stack:
246 self.console._live_stack[0].refresh()
247 return
249 if self.console.is_jupyter: # pragma: no cover
250 try:
251 from IPython.display import display
252 from ipywidgets import Output
253 except ImportError:
254 import warnings
256 warnings.warn('install "ipywidgets" for Jupyter support')
257 else:
258 if self.ipy_widget is None:
259 self.ipy_widget = Output()
260 display(self.ipy_widget)
262 with self.ipy_widget:
263 self.ipy_widget.clear_output(wait=True)
264 self.console.print(self._live_render.renderable)
265 elif self.console.is_terminal and not self.console.is_dumb_terminal:
266 with self.console:
267 self.console.print(Control())
268 elif (
269 not self._started and not self.transient
270 ): # if it is finished allow files or dumb-terminals to see final result
271 with self.console:
272 self.console.print(Control())
274 def process_renderables(
275 self, renderables: List[ConsoleRenderable]
276 ) -> List[ConsoleRenderable]:
277 """Process renderables to restore cursor and display progress."""
278 self._live_render.vertical_overflow = self.vertical_overflow
279 if self.console.is_interactive:
280 # lock needs acquiring as user can modify live_render renderable at any time unlike in Progress.
281 with self._lock:
282 reset = (
283 Control.home()
284 if self._alt_screen
285 else self._live_render.position_cursor()
286 )
287 renderables = [reset, *renderables, self._live_render]
288 elif (
289 not self._started and not self.transient
290 ): # if it is finished render the final output for files or dumb_terminals
291 renderables = [*renderables, self._live_render]
293 return renderables
296if __name__ == "__main__": # pragma: no cover
297 import random
298 import time
299 from itertools import cycle
300 from typing import Dict, List, Tuple
302 from .align import Align
303 from .console import Console
304 from .live import Live as Live
305 from .panel import Panel
306 from .rule import Rule
307 from .syntax import Syntax
308 from .table import Table
310 console = Console()
312 syntax = Syntax(
313 '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]:
314 """Iterate and generate a tuple with a flag for last value."""
315 iter_values = iter(values)
316 try:
317 previous_value = next(iter_values)
318 except StopIteration:
319 return
320 for value in iter_values:
321 yield False, previous_value
322 previous_value = value
323 yield True, previous_value''',
324 "python",
325 line_numbers=True,
326 )
328 table = Table("foo", "bar", "baz")
329 table.add_row("1", "2", "3")
331 progress_renderables = [
332 "You can make the terminal shorter and taller to see the live table hide"
333 "Text may be printed while the progress bars are rendering.",
334 Panel("In fact, [i]any[/i] renderable will work"),
335 "Such as [magenta]tables[/]...",
336 table,
337 "Pretty printed structures...",
338 {"type": "example", "text": "Pretty printed"},
339 "Syntax...",
340 syntax,
341 Rule("Give it a try!"),
342 ]
344 examples = cycle(progress_renderables)
346 exchanges = [
347 "SGD",
348 "MYR",
349 "EUR",
350 "USD",
351 "AUD",
352 "JPY",
353 "CNH",
354 "HKD",
355 "CAD",
356 "INR",
357 "DKK",
358 "GBP",
359 "RUB",
360 "NZD",
361 "MXN",
362 "IDR",
363 "TWD",
364 "THB",
365 "VND",
366 ]
367 with Live(console=console) as live_table:
368 exchange_rate_dict: Dict[Tuple[str, str], float] = {}
370 for index in range(100):
371 select_exchange = exchanges[index % len(exchanges)]
373 for exchange in exchanges:
374 if exchange == select_exchange:
375 continue
376 time.sleep(0.4)
377 if random.randint(0, 10) < 1:
378 console.log(next(examples))
379 exchange_rate_dict[(select_exchange, exchange)] = 200 / (
380 (random.random() * 320) + 1
381 )
382 if len(exchange_rate_dict) > len(exchanges) - 1:
383 exchange_rate_dict.pop(list(exchange_rate_dict.keys())[0])
384 table = Table(title="Exchange Rates")
386 table.add_column("Source Currency")
387 table.add_column("Destination Currency")
388 table.add_column("Exchange Rate")
390 for (source, dest), exchange_rate in exchange_rate_dict.items():
391 table.add_row(
392 source,
393 dest,
394 Text(
395 f"{exchange_rate:.4f}",
396 style="red" if exchange_rate < 1.0 else "green",
397 ),
398 )
400 live_table.update(Align.center(table))