Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rich/live.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

161 statements  

1from __future__ import annotations 

2 

3import sys 

4from threading import Event, RLock, Thread 

5from types import TracebackType 

6from typing import IO, TYPE_CHECKING, Any, Callable, List, Optional, TextIO, Type, cast 

7 

8from . import get_console 

9from .console import Console, ConsoleRenderable, Group, RenderableType, RenderHook 

10from .control import Control 

11from .file_proxy import FileProxy 

12from .jupyter import JupyterMixin 

13from .live_render import LiveRender, VerticalOverflowMethod 

14from .screen import Screen 

15from .text import Text 

16 

17if TYPE_CHECKING: 

18 # Can be replaced with `from typing import Self` in Python 3.11+ 

19 from typing_extensions import Self # pragma: no cover 

20 

21 

22class _RefreshThread(Thread): 

23 """A thread that calls refresh() at regular intervals.""" 

24 

25 def __init__(self, live: "Live", refresh_per_second: float) -> None: 

26 self.live = live 

27 self.refresh_per_second = refresh_per_second 

28 self.done = Event() 

29 super().__init__(daemon=True) 

30 

31 def stop(self) -> None: 

32 self.done.set() 

33 

34 def run(self) -> None: 

35 while not self.done.wait(1 / self.refresh_per_second): 

36 with self.live._lock: 

37 if not self.done.is_set(): 

38 self.live.refresh() 

39 

40 

41class Live(JupyterMixin, RenderHook): 

42 """Renders an auto-updating live display of any given renderable. 

43 

44 Args: 

45 renderable (RenderableType, optional): The renderable to live display. Defaults to displaying nothing. 

46 console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout. 

47 screen (bool, optional): Enable alternate screen mode. Defaults to False. 

48 auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()` or `update()` with refresh flag. Defaults to True 

49 refresh_per_second (float, optional): Number of times per second to refresh the live display. Defaults to 4. 

50 transient (bool, optional): Clear the renderable on exit (has no effect when screen=True). Defaults to False. 

51 redirect_stdout (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True. 

52 redirect_stderr (bool, optional): Enable redirection of stderr. Defaults to True. 

53 vertical_overflow (VerticalOverflowMethod, optional): How to handle renderable when it is too tall for the console. Defaults to "ellipsis". 

54 get_renderable (Callable[[], RenderableType], optional): Optional callable to get renderable. Defaults to None. 

55 """ 

56 

57 def __init__( 

58 self, 

59 renderable: Optional[RenderableType] = None, 

60 *, 

61 console: Optional[Console] = None, 

62 screen: bool = False, 

63 auto_refresh: bool = True, 

64 refresh_per_second: float = 4, 

65 transient: bool = False, 

66 redirect_stdout: bool = True, 

67 redirect_stderr: bool = True, 

68 vertical_overflow: VerticalOverflowMethod = "ellipsis", 

69 get_renderable: Optional[Callable[[], RenderableType]] = None, 

70 ) -> None: 

71 assert refresh_per_second > 0, "refresh_per_second must be > 0" 

72 self._renderable = renderable 

73 self.console = console if console is not None else get_console() 

74 self._screen = screen 

75 self._alt_screen = False 

76 

77 self._redirect_stdout = redirect_stdout 

78 self._redirect_stderr = redirect_stderr 

79 self._restore_stdout: Optional[IO[str]] = None 

80 self._restore_stderr: Optional[IO[str]] = None 

81 

82 self._lock = RLock() 

83 self.ipy_widget: Optional[Any] = None 

84 self.auto_refresh = auto_refresh 

85 self._started: bool = False 

86 self.transient = True if screen else transient 

87 

88 self._refresh_thread: Optional[_RefreshThread] = None 

89 self.refresh_per_second = refresh_per_second 

90 

91 self.vertical_overflow = vertical_overflow 

92 self._get_renderable = get_renderable 

93 self._live_render = LiveRender( 

94 self.get_renderable(), vertical_overflow=vertical_overflow 

95 ) 

96 self._nested = False 

97 

98 @property 

99 def is_started(self) -> bool: 

100 """Check if live display has been started.""" 

101 return self._started 

102 

103 def get_renderable(self) -> RenderableType: 

104 renderable = ( 

105 self._get_renderable() 

106 if self._get_renderable is not None 

107 else self._renderable 

108 ) 

109 return renderable or "" 

110 

111 def start(self, refresh: bool = False) -> None: 

112 """Start live rendering display. 

113 

114 Args: 

115 refresh (bool, optional): Also refresh. Defaults to False. 

116 """ 

117 with self._lock: 

118 if self._started: 

119 return 

120 self._started = True 

121 

122 if not self.console.set_live(self): 

123 self._nested = True 

124 return 

125 

126 if self._screen: 

127 self._alt_screen = self.console.set_alt_screen(True) 

128 self.console.show_cursor(False) 

129 self._enable_redirect_io() 

130 self.console.push_render_hook(self) 

131 if refresh: 

132 try: 

133 self.refresh() 

134 except Exception: 

135 # If refresh fails, we want to stop the redirection of sys.stderr, 

136 # so the error stacktrace is properly displayed in the terminal. 

137 # (or, if the code that calls Rich captures the exception and wants to display something, 

138 # let this be displayed in the terminal). 

139 self.stop() 

140 raise 

141 if self.auto_refresh: 

142 self._refresh_thread = _RefreshThread(self, self.refresh_per_second) 

143 self._refresh_thread.start() 

144 

145 def stop(self) -> None: 

146 """Stop live rendering display.""" 

147 with self._lock: 

148 if not self._started: 

149 return 

150 self._started = False 

151 self.console.clear_live() 

152 if self._nested: 

153 if not self.transient: 

154 self.console.print(self.renderable) 

155 return 

156 

157 if self.auto_refresh and self._refresh_thread is not None: 

158 self._refresh_thread.stop() 

159 self._refresh_thread = None 

160 # allow it to fully render on the last even if overflow 

161 self.vertical_overflow = "visible" 

162 with self.console: 

163 try: 

164 if not self._alt_screen and not self.console.is_jupyter: 

165 self.refresh() 

166 finally: 

167 self._disable_redirect_io() 

168 self.console.pop_render_hook() 

169 if ( 

170 not self._alt_screen 

171 and self.console.is_terminal 

172 and self._live_render.last_render_height 

173 ): 

174 self.console.line() 

175 self.console.show_cursor(True) 

176 if self._alt_screen: 

177 self.console.set_alt_screen(False) 

178 if self.transient and not self._alt_screen: 

179 self.console.control(self._live_render.restore_cursor()) 

180 if self.ipy_widget is not None and self.transient: 

181 self.ipy_widget.close() # pragma: no cover 

182 

183 def __enter__(self) -> Self: 

184 self.start(refresh=self._renderable is not None) 

185 return self 

186 

187 def __exit__( 

188 self, 

189 exc_type: Optional[Type[BaseException]], 

190 exc_val: Optional[BaseException], 

191 exc_tb: Optional[TracebackType], 

192 ) -> None: 

193 self.stop() 

194 

195 def _enable_redirect_io(self) -> None: 

196 """Enable redirecting of stdout / stderr.""" 

197 if self.console.is_terminal or self.console.is_jupyter: 

198 if self._redirect_stdout and not isinstance(sys.stdout, FileProxy): 

199 self._restore_stdout = sys.stdout 

200 sys.stdout = cast("TextIO", FileProxy(self.console, sys.stdout)) 

201 if self._redirect_stderr and not isinstance(sys.stderr, FileProxy): 

202 self._restore_stderr = sys.stderr 

203 sys.stderr = cast("TextIO", FileProxy(self.console, sys.stderr)) 

204 

205 def _disable_redirect_io(self) -> None: 

206 """Disable redirecting of stdout / stderr.""" 

207 if self._restore_stdout: 

208 sys.stdout = cast("TextIO", self._restore_stdout) 

209 self._restore_stdout = None 

210 if self._restore_stderr: 

211 sys.stderr = cast("TextIO", self._restore_stderr) 

212 self._restore_stderr = None 

213 

214 @property 

215 def renderable(self) -> RenderableType: 

216 """Get the renderable that is being displayed 

217 

218 Returns: 

219 RenderableType: Displayed renderable. 

220 """ 

221 live_stack = self.console._live_stack 

222 renderable: RenderableType 

223 if live_stack and self is live_stack[0]: 

224 # The first Live instance will render everything in the Live stack 

225 renderable = Group(*[live.get_renderable() for live in live_stack]) 

226 else: 

227 renderable = self.get_renderable() 

228 return Screen(renderable) if self._alt_screen else renderable 

229 

230 def update(self, renderable: RenderableType, *, refresh: bool = False) -> None: 

231 """Update the renderable that is being displayed 

232 

233 Args: 

234 renderable (RenderableType): New renderable to use. 

235 refresh (bool, optional): Refresh the display. Defaults to False. 

236 """ 

237 if isinstance(renderable, str): 

238 renderable = self.console.render_str(renderable) 

239 with self._lock: 

240 self._renderable = renderable 

241 if refresh: 

242 self.refresh() 

243 

244 def refresh(self) -> None: 

245 """Update the display of the Live Render.""" 

246 with self._lock: 

247 self._live_render.set_renderable(self.renderable) 

248 if self._nested: 

249 if self.console._live_stack: 

250 self.console._live_stack[0].refresh() 

251 return 

252 

253 if self.console.is_jupyter: # pragma: no cover 

254 try: 

255 from IPython.display import display 

256 from ipywidgets import Output 

257 except ImportError: 

258 import warnings 

259 

260 warnings.warn('install "ipywidgets" for Jupyter support') 

261 else: 

262 if self.ipy_widget is None: 

263 self.ipy_widget = Output() 

264 display(self.ipy_widget) 

265 

266 with self.ipy_widget: 

267 self.ipy_widget.clear_output(wait=True) 

268 self.console.print(self._live_render.renderable) 

269 elif self.console.is_terminal and not self.console.is_dumb_terminal: 

270 with self.console: 

271 self.console.print(Control()) 

272 elif ( 

273 not self._started and not self.transient 

274 ): # if it is finished allow files or dumb-terminals to see final result 

275 with self.console: 

276 self.console.print(Control()) 

277 

278 def process_renderables( 

279 self, renderables: List[ConsoleRenderable] 

280 ) -> List[ConsoleRenderable]: 

281 """Process renderables to restore cursor and display progress.""" 

282 self._live_render.vertical_overflow = self.vertical_overflow 

283 if self.console.is_interactive: 

284 # lock needs acquiring as user can modify live_render renderable at any time unlike in Progress. 

285 with self._lock: 

286 reset = ( 

287 Control.home() 

288 if self._alt_screen 

289 else self._live_render.position_cursor() 

290 ) 

291 renderables = [reset, *renderables, self._live_render] 

292 elif ( 

293 not self._started and not self.transient 

294 ): # if it is finished render the final output for files or dumb_terminals 

295 renderables = [*renderables, self._live_render] 

296 

297 return renderables 

298 

299 

300if __name__ == "__main__": # pragma: no cover 

301 import random 

302 import time 

303 from itertools import cycle 

304 from typing import Dict, List, Tuple 

305 

306 from .align import Align 

307 from .console import Console 

308 from .live import Live as Live 

309 from .panel import Panel 

310 from .rule import Rule 

311 from .syntax import Syntax 

312 from .table import Table 

313 

314 console = Console() 

315 

316 syntax = Syntax( 

317 '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]: 

318 """Iterate and generate a tuple with a flag for last value.""" 

319 iter_values = iter(values) 

320 try: 

321 previous_value = next(iter_values) 

322 except StopIteration: 

323 return 

324 for value in iter_values: 

325 yield False, previous_value 

326 previous_value = value 

327 yield True, previous_value''', 

328 "python", 

329 line_numbers=True, 

330 ) 

331 

332 table = Table("foo", "bar", "baz") 

333 table.add_row("1", "2", "3") 

334 

335 progress_renderables = [ 

336 "You can make the terminal shorter and taller to see the live table hide" 

337 "Text may be printed while the progress bars are rendering.", 

338 Panel("In fact, [i]any[/i] renderable will work"), 

339 "Such as [magenta]tables[/]...", 

340 table, 

341 "Pretty printed structures...", 

342 {"type": "example", "text": "Pretty printed"}, 

343 "Syntax...", 

344 syntax, 

345 Rule("Give it a try!"), 

346 ] 

347 

348 examples = cycle(progress_renderables) 

349 

350 exchanges = [ 

351 "SGD", 

352 "MYR", 

353 "EUR", 

354 "USD", 

355 "AUD", 

356 "JPY", 

357 "CNH", 

358 "HKD", 

359 "CAD", 

360 "INR", 

361 "DKK", 

362 "GBP", 

363 "RUB", 

364 "NZD", 

365 "MXN", 

366 "IDR", 

367 "TWD", 

368 "THB", 

369 "VND", 

370 ] 

371 with Live(console=console) as live_table: 

372 exchange_rate_dict: Dict[Tuple[str, str], float] = {} 

373 

374 for index in range(100): 

375 select_exchange = exchanges[index % len(exchanges)] 

376 

377 for exchange in exchanges: 

378 if exchange == select_exchange: 

379 continue 

380 time.sleep(0.4) 

381 if random.randint(0, 10) < 1: 

382 console.log(next(examples)) 

383 exchange_rate_dict[(select_exchange, exchange)] = 200 / ( 

384 (random.random() * 320) + 1 

385 ) 

386 if len(exchange_rate_dict) > len(exchanges) - 1: 

387 exchange_rate_dict.pop(list(exchange_rate_dict.keys())[0]) 

388 table = Table(title="Exchange Rates") 

389 

390 table.add_column("Source Currency") 

391 table.add_column("Destination Currency") 

392 table.add_column("Exchange Rate") 

393 

394 for (source, dest), exchange_rate in exchange_rate_dict.items(): 

395 table.add_row( 

396 source, 

397 dest, 

398 Text( 

399 f"{exchange_rate:.4f}", 

400 style="red" if exchange_rate < 1.0 else "green", 

401 ), 

402 ) 

403 

404 live_table.update(Align.center(table))