Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/shortcuts/progress_bar/base.py: 34%

172 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1""" 

2Progress bar implementation on top of prompt_toolkit. 

3 

4:: 

5 

6 with ProgressBar(...) as pb: 

7 for item in pb(data): 

8 ... 

9""" 

10from __future__ import annotations 

11 

12import contextvars 

13import datetime 

14import functools 

15import os 

16import signal 

17import threading 

18import traceback 

19from typing import ( 

20 Callable, 

21 Generic, 

22 Iterable, 

23 Iterator, 

24 List, 

25 Optional, 

26 Sequence, 

27 Sized, 

28 TextIO, 

29 TypeVar, 

30 cast, 

31) 

32 

33from prompt_toolkit.application import Application 

34from prompt_toolkit.application.current import get_app_session 

35from prompt_toolkit.filters import Condition, is_done, renderer_height_is_known 

36from prompt_toolkit.formatted_text import ( 

37 AnyFormattedText, 

38 StyleAndTextTuples, 

39 to_formatted_text, 

40) 

41from prompt_toolkit.input import Input 

42from prompt_toolkit.key_binding import KeyBindings 

43from prompt_toolkit.key_binding.key_processor import KeyPressEvent 

44from prompt_toolkit.layout import ( 

45 ConditionalContainer, 

46 FormattedTextControl, 

47 HSplit, 

48 Layout, 

49 VSplit, 

50 Window, 

51) 

52from prompt_toolkit.layout.controls import UIContent, UIControl 

53from prompt_toolkit.layout.dimension import AnyDimension, D 

54from prompt_toolkit.output import ColorDepth, Output 

55from prompt_toolkit.styles import BaseStyle 

56from prompt_toolkit.utils import in_main_thread 

57 

58from .formatters import Formatter, create_default_formatters 

59 

60__all__ = ["ProgressBar"] 

61 

62E = KeyPressEvent 

63 

64_SIGWINCH = getattr(signal, "SIGWINCH", None) 

65 

66 

67def create_key_bindings(cancel_callback: Callable[[], None] | None) -> KeyBindings: 

68 """ 

69 Key bindings handled by the progress bar. 

70 (The main thread is not supposed to handle any key bindings.) 

71 """ 

72 kb = KeyBindings() 

73 

74 @kb.add("c-l") 

75 def _clear(event: E) -> None: 

76 event.app.renderer.clear() 

77 

78 if cancel_callback is not None: 

79 

80 @kb.add("c-c") 

81 def _interrupt(event: E) -> None: 

82 "Kill the 'body' of the progress bar, but only if we run from the main thread." 

83 assert cancel_callback is not None 

84 cancel_callback() 

85 

86 return kb 

87 

88 

89_T = TypeVar("_T") 

90 

91 

92class ProgressBar: 

93 """ 

94 Progress bar context manager. 

95 

96 Usage :: 

97 

98 with ProgressBar(...) as pb: 

99 for item in pb(data): 

100 ... 

101 

102 :param title: Text to be displayed above the progress bars. This can be a 

103 callable or formatted text as well. 

104 :param formatters: List of :class:`.Formatter` instances. 

105 :param bottom_toolbar: Text to be displayed in the bottom toolbar. This 

106 can be a callable or formatted text. 

107 :param style: :class:`prompt_toolkit.styles.BaseStyle` instance. 

108 :param key_bindings: :class:`.KeyBindings` instance. 

109 :param cancel_callback: Callback function that's called when control-c is 

110 pressed by the user. This can be used for instance to start "proper" 

111 cancellation if the wrapped code supports it. 

112 :param file: The file object used for rendering, by default `sys.stderr` is used. 

113 

114 :param color_depth: `prompt_toolkit` `ColorDepth` instance. 

115 :param output: :class:`~prompt_toolkit.output.Output` instance. 

116 :param input: :class:`~prompt_toolkit.input.Input` instance. 

117 """ 

118 

119 def __init__( 

120 self, 

121 title: AnyFormattedText = None, 

122 formatters: Sequence[Formatter] | None = None, 

123 bottom_toolbar: AnyFormattedText = None, 

124 style: BaseStyle | None = None, 

125 key_bindings: KeyBindings | None = None, 

126 cancel_callback: Callable[[], None] | None = None, 

127 file: TextIO | None = None, 

128 color_depth: ColorDepth | None = None, 

129 output: Output | None = None, 

130 input: Input | None = None, 

131 ) -> None: 

132 self.title = title 

133 self.formatters = formatters or create_default_formatters() 

134 self.bottom_toolbar = bottom_toolbar 

135 self.counters: list[ProgressBarCounter[object]] = [] 

136 self.style = style 

137 self.key_bindings = key_bindings 

138 self.cancel_callback = cancel_callback 

139 

140 # If no `cancel_callback` was given, and we're creating the progress 

141 # bar from the main thread. Cancel by sending a `KeyboardInterrupt` to 

142 # the main thread. 

143 if self.cancel_callback is None and in_main_thread(): 

144 

145 def keyboard_interrupt_to_main_thread() -> None: 

146 os.kill(os.getpid(), signal.SIGINT) 

147 

148 self.cancel_callback = keyboard_interrupt_to_main_thread 

149 

150 # Note that we use __stderr__ as default error output, because that 

151 # works best with `patch_stdout`. 

152 self.color_depth = color_depth 

153 self.output = output or get_app_session().output 

154 self.input = input or get_app_session().input 

155 

156 self._thread: threading.Thread | None = None 

157 

158 self._has_sigwinch = False 

159 self._app_started = threading.Event() 

160 

161 def __enter__(self) -> ProgressBar: 

162 # Create UI Application. 

163 title_toolbar = ConditionalContainer( 

164 Window( 

165 FormattedTextControl(lambda: self.title), 

166 height=1, 

167 style="class:progressbar,title", 

168 ), 

169 filter=Condition(lambda: self.title is not None), 

170 ) 

171 

172 bottom_toolbar = ConditionalContainer( 

173 Window( 

174 FormattedTextControl( 

175 lambda: self.bottom_toolbar, style="class:bottom-toolbar.text" 

176 ), 

177 style="class:bottom-toolbar", 

178 height=1, 

179 ), 

180 filter=~is_done 

181 & renderer_height_is_known 

182 & Condition(lambda: self.bottom_toolbar is not None), 

183 ) 

184 

185 def width_for_formatter(formatter: Formatter) -> AnyDimension: 

186 # Needs to be passed as callable (partial) to the 'width' 

187 # parameter, because we want to call it on every resize. 

188 return formatter.get_width(progress_bar=self) 

189 

190 progress_controls = [ 

191 Window( 

192 content=_ProgressControl(self, f, self.cancel_callback), 

193 width=functools.partial(width_for_formatter, f), 

194 ) 

195 for f in self.formatters 

196 ] 

197 

198 self.app: Application[None] = Application( 

199 min_redraw_interval=0.05, 

200 layout=Layout( 

201 HSplit( 

202 [ 

203 title_toolbar, 

204 VSplit( 

205 progress_controls, 

206 height=lambda: D( 

207 preferred=len(self.counters), max=len(self.counters) 

208 ), 

209 ), 

210 Window(), 

211 bottom_toolbar, 

212 ] 

213 ) 

214 ), 

215 style=self.style, 

216 key_bindings=self.key_bindings, 

217 refresh_interval=0.3, 

218 color_depth=self.color_depth, 

219 output=self.output, 

220 input=self.input, 

221 ) 

222 

223 # Run application in different thread. 

224 def run() -> None: 

225 try: 

226 self.app.run(pre_run=self._app_started.set) 

227 except BaseException as e: 

228 traceback.print_exc() 

229 print(e) 

230 

231 ctx: contextvars.Context = contextvars.copy_context() 

232 

233 self._thread = threading.Thread(target=ctx.run, args=(run,)) 

234 self._thread.start() 

235 

236 return self 

237 

238 def __exit__(self, *a: object) -> None: 

239 # Wait for the app to be started. Make sure we don't quit earlier, 

240 # otherwise `self.app.exit` won't terminate the app because 

241 # `self.app.future` has not yet been set. 

242 self._app_started.wait() 

243 

244 # Quit UI application. 

245 if self.app.is_running and self.app.loop is not None: 

246 self.app.loop.call_soon_threadsafe(self.app.exit) 

247 

248 if self._thread is not None: 

249 self._thread.join() 

250 

251 def __call__( 

252 self, 

253 data: Iterable[_T] | None = None, 

254 label: AnyFormattedText = "", 

255 remove_when_done: bool = False, 

256 total: int | None = None, 

257 ) -> ProgressBarCounter[_T]: 

258 """ 

259 Start a new counter. 

260 

261 :param label: Title text or description for this progress. (This can be 

262 formatted text as well). 

263 :param remove_when_done: When `True`, hide this progress bar. 

264 :param total: Specify the maximum value if it can't be calculated by 

265 calling ``len``. 

266 """ 

267 counter = ProgressBarCounter( 

268 self, data, label=label, remove_when_done=remove_when_done, total=total 

269 ) 

270 self.counters.append(counter) 

271 return counter 

272 

273 def invalidate(self) -> None: 

274 self.app.invalidate() 

275 

276 

277class _ProgressControl(UIControl): 

278 """ 

279 User control for the progress bar. 

280 """ 

281 

282 def __init__( 

283 self, 

284 progress_bar: ProgressBar, 

285 formatter: Formatter, 

286 cancel_callback: Callable[[], None] | None, 

287 ) -> None: 

288 self.progress_bar = progress_bar 

289 self.formatter = formatter 

290 self._key_bindings = create_key_bindings(cancel_callback) 

291 

292 def create_content(self, width: int, height: int) -> UIContent: 

293 items: list[StyleAndTextTuples] = [] 

294 

295 for pr in self.progress_bar.counters: 

296 try: 

297 text = self.formatter.format(self.progress_bar, pr, width) 

298 except BaseException: 

299 traceback.print_exc() 

300 text = "ERROR" 

301 

302 items.append(to_formatted_text(text)) 

303 

304 def get_line(i: int) -> StyleAndTextTuples: 

305 return items[i] 

306 

307 return UIContent(get_line=get_line, line_count=len(items), show_cursor=False) 

308 

309 def is_focusable(self) -> bool: 

310 return True # Make sure that the key bindings work. 

311 

312 def get_key_bindings(self) -> KeyBindings: 

313 return self._key_bindings 

314 

315 

316_CounterItem = TypeVar("_CounterItem", covariant=True) 

317 

318 

319class ProgressBarCounter(Generic[_CounterItem]): 

320 """ 

321 An individual counter (A progress bar can have multiple counters). 

322 """ 

323 

324 def __init__( 

325 self, 

326 progress_bar: ProgressBar, 

327 data: Iterable[_CounterItem] | None = None, 

328 label: AnyFormattedText = "", 

329 remove_when_done: bool = False, 

330 total: int | None = None, 

331 ) -> None: 

332 self.start_time = datetime.datetime.now() 

333 self.stop_time: datetime.datetime | None = None 

334 self.progress_bar = progress_bar 

335 self.data = data 

336 self.items_completed = 0 

337 self.label = label 

338 self.remove_when_done = remove_when_done 

339 self._done = False 

340 self.total: int | None 

341 

342 if total is None: 

343 try: 

344 self.total = len(cast(Sized, data)) 

345 except TypeError: 

346 self.total = None # We don't know the total length. 

347 else: 

348 self.total = total 

349 

350 def __iter__(self) -> Iterator[_CounterItem]: 

351 if self.data is not None: 

352 try: 

353 for item in self.data: 

354 yield item 

355 self.item_completed() 

356 

357 # Only done if we iterate to the very end. 

358 self.done = True 

359 finally: 

360 # Ensure counter has stopped even if we did not iterate to the 

361 # end (e.g. break or exceptions). 

362 self.stopped = True 

363 else: 

364 raise NotImplementedError("No data defined to iterate over.") 

365 

366 def item_completed(self) -> None: 

367 """ 

368 Start handling the next item. 

369 

370 (Can be called manually in case we don't have a collection to loop through.) 

371 """ 

372 self.items_completed += 1 

373 self.progress_bar.invalidate() 

374 

375 @property 

376 def done(self) -> bool: 

377 """Whether a counter has been completed. 

378 

379 Done counter have been stopped (see stopped) and removed depending on 

380 remove_when_done value. 

381 

382 Contrast this with stopped. A stopped counter may be terminated before 

383 100% completion. A done counter has reached its 100% completion. 

384 """ 

385 return self._done 

386 

387 @done.setter 

388 def done(self, value: bool) -> None: 

389 self._done = value 

390 self.stopped = value 

391 

392 if value and self.remove_when_done: 

393 self.progress_bar.counters.remove(self) 

394 

395 @property 

396 def stopped(self) -> bool: 

397 """Whether a counter has been stopped. 

398 

399 Stopped counters no longer have increasing time_elapsed. This distinction is 

400 also used to prevent the Bar formatter with unknown totals from continuing to run. 

401 

402 A stopped counter (but not done) can be used to signal that a given counter has 

403 encountered an error but allows other counters to continue 

404 (e.g. download X of Y failed). Given how only done counters are removed 

405 (see remove_when_done) this can help aggregate failures from a large number of 

406 successes. 

407 

408 Contrast this with done. A done counter has reached its 100% completion. 

409 A stopped counter may be terminated before 100% completion. 

410 """ 

411 return self.stop_time is not None 

412 

413 @stopped.setter 

414 def stopped(self, value: bool) -> None: 

415 if value: 

416 # This counter has not already been stopped. 

417 if not self.stop_time: 

418 self.stop_time = datetime.datetime.now() 

419 else: 

420 # Clearing any previously set stop_time. 

421 self.stop_time = None 

422 

423 @property 

424 def percentage(self) -> float: 

425 if self.total is None: 

426 return 0 

427 else: 

428 return self.items_completed * 100 / max(self.total, 1) 

429 

430 @property 

431 def time_elapsed(self) -> datetime.timedelta: 

432 """ 

433 Return how much time has been elapsed since the start. 

434 """ 

435 if self.stop_time is None: 

436 return datetime.datetime.now() - self.start_time 

437 else: 

438 return self.stop_time - self.start_time 

439 

440 @property 

441 def time_left(self) -> datetime.timedelta | None: 

442 """ 

443 Timedelta representing the time left. 

444 """ 

445 if self.total is None or not self.percentage: 

446 return None 

447 elif self.done or self.stopped: 

448 return datetime.timedelta(0) 

449 else: 

450 return self.time_elapsed * (100 - self.percentage) / self.percentage