Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/shortcuts/progress_bar/base.py: 34%
172 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2Progress bar implementation on top of prompt_toolkit.
4::
6 with ProgressBar(...) as pb:
7 for item in pb(data):
8 ...
9"""
10from __future__ import annotations
12import contextvars
13import datetime
14import functools
15import os
16import signal
17import threading
18import traceback
19from typing import (
20 Callable,
21 Generic,
22 Iterable,
23 Iterator,
24 Sequence,
25 Sized,
26 TextIO,
27 TypeVar,
28 cast,
29)
31from prompt_toolkit.application import Application
32from prompt_toolkit.application.current import get_app_session
33from prompt_toolkit.filters import Condition, is_done, renderer_height_is_known
34from prompt_toolkit.formatted_text import (
35 AnyFormattedText,
36 StyleAndTextTuples,
37 to_formatted_text,
38)
39from prompt_toolkit.input import Input
40from prompt_toolkit.key_binding import KeyBindings
41from prompt_toolkit.key_binding.key_processor import KeyPressEvent
42from prompt_toolkit.layout import (
43 ConditionalContainer,
44 FormattedTextControl,
45 HSplit,
46 Layout,
47 VSplit,
48 Window,
49)
50from prompt_toolkit.layout.controls import UIContent, UIControl
51from prompt_toolkit.layout.dimension import AnyDimension, D
52from prompt_toolkit.output import ColorDepth, Output
53from prompt_toolkit.styles import BaseStyle
54from prompt_toolkit.utils import in_main_thread
56from .formatters import Formatter, create_default_formatters
58__all__ = ["ProgressBar"]
60E = KeyPressEvent
62_SIGWINCH = getattr(signal, "SIGWINCH", None)
65def create_key_bindings(cancel_callback: Callable[[], None] | None) -> KeyBindings:
66 """
67 Key bindings handled by the progress bar.
68 (The main thread is not supposed to handle any key bindings.)
69 """
70 kb = KeyBindings()
72 @kb.add("c-l")
73 def _clear(event: E) -> None:
74 event.app.renderer.clear()
76 if cancel_callback is not None:
78 @kb.add("c-c")
79 def _interrupt(event: E) -> None:
80 "Kill the 'body' of the progress bar, but only if we run from the main thread."
81 assert cancel_callback is not None
82 cancel_callback()
84 return kb
87_T = TypeVar("_T")
90class ProgressBar:
91 """
92 Progress bar context manager.
94 Usage ::
96 with ProgressBar(...) as pb:
97 for item in pb(data):
98 ...
100 :param title: Text to be displayed above the progress bars. This can be a
101 callable or formatted text as well.
102 :param formatters: List of :class:`.Formatter` instances.
103 :param bottom_toolbar: Text to be displayed in the bottom toolbar. This
104 can be a callable or formatted text.
105 :param style: :class:`prompt_toolkit.styles.BaseStyle` instance.
106 :param key_bindings: :class:`.KeyBindings` instance.
107 :param cancel_callback: Callback function that's called when control-c is
108 pressed by the user. This can be used for instance to start "proper"
109 cancellation if the wrapped code supports it.
110 :param file: The file object used for rendering, by default `sys.stderr` is used.
112 :param color_depth: `prompt_toolkit` `ColorDepth` instance.
113 :param output: :class:`~prompt_toolkit.output.Output` instance.
114 :param input: :class:`~prompt_toolkit.input.Input` instance.
115 """
117 def __init__(
118 self,
119 title: AnyFormattedText = None,
120 formatters: Sequence[Formatter] | None = None,
121 bottom_toolbar: AnyFormattedText = None,
122 style: BaseStyle | None = None,
123 key_bindings: KeyBindings | None = None,
124 cancel_callback: Callable[[], None] | None = None,
125 file: TextIO | None = None,
126 color_depth: ColorDepth | None = None,
127 output: Output | None = None,
128 input: Input | None = None,
129 ) -> None:
130 self.title = title
131 self.formatters = formatters or create_default_formatters()
132 self.bottom_toolbar = bottom_toolbar
133 self.counters: list[ProgressBarCounter[object]] = []
134 self.style = style
135 self.key_bindings = key_bindings
136 self.cancel_callback = cancel_callback
138 # If no `cancel_callback` was given, and we're creating the progress
139 # bar from the main thread. Cancel by sending a `KeyboardInterrupt` to
140 # the main thread.
141 if self.cancel_callback is None and in_main_thread():
143 def keyboard_interrupt_to_main_thread() -> None:
144 os.kill(os.getpid(), signal.SIGINT)
146 self.cancel_callback = keyboard_interrupt_to_main_thread
148 # Note that we use __stderr__ as default error output, because that
149 # works best with `patch_stdout`.
150 self.color_depth = color_depth
151 self.output = output or get_app_session().output
152 self.input = input or get_app_session().input
154 self._thread: threading.Thread | None = None
156 self._has_sigwinch = False
157 self._app_started = threading.Event()
159 def __enter__(self) -> ProgressBar:
160 # Create UI Application.
161 title_toolbar = ConditionalContainer(
162 Window(
163 FormattedTextControl(lambda: self.title),
164 height=1,
165 style="class:progressbar,title",
166 ),
167 filter=Condition(lambda: self.title is not None),
168 )
170 bottom_toolbar = ConditionalContainer(
171 Window(
172 FormattedTextControl(
173 lambda: self.bottom_toolbar, style="class:bottom-toolbar.text"
174 ),
175 style="class:bottom-toolbar",
176 height=1,
177 ),
178 filter=~is_done
179 & renderer_height_is_known
180 & Condition(lambda: self.bottom_toolbar is not None),
181 )
183 def width_for_formatter(formatter: Formatter) -> AnyDimension:
184 # Needs to be passed as callable (partial) to the 'width'
185 # parameter, because we want to call it on every resize.
186 return formatter.get_width(progress_bar=self)
188 progress_controls = [
189 Window(
190 content=_ProgressControl(self, f, self.cancel_callback),
191 width=functools.partial(width_for_formatter, f),
192 )
193 for f in self.formatters
194 ]
196 self.app: Application[None] = Application(
197 min_redraw_interval=0.05,
198 layout=Layout(
199 HSplit(
200 [
201 title_toolbar,
202 VSplit(
203 progress_controls,
204 height=lambda: D(
205 preferred=len(self.counters), max=len(self.counters)
206 ),
207 ),
208 Window(),
209 bottom_toolbar,
210 ]
211 )
212 ),
213 style=self.style,
214 key_bindings=self.key_bindings,
215 refresh_interval=0.3,
216 color_depth=self.color_depth,
217 output=self.output,
218 input=self.input,
219 )
221 # Run application in different thread.
222 def run() -> None:
223 try:
224 self.app.run(pre_run=self._app_started.set)
225 except BaseException as e:
226 traceback.print_exc()
227 print(e)
229 ctx: contextvars.Context = contextvars.copy_context()
231 self._thread = threading.Thread(target=ctx.run, args=(run,))
232 self._thread.start()
234 return self
236 def __exit__(self, *a: object) -> None:
237 # Wait for the app to be started. Make sure we don't quit earlier,
238 # otherwise `self.app.exit` won't terminate the app because
239 # `self.app.future` has not yet been set.
240 self._app_started.wait()
242 # Quit UI application.
243 if self.app.is_running and self.app.loop is not None:
244 self.app.loop.call_soon_threadsafe(self.app.exit)
246 if self._thread is not None:
247 self._thread.join()
249 def __call__(
250 self,
251 data: Iterable[_T] | None = None,
252 label: AnyFormattedText = "",
253 remove_when_done: bool = False,
254 total: int | None = None,
255 ) -> ProgressBarCounter[_T]:
256 """
257 Start a new counter.
259 :param label: Title text or description for this progress. (This can be
260 formatted text as well).
261 :param remove_when_done: When `True`, hide this progress bar.
262 :param total: Specify the maximum value if it can't be calculated by
263 calling ``len``.
264 """
265 counter = ProgressBarCounter(
266 self, data, label=label, remove_when_done=remove_when_done, total=total
267 )
268 self.counters.append(counter)
269 return counter
271 def invalidate(self) -> None:
272 self.app.invalidate()
275class _ProgressControl(UIControl):
276 """
277 User control for the progress bar.
278 """
280 def __init__(
281 self,
282 progress_bar: ProgressBar,
283 formatter: Formatter,
284 cancel_callback: Callable[[], None] | None,
285 ) -> None:
286 self.progress_bar = progress_bar
287 self.formatter = formatter
288 self._key_bindings = create_key_bindings(cancel_callback)
290 def create_content(self, width: int, height: int) -> UIContent:
291 items: list[StyleAndTextTuples] = []
293 for pr in self.progress_bar.counters:
294 try:
295 text = self.formatter.format(self.progress_bar, pr, width)
296 except BaseException:
297 traceback.print_exc()
298 text = "ERROR"
300 items.append(to_formatted_text(text))
302 def get_line(i: int) -> StyleAndTextTuples:
303 return items[i]
305 return UIContent(get_line=get_line, line_count=len(items), show_cursor=False)
307 def is_focusable(self) -> bool:
308 return True # Make sure that the key bindings work.
310 def get_key_bindings(self) -> KeyBindings:
311 return self._key_bindings
314_CounterItem = TypeVar("_CounterItem", covariant=True)
317class ProgressBarCounter(Generic[_CounterItem]):
318 """
319 An individual counter (A progress bar can have multiple counters).
320 """
322 def __init__(
323 self,
324 progress_bar: ProgressBar,
325 data: Iterable[_CounterItem] | None = None,
326 label: AnyFormattedText = "",
327 remove_when_done: bool = False,
328 total: int | None = None,
329 ) -> None:
330 self.start_time = datetime.datetime.now()
331 self.stop_time: datetime.datetime | None = None
332 self.progress_bar = progress_bar
333 self.data = data
334 self.items_completed = 0
335 self.label = label
336 self.remove_when_done = remove_when_done
337 self._done = False
338 self.total: int | None
340 if total is None:
341 try:
342 self.total = len(cast(Sized, data))
343 except TypeError:
344 self.total = None # We don't know the total length.
345 else:
346 self.total = total
348 def __iter__(self) -> Iterator[_CounterItem]:
349 if self.data is not None:
350 try:
351 for item in self.data:
352 yield item
353 self.item_completed()
355 # Only done if we iterate to the very end.
356 self.done = True
357 finally:
358 # Ensure counter has stopped even if we did not iterate to the
359 # end (e.g. break or exceptions).
360 self.stopped = True
361 else:
362 raise NotImplementedError("No data defined to iterate over.")
364 def item_completed(self) -> None:
365 """
366 Start handling the next item.
368 (Can be called manually in case we don't have a collection to loop through.)
369 """
370 self.items_completed += 1
371 self.progress_bar.invalidate()
373 @property
374 def done(self) -> bool:
375 """Whether a counter has been completed.
377 Done counter have been stopped (see stopped) and removed depending on
378 remove_when_done value.
380 Contrast this with stopped. A stopped counter may be terminated before
381 100% completion. A done counter has reached its 100% completion.
382 """
383 return self._done
385 @done.setter
386 def done(self, value: bool) -> None:
387 self._done = value
388 self.stopped = value
390 if value and self.remove_when_done:
391 self.progress_bar.counters.remove(self)
393 @property
394 def stopped(self) -> bool:
395 """Whether a counter has been stopped.
397 Stopped counters no longer have increasing time_elapsed. This distinction is
398 also used to prevent the Bar formatter with unknown totals from continuing to run.
400 A stopped counter (but not done) can be used to signal that a given counter has
401 encountered an error but allows other counters to continue
402 (e.g. download X of Y failed). Given how only done counters are removed
403 (see remove_when_done) this can help aggregate failures from a large number of
404 successes.
406 Contrast this with done. A done counter has reached its 100% completion.
407 A stopped counter may be terminated before 100% completion.
408 """
409 return self.stop_time is not None
411 @stopped.setter
412 def stopped(self, value: bool) -> None:
413 if value:
414 # This counter has not already been stopped.
415 if not self.stop_time:
416 self.stop_time = datetime.datetime.now()
417 else:
418 # Clearing any previously set stop_time.
419 self.stop_time = None
421 @property
422 def percentage(self) -> float:
423 if self.total is None:
424 return 0
425 else:
426 return self.items_completed * 100 / max(self.total, 1)
428 @property
429 def time_elapsed(self) -> datetime.timedelta:
430 """
431 Return how much time has been elapsed since the start.
432 """
433 if self.stop_time is None:
434 return datetime.datetime.now() - self.start_time
435 else:
436 return self.stop_time - self.start_time
438 @property
439 def time_left(self) -> datetime.timedelta | None:
440 """
441 Timedelta representing the time left.
442 """
443 if self.total is None or not self.percentage:
444 return None
445 elif self.done or self.stopped:
446 return datetime.timedelta(0)
447 else:
448 return self.time_elapsed * (100 - self.percentage) / self.percentage