Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rich/progress.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

624 statements  

1from __future__ import annotations 

2 

3import io 

4import typing 

5import warnings 

6from abc import ABC, abstractmethod 

7from collections import deque 

8from dataclasses import dataclass, field 

9from datetime import timedelta 

10from io import RawIOBase, UnsupportedOperation 

11from math import ceil 

12from mmap import mmap 

13from operator import length_hint 

14from os import PathLike, stat 

15from threading import Event, RLock, Thread 

16from types import TracebackType 

17from typing import ( 

18 TYPE_CHECKING, 

19 Any, 

20 BinaryIO, 

21 Callable, 

22 ContextManager, 

23 Deque, 

24 Dict, 

25 Generic, 

26 Iterable, 

27 List, 

28 Literal, 

29 NamedTuple, 

30 NewType, 

31 Optional, 

32 TextIO, 

33 Tuple, 

34 Type, 

35 TypeVar, 

36 Union, 

37) 

38 

39if TYPE_CHECKING: 

40 # Can be replaced with `from typing import Self` in Python 3.11+ 

41 from typing_extensions import Self # pragma: no cover 

42 

43from . import filesize, get_console 

44from .console import Console, Group, JustifyMethod, RenderableType 

45from .highlighter import Highlighter 

46from .jupyter import JupyterMixin 

47from .live import Live 

48from .progress_bar import ProgressBar 

49from .spinner import Spinner 

50from .style import StyleType 

51from .table import Column, Table 

52from .text import Text, TextType 

53 

54TaskID = NewType("TaskID", int) 

55 

56ProgressType = TypeVar("ProgressType") 

57 

58GetTimeCallable = Callable[[], float] 

59 

60 

61_I = typing.TypeVar("_I", TextIO, BinaryIO) 

62 

63 

64class _TrackThread(Thread): 

65 """A thread to periodically update progress.""" 

66 

67 def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float): 

68 self.progress = progress 

69 self.task_id = task_id 

70 self.update_period = update_period 

71 self.done = Event() 

72 

73 self.completed = 0 

74 super().__init__(daemon=True) 

75 

76 def run(self) -> None: 

77 task_id = self.task_id 

78 advance = self.progress.advance 

79 update_period = self.update_period 

80 last_completed = 0 

81 wait = self.done.wait 

82 while not wait(update_period) and self.progress.live.is_started: 

83 completed = self.completed 

84 if last_completed != completed: 

85 advance(task_id, completed - last_completed) 

86 last_completed = completed 

87 

88 self.progress.update(self.task_id, completed=self.completed, refresh=True) 

89 

90 def __enter__(self) -> "_TrackThread": 

91 self.start() 

92 return self 

93 

94 def __exit__( 

95 self, 

96 exc_type: Optional[Type[BaseException]], 

97 exc_val: Optional[BaseException], 

98 exc_tb: Optional[TracebackType], 

99 ) -> None: 

100 self.done.set() 

101 self.join() 

102 

103 

104def track( 

105 sequence: Iterable[ProgressType], 

106 description: str = "Working...", 

107 total: Optional[float] = None, 

108 completed: int = 0, 

109 auto_refresh: bool = True, 

110 console: Optional[Console] = None, 

111 transient: bool = False, 

112 get_time: Optional[Callable[[], float]] = None, 

113 refresh_per_second: float = 10, 

114 style: StyleType = "bar.back", 

115 complete_style: StyleType = "bar.complete", 

116 finished_style: StyleType = "bar.finished", 

117 pulse_style: StyleType = "bar.pulse", 

118 update_period: float = 0.1, 

119 disable: bool = False, 

120 show_speed: bool = True, 

121) -> Iterable[ProgressType]: 

122 """Track progress by iterating over a sequence. 

123 

124 You can also track progress of an iterable, which might require that you additionally specify ``total``. 

125 

126 Args: 

127 sequence (Iterable[ProgressType]): Values you wish to iterate over and track progress. 

128 description (str, optional): Description of task show next to progress bar. Defaults to "Working". 

129 total: (float, optional): Total number of steps. Default is len(sequence). 

130 completed (int, optional): Number of steps completed so far. Defaults to 0. 

131 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. 

132 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

133 console (Console, optional): Console to write to. Default creates internal Console instance. 

134 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. 

135 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

136 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

137 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

138 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

139 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. 

140 disable (bool, optional): Disable display of progress. 

141 show_speed (bool, optional): Show speed if total isn't known. Defaults to True. 

142 Returns: 

143 Iterable[ProgressType]: An iterable of the values in the sequence. 

144 

145 """ 

146 

147 columns: List["ProgressColumn"] = ( 

148 [TextColumn("[progress.description]{task.description}")] if description else [] 

149 ) 

150 columns.extend( 

151 ( 

152 BarColumn( 

153 style=style, 

154 complete_style=complete_style, 

155 finished_style=finished_style, 

156 pulse_style=pulse_style, 

157 ), 

158 TaskProgressColumn(show_speed=show_speed), 

159 TimeRemainingColumn(elapsed_when_finished=True), 

160 ) 

161 ) 

162 progress = Progress( 

163 *columns, 

164 auto_refresh=auto_refresh, 

165 console=console, 

166 transient=transient, 

167 get_time=get_time, 

168 refresh_per_second=refresh_per_second or 10, 

169 disable=disable, 

170 ) 

171 

172 with progress: 

173 yield from progress.track( 

174 sequence, 

175 total=total, 

176 completed=completed, 

177 description=description, 

178 update_period=update_period, 

179 ) 

180 

181 

182class _Reader(RawIOBase, BinaryIO): 

183 """A reader that tracks progress while it's being read from.""" 

184 

185 def __init__( 

186 self, 

187 handle: BinaryIO, 

188 progress: "Progress", 

189 task: TaskID, 

190 close_handle: bool = True, 

191 ) -> None: 

192 self.handle = handle 

193 self.progress = progress 

194 self.task = task 

195 self.close_handle = close_handle 

196 self._closed = False 

197 

198 def __enter__(self) -> "_Reader": 

199 self.handle.__enter__() 

200 return self 

201 

202 def __exit__( 

203 self, 

204 exc_type: Optional[Type[BaseException]], 

205 exc_val: Optional[BaseException], 

206 exc_tb: Optional[TracebackType], 

207 ) -> None: 

208 self.close() 

209 

210 def __iter__(self) -> BinaryIO: 

211 return self 

212 

213 def __next__(self) -> bytes: 

214 line = next(self.handle) 

215 self.progress.advance(self.task, advance=len(line)) 

216 return line 

217 

218 @property 

219 def closed(self) -> bool: 

220 return self._closed 

221 

222 def fileno(self) -> int: 

223 return self.handle.fileno() 

224 

225 def isatty(self) -> bool: 

226 return self.handle.isatty() 

227 

228 @property 

229 def mode(self) -> str: 

230 return self.handle.mode 

231 

232 @property 

233 def name(self) -> str: 

234 return self.handle.name 

235 

236 def readable(self) -> bool: 

237 return self.handle.readable() 

238 

239 def seekable(self) -> bool: 

240 return self.handle.seekable() 

241 

242 def writable(self) -> bool: 

243 return False 

244 

245 def read(self, size: int = -1) -> bytes: 

246 block = self.handle.read(size) 

247 self.progress.advance(self.task, advance=len(block)) 

248 return block 

249 

250 def readinto(self, b: Union[bytearray, memoryview, mmap]): # type: ignore[no-untyped-def, override] 

251 n = self.handle.readinto(b) # type: ignore[attr-defined] 

252 self.progress.advance(self.task, advance=n) 

253 return n 

254 

255 def readline(self, size: int = -1) -> bytes: # type: ignore[override] 

256 line = self.handle.readline(size) 

257 self.progress.advance(self.task, advance=len(line)) 

258 return line 

259 

260 def readlines(self, hint: int = -1) -> List[bytes]: 

261 lines = self.handle.readlines(hint) 

262 self.progress.advance(self.task, advance=sum(map(len, lines))) 

263 return lines 

264 

265 def close(self) -> None: 

266 if self.close_handle: 

267 self.handle.close() 

268 self._closed = True 

269 

270 def seek(self, offset: int, whence: int = 0) -> int: 

271 pos = self.handle.seek(offset, whence) 

272 self.progress.update(self.task, completed=pos) 

273 return pos 

274 

275 def tell(self) -> int: 

276 return self.handle.tell() 

277 

278 def write(self, s: Any) -> int: 

279 raise UnsupportedOperation("write") 

280 

281 def writelines(self, lines: Iterable[Any]) -> None: 

282 raise UnsupportedOperation("writelines") 

283 

284 

285class _ReadContext(ContextManager[_I], Generic[_I]): 

286 """A utility class to handle a context for both a reader and a progress.""" 

287 

288 def __init__(self, progress: "Progress", reader: _I) -> None: 

289 self.progress = progress 

290 self.reader: _I = reader 

291 

292 def __enter__(self) -> _I: 

293 self.progress.start() 

294 return self.reader.__enter__() 

295 

296 def __exit__( 

297 self, 

298 exc_type: Optional[Type[BaseException]], 

299 exc_val: Optional[BaseException], 

300 exc_tb: Optional[TracebackType], 

301 ) -> None: 

302 self.progress.stop() 

303 self.reader.__exit__(exc_type, exc_val, exc_tb) 

304 

305 

306def wrap_file( 

307 file: BinaryIO, 

308 total: int, 

309 *, 

310 description: str = "Reading...", 

311 auto_refresh: bool = True, 

312 console: Optional[Console] = None, 

313 transient: bool = False, 

314 get_time: Optional[Callable[[], float]] = None, 

315 refresh_per_second: float = 10, 

316 style: StyleType = "bar.back", 

317 complete_style: StyleType = "bar.complete", 

318 finished_style: StyleType = "bar.finished", 

319 pulse_style: StyleType = "bar.pulse", 

320 disable: bool = False, 

321) -> ContextManager[BinaryIO]: 

322 """Read bytes from a file while tracking progress. 

323 

324 Args: 

325 file (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. 

326 total (int): Total number of bytes to read. 

327 description (str, optional): Description of task show next to progress bar. Defaults to "Reading". 

328 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. 

329 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

330 console (Console, optional): Console to write to. Default creates internal Console instance. 

331 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. 

332 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

333 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

334 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

335 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

336 disable (bool, optional): Disable display of progress. 

337 Returns: 

338 ContextManager[BinaryIO]: A context manager yielding a progress reader. 

339 

340 """ 

341 

342 columns: List["ProgressColumn"] = ( 

343 [TextColumn("[progress.description]{task.description}")] if description else [] 

344 ) 

345 columns.extend( 

346 ( 

347 BarColumn( 

348 style=style, 

349 complete_style=complete_style, 

350 finished_style=finished_style, 

351 pulse_style=pulse_style, 

352 ), 

353 DownloadColumn(), 

354 TimeRemainingColumn(), 

355 ) 

356 ) 

357 progress = Progress( 

358 *columns, 

359 auto_refresh=auto_refresh, 

360 console=console, 

361 transient=transient, 

362 get_time=get_time, 

363 refresh_per_second=refresh_per_second or 10, 

364 disable=disable, 

365 ) 

366 

367 reader = progress.wrap_file(file, total=total, description=description) 

368 return _ReadContext(progress, reader) 

369 

370 

371@typing.overload 

372def open( 

373 file: Union[str, "PathLike[str]", bytes], 

374 mode: Union[Literal["rt"], Literal["r"]], 

375 buffering: int = -1, 

376 encoding: Optional[str] = None, 

377 errors: Optional[str] = None, 

378 newline: Optional[str] = None, 

379 *, 

380 total: Optional[int] = None, 

381 description: str = "Reading...", 

382 auto_refresh: bool = True, 

383 console: Optional[Console] = None, 

384 transient: bool = False, 

385 get_time: Optional[Callable[[], float]] = None, 

386 refresh_per_second: float = 10, 

387 style: StyleType = "bar.back", 

388 complete_style: StyleType = "bar.complete", 

389 finished_style: StyleType = "bar.finished", 

390 pulse_style: StyleType = "bar.pulse", 

391 disable: bool = False, 

392) -> ContextManager[TextIO]: 

393 pass 

394 

395 

396@typing.overload 

397def open( 

398 file: Union[str, "PathLike[str]", bytes], 

399 mode: Literal["rb"], 

400 buffering: int = -1, 

401 encoding: Optional[str] = None, 

402 errors: Optional[str] = None, 

403 newline: Optional[str] = None, 

404 *, 

405 total: Optional[int] = None, 

406 description: str = "Reading...", 

407 auto_refresh: bool = True, 

408 console: Optional[Console] = None, 

409 transient: bool = False, 

410 get_time: Optional[Callable[[], float]] = None, 

411 refresh_per_second: float = 10, 

412 style: StyleType = "bar.back", 

413 complete_style: StyleType = "bar.complete", 

414 finished_style: StyleType = "bar.finished", 

415 pulse_style: StyleType = "bar.pulse", 

416 disable: bool = False, 

417) -> ContextManager[BinaryIO]: 

418 pass 

419 

420 

421def open( 

422 file: Union[str, "PathLike[str]", bytes], 

423 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", 

424 buffering: int = -1, 

425 encoding: Optional[str] = None, 

426 errors: Optional[str] = None, 

427 newline: Optional[str] = None, 

428 *, 

429 total: Optional[int] = None, 

430 description: str = "Reading...", 

431 auto_refresh: bool = True, 

432 console: Optional[Console] = None, 

433 transient: bool = False, 

434 get_time: Optional[Callable[[], float]] = None, 

435 refresh_per_second: float = 10, 

436 style: StyleType = "bar.back", 

437 complete_style: StyleType = "bar.complete", 

438 finished_style: StyleType = "bar.finished", 

439 pulse_style: StyleType = "bar.pulse", 

440 disable: bool = False, 

441) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]: 

442 """Read bytes from a file while tracking progress. 

443 

444 Args: 

445 path (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. 

446 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". 

447 buffering (int): The buffering strategy to use, see :func:`io.open`. 

448 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. 

449 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. 

450 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open` 

451 total: (int, optional): Total number of bytes to read. Must be provided if reading from a file handle. Default for a path is os.stat(file).st_size. 

452 description (str, optional): Description of task show next to progress bar. Defaults to "Reading". 

453 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. 

454 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

455 console (Console, optional): Console to write to. Default creates internal Console instance. 

456 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. 

457 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

458 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

459 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

460 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

461 disable (bool, optional): Disable display of progress. 

462 encoding (str, optional): The encoding to use when reading in text mode. 

463 

464 Returns: 

465 ContextManager[BinaryIO]: A context manager yielding a progress reader. 

466 

467 """ 

468 

469 columns: List["ProgressColumn"] = ( 

470 [TextColumn("[progress.description]{task.description}")] if description else [] 

471 ) 

472 columns.extend( 

473 ( 

474 BarColumn( 

475 style=style, 

476 complete_style=complete_style, 

477 finished_style=finished_style, 

478 pulse_style=pulse_style, 

479 ), 

480 DownloadColumn(), 

481 TimeRemainingColumn(), 

482 ) 

483 ) 

484 progress = Progress( 

485 *columns, 

486 auto_refresh=auto_refresh, 

487 console=console, 

488 transient=transient, 

489 get_time=get_time, 

490 refresh_per_second=refresh_per_second or 10, 

491 disable=disable, 

492 ) 

493 

494 reader = progress.open( 

495 file, 

496 mode=mode, 

497 buffering=buffering, 

498 encoding=encoding, 

499 errors=errors, 

500 newline=newline, 

501 total=total, 

502 description=description, 

503 ) 

504 return _ReadContext(progress, reader) # type: ignore[return-value, type-var] 

505 

506 

507class ProgressColumn(ABC): 

508 """Base class for a widget to use in progress display.""" 

509 

510 max_refresh: Optional[float] = None 

511 

512 def __init__(self, table_column: Optional[Column] = None) -> None: 

513 self._table_column = table_column 

514 self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {} 

515 self._update_time: Optional[float] = None 

516 

517 def get_table_column(self) -> Column: 

518 """Get a table column, used to build tasks table.""" 

519 return self._table_column or Column() 

520 

521 def __call__(self, task: "Task") -> RenderableType: 

522 """Called by the Progress object to return a renderable for the given task. 

523 

524 Args: 

525 task (Task): An object containing information regarding the task. 

526 

527 Returns: 

528 RenderableType: Anything renderable (including str). 

529 """ 

530 current_time = task.get_time() 

531 if self.max_refresh is not None and not task.completed: 

532 try: 

533 timestamp, renderable = self._renderable_cache[task.id] 

534 except KeyError: 

535 pass 

536 else: 

537 if timestamp + self.max_refresh > current_time: 

538 return renderable 

539 

540 renderable = self.render(task) 

541 self._renderable_cache[task.id] = (current_time, renderable) 

542 return renderable 

543 

544 @abstractmethod 

545 def render(self, task: "Task") -> RenderableType: 

546 """Should return a renderable object.""" 

547 

548 

549class RenderableColumn(ProgressColumn): 

550 """A column to insert an arbitrary column. 

551 

552 Args: 

553 renderable (RenderableType, optional): Any renderable. Defaults to empty string. 

554 """ 

555 

556 def __init__( 

557 self, renderable: RenderableType = "", *, table_column: Optional[Column] = None 

558 ): 

559 self.renderable = renderable 

560 super().__init__(table_column=table_column) 

561 

562 def render(self, task: "Task") -> RenderableType: 

563 return self.renderable 

564 

565 

566class SpinnerColumn(ProgressColumn): 

567 """A column with a 'spinner' animation. 

568 

569 Args: 

570 spinner_name (str, optional): Name of spinner animation. Defaults to "dots". 

571 style (StyleType, optional): Style of spinner. Defaults to "progress.spinner". 

572 speed (float, optional): Speed factor of spinner. Defaults to 1.0. 

573 finished_text (TextType, optional): Text used when task is finished. Defaults to " ". 

574 """ 

575 

576 def __init__( 

577 self, 

578 spinner_name: str = "dots", 

579 style: Optional[StyleType] = "progress.spinner", 

580 speed: float = 1.0, 

581 finished_text: TextType = " ", 

582 table_column: Optional[Column] = None, 

583 ): 

584 self.spinner = Spinner(spinner_name, style=style, speed=speed) 

585 self.finished_text = ( 

586 Text.from_markup(finished_text) 

587 if isinstance(finished_text, str) 

588 else finished_text 

589 ) 

590 super().__init__(table_column=table_column) 

591 

592 def set_spinner( 

593 self, 

594 spinner_name: str, 

595 spinner_style: Optional[StyleType] = "progress.spinner", 

596 speed: float = 1.0, 

597 ) -> None: 

598 """Set a new spinner. 

599 

600 Args: 

601 spinner_name (str): Spinner name, see python -m rich.spinner. 

602 spinner_style (Optional[StyleType], optional): Spinner style. Defaults to "progress.spinner". 

603 speed (float, optional): Speed factor of spinner. Defaults to 1.0. 

604 """ 

605 self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed) 

606 

607 def render(self, task: "Task") -> RenderableType: 

608 text = ( 

609 self.finished_text 

610 if task.finished 

611 else self.spinner.render(task.get_time()) 

612 ) 

613 return text 

614 

615 

616class TextColumn(ProgressColumn): 

617 """A column containing text.""" 

618 

619 def __init__( 

620 self, 

621 text_format: str, 

622 style: StyleType = "none", 

623 justify: JustifyMethod = "left", 

624 markup: bool = True, 

625 highlighter: Optional[Highlighter] = None, 

626 table_column: Optional[Column] = None, 

627 ) -> None: 

628 self.text_format = text_format 

629 self.justify: JustifyMethod = justify 

630 self.style = style 

631 self.markup = markup 

632 self.highlighter = highlighter 

633 super().__init__(table_column=table_column or Column(no_wrap=True)) 

634 

635 def render(self, task: "Task") -> Text: 

636 _text = self.text_format.format(task=task) 

637 if self.markup: 

638 text = Text.from_markup(_text, style=self.style, justify=self.justify) 

639 else: 

640 text = Text(_text, style=self.style, justify=self.justify) 

641 if self.highlighter: 

642 self.highlighter.highlight(text) 

643 return text 

644 

645 

646class BarColumn(ProgressColumn): 

647 """Renders a visual progress bar. 

648 

649 Args: 

650 bar_width (Optional[int], optional): Width of bar or None for full width. Defaults to 40. 

651 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

652 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

653 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

654 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

655 """ 

656 

657 def __init__( 

658 self, 

659 bar_width: Optional[int] = 40, 

660 style: StyleType = "bar.back", 

661 complete_style: StyleType = "bar.complete", 

662 finished_style: StyleType = "bar.finished", 

663 pulse_style: StyleType = "bar.pulse", 

664 table_column: Optional[Column] = None, 

665 ) -> None: 

666 self.bar_width = bar_width 

667 self.style = style 

668 self.complete_style = complete_style 

669 self.finished_style = finished_style 

670 self.pulse_style = pulse_style 

671 super().__init__(table_column=table_column) 

672 

673 def render(self, task: "Task") -> ProgressBar: 

674 """Gets a progress bar widget for a task.""" 

675 return ProgressBar( 

676 total=max(0, task.total) if task.total is not None else None, 

677 completed=max(0, task.completed), 

678 width=None if self.bar_width is None else max(1, self.bar_width), 

679 pulse=not task.started, 

680 animation_time=task.get_time(), 

681 style=self.style, 

682 complete_style=self.complete_style, 

683 finished_style=self.finished_style, 

684 pulse_style=self.pulse_style, 

685 ) 

686 

687 

688class TimeElapsedColumn(ProgressColumn): 

689 """Renders time elapsed.""" 

690 

691 def render(self, task: "Task") -> Text: 

692 """Show time elapsed.""" 

693 elapsed = task.finished_time if task.finished else task.elapsed 

694 if elapsed is None: 

695 return Text("-:--:--", style="progress.elapsed") 

696 delta = timedelta(seconds=max(0, int(elapsed))) 

697 return Text(str(delta), style="progress.elapsed") 

698 

699 

700class TaskProgressColumn(TextColumn): 

701 """Show task progress as a percentage. 

702 

703 Args: 

704 text_format (str, optional): Format for percentage display. Defaults to "[progress.percentage]{task.percentage:>3.0f}%". 

705 text_format_no_percentage (str, optional): Format if percentage is unknown. Defaults to "". 

706 style (StyleType, optional): Style of output. Defaults to "none". 

707 justify (JustifyMethod, optional): Text justification. Defaults to "left". 

708 markup (bool, optional): Enable markup. Defaults to True. 

709 highlighter (Optional[Highlighter], optional): Highlighter to apply to output. Defaults to None. 

710 table_column (Optional[Column], optional): Table Column to use. Defaults to None. 

711 show_speed (bool, optional): Show speed if total is unknown. Defaults to False. 

712 """ 

713 

714 def __init__( 

715 self, 

716 text_format: str = "[progress.percentage]{task.percentage:>3.0f}%", 

717 text_format_no_percentage: str = "", 

718 style: StyleType = "none", 

719 justify: JustifyMethod = "left", 

720 markup: bool = True, 

721 highlighter: Optional[Highlighter] = None, 

722 table_column: Optional[Column] = None, 

723 show_speed: bool = False, 

724 ) -> None: 

725 self.text_format_no_percentage = text_format_no_percentage 

726 self.show_speed = show_speed 

727 super().__init__( 

728 text_format=text_format, 

729 style=style, 

730 justify=justify, 

731 markup=markup, 

732 highlighter=highlighter, 

733 table_column=table_column, 

734 ) 

735 

736 @classmethod 

737 def render_speed(cls, speed: Optional[float]) -> Text: 

738 """Render the speed in iterations per second. 

739 

740 Args: 

741 task (Task): A Task object. 

742 

743 Returns: 

744 Text: Text object containing the task speed. 

745 """ 

746 if speed is None: 

747 return Text("", style="progress.percentage") 

748 unit, suffix = filesize.pick_unit_and_suffix( 

749 int(speed), 

750 ["", "×10³", "×10⁶", "×10⁹", "×10¹²"], 

751 1000, 

752 ) 

753 data_speed = speed / unit 

754 return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage") 

755 

756 def render(self, task: "Task") -> Text: 

757 if task.total is None and self.show_speed: 

758 return self.render_speed(task.finished_speed or task.speed) 

759 text_format = ( 

760 self.text_format_no_percentage if task.total is None else self.text_format 

761 ) 

762 _text = text_format.format(task=task) 

763 if self.markup: 

764 text = Text.from_markup(_text, style=self.style, justify=self.justify) 

765 else: 

766 text = Text(_text, style=self.style, justify=self.justify) 

767 if self.highlighter: 

768 self.highlighter.highlight(text) 

769 return text 

770 

771 

772class TimeRemainingColumn(ProgressColumn): 

773 """Renders estimated time remaining. 

774 

775 Args: 

776 compact (bool, optional): Render MM:SS when time remaining is less than an hour. Defaults to False. 

777 elapsed_when_finished (bool, optional): Render time elapsed when the task is finished. Defaults to False. 

778 """ 

779 

780 # Only refresh twice a second to prevent jitter 

781 max_refresh = 0.5 

782 

783 def __init__( 

784 self, 

785 compact: bool = False, 

786 elapsed_when_finished: bool = False, 

787 table_column: Optional[Column] = None, 

788 ): 

789 self.compact = compact 

790 self.elapsed_when_finished = elapsed_when_finished 

791 super().__init__(table_column=table_column) 

792 

793 def render(self, task: "Task") -> Text: 

794 """Show time remaining.""" 

795 if self.elapsed_when_finished and task.finished: 

796 task_time = task.finished_time 

797 style = "progress.elapsed" 

798 else: 

799 task_time = task.time_remaining 

800 style = "progress.remaining" 

801 

802 if task.total is None: 

803 return Text("", style=style) 

804 

805 if task_time is None: 

806 return Text("--:--" if self.compact else "-:--:--", style=style) 

807 

808 # Based on https://github.com/tqdm/tqdm/blob/master/tqdm/std.py 

809 minutes, seconds = divmod(int(task_time), 60) 

810 hours, minutes = divmod(minutes, 60) 

811 

812 if self.compact and not hours: 

813 formatted = f"{minutes:02d}:{seconds:02d}" 

814 else: 

815 formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}" 

816 

817 return Text(formatted, style=style) 

818 

819 

820class FileSizeColumn(ProgressColumn): 

821 """Renders completed filesize.""" 

822 

823 def render(self, task: "Task") -> Text: 

824 """Show data completed.""" 

825 data_size = filesize.decimal(int(task.completed)) 

826 return Text(data_size, style="progress.filesize") 

827 

828 

829class TotalFileSizeColumn(ProgressColumn): 

830 """Renders total filesize.""" 

831 

832 def render(self, task: "Task") -> Text: 

833 """Show data completed.""" 

834 data_size = filesize.decimal(int(task.total)) if task.total is not None else "" 

835 return Text(data_size, style="progress.filesize.total") 

836 

837 

838class MofNCompleteColumn(ProgressColumn): 

839 """Renders completed count/total, e.g. ' 10/1000'. 

840 

841 Best for bounded tasks with int quantities. 

842 

843 Space pads the completed count so that progress length does not change as task progresses 

844 past powers of 10. 

845 

846 Args: 

847 separator (str, optional): Text to separate completed and total values. Defaults to "/". 

848 """ 

849 

850 def __init__(self, separator: str = "/", table_column: Optional[Column] = None): 

851 self.separator = separator 

852 super().__init__(table_column=table_column) 

853 

854 def render(self, task: "Task") -> Text: 

855 """Show completed/total.""" 

856 completed = int(task.completed) 

857 total = int(task.total) if task.total is not None else "?" 

858 total_width = len(str(total)) 

859 return Text( 

860 f"{completed:{total_width}d}{self.separator}{total}", 

861 style="progress.download", 

862 ) 

863 

864 

865class DownloadColumn(ProgressColumn): 

866 """Renders file size downloaded and total, e.g. '0.5/2.3 GB'. 

867 

868 Args: 

869 binary_units (bool, optional): Use binary units, KiB, MiB etc. Defaults to False. 

870 """ 

871 

872 def __init__( 

873 self, binary_units: bool = False, table_column: Optional[Column] = None 

874 ) -> None: 

875 self.binary_units = binary_units 

876 super().__init__(table_column=table_column) 

877 

878 def render(self, task: "Task") -> Text: 

879 """Calculate common unit for completed and total.""" 

880 completed = int(task.completed) 

881 

882 unit_and_suffix_calculation_base = ( 

883 int(task.total) if task.total is not None else completed 

884 ) 

885 if self.binary_units: 

886 unit, suffix = filesize.pick_unit_and_suffix( 

887 unit_and_suffix_calculation_base, 

888 ["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"], 

889 1024, 

890 ) 

891 else: 

892 unit, suffix = filesize.pick_unit_and_suffix( 

893 unit_and_suffix_calculation_base, 

894 ["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"], 

895 1000, 

896 ) 

897 precision = 0 if unit == 1 else 1 

898 

899 completed_ratio = completed / unit 

900 completed_str = f"{completed_ratio:,.{precision}f}" 

901 

902 if task.total is not None: 

903 total = int(task.total) 

904 total_ratio = total / unit 

905 total_str = f"{total_ratio:,.{precision}f}" 

906 else: 

907 total_str = "?" 

908 

909 download_status = f"{completed_str}/{total_str} {suffix}" 

910 download_text = Text(download_status, style="progress.download") 

911 return download_text 

912 

913 

914class TransferSpeedColumn(ProgressColumn): 

915 """Renders human readable transfer speed.""" 

916 

917 def render(self, task: "Task") -> Text: 

918 """Show data transfer speed.""" 

919 speed = task.finished_speed or task.speed 

920 if speed is None: 

921 return Text("?", style="progress.data.speed") 

922 data_speed = filesize.decimal(int(speed)) 

923 return Text(f"{data_speed}/s", style="progress.data.speed") 

924 

925 

926class ProgressSample(NamedTuple): 

927 """Sample of progress for a given time.""" 

928 

929 timestamp: float 

930 """Timestamp of sample.""" 

931 completed: float 

932 """Number of steps completed.""" 

933 

934 

935@dataclass 

936class Task: 

937 """Information regarding a progress task. 

938 

939 This object should be considered read-only outside of the :class:`~Progress` class. 

940 

941 """ 

942 

943 id: TaskID 

944 """Task ID associated with this task (used in Progress methods).""" 

945 

946 description: str 

947 """str: Description of the task.""" 

948 

949 total: Optional[float] 

950 """Optional[float]: Total number of steps in this task.""" 

951 

952 completed: float 

953 """float: Number of steps completed""" 

954 

955 _get_time: GetTimeCallable 

956 """Callable to get the current time.""" 

957 

958 finished_time: Optional[float] = None 

959 """float: Time task was finished.""" 

960 

961 visible: bool = True 

962 """bool: Indicates if this task is visible in the progress display.""" 

963 

964 fields: Dict[str, Any] = field(default_factory=dict) 

965 """dict: Arbitrary fields passed in via Progress.update.""" 

966 

967 start_time: Optional[float] = field(default=None, init=False, repr=False) 

968 """Optional[float]: Time this task was started, or None if not started.""" 

969 

970 stop_time: Optional[float] = field(default=None, init=False, repr=False) 

971 """Optional[float]: Time this task was stopped, or None if not stopped.""" 

972 

973 finished_speed: Optional[float] = None 

974 """Optional[float]: The last speed for a finished task.""" 

975 

976 _progress: Deque[ProgressSample] = field( 

977 default_factory=lambda: deque(maxlen=1000), init=False, repr=False 

978 ) 

979 

980 _lock: RLock = field(repr=False, default_factory=RLock) 

981 """Thread lock.""" 

982 

983 def get_time(self) -> float: 

984 """float: Get the current time, in seconds.""" 

985 return self._get_time() 

986 

987 @property 

988 def started(self) -> bool: 

989 """bool: Check if the task as started.""" 

990 return self.start_time is not None 

991 

992 @property 

993 def remaining(self) -> Optional[float]: 

994 """Optional[float]: Get the number of steps remaining, if a non-None total was set.""" 

995 if self.total is None: 

996 return None 

997 return self.total - self.completed 

998 

999 @property 

1000 def elapsed(self) -> Optional[float]: 

1001 """Optional[float]: Time elapsed since task was started, or ``None`` if the task hasn't started.""" 

1002 if self.start_time is None: 

1003 return None 

1004 if self.stop_time is not None: 

1005 return self.stop_time - self.start_time 

1006 return self.get_time() - self.start_time 

1007 

1008 @property 

1009 def finished(self) -> bool: 

1010 """Check if the task has finished.""" 

1011 return self.finished_time is not None 

1012 

1013 @property 

1014 def percentage(self) -> float: 

1015 """float: Get progress of task as a percentage. If a None total was set, returns 0""" 

1016 if not self.total: 

1017 return 0.0 

1018 completed = (self.completed / self.total) * 100.0 

1019 completed = min(100.0, max(0.0, completed)) 

1020 return completed 

1021 

1022 @property 

1023 def speed(self) -> Optional[float]: 

1024 """Optional[float]: Get the estimated speed in steps per second.""" 

1025 if self.start_time is None: 

1026 return None 

1027 with self._lock: 

1028 progress = self._progress 

1029 if not progress: 

1030 return None 

1031 total_time = progress[-1].timestamp - progress[0].timestamp 

1032 if total_time == 0: 

1033 return None 

1034 iter_progress = iter(progress) 

1035 next(iter_progress) 

1036 total_completed = sum(sample.completed for sample in iter_progress) 

1037 speed = total_completed / total_time 

1038 return speed 

1039 

1040 @property 

1041 def time_remaining(self) -> Optional[float]: 

1042 """Optional[float]: Get estimated time to completion, or ``None`` if no data.""" 

1043 if self.finished: 

1044 return 0.0 

1045 speed = self.speed 

1046 if not speed: 

1047 return None 

1048 remaining = self.remaining 

1049 if remaining is None: 

1050 return None 

1051 estimate = ceil(remaining / speed) 

1052 return estimate 

1053 

1054 def _reset(self) -> None: 

1055 """Reset progress.""" 

1056 self._progress.clear() 

1057 self.finished_time = None 

1058 self.finished_speed = None 

1059 

1060 

1061class Progress(JupyterMixin): 

1062 """Renders an auto-updating progress bar(s). 

1063 

1064 Args: 

1065 console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout. 

1066 auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`. 

1067 refresh_per_second (float, optional): Number of times per second to refresh the progress information. Defaults to 10. 

1068 speed_estimate_period: (float, optional): Period (in seconds) used to calculate the speed estimate. Defaults to 30. 

1069 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

1070 redirect_stdout: (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True. 

1071 redirect_stderr: (bool, optional): Enable redirection of stderr. Defaults to True. 

1072 get_time: (Callable, optional): A callable that gets the current time, or None to use Console.get_time. Defaults to None. 

1073 disable (bool, optional): Disable progress display. Defaults to False 

1074 expand (bool, optional): Expand tasks table to fit width. Defaults to False. 

1075 """ 

1076 

1077 def __init__( 

1078 self, 

1079 *columns: Union[str, ProgressColumn], 

1080 console: Optional[Console] = None, 

1081 auto_refresh: bool = True, 

1082 refresh_per_second: float = 10, 

1083 speed_estimate_period: float = 30.0, 

1084 transient: bool = False, 

1085 redirect_stdout: bool = True, 

1086 redirect_stderr: bool = True, 

1087 get_time: Optional[GetTimeCallable] = None, 

1088 disable: bool = False, 

1089 expand: bool = False, 

1090 ) -> None: 

1091 assert refresh_per_second > 0, "refresh_per_second must be > 0" 

1092 self._lock = RLock() 

1093 self.columns = columns or self.get_default_columns() 

1094 self.speed_estimate_period = speed_estimate_period 

1095 

1096 self.disable = disable 

1097 self.expand = expand 

1098 self._tasks: Dict[TaskID, Task] = {} 

1099 self._task_index: TaskID = TaskID(0) 

1100 self.live = Live( 

1101 console=console or get_console(), 

1102 auto_refresh=auto_refresh, 

1103 refresh_per_second=refresh_per_second, 

1104 transient=transient, 

1105 redirect_stdout=redirect_stdout, 

1106 redirect_stderr=redirect_stderr, 

1107 get_renderable=self.get_renderable, 

1108 ) 

1109 self.get_time = get_time or self.console.get_time 

1110 self.print = self.console.print 

1111 self.log = self.console.log 

1112 

1113 @classmethod 

1114 def get_default_columns(cls) -> Tuple[ProgressColumn, ...]: 

1115 """Get the default columns used for a new Progress instance: 

1116 - a text column for the description (TextColumn) 

1117 - the bar itself (BarColumn) 

1118 - a text column showing completion percentage (TextColumn) 

1119 - an estimated-time-remaining column (TimeRemainingColumn) 

1120 If the Progress instance is created without passing a columns argument, 

1121 the default columns defined here will be used. 

1122 

1123 You can also create a Progress instance using custom columns before 

1124 and/or after the defaults, as in this example: 

1125 

1126 progress = Progress( 

1127 SpinnerColumn(), 

1128 *Progress.get_default_columns(), 

1129 "Elapsed:", 

1130 TimeElapsedColumn(), 

1131 ) 

1132 

1133 This code shows the creation of a Progress display, containing 

1134 a spinner to the left, the default columns, and a labeled elapsed 

1135 time column. 

1136 """ 

1137 return ( 

1138 TextColumn("[progress.description]{task.description}"), 

1139 BarColumn(), 

1140 TaskProgressColumn(), 

1141 TimeRemainingColumn(), 

1142 ) 

1143 

1144 @property 

1145 def console(self) -> Console: 

1146 return self.live.console 

1147 

1148 @property 

1149 def tasks(self) -> List[Task]: 

1150 """Get a list of Task instances.""" 

1151 with self._lock: 

1152 return list(self._tasks.values()) 

1153 

1154 @property 

1155 def task_ids(self) -> List[TaskID]: 

1156 """A list of task IDs.""" 

1157 with self._lock: 

1158 return list(self._tasks.keys()) 

1159 

1160 @property 

1161 def finished(self) -> bool: 

1162 """Check if all tasks have been completed.""" 

1163 with self._lock: 

1164 if not self._tasks: 

1165 return True 

1166 return all(task.finished for task in self._tasks.values()) 

1167 

1168 def start(self) -> None: 

1169 """Start the progress display.""" 

1170 if not self.disable: 

1171 self.live.start(refresh=True) 

1172 

1173 def stop(self) -> None: 

1174 """Stop the progress display.""" 

1175 if not self.disable: 

1176 self.live.stop() 

1177 if not self.console.is_interactive and not self.console.is_jupyter: 

1178 self.console.print() 

1179 

1180 def __enter__(self) -> Self: 

1181 self.start() 

1182 return self 

1183 

1184 def __exit__( 

1185 self, 

1186 exc_type: Optional[Type[BaseException]], 

1187 exc_val: Optional[BaseException], 

1188 exc_tb: Optional[TracebackType], 

1189 ) -> None: 

1190 self.stop() 

1191 

1192 def track( 

1193 self, 

1194 sequence: Iterable[ProgressType], 

1195 total: Optional[float] = None, 

1196 completed: int = 0, 

1197 task_id: Optional[TaskID] = None, 

1198 description: str = "Working...", 

1199 update_period: float = 0.1, 

1200 ) -> Iterable[ProgressType]: 

1201 """Track progress by iterating over a sequence. 

1202 

1203 You can also track progress of an iterable, which might require that you additionally specify ``total``. 

1204 

1205 Args: 

1206 sequence (Iterable[ProgressType]): Values you want to iterate over and track progress. 

1207 total: (float, optional): Total number of steps. Default is len(sequence). 

1208 completed (int, optional): Number of steps completed so far. Defaults to 0. 

1209 task_id: (TaskID): Task to track. Default is new task. 

1210 description: (str, optional): Description of task, if new task is created. 

1211 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. 

1212 

1213 Returns: 

1214 Iterable[ProgressType]: An iterable of values taken from the provided sequence. 

1215 """ 

1216 if total is None: 

1217 total = float(length_hint(sequence)) or None 

1218 

1219 if task_id is None: 

1220 task_id = self.add_task(description, total=total, completed=completed) 

1221 else: 

1222 self.update(task_id, total=total, completed=completed) 

1223 

1224 if self.live.auto_refresh: 

1225 with _TrackThread(self, task_id, update_period) as track_thread: 

1226 for value in sequence: 

1227 yield value 

1228 track_thread.completed += 1 

1229 else: 

1230 advance = self.advance 

1231 refresh = self.refresh 

1232 for value in sequence: 

1233 yield value 

1234 advance(task_id, 1) 

1235 refresh() 

1236 

1237 def wrap_file( 

1238 self, 

1239 file: BinaryIO, 

1240 total: Optional[int] = None, 

1241 *, 

1242 task_id: Optional[TaskID] = None, 

1243 description: str = "Reading...", 

1244 ) -> BinaryIO: 

1245 """Track progress file reading from a binary file. 

1246 

1247 Args: 

1248 file (BinaryIO): A file-like object opened in binary mode. 

1249 total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given. 

1250 task_id (TaskID): Task to track. Default is new task. 

1251 description (str, optional): Description of task, if new task is created. 

1252 

1253 Returns: 

1254 BinaryIO: A readable file-like object in binary mode. 

1255 

1256 Raises: 

1257 ValueError: When no total value can be extracted from the arguments or the task. 

1258 """ 

1259 # attempt to recover the total from the task 

1260 total_bytes: Optional[float] = None 

1261 if total is not None: 

1262 total_bytes = total 

1263 elif task_id is not None: 

1264 with self._lock: 

1265 total_bytes = self._tasks[task_id].total 

1266 if total_bytes is None: 

1267 raise ValueError( 

1268 f"unable to get the total number of bytes, please specify 'total'" 

1269 ) 

1270 

1271 # update total of task or create new task 

1272 if task_id is None: 

1273 task_id = self.add_task(description, total=total_bytes) 

1274 else: 

1275 self.update(task_id, total=total_bytes) 

1276 

1277 return _Reader(file, self, task_id, close_handle=False) 

1278 

1279 @typing.overload 

1280 def open( 

1281 self, 

1282 file: Union[str, "PathLike[str]", bytes], 

1283 mode: Literal["rb"], 

1284 buffering: int = -1, 

1285 encoding: Optional[str] = None, 

1286 errors: Optional[str] = None, 

1287 newline: Optional[str] = None, 

1288 *, 

1289 total: Optional[int] = None, 

1290 task_id: Optional[TaskID] = None, 

1291 description: str = "Reading...", 

1292 ) -> BinaryIO: 

1293 pass 

1294 

1295 @typing.overload 

1296 def open( 

1297 self, 

1298 file: Union[str, "PathLike[str]", bytes], 

1299 mode: Union[Literal["r"], Literal["rt"]], 

1300 buffering: int = -1, 

1301 encoding: Optional[str] = None, 

1302 errors: Optional[str] = None, 

1303 newline: Optional[str] = None, 

1304 *, 

1305 total: Optional[int] = None, 

1306 task_id: Optional[TaskID] = None, 

1307 description: str = "Reading...", 

1308 ) -> TextIO: 

1309 pass 

1310 

1311 def open( 

1312 self, 

1313 file: Union[str, "PathLike[str]", bytes], 

1314 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", 

1315 buffering: int = -1, 

1316 encoding: Optional[str] = None, 

1317 errors: Optional[str] = None, 

1318 newline: Optional[str] = None, 

1319 *, 

1320 total: Optional[int] = None, 

1321 task_id: Optional[TaskID] = None, 

1322 description: str = "Reading...", 

1323 ) -> Union[BinaryIO, TextIO]: 

1324 """Track progress while reading from a binary file. 

1325 

1326 Args: 

1327 path (Union[str, PathLike[str]]): The path to the file to read. 

1328 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". 

1329 buffering (int): The buffering strategy to use, see :func:`io.open`. 

1330 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. 

1331 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. 

1332 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`. 

1333 total (int, optional): Total number of bytes to read. If none given, os.stat(path).st_size is used. 

1334 task_id (TaskID): Task to track. Default is new task. 

1335 description (str, optional): Description of task, if new task is created. 

1336 

1337 Returns: 

1338 BinaryIO: A readable file-like object in binary mode. 

1339 

1340 Raises: 

1341 ValueError: When an invalid mode is given. 

1342 """ 

1343 # normalize the mode (always rb, rt) 

1344 _mode = "".join(sorted(mode, reverse=False)) 

1345 if _mode not in ("br", "rt", "r"): 

1346 raise ValueError(f"invalid mode {mode!r}") 

1347 

1348 # patch buffering to provide the same behaviour as the builtin `open` 

1349 line_buffering = buffering == 1 

1350 if _mode == "br" and buffering == 1: 

1351 warnings.warn( 

1352 "line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used", 

1353 RuntimeWarning, 

1354 ) 

1355 buffering = -1 

1356 elif _mode in ("rt", "r"): 

1357 if buffering == 0: 

1358 raise ValueError("can't have unbuffered text I/O") 

1359 elif buffering == 1: 

1360 buffering = -1 

1361 

1362 # attempt to get the total with `os.stat` 

1363 if total is None: 

1364 total = stat(file).st_size 

1365 

1366 # update total of task or create new task 

1367 if task_id is None: 

1368 task_id = self.add_task(description, total=total) 

1369 else: 

1370 self.update(task_id, total=total) 

1371 

1372 # open the file in binary mode, 

1373 handle = io.open(file, "rb", buffering=buffering) 

1374 reader = _Reader(handle, self, task_id, close_handle=True) 

1375 

1376 # wrap the reader in a `TextIOWrapper` if text mode 

1377 if mode in ("r", "rt"): 

1378 return io.TextIOWrapper( 

1379 reader, 

1380 encoding=encoding, 

1381 errors=errors, 

1382 newline=newline, 

1383 line_buffering=line_buffering, 

1384 ) 

1385 

1386 return reader 

1387 

1388 def start_task(self, task_id: TaskID) -> None: 

1389 """Start a task. 

1390 

1391 Starts a task (used when calculating elapsed time). You may need to call this manually, 

1392 if you called ``add_task`` with ``start=False``. 

1393 

1394 Args: 

1395 task_id (TaskID): ID of task. 

1396 """ 

1397 with self._lock: 

1398 task = self._tasks[task_id] 

1399 if task.start_time is None: 

1400 task.start_time = self.get_time() 

1401 

1402 def stop_task(self, task_id: TaskID) -> None: 

1403 """Stop a task. 

1404 

1405 This will freeze the elapsed time on the task. 

1406 

1407 Args: 

1408 task_id (TaskID): ID of task. 

1409 """ 

1410 with self._lock: 

1411 task = self._tasks[task_id] 

1412 current_time = self.get_time() 

1413 if task.start_time is None: 

1414 task.start_time = current_time 

1415 task.stop_time = current_time 

1416 

1417 def update( 

1418 self, 

1419 task_id: TaskID, 

1420 *, 

1421 total: Optional[float] = None, 

1422 completed: Optional[float] = None, 

1423 advance: Optional[float] = None, 

1424 description: Optional[str] = None, 

1425 visible: Optional[bool] = None, 

1426 refresh: bool = False, 

1427 **fields: Any, 

1428 ) -> None: 

1429 """Update information associated with a task. 

1430 

1431 Args: 

1432 task_id (TaskID): Task id (returned by add_task). 

1433 total (float, optional): Updates task.total if not None. 

1434 completed (float, optional): Updates task.completed if not None. 

1435 advance (float, optional): Add a value to task.completed if not None. 

1436 description (str, optional): Change task description if not None. 

1437 visible (bool, optional): Set visible flag if not None. 

1438 refresh (bool): Force a refresh of progress information. Default is False. 

1439 **fields (Any): Additional data fields required for rendering. 

1440 """ 

1441 with self._lock: 

1442 task = self._tasks[task_id] 

1443 completed_start = task.completed 

1444 

1445 if total is not None and total != task.total: 

1446 task.total = total 

1447 task._reset() 

1448 if advance is not None: 

1449 task.completed += advance 

1450 if completed is not None: 

1451 task.completed = completed 

1452 if description is not None: 

1453 task.description = description 

1454 if visible is not None: 

1455 task.visible = visible 

1456 task.fields.update(fields) 

1457 update_completed = task.completed - completed_start 

1458 

1459 current_time = self.get_time() 

1460 old_sample_time = current_time - self.speed_estimate_period 

1461 _progress = task._progress 

1462 

1463 popleft = _progress.popleft 

1464 while _progress and _progress[0].timestamp < old_sample_time: 

1465 popleft() 

1466 if update_completed > 0: 

1467 _progress.append(ProgressSample(current_time, update_completed)) 

1468 if ( 

1469 task.total is not None 

1470 and task.completed >= task.total 

1471 and task.finished_time is None 

1472 ): 

1473 task.finished_time = task.elapsed 

1474 

1475 if refresh: 

1476 self.refresh() 

1477 

1478 def reset( 

1479 self, 

1480 task_id: TaskID, 

1481 *, 

1482 start: bool = True, 

1483 total: Optional[float] = None, 

1484 completed: int = 0, 

1485 visible: Optional[bool] = None, 

1486 description: Optional[str] = None, 

1487 **fields: Any, 

1488 ) -> None: 

1489 """Reset a task so completed is 0 and the clock is reset. 

1490 

1491 Args: 

1492 task_id (TaskID): ID of task. 

1493 start (bool, optional): Start the task after reset. Defaults to True. 

1494 total (float, optional): New total steps in task, or None to use current total. Defaults to None. 

1495 completed (int, optional): Number of steps completed. Defaults to 0. 

1496 visible (bool, optional): Set visible flag if not None. 

1497 description (str, optional): Change task description if not None. Defaults to None. 

1498 **fields (str): Additional data fields required for rendering. 

1499 """ 

1500 current_time = self.get_time() 

1501 with self._lock: 

1502 task = self._tasks[task_id] 

1503 task._reset() 

1504 task.start_time = current_time if start else None 

1505 if total is not None: 

1506 task.total = total 

1507 task.completed = completed 

1508 if visible is not None: 

1509 task.visible = visible 

1510 if fields: 

1511 task.fields = fields 

1512 if description is not None: 

1513 task.description = description 

1514 task.finished_time = None 

1515 self.refresh() 

1516 

1517 def advance(self, task_id: TaskID, advance: float = 1) -> None: 

1518 """Advance task by a number of steps. 

1519 

1520 Args: 

1521 task_id (TaskID): ID of task. 

1522 advance (float): Number of steps to advance. Default is 1. 

1523 """ 

1524 current_time = self.get_time() 

1525 with self._lock: 

1526 task = self._tasks[task_id] 

1527 completed_start = task.completed 

1528 task.completed += advance 

1529 update_completed = task.completed - completed_start 

1530 old_sample_time = current_time - self.speed_estimate_period 

1531 _progress = task._progress 

1532 

1533 popleft = _progress.popleft 

1534 while _progress and _progress[0].timestamp < old_sample_time: 

1535 popleft() 

1536 while len(_progress) > 1000: 

1537 popleft() 

1538 _progress.append(ProgressSample(current_time, update_completed)) 

1539 if ( 

1540 task.total is not None 

1541 and task.completed >= task.total 

1542 and task.finished_time is None 

1543 ): 

1544 task.finished_time = task.elapsed 

1545 task.finished_speed = task.speed 

1546 

1547 def refresh(self) -> None: 

1548 """Refresh (render) the progress information.""" 

1549 if not self.disable and self.live.is_started: 

1550 self.live.refresh() 

1551 

1552 def get_renderable(self) -> RenderableType: 

1553 """Get a renderable for the progress display.""" 

1554 renderable = Group(*self.get_renderables()) 

1555 return renderable 

1556 

1557 def get_renderables(self) -> Iterable[RenderableType]: 

1558 """Get a number of renderables for the progress display.""" 

1559 table = self.make_tasks_table(self.tasks) 

1560 yield table 

1561 

1562 def make_tasks_table(self, tasks: Iterable[Task]) -> Table: 

1563 """Get a table to render the Progress display. 

1564 

1565 Args: 

1566 tasks (Iterable[Task]): An iterable of Task instances, one per row of the table. 

1567 

1568 Returns: 

1569 Table: A table instance. 

1570 """ 

1571 table_columns = ( 

1572 ( 

1573 Column(no_wrap=True) 

1574 if isinstance(_column, str) 

1575 else _column.get_table_column().copy() 

1576 ) 

1577 for _column in self.columns 

1578 ) 

1579 table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand) 

1580 

1581 for task in tasks: 

1582 if task.visible: 

1583 table.add_row( 

1584 *( 

1585 ( 

1586 column.format(task=task) 

1587 if isinstance(column, str) 

1588 else column(task) 

1589 ) 

1590 for column in self.columns 

1591 ) 

1592 ) 

1593 return table 

1594 

1595 def __rich__(self) -> RenderableType: 

1596 """Makes the Progress class itself renderable.""" 

1597 with self._lock: 

1598 return self.get_renderable() 

1599 

1600 def add_task( 

1601 self, 

1602 description: str, 

1603 start: bool = True, 

1604 total: Optional[float] = 100.0, 

1605 completed: int = 0, 

1606 visible: bool = True, 

1607 **fields: Any, 

1608 ) -> TaskID: 

1609 """Add a new 'task' to the Progress display. 

1610 

1611 Args: 

1612 description (str): A description of the task. 

1613 start (bool, optional): Start the task immediately (to calculate elapsed time). If set to False, 

1614 you will need to call `start` manually. Defaults to True. 

1615 total (float, optional): Number of total steps in the progress if known. 

1616 Set to None to render a pulsing animation. Defaults to 100. 

1617 completed (int, optional): Number of steps completed so far. Defaults to 0. 

1618 visible (bool, optional): Enable display of the task. Defaults to True. 

1619 **fields (str): Additional data fields required for rendering. 

1620 

1621 Returns: 

1622 TaskID: An ID you can use when calling `update`. 

1623 """ 

1624 with self._lock: 

1625 task = Task( 

1626 self._task_index, 

1627 description, 

1628 total, 

1629 completed, 

1630 visible=visible, 

1631 fields=fields, 

1632 _get_time=self.get_time, 

1633 _lock=self._lock, 

1634 ) 

1635 self._tasks[self._task_index] = task 

1636 if start: 

1637 self.start_task(self._task_index) 

1638 new_task_index = self._task_index 

1639 self._task_index = TaskID(int(self._task_index) + 1) 

1640 self.refresh() 

1641 return new_task_index 

1642 

1643 def remove_task(self, task_id: TaskID) -> None: 

1644 """Delete a task if it exists. 

1645 

1646 Args: 

1647 task_id (TaskID): A task ID. 

1648 

1649 """ 

1650 with self._lock: 

1651 del self._tasks[task_id] 

1652 

1653 

1654if __name__ == "__main__": # pragma: no coverage 

1655 import random 

1656 import time 

1657 

1658 from .panel import Panel 

1659 from .rule import Rule 

1660 from .syntax import Syntax 

1661 from .table import Table 

1662 

1663 syntax = Syntax( 

1664 '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]: 

1665 """Iterate and generate a tuple with a flag for last value.""" 

1666 iter_values = iter(values) 

1667 try: 

1668 previous_value = next(iter_values) 

1669 except StopIteration: 

1670 return 

1671 for value in iter_values: 

1672 yield False, previous_value 

1673 previous_value = value 

1674 yield True, previous_value''', 

1675 "python", 

1676 line_numbers=True, 

1677 ) 

1678 

1679 table = Table("foo", "bar", "baz") 

1680 table.add_row("1", "2", "3") 

1681 

1682 progress_renderables = [ 

1683 "Text may be printed while the progress bars are rendering.", 

1684 Panel("In fact, [i]any[/i] renderable will work"), 

1685 "Such as [magenta]tables[/]...", 

1686 table, 

1687 "Pretty printed structures...", 

1688 {"type": "example", "text": "Pretty printed"}, 

1689 "Syntax...", 

1690 syntax, 

1691 Rule("Give it a try!"), 

1692 ] 

1693 

1694 from itertools import cycle 

1695 

1696 examples = cycle(progress_renderables) 

1697 

1698 console = Console(record=True) 

1699 

1700 with Progress( 

1701 SpinnerColumn(), 

1702 *Progress.get_default_columns(), 

1703 TimeElapsedColumn(), 

1704 console=console, 

1705 transient=False, 

1706 ) as progress: 

1707 task1 = progress.add_task("[red]Downloading", total=1000) 

1708 task2 = progress.add_task("[green]Processing", total=1000) 

1709 task3 = progress.add_task("[yellow]Thinking", total=None) 

1710 

1711 while not progress.finished: 

1712 progress.update(task1, advance=0.5) 

1713 progress.update(task2, advance=0.3) 

1714 time.sleep(0.01) 

1715 if random.randint(0, 100) < 1: 

1716 progress.log(next(examples))