Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/shortcuts/progress_bar/base.py: 34%

172 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2Progress bar implementation on top of prompt_toolkit. 

3 

4:: 

5 

6 with ProgressBar(...) as pb: 

7 for item in pb(data): 

8 ... 

9""" 

10from __future__ import annotations 

11 

12import contextvars 

13import datetime 

14import functools 

15import os 

16import signal 

17import threading 

18import traceback 

19from typing import ( 

20 Callable, 

21 Generic, 

22 Iterable, 

23 Iterator, 

24 Sequence, 

25 Sized, 

26 TextIO, 

27 TypeVar, 

28 cast, 

29) 

30 

31from prompt_toolkit.application import Application 

32from prompt_toolkit.application.current import get_app_session 

33from prompt_toolkit.filters import Condition, is_done, renderer_height_is_known 

34from prompt_toolkit.formatted_text import ( 

35 AnyFormattedText, 

36 StyleAndTextTuples, 

37 to_formatted_text, 

38) 

39from prompt_toolkit.input import Input 

40from prompt_toolkit.key_binding import KeyBindings 

41from prompt_toolkit.key_binding.key_processor import KeyPressEvent 

42from prompt_toolkit.layout import ( 

43 ConditionalContainer, 

44 FormattedTextControl, 

45 HSplit, 

46 Layout, 

47 VSplit, 

48 Window, 

49) 

50from prompt_toolkit.layout.controls import UIContent, UIControl 

51from prompt_toolkit.layout.dimension import AnyDimension, D 

52from prompt_toolkit.output import ColorDepth, Output 

53from prompt_toolkit.styles import BaseStyle 

54from prompt_toolkit.utils import in_main_thread 

55 

56from .formatters import Formatter, create_default_formatters 

57 

58__all__ = ["ProgressBar"] 

59 

60E = KeyPressEvent 

61 

62_SIGWINCH = getattr(signal, "SIGWINCH", None) 

63 

64 

65def create_key_bindings(cancel_callback: Callable[[], None] | None) -> KeyBindings: 

66 """ 

67 Key bindings handled by the progress bar. 

68 (The main thread is not supposed to handle any key bindings.) 

69 """ 

70 kb = KeyBindings() 

71 

72 @kb.add("c-l") 

73 def _clear(event: E) -> None: 

74 event.app.renderer.clear() 

75 

76 if cancel_callback is not None: 

77 

78 @kb.add("c-c") 

79 def _interrupt(event: E) -> None: 

80 "Kill the 'body' of the progress bar, but only if we run from the main thread." 

81 assert cancel_callback is not None 

82 cancel_callback() 

83 

84 return kb 

85 

86 

87_T = TypeVar("_T") 

88 

89 

90class ProgressBar: 

91 """ 

92 Progress bar context manager. 

93 

94 Usage :: 

95 

96 with ProgressBar(...) as pb: 

97 for item in pb(data): 

98 ... 

99 

100 :param title: Text to be displayed above the progress bars. This can be a 

101 callable or formatted text as well. 

102 :param formatters: List of :class:`.Formatter` instances. 

103 :param bottom_toolbar: Text to be displayed in the bottom toolbar. This 

104 can be a callable or formatted text. 

105 :param style: :class:`prompt_toolkit.styles.BaseStyle` instance. 

106 :param key_bindings: :class:`.KeyBindings` instance. 

107 :param cancel_callback: Callback function that's called when control-c is 

108 pressed by the user. This can be used for instance to start "proper" 

109 cancellation if the wrapped code supports it. 

110 :param file: The file object used for rendering, by default `sys.stderr` is used. 

111 

112 :param color_depth: `prompt_toolkit` `ColorDepth` instance. 

113 :param output: :class:`~prompt_toolkit.output.Output` instance. 

114 :param input: :class:`~prompt_toolkit.input.Input` instance. 

115 """ 

116 

117 def __init__( 

118 self, 

119 title: AnyFormattedText = None, 

120 formatters: Sequence[Formatter] | None = None, 

121 bottom_toolbar: AnyFormattedText = None, 

122 style: BaseStyle | None = None, 

123 key_bindings: KeyBindings | None = None, 

124 cancel_callback: Callable[[], None] | None = None, 

125 file: TextIO | None = None, 

126 color_depth: ColorDepth | None = None, 

127 output: Output | None = None, 

128 input: Input | None = None, 

129 ) -> None: 

130 self.title = title 

131 self.formatters = formatters or create_default_formatters() 

132 self.bottom_toolbar = bottom_toolbar 

133 self.counters: list[ProgressBarCounter[object]] = [] 

134 self.style = style 

135 self.key_bindings = key_bindings 

136 self.cancel_callback = cancel_callback 

137 

138 # If no `cancel_callback` was given, and we're creating the progress 

139 # bar from the main thread. Cancel by sending a `KeyboardInterrupt` to 

140 # the main thread. 

141 if self.cancel_callback is None and in_main_thread(): 

142 

143 def keyboard_interrupt_to_main_thread() -> None: 

144 os.kill(os.getpid(), signal.SIGINT) 

145 

146 self.cancel_callback = keyboard_interrupt_to_main_thread 

147 

148 # Note that we use __stderr__ as default error output, because that 

149 # works best with `patch_stdout`. 

150 self.color_depth = color_depth 

151 self.output = output or get_app_session().output 

152 self.input = input or get_app_session().input 

153 

154 self._thread: threading.Thread | None = None 

155 

156 self._has_sigwinch = False 

157 self._app_started = threading.Event() 

158 

159 def __enter__(self) -> ProgressBar: 

160 # Create UI Application. 

161 title_toolbar = ConditionalContainer( 

162 Window( 

163 FormattedTextControl(lambda: self.title), 

164 height=1, 

165 style="class:progressbar,title", 

166 ), 

167 filter=Condition(lambda: self.title is not None), 

168 ) 

169 

170 bottom_toolbar = ConditionalContainer( 

171 Window( 

172 FormattedTextControl( 

173 lambda: self.bottom_toolbar, style="class:bottom-toolbar.text" 

174 ), 

175 style="class:bottom-toolbar", 

176 height=1, 

177 ), 

178 filter=~is_done 

179 & renderer_height_is_known 

180 & Condition(lambda: self.bottom_toolbar is not None), 

181 ) 

182 

183 def width_for_formatter(formatter: Formatter) -> AnyDimension: 

184 # Needs to be passed as callable (partial) to the 'width' 

185 # parameter, because we want to call it on every resize. 

186 return formatter.get_width(progress_bar=self) 

187 

188 progress_controls = [ 

189 Window( 

190 content=_ProgressControl(self, f, self.cancel_callback), 

191 width=functools.partial(width_for_formatter, f), 

192 ) 

193 for f in self.formatters 

194 ] 

195 

196 self.app: Application[None] = Application( 

197 min_redraw_interval=0.05, 

198 layout=Layout( 

199 HSplit( 

200 [ 

201 title_toolbar, 

202 VSplit( 

203 progress_controls, 

204 height=lambda: D( 

205 preferred=len(self.counters), max=len(self.counters) 

206 ), 

207 ), 

208 Window(), 

209 bottom_toolbar, 

210 ] 

211 ) 

212 ), 

213 style=self.style, 

214 key_bindings=self.key_bindings, 

215 refresh_interval=0.3, 

216 color_depth=self.color_depth, 

217 output=self.output, 

218 input=self.input, 

219 ) 

220 

221 # Run application in different thread. 

222 def run() -> None: 

223 try: 

224 self.app.run(pre_run=self._app_started.set) 

225 except BaseException as e: 

226 traceback.print_exc() 

227 print(e) 

228 

229 ctx: contextvars.Context = contextvars.copy_context() 

230 

231 self._thread = threading.Thread(target=ctx.run, args=(run,)) 

232 self._thread.start() 

233 

234 return self 

235 

236 def __exit__(self, *a: object) -> None: 

237 # Wait for the app to be started. Make sure we don't quit earlier, 

238 # otherwise `self.app.exit` won't terminate the app because 

239 # `self.app.future` has not yet been set. 

240 self._app_started.wait() 

241 

242 # Quit UI application. 

243 if self.app.is_running and self.app.loop is not None: 

244 self.app.loop.call_soon_threadsafe(self.app.exit) 

245 

246 if self._thread is not None: 

247 self._thread.join() 

248 

249 def __call__( 

250 self, 

251 data: Iterable[_T] | None = None, 

252 label: AnyFormattedText = "", 

253 remove_when_done: bool = False, 

254 total: int | None = None, 

255 ) -> ProgressBarCounter[_T]: 

256 """ 

257 Start a new counter. 

258 

259 :param label: Title text or description for this progress. (This can be 

260 formatted text as well). 

261 :param remove_when_done: When `True`, hide this progress bar. 

262 :param total: Specify the maximum value if it can't be calculated by 

263 calling ``len``. 

264 """ 

265 counter = ProgressBarCounter( 

266 self, data, label=label, remove_when_done=remove_when_done, total=total 

267 ) 

268 self.counters.append(counter) 

269 return counter 

270 

271 def invalidate(self) -> None: 

272 self.app.invalidate() 

273 

274 

275class _ProgressControl(UIControl): 

276 """ 

277 User control for the progress bar. 

278 """ 

279 

280 def __init__( 

281 self, 

282 progress_bar: ProgressBar, 

283 formatter: Formatter, 

284 cancel_callback: Callable[[], None] | None, 

285 ) -> None: 

286 self.progress_bar = progress_bar 

287 self.formatter = formatter 

288 self._key_bindings = create_key_bindings(cancel_callback) 

289 

290 def create_content(self, width: int, height: int) -> UIContent: 

291 items: list[StyleAndTextTuples] = [] 

292 

293 for pr in self.progress_bar.counters: 

294 try: 

295 text = self.formatter.format(self.progress_bar, pr, width) 

296 except BaseException: 

297 traceback.print_exc() 

298 text = "ERROR" 

299 

300 items.append(to_formatted_text(text)) 

301 

302 def get_line(i: int) -> StyleAndTextTuples: 

303 return items[i] 

304 

305 return UIContent(get_line=get_line, line_count=len(items), show_cursor=False) 

306 

307 def is_focusable(self) -> bool: 

308 return True # Make sure that the key bindings work. 

309 

310 def get_key_bindings(self) -> KeyBindings: 

311 return self._key_bindings 

312 

313 

314_CounterItem = TypeVar("_CounterItem", covariant=True) 

315 

316 

317class ProgressBarCounter(Generic[_CounterItem]): 

318 """ 

319 An individual counter (A progress bar can have multiple counters). 

320 """ 

321 

322 def __init__( 

323 self, 

324 progress_bar: ProgressBar, 

325 data: Iterable[_CounterItem] | None = None, 

326 label: AnyFormattedText = "", 

327 remove_when_done: bool = False, 

328 total: int | None = None, 

329 ) -> None: 

330 self.start_time = datetime.datetime.now() 

331 self.stop_time: datetime.datetime | None = None 

332 self.progress_bar = progress_bar 

333 self.data = data 

334 self.items_completed = 0 

335 self.label = label 

336 self.remove_when_done = remove_when_done 

337 self._done = False 

338 self.total: int | None 

339 

340 if total is None: 

341 try: 

342 self.total = len(cast(Sized, data)) 

343 except TypeError: 

344 self.total = None # We don't know the total length. 

345 else: 

346 self.total = total 

347 

348 def __iter__(self) -> Iterator[_CounterItem]: 

349 if self.data is not None: 

350 try: 

351 for item in self.data: 

352 yield item 

353 self.item_completed() 

354 

355 # Only done if we iterate to the very end. 

356 self.done = True 

357 finally: 

358 # Ensure counter has stopped even if we did not iterate to the 

359 # end (e.g. break or exceptions). 

360 self.stopped = True 

361 else: 

362 raise NotImplementedError("No data defined to iterate over.") 

363 

364 def item_completed(self) -> None: 

365 """ 

366 Start handling the next item. 

367 

368 (Can be called manually in case we don't have a collection to loop through.) 

369 """ 

370 self.items_completed += 1 

371 self.progress_bar.invalidate() 

372 

373 @property 

374 def done(self) -> bool: 

375 """Whether a counter has been completed. 

376 

377 Done counter have been stopped (see stopped) and removed depending on 

378 remove_when_done value. 

379 

380 Contrast this with stopped. A stopped counter may be terminated before 

381 100% completion. A done counter has reached its 100% completion. 

382 """ 

383 return self._done 

384 

385 @done.setter 

386 def done(self, value: bool) -> None: 

387 self._done = value 

388 self.stopped = value 

389 

390 if value and self.remove_when_done: 

391 self.progress_bar.counters.remove(self) 

392 

393 @property 

394 def stopped(self) -> bool: 

395 """Whether a counter has been stopped. 

396 

397 Stopped counters no longer have increasing time_elapsed. This distinction is 

398 also used to prevent the Bar formatter with unknown totals from continuing to run. 

399 

400 A stopped counter (but not done) can be used to signal that a given counter has 

401 encountered an error but allows other counters to continue 

402 (e.g. download X of Y failed). Given how only done counters are removed 

403 (see remove_when_done) this can help aggregate failures from a large number of 

404 successes. 

405 

406 Contrast this with done. A done counter has reached its 100% completion. 

407 A stopped counter may be terminated before 100% completion. 

408 """ 

409 return self.stop_time is not None 

410 

411 @stopped.setter 

412 def stopped(self, value: bool) -> None: 

413 if value: 

414 # This counter has not already been stopped. 

415 if not self.stop_time: 

416 self.stop_time = datetime.datetime.now() 

417 else: 

418 # Clearing any previously set stop_time. 

419 self.stop_time = None 

420 

421 @property 

422 def percentage(self) -> float: 

423 if self.total is None: 

424 return 0 

425 else: 

426 return self.items_completed * 100 / max(self.total, 1) 

427 

428 @property 

429 def time_elapsed(self) -> datetime.timedelta: 

430 """ 

431 Return how much time has been elapsed since the start. 

432 """ 

433 if self.stop_time is None: 

434 return datetime.datetime.now() - self.start_time 

435 else: 

436 return self.stop_time - self.start_time 

437 

438 @property 

439 def time_left(self) -> datetime.timedelta | None: 

440 """ 

441 Timedelta representing the time left. 

442 """ 

443 if self.total is None or not self.percentage: 

444 return None 

445 elif self.done or self.stopped: 

446 return datetime.timedelta(0) 

447 else: 

448 return self.time_elapsed * (100 - self.percentage) / self.percentage