Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rich/progress.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import typing
5import warnings
6from abc import ABC, abstractmethod
7from collections import deque
8from dataclasses import dataclass, field
9from datetime import timedelta
10from io import RawIOBase, UnsupportedOperation
11from math import ceil
12from mmap import mmap
13from operator import length_hint
14from os import PathLike, stat
15from threading import Event, RLock, Thread
16from types import TracebackType
17from typing import (
18 TYPE_CHECKING,
19 Any,
20 BinaryIO,
21 Callable,
22 ContextManager,
23 Deque,
24 Dict,
25 Generic,
26 Iterable,
27 List,
28 Literal,
29 NamedTuple,
30 NewType,
31 Optional,
32 TextIO,
33 Tuple,
34 Type,
35 TypeVar,
36 Union,
37)
39if TYPE_CHECKING:
40 # Can be replaced with `from typing import Self` in Python 3.11+
41 from typing_extensions import Self # pragma: no cover
43from . import filesize, get_console
44from .console import Console, Group, JustifyMethod, RenderableType
45from .highlighter import Highlighter
46from .jupyter import JupyterMixin
47from .live import Live
48from .progress_bar import ProgressBar
49from .spinner import Spinner
50from .style import StyleType
51from .table import Column, Table
52from .text import Text, TextType
54TaskID = NewType("TaskID", int)
56ProgressType = TypeVar("ProgressType")
58GetTimeCallable = Callable[[], float]
61_I = typing.TypeVar("_I", TextIO, BinaryIO)
64class _TrackThread(Thread):
65 """A thread to periodically update progress."""
67 def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float):
68 self.progress = progress
69 self.task_id = task_id
70 self.update_period = update_period
71 self.done = Event()
73 self.completed = 0
74 super().__init__(daemon=True)
76 def run(self) -> None:
77 task_id = self.task_id
78 advance = self.progress.advance
79 update_period = self.update_period
80 last_completed = 0
81 wait = self.done.wait
82 while not wait(update_period) and self.progress.live.is_started:
83 completed = self.completed
84 if last_completed != completed:
85 advance(task_id, completed - last_completed)
86 last_completed = completed
88 self.progress.update(self.task_id, completed=self.completed, refresh=True)
90 def __enter__(self) -> "_TrackThread":
91 self.start()
92 return self
94 def __exit__(
95 self,
96 exc_type: Optional[Type[BaseException]],
97 exc_val: Optional[BaseException],
98 exc_tb: Optional[TracebackType],
99 ) -> None:
100 self.done.set()
101 self.join()
104def track(
105 sequence: Iterable[ProgressType],
106 description: str = "Working...",
107 total: Optional[float] = None,
108 completed: int = 0,
109 auto_refresh: bool = True,
110 console: Optional[Console] = None,
111 transient: bool = False,
112 get_time: Optional[Callable[[], float]] = None,
113 refresh_per_second: float = 10,
114 style: StyleType = "bar.back",
115 complete_style: StyleType = "bar.complete",
116 finished_style: StyleType = "bar.finished",
117 pulse_style: StyleType = "bar.pulse",
118 update_period: float = 0.1,
119 disable: bool = False,
120 show_speed: bool = True,
121) -> Iterable[ProgressType]:
122 """Track progress by iterating over a sequence.
124 You can also track progress of an iterable, which might require that you additionally specify ``total``.
126 Args:
127 sequence (Iterable[ProgressType]): Values you wish to iterate over and track progress.
128 description (str, optional): Description of task show next to progress bar. Defaults to "Working".
129 total: (float, optional): Total number of steps. Default is len(sequence).
130 completed (int, optional): Number of steps completed so far. Defaults to 0.
131 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
132 transient: (bool, optional): Clear the progress on exit. Defaults to False.
133 console (Console, optional): Console to write to. Default creates internal Console instance.
134 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
135 style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
136 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
137 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
138 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
139 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1.
140 disable (bool, optional): Disable display of progress.
141 show_speed (bool, optional): Show speed if total isn't known. Defaults to True.
142 Returns:
143 Iterable[ProgressType]: An iterable of the values in the sequence.
145 """
147 columns: List["ProgressColumn"] = (
148 [TextColumn("[progress.description]{task.description}")] if description else []
149 )
150 columns.extend(
151 (
152 BarColumn(
153 style=style,
154 complete_style=complete_style,
155 finished_style=finished_style,
156 pulse_style=pulse_style,
157 ),
158 TaskProgressColumn(show_speed=show_speed),
159 TimeRemainingColumn(elapsed_when_finished=True),
160 )
161 )
162 progress = Progress(
163 *columns,
164 auto_refresh=auto_refresh,
165 console=console,
166 transient=transient,
167 get_time=get_time,
168 refresh_per_second=refresh_per_second or 10,
169 disable=disable,
170 )
172 with progress:
173 yield from progress.track(
174 sequence,
175 total=total,
176 completed=completed,
177 description=description,
178 update_period=update_period,
179 )
182class _Reader(RawIOBase, BinaryIO):
183 """A reader that tracks progress while it's being read from."""
185 def __init__(
186 self,
187 handle: BinaryIO,
188 progress: "Progress",
189 task: TaskID,
190 close_handle: bool = True,
191 ) -> None:
192 self.handle = handle
193 self.progress = progress
194 self.task = task
195 self.close_handle = close_handle
196 self._closed = False
198 def __enter__(self) -> "_Reader":
199 self.handle.__enter__()
200 return self
202 def __exit__(
203 self,
204 exc_type: Optional[Type[BaseException]],
205 exc_val: Optional[BaseException],
206 exc_tb: Optional[TracebackType],
207 ) -> None:
208 self.close()
210 def __iter__(self) -> BinaryIO:
211 return self
213 def __next__(self) -> bytes:
214 line = next(self.handle)
215 self.progress.advance(self.task, advance=len(line))
216 return line
218 @property
219 def closed(self) -> bool:
220 return self._closed
222 def fileno(self) -> int:
223 return self.handle.fileno()
225 def isatty(self) -> bool:
226 return self.handle.isatty()
228 @property
229 def mode(self) -> str:
230 return self.handle.mode
232 @property
233 def name(self) -> str:
234 return self.handle.name
236 def readable(self) -> bool:
237 return self.handle.readable()
239 def seekable(self) -> bool:
240 return self.handle.seekable()
242 def writable(self) -> bool:
243 return False
245 def read(self, size: int = -1) -> bytes:
246 block = self.handle.read(size)
247 self.progress.advance(self.task, advance=len(block))
248 return block
250 def readinto(self, b: Union[bytearray, memoryview, mmap]): # type: ignore[no-untyped-def, override]
251 n = self.handle.readinto(b) # type: ignore[attr-defined]
252 self.progress.advance(self.task, advance=n)
253 return n
255 def readline(self, size: int = -1) -> bytes: # type: ignore[override]
256 line = self.handle.readline(size)
257 self.progress.advance(self.task, advance=len(line))
258 return line
260 def readlines(self, hint: int = -1) -> List[bytes]:
261 lines = self.handle.readlines(hint)
262 self.progress.advance(self.task, advance=sum(map(len, lines)))
263 return lines
265 def close(self) -> None:
266 if self.close_handle:
267 self.handle.close()
268 self._closed = True
270 def seek(self, offset: int, whence: int = 0) -> int:
271 pos = self.handle.seek(offset, whence)
272 self.progress.update(self.task, completed=pos)
273 return pos
275 def tell(self) -> int:
276 return self.handle.tell()
278 def write(self, s: Any) -> int:
279 raise UnsupportedOperation("write")
281 def writelines(self, lines: Iterable[Any]) -> None:
282 raise UnsupportedOperation("writelines")
285class _ReadContext(ContextManager[_I], Generic[_I]):
286 """A utility class to handle a context for both a reader and a progress."""
288 def __init__(self, progress: "Progress", reader: _I) -> None:
289 self.progress = progress
290 self.reader: _I = reader
292 def __enter__(self) -> _I:
293 self.progress.start()
294 return self.reader.__enter__()
296 def __exit__(
297 self,
298 exc_type: Optional[Type[BaseException]],
299 exc_val: Optional[BaseException],
300 exc_tb: Optional[TracebackType],
301 ) -> None:
302 self.progress.stop()
303 self.reader.__exit__(exc_type, exc_val, exc_tb)
306def wrap_file(
307 file: BinaryIO,
308 total: int,
309 *,
310 description: str = "Reading...",
311 auto_refresh: bool = True,
312 console: Optional[Console] = None,
313 transient: bool = False,
314 get_time: Optional[Callable[[], float]] = None,
315 refresh_per_second: float = 10,
316 style: StyleType = "bar.back",
317 complete_style: StyleType = "bar.complete",
318 finished_style: StyleType = "bar.finished",
319 pulse_style: StyleType = "bar.pulse",
320 disable: bool = False,
321) -> ContextManager[BinaryIO]:
322 """Read bytes from a file while tracking progress.
324 Args:
325 file (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode.
326 total (int): Total number of bytes to read.
327 description (str, optional): Description of task show next to progress bar. Defaults to "Reading".
328 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
329 transient: (bool, optional): Clear the progress on exit. Defaults to False.
330 console (Console, optional): Console to write to. Default creates internal Console instance.
331 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
332 style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
333 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
334 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
335 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
336 disable (bool, optional): Disable display of progress.
337 Returns:
338 ContextManager[BinaryIO]: A context manager yielding a progress reader.
340 """
342 columns: List["ProgressColumn"] = (
343 [TextColumn("[progress.description]{task.description}")] if description else []
344 )
345 columns.extend(
346 (
347 BarColumn(
348 style=style,
349 complete_style=complete_style,
350 finished_style=finished_style,
351 pulse_style=pulse_style,
352 ),
353 DownloadColumn(),
354 TimeRemainingColumn(),
355 )
356 )
357 progress = Progress(
358 *columns,
359 auto_refresh=auto_refresh,
360 console=console,
361 transient=transient,
362 get_time=get_time,
363 refresh_per_second=refresh_per_second or 10,
364 disable=disable,
365 )
367 reader = progress.wrap_file(file, total=total, description=description)
368 return _ReadContext(progress, reader)
371@typing.overload
372def open(
373 file: Union[str, "PathLike[str]", bytes],
374 mode: Union[Literal["rt"], Literal["r"]],
375 buffering: int = -1,
376 encoding: Optional[str] = None,
377 errors: Optional[str] = None,
378 newline: Optional[str] = None,
379 *,
380 total: Optional[int] = None,
381 description: str = "Reading...",
382 auto_refresh: bool = True,
383 console: Optional[Console] = None,
384 transient: bool = False,
385 get_time: Optional[Callable[[], float]] = None,
386 refresh_per_second: float = 10,
387 style: StyleType = "bar.back",
388 complete_style: StyleType = "bar.complete",
389 finished_style: StyleType = "bar.finished",
390 pulse_style: StyleType = "bar.pulse",
391 disable: bool = False,
392) -> ContextManager[TextIO]:
393 pass
396@typing.overload
397def open(
398 file: Union[str, "PathLike[str]", bytes],
399 mode: Literal["rb"],
400 buffering: int = -1,
401 encoding: Optional[str] = None,
402 errors: Optional[str] = None,
403 newline: Optional[str] = None,
404 *,
405 total: Optional[int] = None,
406 description: str = "Reading...",
407 auto_refresh: bool = True,
408 console: Optional[Console] = None,
409 transient: bool = False,
410 get_time: Optional[Callable[[], float]] = None,
411 refresh_per_second: float = 10,
412 style: StyleType = "bar.back",
413 complete_style: StyleType = "bar.complete",
414 finished_style: StyleType = "bar.finished",
415 pulse_style: StyleType = "bar.pulse",
416 disable: bool = False,
417) -> ContextManager[BinaryIO]:
418 pass
421def open(
422 file: Union[str, "PathLike[str]", bytes],
423 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
424 buffering: int = -1,
425 encoding: Optional[str] = None,
426 errors: Optional[str] = None,
427 newline: Optional[str] = None,
428 *,
429 total: Optional[int] = None,
430 description: str = "Reading...",
431 auto_refresh: bool = True,
432 console: Optional[Console] = None,
433 transient: bool = False,
434 get_time: Optional[Callable[[], float]] = None,
435 refresh_per_second: float = 10,
436 style: StyleType = "bar.back",
437 complete_style: StyleType = "bar.complete",
438 finished_style: StyleType = "bar.finished",
439 pulse_style: StyleType = "bar.pulse",
440 disable: bool = False,
441) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]:
442 """Read bytes from a file while tracking progress.
444 Args:
445 path (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode.
446 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt".
447 buffering (int): The buffering strategy to use, see :func:`io.open`.
448 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`.
449 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`.
450 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`
451 total: (int, optional): Total number of bytes to read. Must be provided if reading from a file handle. Default for a path is os.stat(file).st_size.
452 description (str, optional): Description of task show next to progress bar. Defaults to "Reading".
453 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True.
454 transient: (bool, optional): Clear the progress on exit. Defaults to False.
455 console (Console, optional): Console to write to. Default creates internal Console instance.
456 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10.
457 style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
458 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
459 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
460 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
461 disable (bool, optional): Disable display of progress.
462 encoding (str, optional): The encoding to use when reading in text mode.
464 Returns:
465 ContextManager[BinaryIO]: A context manager yielding a progress reader.
467 """
469 columns: List["ProgressColumn"] = (
470 [TextColumn("[progress.description]{task.description}")] if description else []
471 )
472 columns.extend(
473 (
474 BarColumn(
475 style=style,
476 complete_style=complete_style,
477 finished_style=finished_style,
478 pulse_style=pulse_style,
479 ),
480 DownloadColumn(),
481 TimeRemainingColumn(),
482 )
483 )
484 progress = Progress(
485 *columns,
486 auto_refresh=auto_refresh,
487 console=console,
488 transient=transient,
489 get_time=get_time,
490 refresh_per_second=refresh_per_second or 10,
491 disable=disable,
492 )
494 reader = progress.open(
495 file,
496 mode=mode,
497 buffering=buffering,
498 encoding=encoding,
499 errors=errors,
500 newline=newline,
501 total=total,
502 description=description,
503 )
504 return _ReadContext(progress, reader) # type: ignore[return-value, type-var]
507class ProgressColumn(ABC):
508 """Base class for a widget to use in progress display."""
510 max_refresh: Optional[float] = None
512 def __init__(self, table_column: Optional[Column] = None) -> None:
513 self._table_column = table_column
514 self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {}
515 self._update_time: Optional[float] = None
517 def get_table_column(self) -> Column:
518 """Get a table column, used to build tasks table."""
519 return self._table_column or Column()
521 def __call__(self, task: "Task") -> RenderableType:
522 """Called by the Progress object to return a renderable for the given task.
524 Args:
525 task (Task): An object containing information regarding the task.
527 Returns:
528 RenderableType: Anything renderable (including str).
529 """
530 current_time = task.get_time()
531 if self.max_refresh is not None and not task.completed:
532 try:
533 timestamp, renderable = self._renderable_cache[task.id]
534 except KeyError:
535 pass
536 else:
537 if timestamp + self.max_refresh > current_time:
538 return renderable
540 renderable = self.render(task)
541 self._renderable_cache[task.id] = (current_time, renderable)
542 return renderable
544 @abstractmethod
545 def render(self, task: "Task") -> RenderableType:
546 """Should return a renderable object."""
549class RenderableColumn(ProgressColumn):
550 """A column to insert an arbitrary column.
552 Args:
553 renderable (RenderableType, optional): Any renderable. Defaults to empty string.
554 """
556 def __init__(
557 self, renderable: RenderableType = "", *, table_column: Optional[Column] = None
558 ):
559 self.renderable = renderable
560 super().__init__(table_column=table_column)
562 def render(self, task: "Task") -> RenderableType:
563 return self.renderable
566class SpinnerColumn(ProgressColumn):
567 """A column with a 'spinner' animation.
569 Args:
570 spinner_name (str, optional): Name of spinner animation. Defaults to "dots".
571 style (StyleType, optional): Style of spinner. Defaults to "progress.spinner".
572 speed (float, optional): Speed factor of spinner. Defaults to 1.0.
573 finished_text (TextType, optional): Text used when task is finished. Defaults to " ".
574 """
576 def __init__(
577 self,
578 spinner_name: str = "dots",
579 style: Optional[StyleType] = "progress.spinner",
580 speed: float = 1.0,
581 finished_text: TextType = " ",
582 table_column: Optional[Column] = None,
583 ):
584 self.spinner = Spinner(spinner_name, style=style, speed=speed)
585 self.finished_text = (
586 Text.from_markup(finished_text)
587 if isinstance(finished_text, str)
588 else finished_text
589 )
590 super().__init__(table_column=table_column)
592 def set_spinner(
593 self,
594 spinner_name: str,
595 spinner_style: Optional[StyleType] = "progress.spinner",
596 speed: float = 1.0,
597 ) -> None:
598 """Set a new spinner.
600 Args:
601 spinner_name (str): Spinner name, see python -m rich.spinner.
602 spinner_style (Optional[StyleType], optional): Spinner style. Defaults to "progress.spinner".
603 speed (float, optional): Speed factor of spinner. Defaults to 1.0.
604 """
605 self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed)
607 def render(self, task: "Task") -> RenderableType:
608 text = (
609 self.finished_text
610 if task.finished
611 else self.spinner.render(task.get_time())
612 )
613 return text
616class TextColumn(ProgressColumn):
617 """A column containing text."""
619 def __init__(
620 self,
621 text_format: str,
622 style: StyleType = "none",
623 justify: JustifyMethod = "left",
624 markup: bool = True,
625 highlighter: Optional[Highlighter] = None,
626 table_column: Optional[Column] = None,
627 ) -> None:
628 self.text_format = text_format
629 self.justify: JustifyMethod = justify
630 self.style = style
631 self.markup = markup
632 self.highlighter = highlighter
633 super().__init__(table_column=table_column or Column(no_wrap=True))
635 def render(self, task: "Task") -> Text:
636 _text = self.text_format.format(task=task)
637 if self.markup:
638 text = Text.from_markup(_text, style=self.style, justify=self.justify)
639 else:
640 text = Text(_text, style=self.style, justify=self.justify)
641 if self.highlighter:
642 self.highlighter.highlight(text)
643 return text
646class BarColumn(ProgressColumn):
647 """Renders a visual progress bar.
649 Args:
650 bar_width (Optional[int], optional): Width of bar or None for full width. Defaults to 40.
651 style (StyleType, optional): Style for the bar background. Defaults to "bar.back".
652 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete".
653 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished".
654 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse".
655 """
657 def __init__(
658 self,
659 bar_width: Optional[int] = 40,
660 style: StyleType = "bar.back",
661 complete_style: StyleType = "bar.complete",
662 finished_style: StyleType = "bar.finished",
663 pulse_style: StyleType = "bar.pulse",
664 table_column: Optional[Column] = None,
665 ) -> None:
666 self.bar_width = bar_width
667 self.style = style
668 self.complete_style = complete_style
669 self.finished_style = finished_style
670 self.pulse_style = pulse_style
671 super().__init__(table_column=table_column)
673 def render(self, task: "Task") -> ProgressBar:
674 """Gets a progress bar widget for a task."""
675 return ProgressBar(
676 total=max(0, task.total) if task.total is not None else None,
677 completed=max(0, task.completed),
678 width=None if self.bar_width is None else max(1, self.bar_width),
679 pulse=not task.started,
680 animation_time=task.get_time(),
681 style=self.style,
682 complete_style=self.complete_style,
683 finished_style=self.finished_style,
684 pulse_style=self.pulse_style,
685 )
688class TimeElapsedColumn(ProgressColumn):
689 """Renders time elapsed."""
691 def render(self, task: "Task") -> Text:
692 """Show time elapsed."""
693 elapsed = task.finished_time if task.finished else task.elapsed
694 if elapsed is None:
695 return Text("-:--:--", style="progress.elapsed")
696 delta = timedelta(seconds=max(0, int(elapsed)))
697 return Text(str(delta), style="progress.elapsed")
700class TaskProgressColumn(TextColumn):
701 """Show task progress as a percentage.
703 Args:
704 text_format (str, optional): Format for percentage display. Defaults to "[progress.percentage]{task.percentage:>3.0f}%".
705 text_format_no_percentage (str, optional): Format if percentage is unknown. Defaults to "".
706 style (StyleType, optional): Style of output. Defaults to "none".
707 justify (JustifyMethod, optional): Text justification. Defaults to "left".
708 markup (bool, optional): Enable markup. Defaults to True.
709 highlighter (Optional[Highlighter], optional): Highlighter to apply to output. Defaults to None.
710 table_column (Optional[Column], optional): Table Column to use. Defaults to None.
711 show_speed (bool, optional): Show speed if total is unknown. Defaults to False.
712 """
714 def __init__(
715 self,
716 text_format: str = "[progress.percentage]{task.percentage:>3.0f}%",
717 text_format_no_percentage: str = "",
718 style: StyleType = "none",
719 justify: JustifyMethod = "left",
720 markup: bool = True,
721 highlighter: Optional[Highlighter] = None,
722 table_column: Optional[Column] = None,
723 show_speed: bool = False,
724 ) -> None:
725 self.text_format_no_percentage = text_format_no_percentage
726 self.show_speed = show_speed
727 super().__init__(
728 text_format=text_format,
729 style=style,
730 justify=justify,
731 markup=markup,
732 highlighter=highlighter,
733 table_column=table_column,
734 )
736 @classmethod
737 def render_speed(cls, speed: Optional[float]) -> Text:
738 """Render the speed in iterations per second.
740 Args:
741 task (Task): A Task object.
743 Returns:
744 Text: Text object containing the task speed.
745 """
746 if speed is None:
747 return Text("", style="progress.percentage")
748 unit, suffix = filesize.pick_unit_and_suffix(
749 int(speed),
750 ["", "×10³", "×10⁶", "×10⁹", "×10¹²"],
751 1000,
752 )
753 data_speed = speed / unit
754 return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage")
756 def render(self, task: "Task") -> Text:
757 if task.total is None and self.show_speed:
758 return self.render_speed(task.finished_speed or task.speed)
759 text_format = (
760 self.text_format_no_percentage if task.total is None else self.text_format
761 )
762 _text = text_format.format(task=task)
763 if self.markup:
764 text = Text.from_markup(_text, style=self.style, justify=self.justify)
765 else:
766 text = Text(_text, style=self.style, justify=self.justify)
767 if self.highlighter:
768 self.highlighter.highlight(text)
769 return text
772class TimeRemainingColumn(ProgressColumn):
773 """Renders estimated time remaining.
775 Args:
776 compact (bool, optional): Render MM:SS when time remaining is less than an hour. Defaults to False.
777 elapsed_when_finished (bool, optional): Render time elapsed when the task is finished. Defaults to False.
778 """
780 # Only refresh twice a second to prevent jitter
781 max_refresh = 0.5
783 def __init__(
784 self,
785 compact: bool = False,
786 elapsed_when_finished: bool = False,
787 table_column: Optional[Column] = None,
788 ):
789 self.compact = compact
790 self.elapsed_when_finished = elapsed_when_finished
791 super().__init__(table_column=table_column)
793 def render(self, task: "Task") -> Text:
794 """Show time remaining."""
795 if self.elapsed_when_finished and task.finished:
796 task_time = task.finished_time
797 style = "progress.elapsed"
798 else:
799 task_time = task.time_remaining
800 style = "progress.remaining"
802 if task.total is None:
803 return Text("", style=style)
805 if task_time is None:
806 return Text("--:--" if self.compact else "-:--:--", style=style)
808 # Based on https://github.com/tqdm/tqdm/blob/master/tqdm/std.py
809 minutes, seconds = divmod(int(task_time), 60)
810 hours, minutes = divmod(minutes, 60)
812 if self.compact and not hours:
813 formatted = f"{minutes:02d}:{seconds:02d}"
814 else:
815 formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}"
817 return Text(formatted, style=style)
820class FileSizeColumn(ProgressColumn):
821 """Renders completed filesize."""
823 def render(self, task: "Task") -> Text:
824 """Show data completed."""
825 data_size = filesize.decimal(int(task.completed))
826 return Text(data_size, style="progress.filesize")
829class TotalFileSizeColumn(ProgressColumn):
830 """Renders total filesize."""
832 def render(self, task: "Task") -> Text:
833 """Show data completed."""
834 data_size = filesize.decimal(int(task.total)) if task.total is not None else ""
835 return Text(data_size, style="progress.filesize.total")
838class MofNCompleteColumn(ProgressColumn):
839 """Renders completed count/total, e.g. ' 10/1000'.
841 Best for bounded tasks with int quantities.
843 Space pads the completed count so that progress length does not change as task progresses
844 past powers of 10.
846 Args:
847 separator (str, optional): Text to separate completed and total values. Defaults to "/".
848 """
850 def __init__(self, separator: str = "/", table_column: Optional[Column] = None):
851 self.separator = separator
852 super().__init__(table_column=table_column)
854 def render(self, task: "Task") -> Text:
855 """Show completed/total."""
856 completed = int(task.completed)
857 total = int(task.total) if task.total is not None else "?"
858 total_width = len(str(total))
859 return Text(
860 f"{completed:{total_width}d}{self.separator}{total}",
861 style="progress.download",
862 )
865class DownloadColumn(ProgressColumn):
866 """Renders file size downloaded and total, e.g. '0.5/2.3 GB'.
868 Args:
869 binary_units (bool, optional): Use binary units, KiB, MiB etc. Defaults to False.
870 """
872 def __init__(
873 self, binary_units: bool = False, table_column: Optional[Column] = None
874 ) -> None:
875 self.binary_units = binary_units
876 super().__init__(table_column=table_column)
878 def render(self, task: "Task") -> Text:
879 """Calculate common unit for completed and total."""
880 completed = int(task.completed)
882 unit_and_suffix_calculation_base = (
883 int(task.total) if task.total is not None else completed
884 )
885 if self.binary_units:
886 unit, suffix = filesize.pick_unit_and_suffix(
887 unit_and_suffix_calculation_base,
888 ["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"],
889 1024,
890 )
891 else:
892 unit, suffix = filesize.pick_unit_and_suffix(
893 unit_and_suffix_calculation_base,
894 ["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"],
895 1000,
896 )
897 precision = 0 if unit == 1 else 1
899 completed_ratio = completed / unit
900 completed_str = f"{completed_ratio:,.{precision}f}"
902 if task.total is not None:
903 total = int(task.total)
904 total_ratio = total / unit
905 total_str = f"{total_ratio:,.{precision}f}"
906 else:
907 total_str = "?"
909 download_status = f"{completed_str}/{total_str} {suffix}"
910 download_text = Text(download_status, style="progress.download")
911 return download_text
914class TransferSpeedColumn(ProgressColumn):
915 """Renders human readable transfer speed."""
917 def render(self, task: "Task") -> Text:
918 """Show data transfer speed."""
919 speed = task.finished_speed or task.speed
920 if speed is None:
921 return Text("?", style="progress.data.speed")
922 data_speed = filesize.decimal(int(speed))
923 return Text(f"{data_speed}/s", style="progress.data.speed")
926class ProgressSample(NamedTuple):
927 """Sample of progress for a given time."""
929 timestamp: float
930 """Timestamp of sample."""
931 completed: float
932 """Number of steps completed."""
935@dataclass
936class Task:
937 """Information regarding a progress task.
939 This object should be considered read-only outside of the :class:`~Progress` class.
941 """
943 id: TaskID
944 """Task ID associated with this task (used in Progress methods)."""
946 description: str
947 """str: Description of the task."""
949 total: Optional[float]
950 """Optional[float]: Total number of steps in this task."""
952 completed: float
953 """float: Number of steps completed"""
955 _get_time: GetTimeCallable
956 """Callable to get the current time."""
958 finished_time: Optional[float] = None
959 """float: Time task was finished."""
961 visible: bool = True
962 """bool: Indicates if this task is visible in the progress display."""
964 fields: Dict[str, Any] = field(default_factory=dict)
965 """dict: Arbitrary fields passed in via Progress.update."""
967 start_time: Optional[float] = field(default=None, init=False, repr=False)
968 """Optional[float]: Time this task was started, or None if not started."""
970 stop_time: Optional[float] = field(default=None, init=False, repr=False)
971 """Optional[float]: Time this task was stopped, or None if not stopped."""
973 finished_speed: Optional[float] = None
974 """Optional[float]: The last speed for a finished task."""
976 _progress: Deque[ProgressSample] = field(
977 default_factory=lambda: deque(maxlen=1000), init=False, repr=False
978 )
980 _lock: RLock = field(repr=False, default_factory=RLock)
981 """Thread lock."""
983 def get_time(self) -> float:
984 """float: Get the current time, in seconds."""
985 return self._get_time()
987 @property
988 def started(self) -> bool:
989 """bool: Check if the task as started."""
990 return self.start_time is not None
992 @property
993 def remaining(self) -> Optional[float]:
994 """Optional[float]: Get the number of steps remaining, if a non-None total was set."""
995 if self.total is None:
996 return None
997 return self.total - self.completed
999 @property
1000 def elapsed(self) -> Optional[float]:
1001 """Optional[float]: Time elapsed since task was started, or ``None`` if the task hasn't started."""
1002 if self.start_time is None:
1003 return None
1004 if self.stop_time is not None:
1005 return self.stop_time - self.start_time
1006 return self.get_time() - self.start_time
1008 @property
1009 def finished(self) -> bool:
1010 """Check if the task has finished."""
1011 return self.finished_time is not None
1013 @property
1014 def percentage(self) -> float:
1015 """float: Get progress of task as a percentage. If a None total was set, returns 0"""
1016 if not self.total:
1017 return 0.0
1018 completed = (self.completed / self.total) * 100.0
1019 completed = min(100.0, max(0.0, completed))
1020 return completed
1022 @property
1023 def speed(self) -> Optional[float]:
1024 """Optional[float]: Get the estimated speed in steps per second."""
1025 if self.start_time is None:
1026 return None
1027 with self._lock:
1028 progress = self._progress
1029 if not progress:
1030 return None
1031 total_time = progress[-1].timestamp - progress[0].timestamp
1032 if total_time == 0:
1033 return None
1034 iter_progress = iter(progress)
1035 next(iter_progress)
1036 total_completed = sum(sample.completed for sample in iter_progress)
1037 speed = total_completed / total_time
1038 return speed
1040 @property
1041 def time_remaining(self) -> Optional[float]:
1042 """Optional[float]: Get estimated time to completion, or ``None`` if no data."""
1043 if self.finished:
1044 return 0.0
1045 speed = self.speed
1046 if not speed:
1047 return None
1048 remaining = self.remaining
1049 if remaining is None:
1050 return None
1051 estimate = ceil(remaining / speed)
1052 return estimate
1054 def _reset(self) -> None:
1055 """Reset progress."""
1056 self._progress.clear()
1057 self.finished_time = None
1058 self.finished_speed = None
1061class Progress(JupyterMixin):
1062 """Renders an auto-updating progress bar(s).
1064 Args:
1065 console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout.
1066 auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`.
1067 refresh_per_second (Optional[float], optional): Number of times per second to refresh the progress information or None to use default (10). Defaults to None.
1068 speed_estimate_period: (float, optional): Period (in seconds) used to calculate the speed estimate. Defaults to 30.
1069 transient: (bool, optional): Clear the progress on exit. Defaults to False.
1070 redirect_stdout: (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True.
1071 redirect_stderr: (bool, optional): Enable redirection of stderr. Defaults to True.
1072 get_time: (Callable, optional): A callable that gets the current time, or None to use Console.get_time. Defaults to None.
1073 disable (bool, optional): Disable progress display. Defaults to False
1074 expand (bool, optional): Expand tasks table to fit width. Defaults to False.
1075 """
1077 def __init__(
1078 self,
1079 *columns: Union[str, ProgressColumn],
1080 console: Optional[Console] = None,
1081 auto_refresh: bool = True,
1082 refresh_per_second: float = 10,
1083 speed_estimate_period: float = 30.0,
1084 transient: bool = False,
1085 redirect_stdout: bool = True,
1086 redirect_stderr: bool = True,
1087 get_time: Optional[GetTimeCallable] = None,
1088 disable: bool = False,
1089 expand: bool = False,
1090 ) -> None:
1091 assert refresh_per_second > 0, "refresh_per_second must be > 0"
1092 self._lock = RLock()
1093 self.columns = columns or self.get_default_columns()
1094 self.speed_estimate_period = speed_estimate_period
1096 self.disable = disable
1097 self.expand = expand
1098 self._tasks: Dict[TaskID, Task] = {}
1099 self._task_index: TaskID = TaskID(0)
1100 self.live = Live(
1101 console=console or get_console(),
1102 auto_refresh=auto_refresh,
1103 refresh_per_second=refresh_per_second,
1104 transient=transient,
1105 redirect_stdout=redirect_stdout,
1106 redirect_stderr=redirect_stderr,
1107 get_renderable=self.get_renderable,
1108 )
1109 self.get_time = get_time or self.console.get_time
1110 self.print = self.console.print
1111 self.log = self.console.log
1113 @classmethod
1114 def get_default_columns(cls) -> Tuple[ProgressColumn, ...]:
1115 """Get the default columns used for a new Progress instance:
1116 - a text column for the description (TextColumn)
1117 - the bar itself (BarColumn)
1118 - a text column showing completion percentage (TextColumn)
1119 - an estimated-time-remaining column (TimeRemainingColumn)
1120 If the Progress instance is created without passing a columns argument,
1121 the default columns defined here will be used.
1123 You can also create a Progress instance using custom columns before
1124 and/or after the defaults, as in this example:
1126 progress = Progress(
1127 SpinnerColumn(),
1128 *Progress.get_default_columns(),
1129 "Elapsed:",
1130 TimeElapsedColumn(),
1131 )
1133 This code shows the creation of a Progress display, containing
1134 a spinner to the left, the default columns, and a labeled elapsed
1135 time column.
1136 """
1137 return (
1138 TextColumn("[progress.description]{task.description}"),
1139 BarColumn(),
1140 TaskProgressColumn(),
1141 TimeRemainingColumn(),
1142 )
1144 @property
1145 def console(self) -> Console:
1146 return self.live.console
1148 @property
1149 def tasks(self) -> List[Task]:
1150 """Get a list of Task instances."""
1151 with self._lock:
1152 return list(self._tasks.values())
1154 @property
1155 def task_ids(self) -> List[TaskID]:
1156 """A list of task IDs."""
1157 with self._lock:
1158 return list(self._tasks.keys())
1160 @property
1161 def finished(self) -> bool:
1162 """Check if all tasks have been completed."""
1163 with self._lock:
1164 if not self._tasks:
1165 return True
1166 return all(task.finished for task in self._tasks.values())
1168 def start(self) -> None:
1169 """Start the progress display."""
1170 if not self.disable:
1171 self.live.start(refresh=True)
1173 def stop(self) -> None:
1174 """Stop the progress display."""
1175 self.live.stop()
1176 if not self.console.is_interactive and not self.console.is_jupyter:
1177 self.console.print()
1179 def __enter__(self) -> Self:
1180 self.start()
1181 return self
1183 def __exit__(
1184 self,
1185 exc_type: Optional[Type[BaseException]],
1186 exc_val: Optional[BaseException],
1187 exc_tb: Optional[TracebackType],
1188 ) -> None:
1189 self.stop()
1191 def track(
1192 self,
1193 sequence: Iterable[ProgressType],
1194 total: Optional[float] = None,
1195 completed: int = 0,
1196 task_id: Optional[TaskID] = None,
1197 description: str = "Working...",
1198 update_period: float = 0.1,
1199 ) -> Iterable[ProgressType]:
1200 """Track progress by iterating over a sequence.
1202 You can also track progress of an iterable, which might require that you additionally specify ``total``.
1204 Args:
1205 sequence (Iterable[ProgressType]): Values you want to iterate over and track progress.
1206 total: (float, optional): Total number of steps. Default is len(sequence).
1207 completed (int, optional): Number of steps completed so far. Defaults to 0.
1208 task_id: (TaskID): Task to track. Default is new task.
1209 description: (str, optional): Description of task, if new task is created.
1210 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1.
1212 Returns:
1213 Iterable[ProgressType]: An iterable of values taken from the provided sequence.
1214 """
1215 if total is None:
1216 total = float(length_hint(sequence)) or None
1218 if task_id is None:
1219 task_id = self.add_task(description, total=total, completed=completed)
1220 else:
1221 self.update(task_id, total=total, completed=completed)
1223 if self.live.auto_refresh:
1224 with _TrackThread(self, task_id, update_period) as track_thread:
1225 for value in sequence:
1226 yield value
1227 track_thread.completed += 1
1228 else:
1229 advance = self.advance
1230 refresh = self.refresh
1231 for value in sequence:
1232 yield value
1233 advance(task_id, 1)
1234 refresh()
1236 def wrap_file(
1237 self,
1238 file: BinaryIO,
1239 total: Optional[int] = None,
1240 *,
1241 task_id: Optional[TaskID] = None,
1242 description: str = "Reading...",
1243 ) -> BinaryIO:
1244 """Track progress file reading from a binary file.
1246 Args:
1247 file (BinaryIO): A file-like object opened in binary mode.
1248 total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given.
1249 task_id (TaskID): Task to track. Default is new task.
1250 description (str, optional): Description of task, if new task is created.
1252 Returns:
1253 BinaryIO: A readable file-like object in binary mode.
1255 Raises:
1256 ValueError: When no total value can be extracted from the arguments or the task.
1257 """
1258 # attempt to recover the total from the task
1259 total_bytes: Optional[float] = None
1260 if total is not None:
1261 total_bytes = total
1262 elif task_id is not None:
1263 with self._lock:
1264 total_bytes = self._tasks[task_id].total
1265 if total_bytes is None:
1266 raise ValueError(
1267 f"unable to get the total number of bytes, please specify 'total'"
1268 )
1270 # update total of task or create new task
1271 if task_id is None:
1272 task_id = self.add_task(description, total=total_bytes)
1273 else:
1274 self.update(task_id, total=total_bytes)
1276 return _Reader(file, self, task_id, close_handle=False)
1278 @typing.overload
1279 def open(
1280 self,
1281 file: Union[str, "PathLike[str]", bytes],
1282 mode: Literal["rb"],
1283 buffering: int = -1,
1284 encoding: Optional[str] = None,
1285 errors: Optional[str] = None,
1286 newline: Optional[str] = None,
1287 *,
1288 total: Optional[int] = None,
1289 task_id: Optional[TaskID] = None,
1290 description: str = "Reading...",
1291 ) -> BinaryIO:
1292 pass
1294 @typing.overload
1295 def open(
1296 self,
1297 file: Union[str, "PathLike[str]", bytes],
1298 mode: Union[Literal["r"], Literal["rt"]],
1299 buffering: int = -1,
1300 encoding: Optional[str] = None,
1301 errors: Optional[str] = None,
1302 newline: Optional[str] = None,
1303 *,
1304 total: Optional[int] = None,
1305 task_id: Optional[TaskID] = None,
1306 description: str = "Reading...",
1307 ) -> TextIO:
1308 pass
1310 def open(
1311 self,
1312 file: Union[str, "PathLike[str]", bytes],
1313 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r",
1314 buffering: int = -1,
1315 encoding: Optional[str] = None,
1316 errors: Optional[str] = None,
1317 newline: Optional[str] = None,
1318 *,
1319 total: Optional[int] = None,
1320 task_id: Optional[TaskID] = None,
1321 description: str = "Reading...",
1322 ) -> Union[BinaryIO, TextIO]:
1323 """Track progress while reading from a binary file.
1325 Args:
1326 path (Union[str, PathLike[str]]): The path to the file to read.
1327 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt".
1328 buffering (int): The buffering strategy to use, see :func:`io.open`.
1329 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`.
1330 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`.
1331 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`.
1332 total (int, optional): Total number of bytes to read. If none given, os.stat(path).st_size is used.
1333 task_id (TaskID): Task to track. Default is new task.
1334 description (str, optional): Description of task, if new task is created.
1336 Returns:
1337 BinaryIO: A readable file-like object in binary mode.
1339 Raises:
1340 ValueError: When an invalid mode is given.
1341 """
1342 # normalize the mode (always rb, rt)
1343 _mode = "".join(sorted(mode, reverse=False))
1344 if _mode not in ("br", "rt", "r"):
1345 raise ValueError(f"invalid mode {mode!r}")
1347 # patch buffering to provide the same behaviour as the builtin `open`
1348 line_buffering = buffering == 1
1349 if _mode == "br" and buffering == 1:
1350 warnings.warn(
1351 "line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used",
1352 RuntimeWarning,
1353 )
1354 buffering = -1
1355 elif _mode in ("rt", "r"):
1356 if buffering == 0:
1357 raise ValueError("can't have unbuffered text I/O")
1358 elif buffering == 1:
1359 buffering = -1
1361 # attempt to get the total with `os.stat`
1362 if total is None:
1363 total = stat(file).st_size
1365 # update total of task or create new task
1366 if task_id is None:
1367 task_id = self.add_task(description, total=total)
1368 else:
1369 self.update(task_id, total=total)
1371 # open the file in binary mode,
1372 handle = io.open(file, "rb", buffering=buffering)
1373 reader = _Reader(handle, self, task_id, close_handle=True)
1375 # wrap the reader in a `TextIOWrapper` if text mode
1376 if mode in ("r", "rt"):
1377 return io.TextIOWrapper(
1378 reader,
1379 encoding=encoding,
1380 errors=errors,
1381 newline=newline,
1382 line_buffering=line_buffering,
1383 )
1385 return reader
1387 def start_task(self, task_id: TaskID) -> None:
1388 """Start a task.
1390 Starts a task (used when calculating elapsed time). You may need to call this manually,
1391 if you called ``add_task`` with ``start=False``.
1393 Args:
1394 task_id (TaskID): ID of task.
1395 """
1396 with self._lock:
1397 task = self._tasks[task_id]
1398 if task.start_time is None:
1399 task.start_time = self.get_time()
1401 def stop_task(self, task_id: TaskID) -> None:
1402 """Stop a task.
1404 This will freeze the elapsed time on the task.
1406 Args:
1407 task_id (TaskID): ID of task.
1408 """
1409 with self._lock:
1410 task = self._tasks[task_id]
1411 current_time = self.get_time()
1412 if task.start_time is None:
1413 task.start_time = current_time
1414 task.stop_time = current_time
1416 def update(
1417 self,
1418 task_id: TaskID,
1419 *,
1420 total: Optional[float] = None,
1421 completed: Optional[float] = None,
1422 advance: Optional[float] = None,
1423 description: Optional[str] = None,
1424 visible: Optional[bool] = None,
1425 refresh: bool = False,
1426 **fields: Any,
1427 ) -> None:
1428 """Update information associated with a task.
1430 Args:
1431 task_id (TaskID): Task id (returned by add_task).
1432 total (float, optional): Updates task.total if not None.
1433 completed (float, optional): Updates task.completed if not None.
1434 advance (float, optional): Add a value to task.completed if not None.
1435 description (str, optional): Change task description if not None.
1436 visible (bool, optional): Set visible flag if not None.
1437 refresh (bool): Force a refresh of progress information. Default is False.
1438 **fields (Any): Additional data fields required for rendering.
1439 """
1440 with self._lock:
1441 task = self._tasks[task_id]
1442 completed_start = task.completed
1444 if total is not None and total != task.total:
1445 task.total = total
1446 task._reset()
1447 if advance is not None:
1448 task.completed += advance
1449 if completed is not None:
1450 task.completed = completed
1451 if description is not None:
1452 task.description = description
1453 if visible is not None:
1454 task.visible = visible
1455 task.fields.update(fields)
1456 update_completed = task.completed - completed_start
1458 current_time = self.get_time()
1459 old_sample_time = current_time - self.speed_estimate_period
1460 _progress = task._progress
1462 popleft = _progress.popleft
1463 while _progress and _progress[0].timestamp < old_sample_time:
1464 popleft()
1465 if update_completed > 0:
1466 _progress.append(ProgressSample(current_time, update_completed))
1467 if (
1468 task.total is not None
1469 and task.completed >= task.total
1470 and task.finished_time is None
1471 ):
1472 task.finished_time = task.elapsed
1474 if refresh:
1475 self.refresh()
1477 def reset(
1478 self,
1479 task_id: TaskID,
1480 *,
1481 start: bool = True,
1482 total: Optional[float] = None,
1483 completed: int = 0,
1484 visible: Optional[bool] = None,
1485 description: Optional[str] = None,
1486 **fields: Any,
1487 ) -> None:
1488 """Reset a task so completed is 0 and the clock is reset.
1490 Args:
1491 task_id (TaskID): ID of task.
1492 start (bool, optional): Start the task after reset. Defaults to True.
1493 total (float, optional): New total steps in task, or None to use current total. Defaults to None.
1494 completed (int, optional): Number of steps completed. Defaults to 0.
1495 visible (bool, optional): Enable display of the task. Defaults to True.
1496 description (str, optional): Change task description if not None. Defaults to None.
1497 **fields (str): Additional data fields required for rendering.
1498 """
1499 current_time = self.get_time()
1500 with self._lock:
1501 task = self._tasks[task_id]
1502 task._reset()
1503 task.start_time = current_time if start else None
1504 if total is not None:
1505 task.total = total
1506 task.completed = completed
1507 if visible is not None:
1508 task.visible = visible
1509 if fields:
1510 task.fields = fields
1511 if description is not None:
1512 task.description = description
1513 task.finished_time = None
1514 self.refresh()
1516 def advance(self, task_id: TaskID, advance: float = 1) -> None:
1517 """Advance task by a number of steps.
1519 Args:
1520 task_id (TaskID): ID of task.
1521 advance (float): Number of steps to advance. Default is 1.
1522 """
1523 current_time = self.get_time()
1524 with self._lock:
1525 task = self._tasks[task_id]
1526 completed_start = task.completed
1527 task.completed += advance
1528 update_completed = task.completed - completed_start
1529 old_sample_time = current_time - self.speed_estimate_period
1530 _progress = task._progress
1532 popleft = _progress.popleft
1533 while _progress and _progress[0].timestamp < old_sample_time:
1534 popleft()
1535 while len(_progress) > 1000:
1536 popleft()
1537 _progress.append(ProgressSample(current_time, update_completed))
1538 if (
1539 task.total is not None
1540 and task.completed >= task.total
1541 and task.finished_time is None
1542 ):
1543 task.finished_time = task.elapsed
1544 task.finished_speed = task.speed
1546 def refresh(self) -> None:
1547 """Refresh (render) the progress information."""
1548 if not self.disable and self.live.is_started:
1549 self.live.refresh()
1551 def get_renderable(self) -> RenderableType:
1552 """Get a renderable for the progress display."""
1553 renderable = Group(*self.get_renderables())
1554 return renderable
1556 def get_renderables(self) -> Iterable[RenderableType]:
1557 """Get a number of renderables for the progress display."""
1558 table = self.make_tasks_table(self.tasks)
1559 yield table
1561 def make_tasks_table(self, tasks: Iterable[Task]) -> Table:
1562 """Get a table to render the Progress display.
1564 Args:
1565 tasks (Iterable[Task]): An iterable of Task instances, one per row of the table.
1567 Returns:
1568 Table: A table instance.
1569 """
1570 table_columns = (
1571 (
1572 Column(no_wrap=True)
1573 if isinstance(_column, str)
1574 else _column.get_table_column().copy()
1575 )
1576 for _column in self.columns
1577 )
1578 table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand)
1580 for task in tasks:
1581 if task.visible:
1582 table.add_row(
1583 *(
1584 (
1585 column.format(task=task)
1586 if isinstance(column, str)
1587 else column(task)
1588 )
1589 for column in self.columns
1590 )
1591 )
1592 return table
1594 def __rich__(self) -> RenderableType:
1595 """Makes the Progress class itself renderable."""
1596 with self._lock:
1597 return self.get_renderable()
1599 def add_task(
1600 self,
1601 description: str,
1602 start: bool = True,
1603 total: Optional[float] = 100.0,
1604 completed: int = 0,
1605 visible: bool = True,
1606 **fields: Any,
1607 ) -> TaskID:
1608 """Add a new 'task' to the Progress display.
1610 Args:
1611 description (str): A description of the task.
1612 start (bool, optional): Start the task immediately (to calculate elapsed time). If set to False,
1613 you will need to call `start` manually. Defaults to True.
1614 total (float, optional): Number of total steps in the progress if known.
1615 Set to None to render a pulsing animation. Defaults to 100.
1616 completed (int, optional): Number of steps completed so far. Defaults to 0.
1617 visible (bool, optional): Enable display of the task. Defaults to True.
1618 **fields (str): Additional data fields required for rendering.
1620 Returns:
1621 TaskID: An ID you can use when calling `update`.
1622 """
1623 with self._lock:
1624 task = Task(
1625 self._task_index,
1626 description,
1627 total,
1628 completed,
1629 visible=visible,
1630 fields=fields,
1631 _get_time=self.get_time,
1632 _lock=self._lock,
1633 )
1634 self._tasks[self._task_index] = task
1635 if start:
1636 self.start_task(self._task_index)
1637 new_task_index = self._task_index
1638 self._task_index = TaskID(int(self._task_index) + 1)
1639 self.refresh()
1640 return new_task_index
1642 def remove_task(self, task_id: TaskID) -> None:
1643 """Delete a task if it exists.
1645 Args:
1646 task_id (TaskID): A task ID.
1648 """
1649 with self._lock:
1650 del self._tasks[task_id]
1653if __name__ == "__main__": # pragma: no coverage
1654 import random
1655 import time
1657 from .panel import Panel
1658 from .rule import Rule
1659 from .syntax import Syntax
1660 from .table import Table
1662 syntax = Syntax(
1663 '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]:
1664 """Iterate and generate a tuple with a flag for last value."""
1665 iter_values = iter(values)
1666 try:
1667 previous_value = next(iter_values)
1668 except StopIteration:
1669 return
1670 for value in iter_values:
1671 yield False, previous_value
1672 previous_value = value
1673 yield True, previous_value''',
1674 "python",
1675 line_numbers=True,
1676 )
1678 table = Table("foo", "bar", "baz")
1679 table.add_row("1", "2", "3")
1681 progress_renderables = [
1682 "Text may be printed while the progress bars are rendering.",
1683 Panel("In fact, [i]any[/i] renderable will work"),
1684 "Such as [magenta]tables[/]...",
1685 table,
1686 "Pretty printed structures...",
1687 {"type": "example", "text": "Pretty printed"},
1688 "Syntax...",
1689 syntax,
1690 Rule("Give it a try!"),
1691 ]
1693 from itertools import cycle
1695 examples = cycle(progress_renderables)
1697 console = Console(record=True)
1699 with Progress(
1700 SpinnerColumn(),
1701 *Progress.get_default_columns(),
1702 TimeElapsedColumn(),
1703 console=console,
1704 transient=False,
1705 ) as progress:
1706 task1 = progress.add_task("[red]Downloading", total=1000)
1707 task2 = progress.add_task("[green]Processing", total=1000)
1708 task3 = progress.add_task("[yellow]Thinking", total=None)
1710 while not progress.finished:
1711 progress.update(task1, advance=0.5)
1712 progress.update(task2, advance=0.3)
1713 time.sleep(0.01)
1714 if random.randint(0, 100) < 1:
1715 progress.log(next(examples))