1from __future__ import annotations
2
3import sys
4from threading import Event, RLock, Thread
5from types import TracebackType
6from typing import IO, TYPE_CHECKING, Any, Callable, List, Optional, TextIO, Type, cast
7
8from . import get_console
9from .console import Console, ConsoleRenderable, Group, RenderableType, RenderHook
10from .control import Control
11from .file_proxy import FileProxy
12from .jupyter import JupyterMixin
13from .live_render import LiveRender, VerticalOverflowMethod
14from .screen import Screen
15from .text import Text
16
17if TYPE_CHECKING:
18 # Can be replaced with `from typing import Self` in Python 3.11+
19 from typing_extensions import Self # pragma: no cover
20
21
22class _RefreshThread(Thread):
23 """A thread that calls refresh() at regular intervals."""
24
25 def __init__(self, live: "Live", refresh_per_second: float) -> None:
26 self.live = live
27 self.refresh_per_second = refresh_per_second
28 self.done = Event()
29 super().__init__(daemon=True)
30
31 def stop(self) -> None:
32 self.done.set()
33
34 def run(self) -> None:
35 while not self.done.wait(1 / self.refresh_per_second):
36 with self.live._lock:
37 if not self.done.is_set():
38 self.live.refresh()
39
40
41class Live(JupyterMixin, RenderHook):
42 """Renders an auto-updating live display of any given renderable.
43
44 Args:
45 renderable (RenderableType, optional): The renderable to live display. Defaults to displaying nothing.
46 console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout.
47 screen (bool, optional): Enable alternate screen mode. Defaults to False.
48 auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()` or `update()` with refresh flag. Defaults to True
49 refresh_per_second (float, optional): Number of times per second to refresh the live display. Defaults to 4.
50 transient (bool, optional): Clear the renderable on exit (has no effect when screen=True). Defaults to False.
51 redirect_stdout (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True.
52 redirect_stderr (bool, optional): Enable redirection of stderr. Defaults to True.
53 vertical_overflow (VerticalOverflowMethod, optional): How to handle renderable when it is too tall for the console. Defaults to "ellipsis".
54 get_renderable (Callable[[], RenderableType], optional): Optional callable to get renderable. Defaults to None.
55 """
56
57 def __init__(
58 self,
59 renderable: Optional[RenderableType] = None,
60 *,
61 console: Optional[Console] = None,
62 screen: bool = False,
63 auto_refresh: bool = True,
64 refresh_per_second: float = 4,
65 transient: bool = False,
66 redirect_stdout: bool = True,
67 redirect_stderr: bool = True,
68 vertical_overflow: VerticalOverflowMethod = "ellipsis",
69 get_renderable: Optional[Callable[[], RenderableType]] = None,
70 ) -> None:
71 assert refresh_per_second > 0, "refresh_per_second must be > 0"
72 self._renderable = renderable
73 self.console = console if console is not None else get_console()
74 self._screen = screen
75 self._alt_screen = False
76
77 self._redirect_stdout = redirect_stdout
78 self._redirect_stderr = redirect_stderr
79 self._restore_stdout: Optional[IO[str]] = None
80 self._restore_stderr: Optional[IO[str]] = None
81
82 self._lock = RLock()
83 self.ipy_widget: Optional[Any] = None
84 self.auto_refresh = auto_refresh
85 self._started: bool = False
86 self.transient = True if screen else transient
87
88 self._refresh_thread: Optional[_RefreshThread] = None
89 self.refresh_per_second = refresh_per_second
90
91 self.vertical_overflow = vertical_overflow
92 self._get_renderable = get_renderable
93 self._live_render = LiveRender(
94 self.get_renderable(), vertical_overflow=vertical_overflow
95 )
96 self._nested = False
97
98 @property
99 def is_started(self) -> bool:
100 """Check if live display has been started."""
101 return self._started
102
103 def get_renderable(self) -> RenderableType:
104 renderable = (
105 self._get_renderable()
106 if self._get_renderable is not None
107 else self._renderable
108 )
109 return renderable or ""
110
111 def start(self, refresh: bool = False) -> None:
112 """Start live rendering display.
113
114 Args:
115 refresh (bool, optional): Also refresh. Defaults to False.
116 """
117 with self._lock:
118 if self._started:
119 return
120 self._started = True
121
122 if not self.console.set_live(self):
123 self._nested = True
124 return
125
126 if self._screen:
127 self._alt_screen = self.console.set_alt_screen(True)
128 self.console.show_cursor(False)
129 self._enable_redirect_io()
130 self.console.push_render_hook(self)
131 if refresh:
132 try:
133 self.refresh()
134 except Exception:
135 # If refresh fails, we want to stop the redirection of sys.stderr,
136 # so the error stacktrace is properly displayed in the terminal.
137 # (or, if the code that calls Rich captures the exception and wants to display something,
138 # let this be displayed in the terminal).
139 self.stop()
140 raise
141 if self.auto_refresh:
142 self._refresh_thread = _RefreshThread(self, self.refresh_per_second)
143 self._refresh_thread.start()
144
145 def stop(self) -> None:
146 """Stop live rendering display."""
147 with self._lock:
148 if not self._started:
149 return
150 self._started = False
151 self.console.clear_live()
152 if self._nested:
153 if not self.transient:
154 self.console.print(self.renderable)
155 return
156
157 if self.auto_refresh and self._refresh_thread is not None:
158 self._refresh_thread.stop()
159 self._refresh_thread = None
160 # allow it to fully render on the last even if overflow
161 self.vertical_overflow = "visible"
162 with self.console:
163 try:
164 if not self._alt_screen and not self.console.is_jupyter:
165 self.refresh()
166 finally:
167 self._disable_redirect_io()
168 self.console.pop_render_hook()
169 if not self._alt_screen and self.console.is_terminal:
170 self.console.line()
171 self.console.show_cursor(True)
172 if self._alt_screen:
173 self.console.set_alt_screen(False)
174 if self.transient and not self._alt_screen:
175 self.console.control(self._live_render.restore_cursor())
176 if self.ipy_widget is not None and self.transient:
177 self.ipy_widget.close() # pragma: no cover
178
179 def __enter__(self) -> Self:
180 self.start(refresh=self._renderable is not None)
181 return self
182
183 def __exit__(
184 self,
185 exc_type: Optional[Type[BaseException]],
186 exc_val: Optional[BaseException],
187 exc_tb: Optional[TracebackType],
188 ) -> None:
189 self.stop()
190
191 def _enable_redirect_io(self) -> None:
192 """Enable redirecting of stdout / stderr."""
193 if self.console.is_terminal or self.console.is_jupyter:
194 if self._redirect_stdout and not isinstance(sys.stdout, FileProxy):
195 self._restore_stdout = sys.stdout
196 sys.stdout = cast("TextIO", FileProxy(self.console, sys.stdout))
197 if self._redirect_stderr and not isinstance(sys.stderr, FileProxy):
198 self._restore_stderr = sys.stderr
199 sys.stderr = cast("TextIO", FileProxy(self.console, sys.stderr))
200
201 def _disable_redirect_io(self) -> None:
202 """Disable redirecting of stdout / stderr."""
203 if self._restore_stdout:
204 sys.stdout = cast("TextIO", self._restore_stdout)
205 self._restore_stdout = None
206 if self._restore_stderr:
207 sys.stderr = cast("TextIO", self._restore_stderr)
208 self._restore_stderr = None
209
210 @property
211 def renderable(self) -> RenderableType:
212 """Get the renderable that is being displayed
213
214 Returns:
215 RenderableType: Displayed renderable.
216 """
217 live_stack = self.console._live_stack
218 renderable: RenderableType
219 if live_stack and self is live_stack[0]:
220 # The first Live instance will render everything in the Live stack
221 renderable = Group(*[live.get_renderable() for live in live_stack])
222 else:
223 renderable = self.get_renderable()
224 return Screen(renderable) if self._alt_screen else renderable
225
226 def update(self, renderable: RenderableType, *, refresh: bool = False) -> None:
227 """Update the renderable that is being displayed
228
229 Args:
230 renderable (RenderableType): New renderable to use.
231 refresh (bool, optional): Refresh the display. Defaults to False.
232 """
233 if isinstance(renderable, str):
234 renderable = self.console.render_str(renderable)
235 with self._lock:
236 self._renderable = renderable
237 if refresh:
238 self.refresh()
239
240 def refresh(self) -> None:
241 """Update the display of the Live Render."""
242 with self._lock:
243 self._live_render.set_renderable(self.renderable)
244 if self._nested:
245 if self.console._live_stack:
246 self.console._live_stack[0].refresh()
247 return
248
249 if self.console.is_jupyter: # pragma: no cover
250 try:
251 from IPython.display import display
252 from ipywidgets import Output
253 except ImportError:
254 import warnings
255
256 warnings.warn('install "ipywidgets" for Jupyter support')
257 else:
258 if self.ipy_widget is None:
259 self.ipy_widget = Output()
260 display(self.ipy_widget)
261
262 with self.ipy_widget:
263 self.ipy_widget.clear_output(wait=True)
264 self.console.print(self._live_render.renderable)
265 elif self.console.is_terminal and not self.console.is_dumb_terminal:
266 with self.console:
267 self.console.print(Control())
268 elif (
269 not self._started and not self.transient
270 ): # if it is finished allow files or dumb-terminals to see final result
271 with self.console:
272 self.console.print(Control())
273
274 def process_renderables(
275 self, renderables: List[ConsoleRenderable]
276 ) -> List[ConsoleRenderable]:
277 """Process renderables to restore cursor and display progress."""
278 self._live_render.vertical_overflow = self.vertical_overflow
279 if self.console.is_interactive:
280 # lock needs acquiring as user can modify live_render renderable at any time unlike in Progress.
281 with self._lock:
282 reset = (
283 Control.home()
284 if self._alt_screen
285 else self._live_render.position_cursor()
286 )
287 renderables = [reset, *renderables, self._live_render]
288 elif (
289 not self._started and not self.transient
290 ): # if it is finished render the final output for files or dumb_terminals
291 renderables = [*renderables, self._live_render]
292
293 return renderables
294
295
296if __name__ == "__main__": # pragma: no cover
297 import random
298 import time
299 from itertools import cycle
300 from typing import Dict, List, Tuple
301
302 from .align import Align
303 from .console import Console
304 from .live import Live as Live
305 from .panel import Panel
306 from .rule import Rule
307 from .syntax import Syntax
308 from .table import Table
309
310 console = Console()
311
312 syntax = Syntax(
313 '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]:
314 """Iterate and generate a tuple with a flag for last value."""
315 iter_values = iter(values)
316 try:
317 previous_value = next(iter_values)
318 except StopIteration:
319 return
320 for value in iter_values:
321 yield False, previous_value
322 previous_value = value
323 yield True, previous_value''',
324 "python",
325 line_numbers=True,
326 )
327
328 table = Table("foo", "bar", "baz")
329 table.add_row("1", "2", "3")
330
331 progress_renderables = [
332 "You can make the terminal shorter and taller to see the live table hide"
333 "Text may be printed while the progress bars are rendering.",
334 Panel("In fact, [i]any[/i] renderable will work"),
335 "Such as [magenta]tables[/]...",
336 table,
337 "Pretty printed structures...",
338 {"type": "example", "text": "Pretty printed"},
339 "Syntax...",
340 syntax,
341 Rule("Give it a try!"),
342 ]
343
344 examples = cycle(progress_renderables)
345
346 exchanges = [
347 "SGD",
348 "MYR",
349 "EUR",
350 "USD",
351 "AUD",
352 "JPY",
353 "CNH",
354 "HKD",
355 "CAD",
356 "INR",
357 "DKK",
358 "GBP",
359 "RUB",
360 "NZD",
361 "MXN",
362 "IDR",
363 "TWD",
364 "THB",
365 "VND",
366 ]
367 with Live(console=console) as live_table:
368 exchange_rate_dict: Dict[Tuple[str, str], float] = {}
369
370 for index in range(100):
371 select_exchange = exchanges[index % len(exchanges)]
372
373 for exchange in exchanges:
374 if exchange == select_exchange:
375 continue
376 time.sleep(0.4)
377 if random.randint(0, 10) < 1:
378 console.log(next(examples))
379 exchange_rate_dict[(select_exchange, exchange)] = 200 / (
380 (random.random() * 320) + 1
381 )
382 if len(exchange_rate_dict) > len(exchanges) - 1:
383 exchange_rate_dict.pop(list(exchange_rate_dict.keys())[0])
384 table = Table(title="Exchange Rates")
385
386 table.add_column("Source Currency")
387 table.add_column("Destination Currency")
388 table.add_column("Exchange Rate")
389
390 for (source, dest), exchange_rate in exchange_rate_dict.items():
391 table.add_row(
392 source,
393 dest,
394 Text(
395 f"{exchange_rate:.4f}",
396 style="red" if exchange_rate < 1.0 else "green",
397 ),
398 )
399
400 live_table.update(Align.center(table))