Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/shortcuts/progress_bar/base.py: 34%
172 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1"""
2Progress bar implementation on top of prompt_toolkit.
4::
6 with ProgressBar(...) as pb:
7 for item in pb(data):
8 ...
9"""
10from __future__ import annotations
12import contextvars
13import datetime
14import functools
15import os
16import signal
17import threading
18import traceback
19from typing import (
20 Callable,
21 Generic,
22 Iterable,
23 Iterator,
24 List,
25 Optional,
26 Sequence,
27 Sized,
28 TextIO,
29 TypeVar,
30 cast,
31)
33from prompt_toolkit.application import Application
34from prompt_toolkit.application.current import get_app_session
35from prompt_toolkit.filters import Condition, is_done, renderer_height_is_known
36from prompt_toolkit.formatted_text import (
37 AnyFormattedText,
38 StyleAndTextTuples,
39 to_formatted_text,
40)
41from prompt_toolkit.input import Input
42from prompt_toolkit.key_binding import KeyBindings
43from prompt_toolkit.key_binding.key_processor import KeyPressEvent
44from prompt_toolkit.layout import (
45 ConditionalContainer,
46 FormattedTextControl,
47 HSplit,
48 Layout,
49 VSplit,
50 Window,
51)
52from prompt_toolkit.layout.controls import UIContent, UIControl
53from prompt_toolkit.layout.dimension import AnyDimension, D
54from prompt_toolkit.output import ColorDepth, Output
55from prompt_toolkit.styles import BaseStyle
56from prompt_toolkit.utils import in_main_thread
58from .formatters import Formatter, create_default_formatters
60__all__ = ["ProgressBar"]
62E = KeyPressEvent
64_SIGWINCH = getattr(signal, "SIGWINCH", None)
67def create_key_bindings(cancel_callback: Callable[[], None] | None) -> KeyBindings:
68 """
69 Key bindings handled by the progress bar.
70 (The main thread is not supposed to handle any key bindings.)
71 """
72 kb = KeyBindings()
74 @kb.add("c-l")
75 def _clear(event: E) -> None:
76 event.app.renderer.clear()
78 if cancel_callback is not None:
80 @kb.add("c-c")
81 def _interrupt(event: E) -> None:
82 "Kill the 'body' of the progress bar, but only if we run from the main thread."
83 assert cancel_callback is not None
84 cancel_callback()
86 return kb
89_T = TypeVar("_T")
92class ProgressBar:
93 """
94 Progress bar context manager.
96 Usage ::
98 with ProgressBar(...) as pb:
99 for item in pb(data):
100 ...
102 :param title: Text to be displayed above the progress bars. This can be a
103 callable or formatted text as well.
104 :param formatters: List of :class:`.Formatter` instances.
105 :param bottom_toolbar: Text to be displayed in the bottom toolbar. This
106 can be a callable or formatted text.
107 :param style: :class:`prompt_toolkit.styles.BaseStyle` instance.
108 :param key_bindings: :class:`.KeyBindings` instance.
109 :param cancel_callback: Callback function that's called when control-c is
110 pressed by the user. This can be used for instance to start "proper"
111 cancellation if the wrapped code supports it.
112 :param file: The file object used for rendering, by default `sys.stderr` is used.
114 :param color_depth: `prompt_toolkit` `ColorDepth` instance.
115 :param output: :class:`~prompt_toolkit.output.Output` instance.
116 :param input: :class:`~prompt_toolkit.input.Input` instance.
117 """
119 def __init__(
120 self,
121 title: AnyFormattedText = None,
122 formatters: Sequence[Formatter] | None = None,
123 bottom_toolbar: AnyFormattedText = None,
124 style: BaseStyle | None = None,
125 key_bindings: KeyBindings | None = None,
126 cancel_callback: Callable[[], None] | None = None,
127 file: TextIO | None = None,
128 color_depth: ColorDepth | None = None,
129 output: Output | None = None,
130 input: Input | None = None,
131 ) -> None:
132 self.title = title
133 self.formatters = formatters or create_default_formatters()
134 self.bottom_toolbar = bottom_toolbar
135 self.counters: list[ProgressBarCounter[object]] = []
136 self.style = style
137 self.key_bindings = key_bindings
138 self.cancel_callback = cancel_callback
140 # If no `cancel_callback` was given, and we're creating the progress
141 # bar from the main thread. Cancel by sending a `KeyboardInterrupt` to
142 # the main thread.
143 if self.cancel_callback is None and in_main_thread():
145 def keyboard_interrupt_to_main_thread() -> None:
146 os.kill(os.getpid(), signal.SIGINT)
148 self.cancel_callback = keyboard_interrupt_to_main_thread
150 # Note that we use __stderr__ as default error output, because that
151 # works best with `patch_stdout`.
152 self.color_depth = color_depth
153 self.output = output or get_app_session().output
154 self.input = input or get_app_session().input
156 self._thread: threading.Thread | None = None
158 self._has_sigwinch = False
159 self._app_started = threading.Event()
161 def __enter__(self) -> ProgressBar:
162 # Create UI Application.
163 title_toolbar = ConditionalContainer(
164 Window(
165 FormattedTextControl(lambda: self.title),
166 height=1,
167 style="class:progressbar,title",
168 ),
169 filter=Condition(lambda: self.title is not None),
170 )
172 bottom_toolbar = ConditionalContainer(
173 Window(
174 FormattedTextControl(
175 lambda: self.bottom_toolbar, style="class:bottom-toolbar.text"
176 ),
177 style="class:bottom-toolbar",
178 height=1,
179 ),
180 filter=~is_done
181 & renderer_height_is_known
182 & Condition(lambda: self.bottom_toolbar is not None),
183 )
185 def width_for_formatter(formatter: Formatter) -> AnyDimension:
186 # Needs to be passed as callable (partial) to the 'width'
187 # parameter, because we want to call it on every resize.
188 return formatter.get_width(progress_bar=self)
190 progress_controls = [
191 Window(
192 content=_ProgressControl(self, f, self.cancel_callback),
193 width=functools.partial(width_for_formatter, f),
194 )
195 for f in self.formatters
196 ]
198 self.app: Application[None] = Application(
199 min_redraw_interval=0.05,
200 layout=Layout(
201 HSplit(
202 [
203 title_toolbar,
204 VSplit(
205 progress_controls,
206 height=lambda: D(
207 preferred=len(self.counters), max=len(self.counters)
208 ),
209 ),
210 Window(),
211 bottom_toolbar,
212 ]
213 )
214 ),
215 style=self.style,
216 key_bindings=self.key_bindings,
217 refresh_interval=0.3,
218 color_depth=self.color_depth,
219 output=self.output,
220 input=self.input,
221 )
223 # Run application in different thread.
224 def run() -> None:
225 try:
226 self.app.run(pre_run=self._app_started.set)
227 except BaseException as e:
228 traceback.print_exc()
229 print(e)
231 ctx: contextvars.Context = contextvars.copy_context()
233 self._thread = threading.Thread(target=ctx.run, args=(run,))
234 self._thread.start()
236 return self
238 def __exit__(self, *a: object) -> None:
239 # Wait for the app to be started. Make sure we don't quit earlier,
240 # otherwise `self.app.exit` won't terminate the app because
241 # `self.app.future` has not yet been set.
242 self._app_started.wait()
244 # Quit UI application.
245 if self.app.is_running and self.app.loop is not None:
246 self.app.loop.call_soon_threadsafe(self.app.exit)
248 if self._thread is not None:
249 self._thread.join()
251 def __call__(
252 self,
253 data: Iterable[_T] | None = None,
254 label: AnyFormattedText = "",
255 remove_when_done: bool = False,
256 total: int | None = None,
257 ) -> ProgressBarCounter[_T]:
258 """
259 Start a new counter.
261 :param label: Title text or description for this progress. (This can be
262 formatted text as well).
263 :param remove_when_done: When `True`, hide this progress bar.
264 :param total: Specify the maximum value if it can't be calculated by
265 calling ``len``.
266 """
267 counter = ProgressBarCounter(
268 self, data, label=label, remove_when_done=remove_when_done, total=total
269 )
270 self.counters.append(counter)
271 return counter
273 def invalidate(self) -> None:
274 self.app.invalidate()
277class _ProgressControl(UIControl):
278 """
279 User control for the progress bar.
280 """
282 def __init__(
283 self,
284 progress_bar: ProgressBar,
285 formatter: Formatter,
286 cancel_callback: Callable[[], None] | None,
287 ) -> None:
288 self.progress_bar = progress_bar
289 self.formatter = formatter
290 self._key_bindings = create_key_bindings(cancel_callback)
292 def create_content(self, width: int, height: int) -> UIContent:
293 items: list[StyleAndTextTuples] = []
295 for pr in self.progress_bar.counters:
296 try:
297 text = self.formatter.format(self.progress_bar, pr, width)
298 except BaseException:
299 traceback.print_exc()
300 text = "ERROR"
302 items.append(to_formatted_text(text))
304 def get_line(i: int) -> StyleAndTextTuples:
305 return items[i]
307 return UIContent(get_line=get_line, line_count=len(items), show_cursor=False)
309 def is_focusable(self) -> bool:
310 return True # Make sure that the key bindings work.
312 def get_key_bindings(self) -> KeyBindings:
313 return self._key_bindings
316_CounterItem = TypeVar("_CounterItem", covariant=True)
319class ProgressBarCounter(Generic[_CounterItem]):
320 """
321 An individual counter (A progress bar can have multiple counters).
322 """
324 def __init__(
325 self,
326 progress_bar: ProgressBar,
327 data: Iterable[_CounterItem] | None = None,
328 label: AnyFormattedText = "",
329 remove_when_done: bool = False,
330 total: int | None = None,
331 ) -> None:
332 self.start_time = datetime.datetime.now()
333 self.stop_time: datetime.datetime | None = None
334 self.progress_bar = progress_bar
335 self.data = data
336 self.items_completed = 0
337 self.label = label
338 self.remove_when_done = remove_when_done
339 self._done = False
340 self.total: int | None
342 if total is None:
343 try:
344 self.total = len(cast(Sized, data))
345 except TypeError:
346 self.total = None # We don't know the total length.
347 else:
348 self.total = total
350 def __iter__(self) -> Iterator[_CounterItem]:
351 if self.data is not None:
352 try:
353 for item in self.data:
354 yield item
355 self.item_completed()
357 # Only done if we iterate to the very end.
358 self.done = True
359 finally:
360 # Ensure counter has stopped even if we did not iterate to the
361 # end (e.g. break or exceptions).
362 self.stopped = True
363 else:
364 raise NotImplementedError("No data defined to iterate over.")
366 def item_completed(self) -> None:
367 """
368 Start handling the next item.
370 (Can be called manually in case we don't have a collection to loop through.)
371 """
372 self.items_completed += 1
373 self.progress_bar.invalidate()
375 @property
376 def done(self) -> bool:
377 """Whether a counter has been completed.
379 Done counter have been stopped (see stopped) and removed depending on
380 remove_when_done value.
382 Contrast this with stopped. A stopped counter may be terminated before
383 100% completion. A done counter has reached its 100% completion.
384 """
385 return self._done
387 @done.setter
388 def done(self, value: bool) -> None:
389 self._done = value
390 self.stopped = value
392 if value and self.remove_when_done:
393 self.progress_bar.counters.remove(self)
395 @property
396 def stopped(self) -> bool:
397 """Whether a counter has been stopped.
399 Stopped counters no longer have increasing time_elapsed. This distinction is
400 also used to prevent the Bar formatter with unknown totals from continuing to run.
402 A stopped counter (but not done) can be used to signal that a given counter has
403 encountered an error but allows other counters to continue
404 (e.g. download X of Y failed). Given how only done counters are removed
405 (see remove_when_done) this can help aggregate failures from a large number of
406 successes.
408 Contrast this with done. A done counter has reached its 100% completion.
409 A stopped counter may be terminated before 100% completion.
410 """
411 return self.stop_time is not None
413 @stopped.setter
414 def stopped(self, value: bool) -> None:
415 if value:
416 # This counter has not already been stopped.
417 if not self.stop_time:
418 self.stop_time = datetime.datetime.now()
419 else:
420 # Clearing any previously set stop_time.
421 self.stop_time = None
423 @property
424 def percentage(self) -> float:
425 if self.total is None:
426 return 0
427 else:
428 return self.items_completed * 100 / max(self.total, 1)
430 @property
431 def time_elapsed(self) -> datetime.timedelta:
432 """
433 Return how much time has been elapsed since the start.
434 """
435 if self.stop_time is None:
436 return datetime.datetime.now() - self.start_time
437 else:
438 return self.stop_time - self.start_time
440 @property
441 def time_left(self) -> datetime.timedelta | None:
442 """
443 Timedelta representing the time left.
444 """
445 if self.total is None or not self.percentage:
446 return None
447 elif self.done or self.stopped:
448 return datetime.timedelta(0)
449 else:
450 return self.time_elapsed * (100 - self.percentage) / self.percentage