Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rich/progress.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

623 statements  

1from __future__ import annotations 

2 

3import io 

4import typing 

5import warnings 

6from abc import ABC, abstractmethod 

7from collections import deque 

8from dataclasses import dataclass, field 

9from datetime import timedelta 

10from io import RawIOBase, UnsupportedOperation 

11from math import ceil 

12from mmap import mmap 

13from operator import length_hint 

14from os import PathLike, stat 

15from threading import Event, RLock, Thread 

16from types import TracebackType 

17from typing import ( 

18 TYPE_CHECKING, 

19 Any, 

20 BinaryIO, 

21 Callable, 

22 ContextManager, 

23 Deque, 

24 Dict, 

25 Generic, 

26 Iterable, 

27 List, 

28 Literal, 

29 NamedTuple, 

30 NewType, 

31 Optional, 

32 TextIO, 

33 Tuple, 

34 Type, 

35 TypeVar, 

36 Union, 

37) 

38 

39if TYPE_CHECKING: 

40 # Can be replaced with `from typing import Self` in Python 3.11+ 

41 from typing_extensions import Self # pragma: no cover 

42 

43from . import filesize, get_console 

44from .console import Console, Group, JustifyMethod, RenderableType 

45from .highlighter import Highlighter 

46from .jupyter import JupyterMixin 

47from .live import Live 

48from .progress_bar import ProgressBar 

49from .spinner import Spinner 

50from .style import StyleType 

51from .table import Column, Table 

52from .text import Text, TextType 

53 

54TaskID = NewType("TaskID", int) 

55 

56ProgressType = TypeVar("ProgressType") 

57 

58GetTimeCallable = Callable[[], float] 

59 

60 

61_I = typing.TypeVar("_I", TextIO, BinaryIO) 

62 

63 

64class _TrackThread(Thread): 

65 """A thread to periodically update progress.""" 

66 

67 def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float): 

68 self.progress = progress 

69 self.task_id = task_id 

70 self.update_period = update_period 

71 self.done = Event() 

72 

73 self.completed = 0 

74 super().__init__(daemon=True) 

75 

76 def run(self) -> None: 

77 task_id = self.task_id 

78 advance = self.progress.advance 

79 update_period = self.update_period 

80 last_completed = 0 

81 wait = self.done.wait 

82 while not wait(update_period) and self.progress.live.is_started: 

83 completed = self.completed 

84 if last_completed != completed: 

85 advance(task_id, completed - last_completed) 

86 last_completed = completed 

87 

88 self.progress.update(self.task_id, completed=self.completed, refresh=True) 

89 

90 def __enter__(self) -> "_TrackThread": 

91 self.start() 

92 return self 

93 

94 def __exit__( 

95 self, 

96 exc_type: Optional[Type[BaseException]], 

97 exc_val: Optional[BaseException], 

98 exc_tb: Optional[TracebackType], 

99 ) -> None: 

100 self.done.set() 

101 self.join() 

102 

103 

104def track( 

105 sequence: Iterable[ProgressType], 

106 description: str = "Working...", 

107 total: Optional[float] = None, 

108 completed: int = 0, 

109 auto_refresh: bool = True, 

110 console: Optional[Console] = None, 

111 transient: bool = False, 

112 get_time: Optional[Callable[[], float]] = None, 

113 refresh_per_second: float = 10, 

114 style: StyleType = "bar.back", 

115 complete_style: StyleType = "bar.complete", 

116 finished_style: StyleType = "bar.finished", 

117 pulse_style: StyleType = "bar.pulse", 

118 update_period: float = 0.1, 

119 disable: bool = False, 

120 show_speed: bool = True, 

121) -> Iterable[ProgressType]: 

122 """Track progress by iterating over a sequence. 

123 

124 You can also track progress of an iterable, which might require that you additionally specify ``total``. 

125 

126 Args: 

127 sequence (Iterable[ProgressType]): Values you wish to iterate over and track progress. 

128 description (str, optional): Description of task show next to progress bar. Defaults to "Working". 

129 total: (float, optional): Total number of steps. Default is len(sequence). 

130 completed (int, optional): Number of steps completed so far. Defaults to 0. 

131 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. 

132 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

133 console (Console, optional): Console to write to. Default creates internal Console instance. 

134 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. 

135 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

136 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

137 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

138 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

139 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. 

140 disable (bool, optional): Disable display of progress. 

141 show_speed (bool, optional): Show speed if total isn't known. Defaults to True. 

142 Returns: 

143 Iterable[ProgressType]: An iterable of the values in the sequence. 

144 

145 """ 

146 

147 columns: List["ProgressColumn"] = ( 

148 [TextColumn("[progress.description]{task.description}")] if description else [] 

149 ) 

150 columns.extend( 

151 ( 

152 BarColumn( 

153 style=style, 

154 complete_style=complete_style, 

155 finished_style=finished_style, 

156 pulse_style=pulse_style, 

157 ), 

158 TaskProgressColumn(show_speed=show_speed), 

159 TimeRemainingColumn(elapsed_when_finished=True), 

160 ) 

161 ) 

162 progress = Progress( 

163 *columns, 

164 auto_refresh=auto_refresh, 

165 console=console, 

166 transient=transient, 

167 get_time=get_time, 

168 refresh_per_second=refresh_per_second or 10, 

169 disable=disable, 

170 ) 

171 

172 with progress: 

173 yield from progress.track( 

174 sequence, 

175 total=total, 

176 completed=completed, 

177 description=description, 

178 update_period=update_period, 

179 ) 

180 

181 

182class _Reader(RawIOBase, BinaryIO): 

183 """A reader that tracks progress while it's being read from.""" 

184 

185 def __init__( 

186 self, 

187 handle: BinaryIO, 

188 progress: "Progress", 

189 task: TaskID, 

190 close_handle: bool = True, 

191 ) -> None: 

192 self.handle = handle 

193 self.progress = progress 

194 self.task = task 

195 self.close_handle = close_handle 

196 self._closed = False 

197 

198 def __enter__(self) -> "_Reader": 

199 self.handle.__enter__() 

200 return self 

201 

202 def __exit__( 

203 self, 

204 exc_type: Optional[Type[BaseException]], 

205 exc_val: Optional[BaseException], 

206 exc_tb: Optional[TracebackType], 

207 ) -> None: 

208 self.close() 

209 

210 def __iter__(self) -> BinaryIO: 

211 return self 

212 

213 def __next__(self) -> bytes: 

214 line = next(self.handle) 

215 self.progress.advance(self.task, advance=len(line)) 

216 return line 

217 

218 @property 

219 def closed(self) -> bool: 

220 return self._closed 

221 

222 def fileno(self) -> int: 

223 return self.handle.fileno() 

224 

225 def isatty(self) -> bool: 

226 return self.handle.isatty() 

227 

228 @property 

229 def mode(self) -> str: 

230 return self.handle.mode 

231 

232 @property 

233 def name(self) -> str: 

234 return self.handle.name 

235 

236 def readable(self) -> bool: 

237 return self.handle.readable() 

238 

239 def seekable(self) -> bool: 

240 return self.handle.seekable() 

241 

242 def writable(self) -> bool: 

243 return False 

244 

245 def read(self, size: int = -1) -> bytes: 

246 block = self.handle.read(size) 

247 self.progress.advance(self.task, advance=len(block)) 

248 return block 

249 

250 def readinto(self, b: Union[bytearray, memoryview, mmap]): # type: ignore[no-untyped-def, override] 

251 n = self.handle.readinto(b) # type: ignore[attr-defined] 

252 self.progress.advance(self.task, advance=n) 

253 return n 

254 

255 def readline(self, size: int = -1) -> bytes: # type: ignore[override] 

256 line = self.handle.readline(size) 

257 self.progress.advance(self.task, advance=len(line)) 

258 return line 

259 

260 def readlines(self, hint: int = -1) -> List[bytes]: 

261 lines = self.handle.readlines(hint) 

262 self.progress.advance(self.task, advance=sum(map(len, lines))) 

263 return lines 

264 

265 def close(self) -> None: 

266 if self.close_handle: 

267 self.handle.close() 

268 self._closed = True 

269 

270 def seek(self, offset: int, whence: int = 0) -> int: 

271 pos = self.handle.seek(offset, whence) 

272 self.progress.update(self.task, completed=pos) 

273 return pos 

274 

275 def tell(self) -> int: 

276 return self.handle.tell() 

277 

278 def write(self, s: Any) -> int: 

279 raise UnsupportedOperation("write") 

280 

281 def writelines(self, lines: Iterable[Any]) -> None: 

282 raise UnsupportedOperation("writelines") 

283 

284 

285class _ReadContext(ContextManager[_I], Generic[_I]): 

286 """A utility class to handle a context for both a reader and a progress.""" 

287 

288 def __init__(self, progress: "Progress", reader: _I) -> None: 

289 self.progress = progress 

290 self.reader: _I = reader 

291 

292 def __enter__(self) -> _I: 

293 self.progress.start() 

294 return self.reader.__enter__() 

295 

296 def __exit__( 

297 self, 

298 exc_type: Optional[Type[BaseException]], 

299 exc_val: Optional[BaseException], 

300 exc_tb: Optional[TracebackType], 

301 ) -> None: 

302 self.progress.stop() 

303 self.reader.__exit__(exc_type, exc_val, exc_tb) 

304 

305 

306def wrap_file( 

307 file: BinaryIO, 

308 total: int, 

309 *, 

310 description: str = "Reading...", 

311 auto_refresh: bool = True, 

312 console: Optional[Console] = None, 

313 transient: bool = False, 

314 get_time: Optional[Callable[[], float]] = None, 

315 refresh_per_second: float = 10, 

316 style: StyleType = "bar.back", 

317 complete_style: StyleType = "bar.complete", 

318 finished_style: StyleType = "bar.finished", 

319 pulse_style: StyleType = "bar.pulse", 

320 disable: bool = False, 

321) -> ContextManager[BinaryIO]: 

322 """Read bytes from a file while tracking progress. 

323 

324 Args: 

325 file (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. 

326 total (int): Total number of bytes to read. 

327 description (str, optional): Description of task show next to progress bar. Defaults to "Reading". 

328 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. 

329 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

330 console (Console, optional): Console to write to. Default creates internal Console instance. 

331 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. 

332 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

333 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

334 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

335 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

336 disable (bool, optional): Disable display of progress. 

337 Returns: 

338 ContextManager[BinaryIO]: A context manager yielding a progress reader. 

339 

340 """ 

341 

342 columns: List["ProgressColumn"] = ( 

343 [TextColumn("[progress.description]{task.description}")] if description else [] 

344 ) 

345 columns.extend( 

346 ( 

347 BarColumn( 

348 style=style, 

349 complete_style=complete_style, 

350 finished_style=finished_style, 

351 pulse_style=pulse_style, 

352 ), 

353 DownloadColumn(), 

354 TimeRemainingColumn(), 

355 ) 

356 ) 

357 progress = Progress( 

358 *columns, 

359 auto_refresh=auto_refresh, 

360 console=console, 

361 transient=transient, 

362 get_time=get_time, 

363 refresh_per_second=refresh_per_second or 10, 

364 disable=disable, 

365 ) 

366 

367 reader = progress.wrap_file(file, total=total, description=description) 

368 return _ReadContext(progress, reader) 

369 

370 

371@typing.overload 

372def open( 

373 file: Union[str, "PathLike[str]", bytes], 

374 mode: Union[Literal["rt"], Literal["r"]], 

375 buffering: int = -1, 

376 encoding: Optional[str] = None, 

377 errors: Optional[str] = None, 

378 newline: Optional[str] = None, 

379 *, 

380 total: Optional[int] = None, 

381 description: str = "Reading...", 

382 auto_refresh: bool = True, 

383 console: Optional[Console] = None, 

384 transient: bool = False, 

385 get_time: Optional[Callable[[], float]] = None, 

386 refresh_per_second: float = 10, 

387 style: StyleType = "bar.back", 

388 complete_style: StyleType = "bar.complete", 

389 finished_style: StyleType = "bar.finished", 

390 pulse_style: StyleType = "bar.pulse", 

391 disable: bool = False, 

392) -> ContextManager[TextIO]: 

393 pass 

394 

395 

396@typing.overload 

397def open( 

398 file: Union[str, "PathLike[str]", bytes], 

399 mode: Literal["rb"], 

400 buffering: int = -1, 

401 encoding: Optional[str] = None, 

402 errors: Optional[str] = None, 

403 newline: Optional[str] = None, 

404 *, 

405 total: Optional[int] = None, 

406 description: str = "Reading...", 

407 auto_refresh: bool = True, 

408 console: Optional[Console] = None, 

409 transient: bool = False, 

410 get_time: Optional[Callable[[], float]] = None, 

411 refresh_per_second: float = 10, 

412 style: StyleType = "bar.back", 

413 complete_style: StyleType = "bar.complete", 

414 finished_style: StyleType = "bar.finished", 

415 pulse_style: StyleType = "bar.pulse", 

416 disable: bool = False, 

417) -> ContextManager[BinaryIO]: 

418 pass 

419 

420 

421def open( 

422 file: Union[str, "PathLike[str]", bytes], 

423 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", 

424 buffering: int = -1, 

425 encoding: Optional[str] = None, 

426 errors: Optional[str] = None, 

427 newline: Optional[str] = None, 

428 *, 

429 total: Optional[int] = None, 

430 description: str = "Reading...", 

431 auto_refresh: bool = True, 

432 console: Optional[Console] = None, 

433 transient: bool = False, 

434 get_time: Optional[Callable[[], float]] = None, 

435 refresh_per_second: float = 10, 

436 style: StyleType = "bar.back", 

437 complete_style: StyleType = "bar.complete", 

438 finished_style: StyleType = "bar.finished", 

439 pulse_style: StyleType = "bar.pulse", 

440 disable: bool = False, 

441) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]: 

442 """Read bytes from a file while tracking progress. 

443 

444 Args: 

445 path (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. 

446 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". 

447 buffering (int): The buffering strategy to use, see :func:`io.open`. 

448 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. 

449 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. 

450 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open` 

451 total: (int, optional): Total number of bytes to read. Must be provided if reading from a file handle. Default for a path is os.stat(file).st_size. 

452 description (str, optional): Description of task show next to progress bar. Defaults to "Reading". 

453 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. 

454 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

455 console (Console, optional): Console to write to. Default creates internal Console instance. 

456 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. 

457 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

458 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

459 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

460 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

461 disable (bool, optional): Disable display of progress. 

462 encoding (str, optional): The encoding to use when reading in text mode. 

463 

464 Returns: 

465 ContextManager[BinaryIO]: A context manager yielding a progress reader. 

466 

467 """ 

468 

469 columns: List["ProgressColumn"] = ( 

470 [TextColumn("[progress.description]{task.description}")] if description else [] 

471 ) 

472 columns.extend( 

473 ( 

474 BarColumn( 

475 style=style, 

476 complete_style=complete_style, 

477 finished_style=finished_style, 

478 pulse_style=pulse_style, 

479 ), 

480 DownloadColumn(), 

481 TimeRemainingColumn(), 

482 ) 

483 ) 

484 progress = Progress( 

485 *columns, 

486 auto_refresh=auto_refresh, 

487 console=console, 

488 transient=transient, 

489 get_time=get_time, 

490 refresh_per_second=refresh_per_second or 10, 

491 disable=disable, 

492 ) 

493 

494 reader = progress.open( 

495 file, 

496 mode=mode, 

497 buffering=buffering, 

498 encoding=encoding, 

499 errors=errors, 

500 newline=newline, 

501 total=total, 

502 description=description, 

503 ) 

504 return _ReadContext(progress, reader) # type: ignore[return-value, type-var] 

505 

506 

507class ProgressColumn(ABC): 

508 """Base class for a widget to use in progress display.""" 

509 

510 max_refresh: Optional[float] = None 

511 

512 def __init__(self, table_column: Optional[Column] = None) -> None: 

513 self._table_column = table_column 

514 self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {} 

515 self._update_time: Optional[float] = None 

516 

517 def get_table_column(self) -> Column: 

518 """Get a table column, used to build tasks table.""" 

519 return self._table_column or Column() 

520 

521 def __call__(self, task: "Task") -> RenderableType: 

522 """Called by the Progress object to return a renderable for the given task. 

523 

524 Args: 

525 task (Task): An object containing information regarding the task. 

526 

527 Returns: 

528 RenderableType: Anything renderable (including str). 

529 """ 

530 current_time = task.get_time() 

531 if self.max_refresh is not None and not task.completed: 

532 try: 

533 timestamp, renderable = self._renderable_cache[task.id] 

534 except KeyError: 

535 pass 

536 else: 

537 if timestamp + self.max_refresh > current_time: 

538 return renderable 

539 

540 renderable = self.render(task) 

541 self._renderable_cache[task.id] = (current_time, renderable) 

542 return renderable 

543 

544 @abstractmethod 

545 def render(self, task: "Task") -> RenderableType: 

546 """Should return a renderable object.""" 

547 

548 

549class RenderableColumn(ProgressColumn): 

550 """A column to insert an arbitrary column. 

551 

552 Args: 

553 renderable (RenderableType, optional): Any renderable. Defaults to empty string. 

554 """ 

555 

556 def __init__( 

557 self, renderable: RenderableType = "", *, table_column: Optional[Column] = None 

558 ): 

559 self.renderable = renderable 

560 super().__init__(table_column=table_column) 

561 

562 def render(self, task: "Task") -> RenderableType: 

563 return self.renderable 

564 

565 

566class SpinnerColumn(ProgressColumn): 

567 """A column with a 'spinner' animation. 

568 

569 Args: 

570 spinner_name (str, optional): Name of spinner animation. Defaults to "dots". 

571 style (StyleType, optional): Style of spinner. Defaults to "progress.spinner". 

572 speed (float, optional): Speed factor of spinner. Defaults to 1.0. 

573 finished_text (TextType, optional): Text used when task is finished. Defaults to " ". 

574 """ 

575 

576 def __init__( 

577 self, 

578 spinner_name: str = "dots", 

579 style: Optional[StyleType] = "progress.spinner", 

580 speed: float = 1.0, 

581 finished_text: TextType = " ", 

582 table_column: Optional[Column] = None, 

583 ): 

584 self.spinner = Spinner(spinner_name, style=style, speed=speed) 

585 self.finished_text = ( 

586 Text.from_markup(finished_text) 

587 if isinstance(finished_text, str) 

588 else finished_text 

589 ) 

590 super().__init__(table_column=table_column) 

591 

592 def set_spinner( 

593 self, 

594 spinner_name: str, 

595 spinner_style: Optional[StyleType] = "progress.spinner", 

596 speed: float = 1.0, 

597 ) -> None: 

598 """Set a new spinner. 

599 

600 Args: 

601 spinner_name (str): Spinner name, see python -m rich.spinner. 

602 spinner_style (Optional[StyleType], optional): Spinner style. Defaults to "progress.spinner". 

603 speed (float, optional): Speed factor of spinner. Defaults to 1.0. 

604 """ 

605 self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed) 

606 

607 def render(self, task: "Task") -> RenderableType: 

608 text = ( 

609 self.finished_text 

610 if task.finished 

611 else self.spinner.render(task.get_time()) 

612 ) 

613 return text 

614 

615 

616class TextColumn(ProgressColumn): 

617 """A column containing text.""" 

618 

619 def __init__( 

620 self, 

621 text_format: str, 

622 style: StyleType = "none", 

623 justify: JustifyMethod = "left", 

624 markup: bool = True, 

625 highlighter: Optional[Highlighter] = None, 

626 table_column: Optional[Column] = None, 

627 ) -> None: 

628 self.text_format = text_format 

629 self.justify: JustifyMethod = justify 

630 self.style = style 

631 self.markup = markup 

632 self.highlighter = highlighter 

633 super().__init__(table_column=table_column or Column(no_wrap=True)) 

634 

635 def render(self, task: "Task") -> Text: 

636 _text = self.text_format.format(task=task) 

637 if self.markup: 

638 text = Text.from_markup(_text, style=self.style, justify=self.justify) 

639 else: 

640 text = Text(_text, style=self.style, justify=self.justify) 

641 if self.highlighter: 

642 self.highlighter.highlight(text) 

643 return text 

644 

645 

646class BarColumn(ProgressColumn): 

647 """Renders a visual progress bar. 

648 

649 Args: 

650 bar_width (Optional[int], optional): Width of bar or None for full width. Defaults to 40. 

651 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

652 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

653 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

654 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

655 """ 

656 

657 def __init__( 

658 self, 

659 bar_width: Optional[int] = 40, 

660 style: StyleType = "bar.back", 

661 complete_style: StyleType = "bar.complete", 

662 finished_style: StyleType = "bar.finished", 

663 pulse_style: StyleType = "bar.pulse", 

664 table_column: Optional[Column] = None, 

665 ) -> None: 

666 self.bar_width = bar_width 

667 self.style = style 

668 self.complete_style = complete_style 

669 self.finished_style = finished_style 

670 self.pulse_style = pulse_style 

671 super().__init__(table_column=table_column) 

672 

673 def render(self, task: "Task") -> ProgressBar: 

674 """Gets a progress bar widget for a task.""" 

675 return ProgressBar( 

676 total=max(0, task.total) if task.total is not None else None, 

677 completed=max(0, task.completed), 

678 width=None if self.bar_width is None else max(1, self.bar_width), 

679 pulse=not task.started, 

680 animation_time=task.get_time(), 

681 style=self.style, 

682 complete_style=self.complete_style, 

683 finished_style=self.finished_style, 

684 pulse_style=self.pulse_style, 

685 ) 

686 

687 

688class TimeElapsedColumn(ProgressColumn): 

689 """Renders time elapsed.""" 

690 

691 def render(self, task: "Task") -> Text: 

692 """Show time elapsed.""" 

693 elapsed = task.finished_time if task.finished else task.elapsed 

694 if elapsed is None: 

695 return Text("-:--:--", style="progress.elapsed") 

696 delta = timedelta(seconds=max(0, int(elapsed))) 

697 return Text(str(delta), style="progress.elapsed") 

698 

699 

700class TaskProgressColumn(TextColumn): 

701 """Show task progress as a percentage. 

702 

703 Args: 

704 text_format (str, optional): Format for percentage display. Defaults to "[progress.percentage]{task.percentage:>3.0f}%". 

705 text_format_no_percentage (str, optional): Format if percentage is unknown. Defaults to "". 

706 style (StyleType, optional): Style of output. Defaults to "none". 

707 justify (JustifyMethod, optional): Text justification. Defaults to "left". 

708 markup (bool, optional): Enable markup. Defaults to True. 

709 highlighter (Optional[Highlighter], optional): Highlighter to apply to output. Defaults to None. 

710 table_column (Optional[Column], optional): Table Column to use. Defaults to None. 

711 show_speed (bool, optional): Show speed if total is unknown. Defaults to False. 

712 """ 

713 

714 def __init__( 

715 self, 

716 text_format: str = "[progress.percentage]{task.percentage:>3.0f}%", 

717 text_format_no_percentage: str = "", 

718 style: StyleType = "none", 

719 justify: JustifyMethod = "left", 

720 markup: bool = True, 

721 highlighter: Optional[Highlighter] = None, 

722 table_column: Optional[Column] = None, 

723 show_speed: bool = False, 

724 ) -> None: 

725 self.text_format_no_percentage = text_format_no_percentage 

726 self.show_speed = show_speed 

727 super().__init__( 

728 text_format=text_format, 

729 style=style, 

730 justify=justify, 

731 markup=markup, 

732 highlighter=highlighter, 

733 table_column=table_column, 

734 ) 

735 

736 @classmethod 

737 def render_speed(cls, speed: Optional[float]) -> Text: 

738 """Render the speed in iterations per second. 

739 

740 Args: 

741 task (Task): A Task object. 

742 

743 Returns: 

744 Text: Text object containing the task speed. 

745 """ 

746 if speed is None: 

747 return Text("", style="progress.percentage") 

748 unit, suffix = filesize.pick_unit_and_suffix( 

749 int(speed), 

750 ["", "×10³", "×10⁶", "×10⁹", "×10¹²"], 

751 1000, 

752 ) 

753 data_speed = speed / unit 

754 return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage") 

755 

756 def render(self, task: "Task") -> Text: 

757 if task.total is None and self.show_speed: 

758 return self.render_speed(task.finished_speed or task.speed) 

759 text_format = ( 

760 self.text_format_no_percentage if task.total is None else self.text_format 

761 ) 

762 _text = text_format.format(task=task) 

763 if self.markup: 

764 text = Text.from_markup(_text, style=self.style, justify=self.justify) 

765 else: 

766 text = Text(_text, style=self.style, justify=self.justify) 

767 if self.highlighter: 

768 self.highlighter.highlight(text) 

769 return text 

770 

771 

772class TimeRemainingColumn(ProgressColumn): 

773 """Renders estimated time remaining. 

774 

775 Args: 

776 compact (bool, optional): Render MM:SS when time remaining is less than an hour. Defaults to False. 

777 elapsed_when_finished (bool, optional): Render time elapsed when the task is finished. Defaults to False. 

778 """ 

779 

780 # Only refresh twice a second to prevent jitter 

781 max_refresh = 0.5 

782 

783 def __init__( 

784 self, 

785 compact: bool = False, 

786 elapsed_when_finished: bool = False, 

787 table_column: Optional[Column] = None, 

788 ): 

789 self.compact = compact 

790 self.elapsed_when_finished = elapsed_when_finished 

791 super().__init__(table_column=table_column) 

792 

793 def render(self, task: "Task") -> Text: 

794 """Show time remaining.""" 

795 if self.elapsed_when_finished and task.finished: 

796 task_time = task.finished_time 

797 style = "progress.elapsed" 

798 else: 

799 task_time = task.time_remaining 

800 style = "progress.remaining" 

801 

802 if task.total is None: 

803 return Text("", style=style) 

804 

805 if task_time is None: 

806 return Text("--:--" if self.compact else "-:--:--", style=style) 

807 

808 # Based on https://github.com/tqdm/tqdm/blob/master/tqdm/std.py 

809 minutes, seconds = divmod(int(task_time), 60) 

810 hours, minutes = divmod(minutes, 60) 

811 

812 if self.compact and not hours: 

813 formatted = f"{minutes:02d}:{seconds:02d}" 

814 else: 

815 formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}" 

816 

817 return Text(formatted, style=style) 

818 

819 

820class FileSizeColumn(ProgressColumn): 

821 """Renders completed filesize.""" 

822 

823 def render(self, task: "Task") -> Text: 

824 """Show data completed.""" 

825 data_size = filesize.decimal(int(task.completed)) 

826 return Text(data_size, style="progress.filesize") 

827 

828 

829class TotalFileSizeColumn(ProgressColumn): 

830 """Renders total filesize.""" 

831 

832 def render(self, task: "Task") -> Text: 

833 """Show data completed.""" 

834 data_size = filesize.decimal(int(task.total)) if task.total is not None else "" 

835 return Text(data_size, style="progress.filesize.total") 

836 

837 

838class MofNCompleteColumn(ProgressColumn): 

839 """Renders completed count/total, e.g. ' 10/1000'. 

840 

841 Best for bounded tasks with int quantities. 

842 

843 Space pads the completed count so that progress length does not change as task progresses 

844 past powers of 10. 

845 

846 Args: 

847 separator (str, optional): Text to separate completed and total values. Defaults to "/". 

848 """ 

849 

850 def __init__(self, separator: str = "/", table_column: Optional[Column] = None): 

851 self.separator = separator 

852 super().__init__(table_column=table_column) 

853 

854 def render(self, task: "Task") -> Text: 

855 """Show completed/total.""" 

856 completed = int(task.completed) 

857 total = int(task.total) if task.total is not None else "?" 

858 total_width = len(str(total)) 

859 return Text( 

860 f"{completed:{total_width}d}{self.separator}{total}", 

861 style="progress.download", 

862 ) 

863 

864 

865class DownloadColumn(ProgressColumn): 

866 """Renders file size downloaded and total, e.g. '0.5/2.3 GB'. 

867 

868 Args: 

869 binary_units (bool, optional): Use binary units, KiB, MiB etc. Defaults to False. 

870 """ 

871 

872 def __init__( 

873 self, binary_units: bool = False, table_column: Optional[Column] = None 

874 ) -> None: 

875 self.binary_units = binary_units 

876 super().__init__(table_column=table_column) 

877 

878 def render(self, task: "Task") -> Text: 

879 """Calculate common unit for completed and total.""" 

880 completed = int(task.completed) 

881 

882 unit_and_suffix_calculation_base = ( 

883 int(task.total) if task.total is not None else completed 

884 ) 

885 if self.binary_units: 

886 unit, suffix = filesize.pick_unit_and_suffix( 

887 unit_and_suffix_calculation_base, 

888 ["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"], 

889 1024, 

890 ) 

891 else: 

892 unit, suffix = filesize.pick_unit_and_suffix( 

893 unit_and_suffix_calculation_base, 

894 ["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"], 

895 1000, 

896 ) 

897 precision = 0 if unit == 1 else 1 

898 

899 completed_ratio = completed / unit 

900 completed_str = f"{completed_ratio:,.{precision}f}" 

901 

902 if task.total is not None: 

903 total = int(task.total) 

904 total_ratio = total / unit 

905 total_str = f"{total_ratio:,.{precision}f}" 

906 else: 

907 total_str = "?" 

908 

909 download_status = f"{completed_str}/{total_str} {suffix}" 

910 download_text = Text(download_status, style="progress.download") 

911 return download_text 

912 

913 

914class TransferSpeedColumn(ProgressColumn): 

915 """Renders human readable transfer speed.""" 

916 

917 def render(self, task: "Task") -> Text: 

918 """Show data transfer speed.""" 

919 speed = task.finished_speed or task.speed 

920 if speed is None: 

921 return Text("?", style="progress.data.speed") 

922 data_speed = filesize.decimal(int(speed)) 

923 return Text(f"{data_speed}/s", style="progress.data.speed") 

924 

925 

926class ProgressSample(NamedTuple): 

927 """Sample of progress for a given time.""" 

928 

929 timestamp: float 

930 """Timestamp of sample.""" 

931 completed: float 

932 """Number of steps completed.""" 

933 

934 

935@dataclass 

936class Task: 

937 """Information regarding a progress task. 

938 

939 This object should be considered read-only outside of the :class:`~Progress` class. 

940 

941 """ 

942 

943 id: TaskID 

944 """Task ID associated with this task (used in Progress methods).""" 

945 

946 description: str 

947 """str: Description of the task.""" 

948 

949 total: Optional[float] 

950 """Optional[float]: Total number of steps in this task.""" 

951 

952 completed: float 

953 """float: Number of steps completed""" 

954 

955 _get_time: GetTimeCallable 

956 """Callable to get the current time.""" 

957 

958 finished_time: Optional[float] = None 

959 """float: Time task was finished.""" 

960 

961 visible: bool = True 

962 """bool: Indicates if this task is visible in the progress display.""" 

963 

964 fields: Dict[str, Any] = field(default_factory=dict) 

965 """dict: Arbitrary fields passed in via Progress.update.""" 

966 

967 start_time: Optional[float] = field(default=None, init=False, repr=False) 

968 """Optional[float]: Time this task was started, or None if not started.""" 

969 

970 stop_time: Optional[float] = field(default=None, init=False, repr=False) 

971 """Optional[float]: Time this task was stopped, or None if not stopped.""" 

972 

973 finished_speed: Optional[float] = None 

974 """Optional[float]: The last speed for a finished task.""" 

975 

976 _progress: Deque[ProgressSample] = field( 

977 default_factory=lambda: deque(maxlen=1000), init=False, repr=False 

978 ) 

979 

980 _lock: RLock = field(repr=False, default_factory=RLock) 

981 """Thread lock.""" 

982 

983 def get_time(self) -> float: 

984 """float: Get the current time, in seconds.""" 

985 return self._get_time() 

986 

987 @property 

988 def started(self) -> bool: 

989 """bool: Check if the task as started.""" 

990 return self.start_time is not None 

991 

992 @property 

993 def remaining(self) -> Optional[float]: 

994 """Optional[float]: Get the number of steps remaining, if a non-None total was set.""" 

995 if self.total is None: 

996 return None 

997 return self.total - self.completed 

998 

999 @property 

1000 def elapsed(self) -> Optional[float]: 

1001 """Optional[float]: Time elapsed since task was started, or ``None`` if the task hasn't started.""" 

1002 if self.start_time is None: 

1003 return None 

1004 if self.stop_time is not None: 

1005 return self.stop_time - self.start_time 

1006 return self.get_time() - self.start_time 

1007 

1008 @property 

1009 def finished(self) -> bool: 

1010 """Check if the task has finished.""" 

1011 return self.finished_time is not None 

1012 

1013 @property 

1014 def percentage(self) -> float: 

1015 """float: Get progress of task as a percentage. If a None total was set, returns 0""" 

1016 if not self.total: 

1017 return 0.0 

1018 completed = (self.completed / self.total) * 100.0 

1019 completed = min(100.0, max(0.0, completed)) 

1020 return completed 

1021 

1022 @property 

1023 def speed(self) -> Optional[float]: 

1024 """Optional[float]: Get the estimated speed in steps per second.""" 

1025 if self.start_time is None: 

1026 return None 

1027 with self._lock: 

1028 progress = self._progress 

1029 if not progress: 

1030 return None 

1031 total_time = progress[-1].timestamp - progress[0].timestamp 

1032 if total_time == 0: 

1033 return None 

1034 iter_progress = iter(progress) 

1035 next(iter_progress) 

1036 total_completed = sum(sample.completed for sample in iter_progress) 

1037 speed = total_completed / total_time 

1038 return speed 

1039 

1040 @property 

1041 def time_remaining(self) -> Optional[float]: 

1042 """Optional[float]: Get estimated time to completion, or ``None`` if no data.""" 

1043 if self.finished: 

1044 return 0.0 

1045 speed = self.speed 

1046 if not speed: 

1047 return None 

1048 remaining = self.remaining 

1049 if remaining is None: 

1050 return None 

1051 estimate = ceil(remaining / speed) 

1052 return estimate 

1053 

1054 def _reset(self) -> None: 

1055 """Reset progress.""" 

1056 self._progress.clear() 

1057 self.finished_time = None 

1058 self.finished_speed = None 

1059 

1060 

1061class Progress(JupyterMixin): 

1062 """Renders an auto-updating progress bar(s). 

1063 

1064 Args: 

1065 console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout. 

1066 auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`. 

1067 refresh_per_second (Optional[float], optional): Number of times per second to refresh the progress information or None to use default (10). Defaults to None. 

1068 speed_estimate_period: (float, optional): Period (in seconds) used to calculate the speed estimate. Defaults to 30. 

1069 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

1070 redirect_stdout: (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True. 

1071 redirect_stderr: (bool, optional): Enable redirection of stderr. Defaults to True. 

1072 get_time: (Callable, optional): A callable that gets the current time, or None to use Console.get_time. Defaults to None. 

1073 disable (bool, optional): Disable progress display. Defaults to False 

1074 expand (bool, optional): Expand tasks table to fit width. Defaults to False. 

1075 """ 

1076 

1077 def __init__( 

1078 self, 

1079 *columns: Union[str, ProgressColumn], 

1080 console: Optional[Console] = None, 

1081 auto_refresh: bool = True, 

1082 refresh_per_second: float = 10, 

1083 speed_estimate_period: float = 30.0, 

1084 transient: bool = False, 

1085 redirect_stdout: bool = True, 

1086 redirect_stderr: bool = True, 

1087 get_time: Optional[GetTimeCallable] = None, 

1088 disable: bool = False, 

1089 expand: bool = False, 

1090 ) -> None: 

1091 assert refresh_per_second > 0, "refresh_per_second must be > 0" 

1092 self._lock = RLock() 

1093 self.columns = columns or self.get_default_columns() 

1094 self.speed_estimate_period = speed_estimate_period 

1095 

1096 self.disable = disable 

1097 self.expand = expand 

1098 self._tasks: Dict[TaskID, Task] = {} 

1099 self._task_index: TaskID = TaskID(0) 

1100 self.live = Live( 

1101 console=console or get_console(), 

1102 auto_refresh=auto_refresh, 

1103 refresh_per_second=refresh_per_second, 

1104 transient=transient, 

1105 redirect_stdout=redirect_stdout, 

1106 redirect_stderr=redirect_stderr, 

1107 get_renderable=self.get_renderable, 

1108 ) 

1109 self.get_time = get_time or self.console.get_time 

1110 self.print = self.console.print 

1111 self.log = self.console.log 

1112 

1113 @classmethod 

1114 def get_default_columns(cls) -> Tuple[ProgressColumn, ...]: 

1115 """Get the default columns used for a new Progress instance: 

1116 - a text column for the description (TextColumn) 

1117 - the bar itself (BarColumn) 

1118 - a text column showing completion percentage (TextColumn) 

1119 - an estimated-time-remaining column (TimeRemainingColumn) 

1120 If the Progress instance is created without passing a columns argument, 

1121 the default columns defined here will be used. 

1122 

1123 You can also create a Progress instance using custom columns before 

1124 and/or after the defaults, as in this example: 

1125 

1126 progress = Progress( 

1127 SpinnerColumn(), 

1128 *Progress.get_default_columns(), 

1129 "Elapsed:", 

1130 TimeElapsedColumn(), 

1131 ) 

1132 

1133 This code shows the creation of a Progress display, containing 

1134 a spinner to the left, the default columns, and a labeled elapsed 

1135 time column. 

1136 """ 

1137 return ( 

1138 TextColumn("[progress.description]{task.description}"), 

1139 BarColumn(), 

1140 TaskProgressColumn(), 

1141 TimeRemainingColumn(), 

1142 ) 

1143 

1144 @property 

1145 def console(self) -> Console: 

1146 return self.live.console 

1147 

1148 @property 

1149 def tasks(self) -> List[Task]: 

1150 """Get a list of Task instances.""" 

1151 with self._lock: 

1152 return list(self._tasks.values()) 

1153 

1154 @property 

1155 def task_ids(self) -> List[TaskID]: 

1156 """A list of task IDs.""" 

1157 with self._lock: 

1158 return list(self._tasks.keys()) 

1159 

1160 @property 

1161 def finished(self) -> bool: 

1162 """Check if all tasks have been completed.""" 

1163 with self._lock: 

1164 if not self._tasks: 

1165 return True 

1166 return all(task.finished for task in self._tasks.values()) 

1167 

1168 def start(self) -> None: 

1169 """Start the progress display.""" 

1170 if not self.disable: 

1171 self.live.start(refresh=True) 

1172 

1173 def stop(self) -> None: 

1174 """Stop the progress display.""" 

1175 self.live.stop() 

1176 if not self.console.is_interactive and not self.console.is_jupyter: 

1177 self.console.print() 

1178 

1179 def __enter__(self) -> Self: 

1180 self.start() 

1181 return self 

1182 

1183 def __exit__( 

1184 self, 

1185 exc_type: Optional[Type[BaseException]], 

1186 exc_val: Optional[BaseException], 

1187 exc_tb: Optional[TracebackType], 

1188 ) -> None: 

1189 self.stop() 

1190 

1191 def track( 

1192 self, 

1193 sequence: Iterable[ProgressType], 

1194 total: Optional[float] = None, 

1195 completed: int = 0, 

1196 task_id: Optional[TaskID] = None, 

1197 description: str = "Working...", 

1198 update_period: float = 0.1, 

1199 ) -> Iterable[ProgressType]: 

1200 """Track progress by iterating over a sequence. 

1201 

1202 You can also track progress of an iterable, which might require that you additionally specify ``total``. 

1203 

1204 Args: 

1205 sequence (Iterable[ProgressType]): Values you want to iterate over and track progress. 

1206 total: (float, optional): Total number of steps. Default is len(sequence). 

1207 completed (int, optional): Number of steps completed so far. Defaults to 0. 

1208 task_id: (TaskID): Task to track. Default is new task. 

1209 description: (str, optional): Description of task, if new task is created. 

1210 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. 

1211 

1212 Returns: 

1213 Iterable[ProgressType]: An iterable of values taken from the provided sequence. 

1214 """ 

1215 if total is None: 

1216 total = float(length_hint(sequence)) or None 

1217 

1218 if task_id is None: 

1219 task_id = self.add_task(description, total=total, completed=completed) 

1220 else: 

1221 self.update(task_id, total=total, completed=completed) 

1222 

1223 if self.live.auto_refresh: 

1224 with _TrackThread(self, task_id, update_period) as track_thread: 

1225 for value in sequence: 

1226 yield value 

1227 track_thread.completed += 1 

1228 else: 

1229 advance = self.advance 

1230 refresh = self.refresh 

1231 for value in sequence: 

1232 yield value 

1233 advance(task_id, 1) 

1234 refresh() 

1235 

1236 def wrap_file( 

1237 self, 

1238 file: BinaryIO, 

1239 total: Optional[int] = None, 

1240 *, 

1241 task_id: Optional[TaskID] = None, 

1242 description: str = "Reading...", 

1243 ) -> BinaryIO: 

1244 """Track progress file reading from a binary file. 

1245 

1246 Args: 

1247 file (BinaryIO): A file-like object opened in binary mode. 

1248 total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given. 

1249 task_id (TaskID): Task to track. Default is new task. 

1250 description (str, optional): Description of task, if new task is created. 

1251 

1252 Returns: 

1253 BinaryIO: A readable file-like object in binary mode. 

1254 

1255 Raises: 

1256 ValueError: When no total value can be extracted from the arguments or the task. 

1257 """ 

1258 # attempt to recover the total from the task 

1259 total_bytes: Optional[float] = None 

1260 if total is not None: 

1261 total_bytes = total 

1262 elif task_id is not None: 

1263 with self._lock: 

1264 total_bytes = self._tasks[task_id].total 

1265 if total_bytes is None: 

1266 raise ValueError( 

1267 f"unable to get the total number of bytes, please specify 'total'" 

1268 ) 

1269 

1270 # update total of task or create new task 

1271 if task_id is None: 

1272 task_id = self.add_task(description, total=total_bytes) 

1273 else: 

1274 self.update(task_id, total=total_bytes) 

1275 

1276 return _Reader(file, self, task_id, close_handle=False) 

1277 

1278 @typing.overload 

1279 def open( 

1280 self, 

1281 file: Union[str, "PathLike[str]", bytes], 

1282 mode: Literal["rb"], 

1283 buffering: int = -1, 

1284 encoding: Optional[str] = None, 

1285 errors: Optional[str] = None, 

1286 newline: Optional[str] = None, 

1287 *, 

1288 total: Optional[int] = None, 

1289 task_id: Optional[TaskID] = None, 

1290 description: str = "Reading...", 

1291 ) -> BinaryIO: 

1292 pass 

1293 

1294 @typing.overload 

1295 def open( 

1296 self, 

1297 file: Union[str, "PathLike[str]", bytes], 

1298 mode: Union[Literal["r"], Literal["rt"]], 

1299 buffering: int = -1, 

1300 encoding: Optional[str] = None, 

1301 errors: Optional[str] = None, 

1302 newline: Optional[str] = None, 

1303 *, 

1304 total: Optional[int] = None, 

1305 task_id: Optional[TaskID] = None, 

1306 description: str = "Reading...", 

1307 ) -> TextIO: 

1308 pass 

1309 

1310 def open( 

1311 self, 

1312 file: Union[str, "PathLike[str]", bytes], 

1313 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", 

1314 buffering: int = -1, 

1315 encoding: Optional[str] = None, 

1316 errors: Optional[str] = None, 

1317 newline: Optional[str] = None, 

1318 *, 

1319 total: Optional[int] = None, 

1320 task_id: Optional[TaskID] = None, 

1321 description: str = "Reading...", 

1322 ) -> Union[BinaryIO, TextIO]: 

1323 """Track progress while reading from a binary file. 

1324 

1325 Args: 

1326 path (Union[str, PathLike[str]]): The path to the file to read. 

1327 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". 

1328 buffering (int): The buffering strategy to use, see :func:`io.open`. 

1329 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. 

1330 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. 

1331 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`. 

1332 total (int, optional): Total number of bytes to read. If none given, os.stat(path).st_size is used. 

1333 task_id (TaskID): Task to track. Default is new task. 

1334 description (str, optional): Description of task, if new task is created. 

1335 

1336 Returns: 

1337 BinaryIO: A readable file-like object in binary mode. 

1338 

1339 Raises: 

1340 ValueError: When an invalid mode is given. 

1341 """ 

1342 # normalize the mode (always rb, rt) 

1343 _mode = "".join(sorted(mode, reverse=False)) 

1344 if _mode not in ("br", "rt", "r"): 

1345 raise ValueError(f"invalid mode {mode!r}") 

1346 

1347 # patch buffering to provide the same behaviour as the builtin `open` 

1348 line_buffering = buffering == 1 

1349 if _mode == "br" and buffering == 1: 

1350 warnings.warn( 

1351 "line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used", 

1352 RuntimeWarning, 

1353 ) 

1354 buffering = -1 

1355 elif _mode in ("rt", "r"): 

1356 if buffering == 0: 

1357 raise ValueError("can't have unbuffered text I/O") 

1358 elif buffering == 1: 

1359 buffering = -1 

1360 

1361 # attempt to get the total with `os.stat` 

1362 if total is None: 

1363 total = stat(file).st_size 

1364 

1365 # update total of task or create new task 

1366 if task_id is None: 

1367 task_id = self.add_task(description, total=total) 

1368 else: 

1369 self.update(task_id, total=total) 

1370 

1371 # open the file in binary mode, 

1372 handle = io.open(file, "rb", buffering=buffering) 

1373 reader = _Reader(handle, self, task_id, close_handle=True) 

1374 

1375 # wrap the reader in a `TextIOWrapper` if text mode 

1376 if mode in ("r", "rt"): 

1377 return io.TextIOWrapper( 

1378 reader, 

1379 encoding=encoding, 

1380 errors=errors, 

1381 newline=newline, 

1382 line_buffering=line_buffering, 

1383 ) 

1384 

1385 return reader 

1386 

1387 def start_task(self, task_id: TaskID) -> None: 

1388 """Start a task. 

1389 

1390 Starts a task (used when calculating elapsed time). You may need to call this manually, 

1391 if you called ``add_task`` with ``start=False``. 

1392 

1393 Args: 

1394 task_id (TaskID): ID of task. 

1395 """ 

1396 with self._lock: 

1397 task = self._tasks[task_id] 

1398 if task.start_time is None: 

1399 task.start_time = self.get_time() 

1400 

1401 def stop_task(self, task_id: TaskID) -> None: 

1402 """Stop a task. 

1403 

1404 This will freeze the elapsed time on the task. 

1405 

1406 Args: 

1407 task_id (TaskID): ID of task. 

1408 """ 

1409 with self._lock: 

1410 task = self._tasks[task_id] 

1411 current_time = self.get_time() 

1412 if task.start_time is None: 

1413 task.start_time = current_time 

1414 task.stop_time = current_time 

1415 

1416 def update( 

1417 self, 

1418 task_id: TaskID, 

1419 *, 

1420 total: Optional[float] = None, 

1421 completed: Optional[float] = None, 

1422 advance: Optional[float] = None, 

1423 description: Optional[str] = None, 

1424 visible: Optional[bool] = None, 

1425 refresh: bool = False, 

1426 **fields: Any, 

1427 ) -> None: 

1428 """Update information associated with a task. 

1429 

1430 Args: 

1431 task_id (TaskID): Task id (returned by add_task). 

1432 total (float, optional): Updates task.total if not None. 

1433 completed (float, optional): Updates task.completed if not None. 

1434 advance (float, optional): Add a value to task.completed if not None. 

1435 description (str, optional): Change task description if not None. 

1436 visible (bool, optional): Set visible flag if not None. 

1437 refresh (bool): Force a refresh of progress information. Default is False. 

1438 **fields (Any): Additional data fields required for rendering. 

1439 """ 

1440 with self._lock: 

1441 task = self._tasks[task_id] 

1442 completed_start = task.completed 

1443 

1444 if total is not None and total != task.total: 

1445 task.total = total 

1446 task._reset() 

1447 if advance is not None: 

1448 task.completed += advance 

1449 if completed is not None: 

1450 task.completed = completed 

1451 if description is not None: 

1452 task.description = description 

1453 if visible is not None: 

1454 task.visible = visible 

1455 task.fields.update(fields) 

1456 update_completed = task.completed - completed_start 

1457 

1458 current_time = self.get_time() 

1459 old_sample_time = current_time - self.speed_estimate_period 

1460 _progress = task._progress 

1461 

1462 popleft = _progress.popleft 

1463 while _progress and _progress[0].timestamp < old_sample_time: 

1464 popleft() 

1465 if update_completed > 0: 

1466 _progress.append(ProgressSample(current_time, update_completed)) 

1467 if ( 

1468 task.total is not None 

1469 and task.completed >= task.total 

1470 and task.finished_time is None 

1471 ): 

1472 task.finished_time = task.elapsed 

1473 

1474 if refresh: 

1475 self.refresh() 

1476 

1477 def reset( 

1478 self, 

1479 task_id: TaskID, 

1480 *, 

1481 start: bool = True, 

1482 total: Optional[float] = None, 

1483 completed: int = 0, 

1484 visible: Optional[bool] = None, 

1485 description: Optional[str] = None, 

1486 **fields: Any, 

1487 ) -> None: 

1488 """Reset a task so completed is 0 and the clock is reset. 

1489 

1490 Args: 

1491 task_id (TaskID): ID of task. 

1492 start (bool, optional): Start the task after reset. Defaults to True. 

1493 total (float, optional): New total steps in task, or None to use current total. Defaults to None. 

1494 completed (int, optional): Number of steps completed. Defaults to 0. 

1495 visible (bool, optional): Enable display of the task. Defaults to True. 

1496 description (str, optional): Change task description if not None. Defaults to None. 

1497 **fields (str): Additional data fields required for rendering. 

1498 """ 

1499 current_time = self.get_time() 

1500 with self._lock: 

1501 task = self._tasks[task_id] 

1502 task._reset() 

1503 task.start_time = current_time if start else None 

1504 if total is not None: 

1505 task.total = total 

1506 task.completed = completed 

1507 if visible is not None: 

1508 task.visible = visible 

1509 if fields: 

1510 task.fields = fields 

1511 if description is not None: 

1512 task.description = description 

1513 task.finished_time = None 

1514 self.refresh() 

1515 

1516 def advance(self, task_id: TaskID, advance: float = 1) -> None: 

1517 """Advance task by a number of steps. 

1518 

1519 Args: 

1520 task_id (TaskID): ID of task. 

1521 advance (float): Number of steps to advance. Default is 1. 

1522 """ 

1523 current_time = self.get_time() 

1524 with self._lock: 

1525 task = self._tasks[task_id] 

1526 completed_start = task.completed 

1527 task.completed += advance 

1528 update_completed = task.completed - completed_start 

1529 old_sample_time = current_time - self.speed_estimate_period 

1530 _progress = task._progress 

1531 

1532 popleft = _progress.popleft 

1533 while _progress and _progress[0].timestamp < old_sample_time: 

1534 popleft() 

1535 while len(_progress) > 1000: 

1536 popleft() 

1537 _progress.append(ProgressSample(current_time, update_completed)) 

1538 if ( 

1539 task.total is not None 

1540 and task.completed >= task.total 

1541 and task.finished_time is None 

1542 ): 

1543 task.finished_time = task.elapsed 

1544 task.finished_speed = task.speed 

1545 

1546 def refresh(self) -> None: 

1547 """Refresh (render) the progress information.""" 

1548 if not self.disable and self.live.is_started: 

1549 self.live.refresh() 

1550 

1551 def get_renderable(self) -> RenderableType: 

1552 """Get a renderable for the progress display.""" 

1553 renderable = Group(*self.get_renderables()) 

1554 return renderable 

1555 

1556 def get_renderables(self) -> Iterable[RenderableType]: 

1557 """Get a number of renderables for the progress display.""" 

1558 table = self.make_tasks_table(self.tasks) 

1559 yield table 

1560 

1561 def make_tasks_table(self, tasks: Iterable[Task]) -> Table: 

1562 """Get a table to render the Progress display. 

1563 

1564 Args: 

1565 tasks (Iterable[Task]): An iterable of Task instances, one per row of the table. 

1566 

1567 Returns: 

1568 Table: A table instance. 

1569 """ 

1570 table_columns = ( 

1571 ( 

1572 Column(no_wrap=True) 

1573 if isinstance(_column, str) 

1574 else _column.get_table_column().copy() 

1575 ) 

1576 for _column in self.columns 

1577 ) 

1578 table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand) 

1579 

1580 for task in tasks: 

1581 if task.visible: 

1582 table.add_row( 

1583 *( 

1584 ( 

1585 column.format(task=task) 

1586 if isinstance(column, str) 

1587 else column(task) 

1588 ) 

1589 for column in self.columns 

1590 ) 

1591 ) 

1592 return table 

1593 

1594 def __rich__(self) -> RenderableType: 

1595 """Makes the Progress class itself renderable.""" 

1596 with self._lock: 

1597 return self.get_renderable() 

1598 

1599 def add_task( 

1600 self, 

1601 description: str, 

1602 start: bool = True, 

1603 total: Optional[float] = 100.0, 

1604 completed: int = 0, 

1605 visible: bool = True, 

1606 **fields: Any, 

1607 ) -> TaskID: 

1608 """Add a new 'task' to the Progress display. 

1609 

1610 Args: 

1611 description (str): A description of the task. 

1612 start (bool, optional): Start the task immediately (to calculate elapsed time). If set to False, 

1613 you will need to call `start` manually. Defaults to True. 

1614 total (float, optional): Number of total steps in the progress if known. 

1615 Set to None to render a pulsing animation. Defaults to 100. 

1616 completed (int, optional): Number of steps completed so far. Defaults to 0. 

1617 visible (bool, optional): Enable display of the task. Defaults to True. 

1618 **fields (str): Additional data fields required for rendering. 

1619 

1620 Returns: 

1621 TaskID: An ID you can use when calling `update`. 

1622 """ 

1623 with self._lock: 

1624 task = Task( 

1625 self._task_index, 

1626 description, 

1627 total, 

1628 completed, 

1629 visible=visible, 

1630 fields=fields, 

1631 _get_time=self.get_time, 

1632 _lock=self._lock, 

1633 ) 

1634 self._tasks[self._task_index] = task 

1635 if start: 

1636 self.start_task(self._task_index) 

1637 new_task_index = self._task_index 

1638 self._task_index = TaskID(int(self._task_index) + 1) 

1639 self.refresh() 

1640 return new_task_index 

1641 

1642 def remove_task(self, task_id: TaskID) -> None: 

1643 """Delete a task if it exists. 

1644 

1645 Args: 

1646 task_id (TaskID): A task ID. 

1647 

1648 """ 

1649 with self._lock: 

1650 del self._tasks[task_id] 

1651 

1652 

1653if __name__ == "__main__": # pragma: no coverage 

1654 import random 

1655 import time 

1656 

1657 from .panel import Panel 

1658 from .rule import Rule 

1659 from .syntax import Syntax 

1660 from .table import Table 

1661 

1662 syntax = Syntax( 

1663 '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]: 

1664 """Iterate and generate a tuple with a flag for last value.""" 

1665 iter_values = iter(values) 

1666 try: 

1667 previous_value = next(iter_values) 

1668 except StopIteration: 

1669 return 

1670 for value in iter_values: 

1671 yield False, previous_value 

1672 previous_value = value 

1673 yield True, previous_value''', 

1674 "python", 

1675 line_numbers=True, 

1676 ) 

1677 

1678 table = Table("foo", "bar", "baz") 

1679 table.add_row("1", "2", "3") 

1680 

1681 progress_renderables = [ 

1682 "Text may be printed while the progress bars are rendering.", 

1683 Panel("In fact, [i]any[/i] renderable will work"), 

1684 "Such as [magenta]tables[/]...", 

1685 table, 

1686 "Pretty printed structures...", 

1687 {"type": "example", "text": "Pretty printed"}, 

1688 "Syntax...", 

1689 syntax, 

1690 Rule("Give it a try!"), 

1691 ] 

1692 

1693 from itertools import cycle 

1694 

1695 examples = cycle(progress_renderables) 

1696 

1697 console = Console(record=True) 

1698 

1699 with Progress( 

1700 SpinnerColumn(), 

1701 *Progress.get_default_columns(), 

1702 TimeElapsedColumn(), 

1703 console=console, 

1704 transient=False, 

1705 ) as progress: 

1706 task1 = progress.add_task("[red]Downloading", total=1000) 

1707 task2 = progress.add_task("[green]Processing", total=1000) 

1708 task3 = progress.add_task("[yellow]Thinking", total=None) 

1709 

1710 while not progress.finished: 

1711 progress.update(task1, advance=0.5) 

1712 progress.update(task2, advance=0.3) 

1713 time.sleep(0.01) 

1714 if random.randint(0, 100) < 1: 

1715 progress.log(next(examples))