Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rich/progress.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import io
2import sys
3import typing
4import warnings
5from abc import ABC, abstractmethod
6from collections import deque
7from dataclasses import dataclass, field
8from datetime import timedelta
9from io import RawIOBase, UnsupportedOperation
10from math import ceil
11from mmap import mmap
12from operator import length_hint
13from os import PathLike, stat
14from threading import Event, RLock, Thread
15from types import TracebackType
16from typing import (
17 Any,
18 BinaryIO,
19 Callable,
20 ContextManager,
21 Deque,
22 Dict,
23 Generic,
24 Iterable,
25 List,
26 NamedTuple,
27 NewType,
28 Optional,
29 Sequence,
30 TextIO,
31 Tuple,
32 Type,
33 TypeVar,
34 Union,
35)
37if sys.version_info >= (3, 8):
38 from typing import Literal
39else:
40 from typing_extensions import Literal # pragma: no cover
42if sys.version_info >= (3, 11):
43 from typing import Self
44else:
45 from typing_extensions import Self # pragma: no cover
47from . import filesize, get_console
48from .console import Console, Group, JustifyMethod, RenderableType
49from .highlighter import Highlighter
50from .jupyter import JupyterMixin
51from .live import Live
52from .progress_bar import ProgressBar
53from .spinner import Spinner
54from .style import StyleType
55from .table import Column, Table
56from .text import Text, TextType
58TaskID = NewType("TaskID", int)
60ProgressType = TypeVar("ProgressType")
62GetTimeCallable = Callable[[], float]
65_I = typing.TypeVar("_I", TextIO, BinaryIO)
68class _TrackThread(Thread):
69 """A thread to periodically update progress."""
71 def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float):
72 self.progress = progress
73 self.task_id = task_id
74 self.update_period = update_period
75 self.done = Event()
77 self.completed = 0
78 super().__init__(daemon=True)
80 def run(self) -> None:
81 task_id = self.task_id
82 advance = self.progress.advance
83 update_period = self.update_period
84 last_completed = 0
85 wait = self.done.wait
86 while not wait(update_period) and self.progress.live.is_started:
87 completed = self.completed
88 if last_completed != completed:
89 advance(task_id, completed - last_completed)
90 last_completed = completed
92 self.progress.update(self.task_id, completed=self.completed, refresh=True)
94 def __enter__(self) -> "_TrackThread":
95 self.start()
96 return self
98 def __exit__(
99 self,
100 exc_type: Optional[Type[BaseException]],
101 exc_val: Optional[BaseException],
102 exc_tb: Optional[TracebackType],
103 ) -> None:
104 self.done.set()
105 self.join()
108def track(
109 sequence: Union[Sequence[ProgressType], Iterable[ProgressType]],
110 description: str = "Working...",
111 total: Optional[float] = None,
112 completed: int = 0,
113 auto_refresh: bool = True,
114 console: Optional[Console] = None,
115 transient: bool = False,
116 get_time: Optional[Callable[[], float]] = None,
117 refresh_per_second: float = 10,
118 style: StyleType = "bar.back",
119 complete_style: StyleType = "bar.complete",
120 finished_style: StyleType = "bar.finished",
121 pulse_style: StyleType = "bar.pulse",
122 update_period: float = 0.1,
123 disable: bool = False,
124 show_speed: bool = True,
125) -> Iterable[ProgressType]:
126 """Track progress by iterating over a sequence.
128 Args:
129 sequence (Iterable[ProgressType]): A sequence (must support "len") you wish to iterate over.
130 description (str, optional): Description of task show next to progress bar. Defaults to "Working".
131 total: (float, optional): Total number of steps. Default is len(sequence).
132 completed (int, optional): Number of steps completed so far. Defaults to 0.
133 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
134 transient: (bool, optional): Clear the progress on exit. Defaults to False.
135 console (Console, optional): Console to write to. Default creates internal Console instance.
136 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
137 style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
138 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
139 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
140 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
141 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1.
142 disable (bool, optional): Disable display of progress.
143 show_speed (bool, optional): Show speed if total isn't known. Defaults to True.
144 Returns:
145 Iterable[ProgressType]: An iterable of the values in the sequence.
147 """
149 columns: List["ProgressColumn"] = (
150 [TextColumn("[progress.description]{task.description}")] if description else []
151 )
152 columns.extend(
153 (
154 BarColumn(
155 style=style,
156 complete_style=complete_style,
157 finished_style=finished_style,
158 pulse_style=pulse_style,
159 ),
160 TaskProgressColumn(show_speed=show_speed),
161 TimeRemainingColumn(elapsed_when_finished=True),
162 )
163 )
164 progress = Progress(
165 *columns,
166 auto_refresh=auto_refresh,
167 console=console,
168 transient=transient,
169 get_time=get_time,
170 refresh_per_second=refresh_per_second or 10,
171 disable=disable,
172 )
174 with progress:
175 yield from progress.track(
176 sequence,
177 total=total,
178 completed=completed,
179 description=description,
180 update_period=update_period,
181 )
184class _Reader(RawIOBase, BinaryIO):
185 """A reader that tracks progress while it's being read from."""
187 def __init__(
188 self,
189 handle: BinaryIO,
190 progress: "Progress",
191 task: TaskID,
192 close_handle: bool = True,
193 ) -> None:
194 self.handle = handle
195 self.progress = progress
196 self.task = task
197 self.close_handle = close_handle
198 self._closed = False
200 def __enter__(self) -> "_Reader":
201 self.handle.__enter__()
202 return self
204 def __exit__(
205 self,
206 exc_type: Optional[Type[BaseException]],
207 exc_val: Optional[BaseException],
208 exc_tb: Optional[TracebackType],
209 ) -> None:
210 self.close()
212 def __iter__(self) -> BinaryIO:
213 return self
215 def __next__(self) -> bytes:
216 line = next(self.handle)
217 self.progress.advance(self.task, advance=len(line))
218 return line
220 @property
221 def closed(self) -> bool:
222 return self._closed
224 def fileno(self) -> int:
225 return self.handle.fileno()
227 def isatty(self) -> bool:
228 return self.handle.isatty()
230 @property
231 def mode(self) -> str:
232 return self.handle.mode
234 @property
235 def name(self) -> str:
236 return self.handle.name
238 def readable(self) -> bool:
239 return self.handle.readable()
241 def seekable(self) -> bool:
242 return self.handle.seekable()
244 def writable(self) -> bool:
245 return False
247 def read(self, size: int = -1) -> bytes:
248 block = self.handle.read(size)
249 self.progress.advance(self.task, advance=len(block))
250 return block
252 def readinto(self, b: Union[bytearray, memoryview, mmap]): # type: ignore[no-untyped-def, override]
253 n = self.handle.readinto(b) # type: ignore[attr-defined]
254 self.progress.advance(self.task, advance=n)
255 return n
257 def readline(self, size: int = -1) -> bytes: # type: ignore[override]
258 line = self.handle.readline(size)
259 self.progress.advance(self.task, advance=len(line))
260 return line
262 def readlines(self, hint: int = -1) -> List[bytes]:
263 lines = self.handle.readlines(hint)
264 self.progress.advance(self.task, advance=sum(map(len, lines)))
265 return lines
267 def close(self) -> None:
268 if self.close_handle:
269 self.handle.close()
270 self._closed = True
272 def seek(self, offset: int, whence: int = 0) -> int:
273 pos = self.handle.seek(offset, whence)
274 self.progress.update(self.task, completed=pos)
275 return pos
277 def tell(self) -> int:
278 return self.handle.tell()
280 def write(self, s: Any) -> int:
281 raise UnsupportedOperation("write")
283 def writelines(self, lines: Iterable[Any]) -> None:
284 raise UnsupportedOperation("writelines")
287class _ReadContext(ContextManager[_I], Generic[_I]):
288 """A utility class to handle a context for both a reader and a progress."""
290 def __init__(self, progress: "Progress", reader: _I) -> None:
291 self.progress = progress
292 self.reader: _I = reader
294 def __enter__(self) -> _I:
295 self.progress.start()
296 return self.reader.__enter__()
298 def __exit__(
299 self,
300 exc_type: Optional[Type[BaseException]],
301 exc_val: Optional[BaseException],
302 exc_tb: Optional[TracebackType],
303 ) -> None:
304 self.progress.stop()
305 self.reader.__exit__(exc_type, exc_val, exc_tb)
308def wrap_file(
309 file: BinaryIO,
310 total: int,
311 *,
312 description: str = "Reading...",
313 auto_refresh: bool = True,
314 console: Optional[Console] = None,
315 transient: bool = False,
316 get_time: Optional[Callable[[], float]] = None,
317 refresh_per_second: float = 10,
318 style: StyleType = "bar.back",
319 complete_style: StyleType = "bar.complete",
320 finished_style: StyleType = "bar.finished",
321 pulse_style: StyleType = "bar.pulse",
322 disable: bool = False,
323) -> ContextManager[BinaryIO]:
324 """Read bytes from a file while tracking progress.
326 Args:
327 file (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode.
328 total (int): Total number of bytes to read.
329 description (str, optional): Description of task show next to progress bar. Defaults to "Reading".
330 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
331 transient: (bool, optional): Clear the progress on exit. Defaults to False.
332 console (Console, optional): Console to write to. Default creates internal Console instance.
333 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
334 style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
335 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
336 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
337 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
338 disable (bool, optional): Disable display of progress.
339 Returns:
340 ContextManager[BinaryIO]: A context manager yielding a progress reader.
342 """
344 columns: List["ProgressColumn"] = (
345 [TextColumn("[progress.description]{task.description}")] if description else []
346 )
347 columns.extend(
348 (
349 BarColumn(
350 style=style,
351 complete_style=complete_style,
352 finished_style=finished_style,
353 pulse_style=pulse_style,
354 ),
355 DownloadColumn(),
356 TimeRemainingColumn(),
357 )
358 )
359 progress = Progress(
360 *columns,
361 auto_refresh=auto_refresh,
362 console=console,
363 transient=transient,
364 get_time=get_time,
365 refresh_per_second=refresh_per_second or 10,
366 disable=disable,
367 )
369 reader = progress.wrap_file(file, total=total, description=description)
370 return _ReadContext(progress, reader)
373@typing.overload
374def open(
375 file: Union[str, "PathLike[str]", bytes],
376 mode: Union[Literal["rt"], Literal["r"]],
377 buffering: int = -1,
378 encoding: Optional[str] = None,
379 errors: Optional[str] = None,
380 newline: Optional[str] = None,
381 *,
382 total: Optional[int] = None,
383 description: str = "Reading...",
384 auto_refresh: bool = True,
385 console: Optional[Console] = None,
386 transient: bool = False,
387 get_time: Optional[Callable[[], float]] = None,
388 refresh_per_second: float = 10,
389 style: StyleType = "bar.back",
390 complete_style: StyleType = "bar.complete",
391 finished_style: StyleType = "bar.finished",
392 pulse_style: StyleType = "bar.pulse",
393 disable: bool = False,
394) -> ContextManager[TextIO]:
395 pass
398@typing.overload
399def open(
400 file: Union[str, "PathLike[str]", bytes],
401 mode: Literal["rb"],
402 buffering: int = -1,
403 encoding: Optional[str] = None,
404 errors: Optional[str] = None,
405 newline: Optional[str] = None,
406 *,
407 total: Optional[int] = None,
408 description: str = "Reading...",
409 auto_refresh: bool = True,
410 console: Optional[Console] = None,
411 transient: bool = False,
412 get_time: Optional[Callable[[], float]] = None,
413 refresh_per_second: float = 10,
414 style: StyleType = "bar.back",
415 complete_style: StyleType = "bar.complete",
416 finished_style: StyleType = "bar.finished",
417 pulse_style: StyleType = "bar.pulse",
418 disable: bool = False,
419) -> ContextManager[BinaryIO]:
420 pass
423def open(
424 file: Union[str, "PathLike[str]", bytes],
425 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
426 buffering: int = -1,
427 encoding: Optional[str] = None,
428 errors: Optional[str] = None,
429 newline: Optional[str] = None,
430 *,
431 total: Optional[int] = None,
432 description: str = "Reading...",
433 auto_refresh: bool = True,
434 console: Optional[Console] = None,
435 transient: bool = False,
436 get_time: Optional[Callable[[], float]] = None,
437 refresh_per_second: float = 10,
438 style: StyleType = "bar.back",
439 complete_style: StyleType = "bar.complete",
440 finished_style: StyleType = "bar.finished",
441 pulse_style: StyleType = "bar.pulse",
442 disable: bool = False,
443) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]:
444 """Read bytes from a file while tracking progress.
446 Args:
447 path (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode.
448 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt".
449 buffering (int): The buffering strategy to use, see :func:`io.open`.
450 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`.
451 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`.
452 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`
453 total: (int, optional): Total number of bytes to read. Must be provided if reading from a file handle. Default for a path is os.stat(file).st_size.
454 description (str, optional): Description of task show next to progress bar. Defaults to "Reading".
455 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
456 transient: (bool, optional): Clear the progress on exit. Defaults to False.
457 console (Console, optional): Console to write to. Default creates internal Console instance.
458 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
459 style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
460 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
461 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
462 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
463 disable (bool, optional): Disable display of progress.
464 encoding (str, optional): The encoding to use when reading in text mode.
466 Returns:
467 ContextManager[BinaryIO]: A context manager yielding a progress reader.
469 """
471 columns: List["ProgressColumn"] = (
472 [TextColumn("[progress.description]{task.description}")] if description else []
473 )
474 columns.extend(
475 (
476 BarColumn(
477 style=style,
478 complete_style=complete_style,
479 finished_style=finished_style,
480 pulse_style=pulse_style,
481 ),
482 DownloadColumn(),
483 TimeRemainingColumn(),
484 )
485 )
486 progress = Progress(
487 *columns,
488 auto_refresh=auto_refresh,
489 console=console,
490 transient=transient,
491 get_time=get_time,
492 refresh_per_second=refresh_per_second or 10,
493 disable=disable,
494 )
496 reader = progress.open(
497 file,
498 mode=mode,
499 buffering=buffering,
500 encoding=encoding,
501 errors=errors,
502 newline=newline,
503 total=total,
504 description=description,
505 )
506 return _ReadContext(progress, reader) # type: ignore[return-value, type-var]
509class ProgressColumn(ABC):
510 """Base class for a widget to use in progress display."""
512 max_refresh: Optional[float] = None
514 def __init__(self, table_column: Optional[Column] = None) -> None:
515 self._table_column = table_column
516 self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {}
517 self._update_time: Optional[float] = None
519 def get_table_column(self) -> Column:
520 """Get a table column, used to build tasks table."""
521 return self._table_column or Column()
523 def __call__(self, task: "Task") -> RenderableType:
524 """Called by the Progress object to return a renderable for the given task.
526 Args:
527 task (Task): An object containing information regarding the task.
529 Returns:
530 RenderableType: Anything renderable (including str).
531 """
532 current_time = task.get_time()
533 if self.max_refresh is not None and not task.completed:
534 try:
535 timestamp, renderable = self._renderable_cache[task.id]
536 except KeyError:
537 pass
538 else:
539 if timestamp + self.max_refresh > current_time:
540 return renderable
542 renderable = self.render(task)
543 self._renderable_cache[task.id] = (current_time, renderable)
544 return renderable
546 @abstractmethod
547 def render(self, task: "Task") -> RenderableType:
548 """Should return a renderable object."""
551class RenderableColumn(ProgressColumn):
552 """A column to insert an arbitrary column.
554 Args:
555 renderable (RenderableType, optional): Any renderable. Defaults to empty string.
556 """
558 def __init__(
559 self, renderable: RenderableType = "", *, table_column: Optional[Column] = None
560 ):
561 self.renderable = renderable
562 super().__init__(table_column=table_column)
564 def render(self, task: "Task") -> RenderableType:
565 return self.renderable
568class SpinnerColumn(ProgressColumn):
569 """A column with a 'spinner' animation.
571 Args:
572 spinner_name (str, optional): Name of spinner animation. Defaults to "dots".
573 style (StyleType, optional): Style of spinner. Defaults to "progress.spinner".
574 speed (float, optional): Speed factor of spinner. Defaults to 1.0.
575 finished_text (TextType, optional): Text used when task is finished. Defaults to " ".
576 """
578 def __init__(
579 self,
580 spinner_name: str = "dots",
581 style: Optional[StyleType] = "progress.spinner",
582 speed: float = 1.0,
583 finished_text: TextType = " ",
584 table_column: Optional[Column] = None,
585 ):
586 self.spinner = Spinner(spinner_name, style=style, speed=speed)
587 self.finished_text = (
588 Text.from_markup(finished_text)
589 if isinstance(finished_text, str)
590 else finished_text
591 )
592 super().__init__(table_column=table_column)
594 def set_spinner(
595 self,
596 spinner_name: str,
597 spinner_style: Optional[StyleType] = "progress.spinner",
598 speed: float = 1.0,
599 ) -> None:
600 """Set a new spinner.
602 Args:
603 spinner_name (str): Spinner name, see python -m rich.spinner.
604 spinner_style (Optional[StyleType], optional): Spinner style. Defaults to "progress.spinner".
605 speed (float, optional): Speed factor of spinner. Defaults to 1.0.
606 """
607 self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed)
609 def render(self, task: "Task") -> RenderableType:
610 text = (
611 self.finished_text
612 if task.finished
613 else self.spinner.render(task.get_time())
614 )
615 return text
618class TextColumn(ProgressColumn):
619 """A column containing text."""
621 def __init__(
622 self,
623 text_format: str,
624 style: StyleType = "none",
625 justify: JustifyMethod = "left",
626 markup: bool = True,
627 highlighter: Optional[Highlighter] = None,
628 table_column: Optional[Column] = None,
629 ) -> None:
630 self.text_format = text_format
631 self.justify: JustifyMethod = justify
632 self.style = style
633 self.markup = markup
634 self.highlighter = highlighter
635 super().__init__(table_column=table_column or Column(no_wrap=True))
637 def render(self, task: "Task") -> Text:
638 _text = self.text_format.format(task=task)
639 if self.markup:
640 text = Text.from_markup(_text, style=self.style, justify=self.justify)
641 else:
642 text = Text(_text, style=self.style, justify=self.justify)
643 if self.highlighter:
644 self.highlighter.highlight(text)
645 return text
648class BarColumn(ProgressColumn):
649 """Renders a visual progress bar.
651 Args:
652 bar_width (Optional[int], optional): Width of bar or None for full width. Defaults to 40.
653 style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
654 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
655 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
656 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
657 """
659 def __init__(
660 self,
661 bar_width: Optional[int] = 40,
662 style: StyleType = "bar.back",
663 complete_style: StyleType = "bar.complete",
664 finished_style: StyleType = "bar.finished",
665 pulse_style: StyleType = "bar.pulse",
666 table_column: Optional[Column] = None,
667 ) -> None:
668 self.bar_width = bar_width
669 self.style = style
670 self.complete_style = complete_style
671 self.finished_style = finished_style
672 self.pulse_style = pulse_style
673 super().__init__(table_column=table_column)
675 def render(self, task: "Task") -> ProgressBar:
676 """Gets a progress bar widget for a task."""
677 return ProgressBar(
678 total=max(0, task.total) if task.total is not None else None,
679 completed=max(0, task.completed),
680 width=None if self.bar_width is None else max(1, self.bar_width),
681 pulse=not task.started,
682 animation_time=task.get_time(),
683 style=self.style,
684 complete_style=self.complete_style,
685 finished_style=self.finished_style,
686 pulse_style=self.pulse_style,
687 )
690class TimeElapsedColumn(ProgressColumn):
691 """Renders time elapsed."""
693 def render(self, task: "Task") -> Text:
694 """Show time elapsed."""
695 elapsed = task.finished_time if task.finished else task.elapsed
696 if elapsed is None:
697 return Text("-:--:--", style="progress.elapsed")
698 delta = timedelta(seconds=max(0, int(elapsed)))
699 return Text(str(delta), style="progress.elapsed")
702class TaskProgressColumn(TextColumn):
703 """Show task progress as a percentage.
705 Args:
706 text_format (str, optional): Format for percentage display. Defaults to "[progress.percentage]{task.percentage:>3.0f}%".
707 text_format_no_percentage (str, optional): Format if percentage is unknown. Defaults to "".
708 style (StyleType, optional): Style of output. Defaults to "none".
709 justify (JustifyMethod, optional): Text justification. Defaults to "left".
710 markup (bool, optional): Enable markup. Defaults to True.
711 highlighter (Optional[Highlighter], optional): Highlighter to apply to output. Defaults to None.
712 table_column (Optional[Column], optional): Table Column to use. Defaults to None.
713 show_speed (bool, optional): Show speed if total is unknown. Defaults to False.
714 """
716 def __init__(
717 self,
718 text_format: str = "[progress.percentage]{task.percentage:>3.0f}%",
719 text_format_no_percentage: str = "",
720 style: StyleType = "none",
721 justify: JustifyMethod = "left",
722 markup: bool = True,
723 highlighter: Optional[Highlighter] = None,
724 table_column: Optional[Column] = None,
725 show_speed: bool = False,
726 ) -> None:
727 self.text_format_no_percentage = text_format_no_percentage
728 self.show_speed = show_speed
729 super().__init__(
730 text_format=text_format,
731 style=style,
732 justify=justify,
733 markup=markup,
734 highlighter=highlighter,
735 table_column=table_column,
736 )
738 @classmethod
739 def render_speed(cls, speed: Optional[float]) -> Text:
740 """Render the speed in iterations per second.
742 Args:
743 task (Task): A Task object.
745 Returns:
746 Text: Text object containing the task speed.
747 """
748 if speed is None:
749 return Text("", style="progress.percentage")
750 unit, suffix = filesize.pick_unit_and_suffix(
751 int(speed),
752 ["", "×10³", "×10⁶", "×10⁹", "×10¹²"],
753 1000,
754 )
755 data_speed = speed / unit
756 return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage")
758 def render(self, task: "Task") -> Text:
759 if task.total is None and self.show_speed:
760 return self.render_speed(task.finished_speed or task.speed)
761 text_format = (
762 self.text_format_no_percentage if task.total is None else self.text_format
763 )
764 _text = text_format.format(task=task)
765 if self.markup:
766 text = Text.from_markup(_text, style=self.style, justify=self.justify)
767 else:
768 text = Text(_text, style=self.style, justify=self.justify)
769 if self.highlighter:
770 self.highlighter.highlight(text)
771 return text
774class TimeRemainingColumn(ProgressColumn):
775 """Renders estimated time remaining.
777 Args:
778 compact (bool, optional): Render MM:SS when time remaining is less than an hour. Defaults to False.
779 elapsed_when_finished (bool, optional): Render time elapsed when the task is finished. Defaults to False.
780 """
782 # Only refresh twice a second to prevent jitter
783 max_refresh = 0.5
785 def __init__(
786 self,
787 compact: bool = False,
788 elapsed_when_finished: bool = False,
789 table_column: Optional[Column] = None,
790 ):
791 self.compact = compact
792 self.elapsed_when_finished = elapsed_when_finished
793 super().__init__(table_column=table_column)
795 def render(self, task: "Task") -> Text:
796 """Show time remaining."""
797 if self.elapsed_when_finished and task.finished:
798 task_time = task.finished_time
799 style = "progress.elapsed"
800 else:
801 task_time = task.time_remaining
802 style = "progress.remaining"
804 if task.total is None:
805 return Text("", style=style)
807 if task_time is None:
808 return Text("--:--" if self.compact else "-:--:--", style=style)
810 # Based on https://github.com/tqdm/tqdm/blob/master/tqdm/std.py
811 minutes, seconds = divmod(int(task_time), 60)
812 hours, minutes = divmod(minutes, 60)
814 if self.compact and not hours:
815 formatted = f"{minutes:02d}:{seconds:02d}"
816 else:
817 formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}"
819 return Text(formatted, style=style)
822class FileSizeColumn(ProgressColumn):
823 """Renders completed filesize."""
825 def render(self, task: "Task") -> Text:
826 """Show data completed."""
827 data_size = filesize.decimal(int(task.completed))
828 return Text(data_size, style="progress.filesize")
831class TotalFileSizeColumn(ProgressColumn):
832 """Renders total filesize."""
834 def render(self, task: "Task") -> Text:
835 """Show data completed."""
836 data_size = filesize.decimal(int(task.total)) if task.total is not None else ""
837 return Text(data_size, style="progress.filesize.total")
840class MofNCompleteColumn(ProgressColumn):
841 """Renders completed count/total, e.g. ' 10/1000'.
843 Best for bounded tasks with int quantities.
845 Space pads the completed count so that progress length does not change as task progresses
846 past powers of 10.
848 Args:
849 separator (str, optional): Text to separate completed and total values. Defaults to "/".
850 """
852 def __init__(self, separator: str = "/", table_column: Optional[Column] = None):
853 self.separator = separator
854 super().__init__(table_column=table_column)
856 def render(self, task: "Task") -> Text:
857 """Show completed/total."""
858 completed = int(task.completed)
859 total = int(task.total) if task.total is not None else "?"
860 total_width = len(str(total))
861 return Text(
862 f"{completed:{total_width}d}{self.separator}{total}",
863 style="progress.download",
864 )
867class DownloadColumn(ProgressColumn):
868 """Renders file size downloaded and total, e.g. '0.5/2.3 GB'.
870 Args:
871 binary_units (bool, optional): Use binary units, KiB, MiB etc. Defaults to False.
872 """
874 def __init__(
875 self, binary_units: bool = False, table_column: Optional[Column] = None
876 ) -> None:
877 self.binary_units = binary_units
878 super().__init__(table_column=table_column)
880 def render(self, task: "Task") -> Text:
881 """Calculate common unit for completed and total."""
882 completed = int(task.completed)
884 unit_and_suffix_calculation_base = (
885 int(task.total) if task.total is not None else completed
886 )
887 if self.binary_units:
888 unit, suffix = filesize.pick_unit_and_suffix(
889 unit_and_suffix_calculation_base,
890 ["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"],
891 1024,
892 )
893 else:
894 unit, suffix = filesize.pick_unit_and_suffix(
895 unit_and_suffix_calculation_base,
896 ["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"],
897 1000,
898 )
899 precision = 0 if unit == 1 else 1
901 completed_ratio = completed / unit
902 completed_str = f"{completed_ratio:,.{precision}f}"
904 if task.total is not None:
905 total = int(task.total)
906 total_ratio = total / unit
907 total_str = f"{total_ratio:,.{precision}f}"
908 else:
909 total_str = "?"
911 download_status = f"{completed_str}/{total_str} {suffix}"
912 download_text = Text(download_status, style="progress.download")
913 return download_text
916class TransferSpeedColumn(ProgressColumn):
917 """Renders human readable transfer speed."""
919 def render(self, task: "Task") -> Text:
920 """Show data transfer speed."""
921 speed = task.finished_speed or task.speed
922 if speed is None:
923 return Text("?", style="progress.data.speed")
924 data_speed = filesize.decimal(int(speed))
925 return Text(f"{data_speed}/s", style="progress.data.speed")
928class ProgressSample(NamedTuple):
929 """Sample of progress for a given time."""
931 timestamp: float
932 """Timestamp of sample."""
933 completed: float
934 """Number of steps completed."""
937@dataclass
938class Task:
939 """Information regarding a progress task.
941 This object should be considered read-only outside of the :class:`~Progress` class.
943 """
945 id: TaskID
946 """Task ID associated with this task (used in Progress methods)."""
948 description: str
949 """str: Description of the task."""
951 total: Optional[float]
952 """Optional[float]: Total number of steps in this task."""
954 completed: float
955 """float: Number of steps completed"""
957 _get_time: GetTimeCallable
958 """Callable to get the current time."""
960 finished_time: Optional[float] = None
961 """float: Time task was finished."""
963 visible: bool = True
964 """bool: Indicates if this task is visible in the progress display."""
966 fields: Dict[str, Any] = field(default_factory=dict)
967 """dict: Arbitrary fields passed in via Progress.update."""
969 start_time: Optional[float] = field(default=None, init=False, repr=False)
970 """Optional[float]: Time this task was started, or None if not started."""
972 stop_time: Optional[float] = field(default=None, init=False, repr=False)
973 """Optional[float]: Time this task was stopped, or None if not stopped."""
975 finished_speed: Optional[float] = None
976 """Optional[float]: The last speed for a finished task."""
978 _progress: Deque[ProgressSample] = field(
979 default_factory=lambda: deque(maxlen=1000), init=False, repr=False
980 )
982 _lock: RLock = field(repr=False, default_factory=RLock)
983 """Thread lock."""
985 def get_time(self) -> float:
986 """float: Get the current time, in seconds."""
987 return self._get_time()
989 @property
990 def started(self) -> bool:
991 """bool: Check if the task as started."""
992 return self.start_time is not None
994 @property
995 def remaining(self) -> Optional[float]:
996 """Optional[float]: Get the number of steps remaining, if a non-None total was set."""
997 if self.total is None:
998 return None
999 return self.total - self.completed
1001 @property
1002 def elapsed(self) -> Optional[float]:
1003 """Optional[float]: Time elapsed since task was started, or ``None`` if the task hasn't started."""
1004 if self.start_time is None:
1005 return None
1006 if self.stop_time is not None:
1007 return self.stop_time - self.start_time
1008 return self.get_time() - self.start_time
1010 @property
1011 def finished(self) -> bool:
1012 """Check if the task has finished."""
1013 return self.finished_time is not None
1015 @property
1016 def percentage(self) -> float:
1017 """float: Get progress of task as a percentage. If a None total was set, returns 0"""
1018 if not self.total:
1019 return 0.0
1020 completed = (self.completed / self.total) * 100.0
1021 completed = min(100.0, max(0.0, completed))
1022 return completed
1024 @property
1025 def speed(self) -> Optional[float]:
1026 """Optional[float]: Get the estimated speed in steps per second."""
1027 if self.start_time is None:
1028 return None
1029 with self._lock:
1030 progress = self._progress
1031 if not progress:
1032 return None
1033 total_time = progress[-1].timestamp - progress[0].timestamp
1034 if total_time == 0:
1035 return None
1036 iter_progress = iter(progress)
1037 next(iter_progress)
1038 total_completed = sum(sample.completed for sample in iter_progress)
1039 speed = total_completed / total_time
1040 return speed
1042 @property
1043 def time_remaining(self) -> Optional[float]:
1044 """Optional[float]: Get estimated time to completion, or ``None`` if no data."""
1045 if self.finished:
1046 return 0.0
1047 speed = self.speed
1048 if not speed:
1049 return None
1050 remaining = self.remaining
1051 if remaining is None:
1052 return None
1053 estimate = ceil(remaining / speed)
1054 return estimate
1056 def _reset(self) -> None:
1057 """Reset progress."""
1058 self._progress.clear()
1059 self.finished_time = None
1060 self.finished_speed = None
1063class Progress(JupyterMixin):
1064 """Renders an auto-updating progress bar(s).
1066 Args:
1067 console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout.
1068 auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`.
1069 refresh_per_second (Optional[float], optional): Number of times per second to refresh the progress information or None to use default (10). Defaults to None.
1070 speed_estimate_period: (float, optional): Period (in seconds) used to calculate the speed estimate. Defaults to 30.
1071 transient: (bool, optional): Clear the progress on exit. Defaults to False.
1072 redirect_stdout: (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True.
1073 redirect_stderr: (bool, optional): Enable redirection of stderr. Defaults to True.
1074 get_time: (Callable, optional): A callable that gets the current time, or None to use Console.get_time. Defaults to None.
1075 disable (bool, optional): Disable progress display. Defaults to False
1076 expand (bool, optional): Expand tasks table to fit width. Defaults to False.
1077 """
1079 def __init__(
1080 self,
1081 *columns: Union[str, ProgressColumn],
1082 console: Optional[Console] = None,
1083 auto_refresh: bool = True,
1084 refresh_per_second: float = 10,
1085 speed_estimate_period: float = 30.0,
1086 transient: bool = False,
1087 redirect_stdout: bool = True,
1088 redirect_stderr: bool = True,
1089 get_time: Optional[GetTimeCallable] = None,
1090 disable: bool = False,
1091 expand: bool = False,
1092 ) -> None:
1093 assert refresh_per_second > 0, "refresh_per_second must be > 0"
1094 self._lock = RLock()
1095 self.columns = columns or self.get_default_columns()
1096 self.speed_estimate_period = speed_estimate_period
1098 self.disable = disable
1099 self.expand = expand
1100 self._tasks: Dict[TaskID, Task] = {}
1101 self._task_index: TaskID = TaskID(0)
1102 self.live = Live(
1103 console=console or get_console(),
1104 auto_refresh=auto_refresh,
1105 refresh_per_second=refresh_per_second,
1106 transient=transient,
1107 redirect_stdout=redirect_stdout,
1108 redirect_stderr=redirect_stderr,
1109 get_renderable=self.get_renderable,
1110 )
1111 self.get_time = get_time or self.console.get_time
1112 self.print = self.console.print
1113 self.log = self.console.log
1115 @classmethod
1116 def get_default_columns(cls) -> Tuple[ProgressColumn, ...]:
1117 """Get the default columns used for a new Progress instance:
1118 - a text column for the description (TextColumn)
1119 - the bar itself (BarColumn)
1120 - a text column showing completion percentage (TextColumn)
1121 - an estimated-time-remaining column (TimeRemainingColumn)
1122 If the Progress instance is created without passing a columns argument,
1123 the default columns defined here will be used.
1125 You can also create a Progress instance using custom columns before
1126 and/or after the defaults, as in this example:
1128 progress = Progress(
1129 SpinnerColumn(),
1130 *Progress.get_default_columns(),
1131 "Elapsed:",
1132 TimeElapsedColumn(),
1133 )
1135 This code shows the creation of a Progress display, containing
1136 a spinner to the left, the default columns, and a labeled elapsed
1137 time column.
1138 """
1139 return (
1140 TextColumn("[progress.description]{task.description}"),
1141 BarColumn(),
1142 TaskProgressColumn(),
1143 TimeRemainingColumn(),
1144 )
1146 @property
1147 def console(self) -> Console:
1148 return self.live.console
1150 @property
1151 def tasks(self) -> List[Task]:
1152 """Get a list of Task instances."""
1153 with self._lock:
1154 return list(self._tasks.values())
1156 @property
1157 def task_ids(self) -> List[TaskID]:
1158 """A list of task IDs."""
1159 with self._lock:
1160 return list(self._tasks.keys())
1162 @property
1163 def finished(self) -> bool:
1164 """Check if all tasks have been completed."""
1165 with self._lock:
1166 if not self._tasks:
1167 return True
1168 return all(task.finished for task in self._tasks.values())
1170 def start(self) -> None:
1171 """Start the progress display."""
1172 if not self.disable:
1173 self.live.start(refresh=True)
1175 def stop(self) -> None:
1176 """Stop the progress display."""
1177 self.live.stop()
1178 if not self.console.is_interactive and not self.console.is_jupyter:
1179 self.console.print()
1181 def __enter__(self) -> Self:
1182 self.start()
1183 return self
1185 def __exit__(
1186 self,
1187 exc_type: Optional[Type[BaseException]],
1188 exc_val: Optional[BaseException],
1189 exc_tb: Optional[TracebackType],
1190 ) -> None:
1191 self.stop()
1193 def track(
1194 self,
1195 sequence: Union[Iterable[ProgressType], Sequence[ProgressType]],
1196 total: Optional[float] = None,
1197 completed: int = 0,
1198 task_id: Optional[TaskID] = None,
1199 description: str = "Working...",
1200 update_period: float = 0.1,
1201 ) -> Iterable[ProgressType]:
1202 """Track progress by iterating over a sequence.
1204 Args:
1205 sequence (Sequence[ProgressType]): A sequence of values you want to iterate over and track progress.
1206 total: (float, optional): Total number of steps. Default is len(sequence).
1207 completed (int, optional): Number of steps completed so far. Defaults to 0.
1208 task_id: (TaskID): Task to track. Default is new task.
1209 description: (str, optional): Description of task, if new task is created.
1210 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1.
1212 Returns:
1213 Iterable[ProgressType]: An iterable of values taken from the provided sequence.
1214 """
1215 if total is None:
1216 total = float(length_hint(sequence)) or None
1218 if task_id is None:
1219 task_id = self.add_task(description, total=total, completed=completed)
1220 else:
1221 self.update(task_id, total=total, completed=completed)
1223 if self.live.auto_refresh:
1224 with _TrackThread(self, task_id, update_period) as track_thread:
1225 for value in sequence:
1226 yield value
1227 track_thread.completed += 1
1228 else:
1229 advance = self.advance
1230 refresh = self.refresh
1231 for value in sequence:
1232 yield value
1233 advance(task_id, 1)
1234 refresh()
1236 def wrap_file(
1237 self,
1238 file: BinaryIO,
1239 total: Optional[int] = None,
1240 *,
1241 task_id: Optional[TaskID] = None,
1242 description: str = "Reading...",
1243 ) -> BinaryIO:
1244 """Track progress file reading from a binary file.
1246 Args:
1247 file (BinaryIO): A file-like object opened in binary mode.
1248 total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given.
1249 task_id (TaskID): Task to track. Default is new task.
1250 description (str, optional): Description of task, if new task is created.
1252 Returns:
1253 BinaryIO: A readable file-like object in binary mode.
1255 Raises:
1256 ValueError: When no total value can be extracted from the arguments or the task.
1257 """
1258 # attempt to recover the total from the task
1259 total_bytes: Optional[float] = None
1260 if total is not None:
1261 total_bytes = total
1262 elif task_id is not None:
1263 with self._lock:
1264 total_bytes = self._tasks[task_id].total
1265 if total_bytes is None:
1266 raise ValueError(
1267 f"unable to get the total number of bytes, please specify 'total'"
1268 )
1270 # update total of task or create new task
1271 if task_id is None:
1272 task_id = self.add_task(description, total=total_bytes)
1273 else:
1274 self.update(task_id, total=total_bytes)
1276 return _Reader(file, self, task_id, close_handle=False)
1278 @typing.overload
1279 def open(
1280 self,
1281 file: Union[str, "PathLike[str]", bytes],
1282 mode: Literal["rb"],
1283 buffering: int = -1,
1284 encoding: Optional[str] = None,
1285 errors: Optional[str] = None,
1286 newline: Optional[str] = None,
1287 *,
1288 total: Optional[int] = None,
1289 task_id: Optional[TaskID] = None,
1290 description: str = "Reading...",
1291 ) -> BinaryIO:
1292 pass
1294 @typing.overload
1295 def open(
1296 self,
1297 file: Union[str, "PathLike[str]", bytes],
1298 mode: Union[Literal["r"], Literal["rt"]],
1299 buffering: int = -1,
1300 encoding: Optional[str] = None,
1301 errors: Optional[str] = None,
1302 newline: Optional[str] = None,
1303 *,
1304 total: Optional[int] = None,
1305 task_id: Optional[TaskID] = None,
1306 description: str = "Reading...",
1307 ) -> TextIO:
1308 pass
1310 def open(
1311 self,
1312 file: Union[str, "PathLike[str]", bytes],
1313 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
1314 buffering: int = -1,
1315 encoding: Optional[str] = None,
1316 errors: Optional[str] = None,
1317 newline: Optional[str] = None,
1318 *,
1319 total: Optional[int] = None,
1320 task_id: Optional[TaskID] = None,
1321 description: str = "Reading...",
1322 ) -> Union[BinaryIO, TextIO]:
1323 """Track progress while reading from a binary file.
1325 Args:
1326 path (Union[str, PathLike[str]]): The path to the file to read.
1327 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt".
1328 buffering (int): The buffering strategy to use, see :func:`io.open`.
1329 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`.
1330 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`.
1331 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`.
1332 total (int, optional): Total number of bytes to read. If none given, os.stat(path).st_size is used.
1333 task_id (TaskID): Task to track. Default is new task.
1334 description (str, optional): Description of task, if new task is created.
1336 Returns:
1337 BinaryIO: A readable file-like object in binary mode.
1339 Raises:
1340 ValueError: When an invalid mode is given.
1341 """
1342 # normalize the mode (always rb, rt)
1343 _mode = "".join(sorted(mode, reverse=False))
1344 if _mode not in ("br", "rt", "r"):
1345 raise ValueError(f"invalid mode {mode!r}")
1347 # patch buffering to provide the same behaviour as the builtin `open`
1348 line_buffering = buffering == 1
1349 if _mode == "br" and buffering == 1:
1350 warnings.warn(
1351 "line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used",
1352 RuntimeWarning,
1353 )
1354 buffering = -1
1355 elif _mode in ("rt", "r"):
1356 if buffering == 0:
1357 raise ValueError("can't have unbuffered text I/O")
1358 elif buffering == 1:
1359 buffering = -1
1361 # attempt to get the total with `os.stat`
1362 if total is None:
1363 total = stat(file).st_size
1365 # update total of task or create new task
1366 if task_id is None:
1367 task_id = self.add_task(description, total=total)
1368 else:
1369 self.update(task_id, total=total)
1371 # open the file in binary mode,
1372 handle = io.open(file, "rb", buffering=buffering)
1373 reader = _Reader(handle, self, task_id, close_handle=True)
1375 # wrap the reader in a `TextIOWrapper` if text mode
1376 if mode in ("r", "rt"):
1377 return io.TextIOWrapper(
1378 reader,
1379 encoding=encoding,
1380 errors=errors,
1381 newline=newline,
1382 line_buffering=line_buffering,
1383 )
1385 return reader
1387 def start_task(self, task_id: TaskID) -> None:
1388 """Start a task.
1390 Starts a task (used when calculating elapsed time). You may need to call this manually,
1391 if you called ``add_task`` with ``start=False``.
1393 Args:
1394 task_id (TaskID): ID of task.
1395 """
1396 with self._lock:
1397 task = self._tasks[task_id]
1398 if task.start_time is None:
1399 task.start_time = self.get_time()
1401 def stop_task(self, task_id: TaskID) -> None:
1402 """Stop a task.
1404 This will freeze the elapsed time on the task.
1406 Args:
1407 task_id (TaskID): ID of task.
1408 """
1409 with self._lock:
1410 task = self._tasks[task_id]
1411 current_time = self.get_time()
1412 if task.start_time is None:
1413 task.start_time = current_time
1414 task.stop_time = current_time
1416 def update(
1417 self,
1418 task_id: TaskID,
1419 *,
1420 total: Optional[float] = None,
1421 completed: Optional[float] = None,
1422 advance: Optional[float] = None,
1423 description: Optional[str] = None,
1424 visible: Optional[bool] = None,
1425 refresh: bool = False,
1426 **fields: Any,
1427 ) -> None:
1428 """Update information associated with a task.
1430 Args:
1431 task_id (TaskID): Task id (returned by add_task).
1432 total (float, optional): Updates task.total if not None.
1433 completed (float, optional): Updates task.completed if not None.
1434 advance (float, optional): Add a value to task.completed if not None.
1435 description (str, optional): Change task description if not None.
1436 visible (bool, optional): Set visible flag if not None.
1437 refresh (bool): Force a refresh of progress information. Default is False.
1438 **fields (Any): Additional data fields required for rendering.
1439 """
1440 with self._lock:
1441 task = self._tasks[task_id]
1442 completed_start = task.completed
1444 if total is not None and total != task.total:
1445 task.total = total
1446 task._reset()
1447 if advance is not None:
1448 task.completed += advance
1449 if completed is not None:
1450 task.completed = completed
1451 if description is not None:
1452 task.description = description
1453 if visible is not None:
1454 task.visible = visible
1455 task.fields.update(fields)
1456 update_completed = task.completed - completed_start
1458 current_time = self.get_time()
1459 old_sample_time = current_time - self.speed_estimate_period
1460 _progress = task._progress
1462 popleft = _progress.popleft
1463 while _progress and _progress[0].timestamp < old_sample_time:
1464 popleft()
1465 if update_completed > 0:
1466 _progress.append(ProgressSample(current_time, update_completed))
1467 if (
1468 task.total is not None
1469 and task.completed >= task.total
1470 and task.finished_time is None
1471 ):
1472 task.finished_time = task.elapsed
1474 if refresh:
1475 self.refresh()
1477 def reset(
1478 self,
1479 task_id: TaskID,
1480 *,
1481 start: bool = True,
1482 total: Optional[float] = None,
1483 completed: int = 0,
1484 visible: Optional[bool] = None,
1485 description: Optional[str] = None,
1486 **fields: Any,
1487 ) -> None:
1488 """Reset a task so completed is 0 and the clock is reset.
1490 Args:
1491 task_id (TaskID): ID of task.
1492 start (bool, optional): Start the task after reset. Defaults to True.
1493 total (float, optional): New total steps in task, or None to use current total. Defaults to None.
1494 completed (int, optional): Number of steps completed. Defaults to 0.
1495 visible (bool, optional): Enable display of the task. Defaults to True.
1496 description (str, optional): Change task description if not None. Defaults to None.
1497 **fields (str): Additional data fields required for rendering.
1498 """
1499 current_time = self.get_time()
1500 with self._lock:
1501 task = self._tasks[task_id]
1502 task._reset()
1503 task.start_time = current_time if start else None
1504 if total is not None:
1505 task.total = total
1506 task.completed = completed
1507 if visible is not None:
1508 task.visible = visible
1509 if fields:
1510 task.fields = fields
1511 if description is not None:
1512 task.description = description
1513 task.finished_time = None
1514 self.refresh()
1516 def advance(self, task_id: TaskID, advance: float = 1) -> None:
1517 """Advance task by a number of steps.
1519 Args:
1520 task_id (TaskID): ID of task.
1521 advance (float): Number of steps to advance. Default is 1.
1522 """
1523 current_time = self.get_time()
1524 with self._lock:
1525 task = self._tasks[task_id]
1526 completed_start = task.completed
1527 task.completed += advance
1528 update_completed = task.completed - completed_start
1529 old_sample_time = current_time - self.speed_estimate_period
1530 _progress = task._progress
1532 popleft = _progress.popleft
1533 while _progress and _progress[0].timestamp < old_sample_time:
1534 popleft()
1535 while len(_progress) > 1000:
1536 popleft()
1537 _progress.append(ProgressSample(current_time, update_completed))
1538 if (
1539 task.total is not None
1540 and task.completed >= task.total
1541 and task.finished_time is None
1542 ):
1543 task.finished_time = task.elapsed
1544 task.finished_speed = task.speed
1546 def refresh(self) -> None:
1547 """Refresh (render) the progress information."""
1548 if not self.disable and self.live.is_started:
1549 self.live.refresh()
1551 def get_renderable(self) -> RenderableType:
1552 """Get a renderable for the progress display."""
1553 renderable = Group(*self.get_renderables())
1554 return renderable
1556 def get_renderables(self) -> Iterable[RenderableType]:
1557 """Get a number of renderables for the progress display."""
1558 table = self.make_tasks_table(self.tasks)
1559 yield table
1561 def make_tasks_table(self, tasks: Iterable[Task]) -> Table:
1562 """Get a table to render the Progress display.
1564 Args:
1565 tasks (Iterable[Task]): An iterable of Task instances, one per row of the table.
1567 Returns:
1568 Table: A table instance.
1569 """
1570 table_columns = (
1571 (
1572 Column(no_wrap=True)
1573 if isinstance(_column, str)
1574 else _column.get_table_column().copy()
1575 )
1576 for _column in self.columns
1577 )
1578 table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand)
1580 for task in tasks:
1581 if task.visible:
1582 table.add_row(
1583 *(
1584 (
1585 column.format(task=task)
1586 if isinstance(column, str)
1587 else column(task)
1588 )
1589 for column in self.columns
1590 )
1591 )
1592 return table
1594 def __rich__(self) -> RenderableType:
1595 """Makes the Progress class itself renderable."""
1596 with self._lock:
1597 return self.get_renderable()
1599 def add_task(
1600 self,
1601 description: str,
1602 start: bool = True,
1603 total: Optional[float] = 100.0,
1604 completed: int = 0,
1605 visible: bool = True,
1606 **fields: Any,
1607 ) -> TaskID:
1608 """Add a new 'task' to the Progress display.
1610 Args:
1611 description (str): A description of the task.
1612 start (bool, optional): Start the task immediately (to calculate elapsed time). If set to False,
1613 you will need to call `start` manually. Defaults to True.
1614 total (float, optional): Number of total steps in the progress if known.
1615 Set to None to render a pulsing animation. Defaults to 100.
1616 completed (int, optional): Number of steps completed so far. Defaults to 0.
1617 visible (bool, optional): Enable display of the task. Defaults to True.
1618 **fields (str): Additional data fields required for rendering.
1620 Returns:
1621 TaskID: An ID you can use when calling `update`.
1622 """
1623 with self._lock:
1624 task = Task(
1625 self._task_index,
1626 description,
1627 total,
1628 completed,
1629 visible=visible,
1630 fields=fields,
1631 _get_time=self.get_time,
1632 _lock=self._lock,
1633 )
1634 self._tasks[self._task_index] = task
1635 if start:
1636 self.start_task(self._task_index)
1637 new_task_index = self._task_index
1638 self._task_index = TaskID(int(self._task_index) + 1)
1639 self.refresh()
1640 return new_task_index
1642 def remove_task(self, task_id: TaskID) -> None:
1643 """Delete a task if it exists.
1645 Args:
1646 task_id (TaskID): A task ID.
1648 """
1649 with self._lock:
1650 del self._tasks[task_id]
1653if __name__ == "__main__": # pragma: no coverage
1654 import random
1655 import time
1657 from .panel import Panel
1658 from .rule import Rule
1659 from .syntax import Syntax
1660 from .table import Table
1662 syntax = Syntax(
1663 '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]:
1664 """Iterate and generate a tuple with a flag for last value."""
1665 iter_values = iter(values)
1666 try:
1667 previous_value = next(iter_values)
1668 except StopIteration:
1669 return
1670 for value in iter_values:
1671 yield False, previous_value
1672 previous_value = value
1673 yield True, previous_value''',
1674 "python",
1675 line_numbers=True,
1676 )
1678 table = Table("foo", "bar", "baz")
1679 table.add_row("1", "2", "3")
1681 progress_renderables = [
1682 "Text may be printed while the progress bars are rendering.",
1683 Panel("In fact, [i]any[/i] renderable will work"),
1684 "Such as [magenta]tables[/]...",
1685 table,
1686 "Pretty printed structures...",
1687 {"type": "example", "text": "Pretty printed"},
1688 "Syntax...",
1689 syntax,
1690 Rule("Give it a try!"),
1691 ]
1693 from itertools import cycle
1695 examples = cycle(progress_renderables)
1697 console = Console(record=True)
1699 with Progress(
1700 SpinnerColumn(),
1701 *Progress.get_default_columns(),
1702 TimeElapsedColumn(),
1703 console=console,
1704 transient=False,
1705 ) as progress:
1706 task1 = progress.add_task("[red]Downloading", total=1000)
1707 task2 = progress.add_task("[green]Processing", total=1000)
1708 task3 = progress.add_task("[yellow]Thinking", total=None)
1710 while not progress.finished:
1711 progress.update(task1, advance=0.5)
1712 progress.update(task2, advance=0.3)
1713 time.sleep(0.01)
1714 if random.randint(0, 100) < 1:
1715 progress.log(next(examples))