Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rich/progress.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

626 statements  

1import io 

2import sys 

3import typing 

4import warnings 

5from abc import ABC, abstractmethod 

6from collections import deque 

7from dataclasses import dataclass, field 

8from datetime import timedelta 

9from io import RawIOBase, UnsupportedOperation 

10from math import ceil 

11from mmap import mmap 

12from operator import length_hint 

13from os import PathLike, stat 

14from threading import Event, RLock, Thread 

15from types import TracebackType 

16from typing import ( 

17 Any, 

18 BinaryIO, 

19 Callable, 

20 ContextManager, 

21 Deque, 

22 Dict, 

23 Generic, 

24 Iterable, 

25 List, 

26 NamedTuple, 

27 NewType, 

28 Optional, 

29 Sequence, 

30 TextIO, 

31 Tuple, 

32 Type, 

33 TypeVar, 

34 Union, 

35) 

36 

37if sys.version_info >= (3, 8): 

38 from typing import Literal 

39else: 

40 from typing_extensions import Literal # pragma: no cover 

41 

42if sys.version_info >= (3, 11): 

43 from typing import Self 

44else: 

45 from typing_extensions import Self # pragma: no cover 

46 

47from . import filesize, get_console 

48from .console import Console, Group, JustifyMethod, RenderableType 

49from .highlighter import Highlighter 

50from .jupyter import JupyterMixin 

51from .live import Live 

52from .progress_bar import ProgressBar 

53from .spinner import Spinner 

54from .style import StyleType 

55from .table import Column, Table 

56from .text import Text, TextType 

57 

58TaskID = NewType("TaskID", int) 

59 

60ProgressType = TypeVar("ProgressType") 

61 

62GetTimeCallable = Callable[[], float] 

63 

64 

65_I = typing.TypeVar("_I", TextIO, BinaryIO) 

66 

67 

68class _TrackThread(Thread): 

69 """A thread to periodically update progress.""" 

70 

71 def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float): 

72 self.progress = progress 

73 self.task_id = task_id 

74 self.update_period = update_period 

75 self.done = Event() 

76 

77 self.completed = 0 

78 super().__init__(daemon=True) 

79 

80 def run(self) -> None: 

81 task_id = self.task_id 

82 advance = self.progress.advance 

83 update_period = self.update_period 

84 last_completed = 0 

85 wait = self.done.wait 

86 while not wait(update_period) and self.progress.live.is_started: 

87 completed = self.completed 

88 if last_completed != completed: 

89 advance(task_id, completed - last_completed) 

90 last_completed = completed 

91 

92 self.progress.update(self.task_id, completed=self.completed, refresh=True) 

93 

94 def __enter__(self) -> "_TrackThread": 

95 self.start() 

96 return self 

97 

98 def __exit__( 

99 self, 

100 exc_type: Optional[Type[BaseException]], 

101 exc_val: Optional[BaseException], 

102 exc_tb: Optional[TracebackType], 

103 ) -> None: 

104 self.done.set() 

105 self.join() 

106 

107 

108def track( 

109 sequence: Union[Sequence[ProgressType], Iterable[ProgressType]], 

110 description: str = "Working...", 

111 total: Optional[float] = None, 

112 completed: int = 0, 

113 auto_refresh: bool = True, 

114 console: Optional[Console] = None, 

115 transient: bool = False, 

116 get_time: Optional[Callable[[], float]] = None, 

117 refresh_per_second: float = 10, 

118 style: StyleType = "bar.back", 

119 complete_style: StyleType = "bar.complete", 

120 finished_style: StyleType = "bar.finished", 

121 pulse_style: StyleType = "bar.pulse", 

122 update_period: float = 0.1, 

123 disable: bool = False, 

124 show_speed: bool = True, 

125) -> Iterable[ProgressType]: 

126 """Track progress by iterating over a sequence. 

127 

128 Args: 

129 sequence (Iterable[ProgressType]): A sequence (must support "len") you wish to iterate over. 

130 description (str, optional): Description of task show next to progress bar. Defaults to "Working". 

131 total: (float, optional): Total number of steps. Default is len(sequence). 

132 completed (int, optional): Number of steps completed so far. Defaults to 0. 

133 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. 

134 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

135 console (Console, optional): Console to write to. Default creates internal Console instance. 

136 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. 

137 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

138 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

139 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

140 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

141 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. 

142 disable (bool, optional): Disable display of progress. 

143 show_speed (bool, optional): Show speed if total isn't known. Defaults to True. 

144 Returns: 

145 Iterable[ProgressType]: An iterable of the values in the sequence. 

146 

147 """ 

148 

149 columns: List["ProgressColumn"] = ( 

150 [TextColumn("[progress.description]{task.description}")] if description else [] 

151 ) 

152 columns.extend( 

153 ( 

154 BarColumn( 

155 style=style, 

156 complete_style=complete_style, 

157 finished_style=finished_style, 

158 pulse_style=pulse_style, 

159 ), 

160 TaskProgressColumn(show_speed=show_speed), 

161 TimeRemainingColumn(elapsed_when_finished=True), 

162 ) 

163 ) 

164 progress = Progress( 

165 *columns, 

166 auto_refresh=auto_refresh, 

167 console=console, 

168 transient=transient, 

169 get_time=get_time, 

170 refresh_per_second=refresh_per_second or 10, 

171 disable=disable, 

172 ) 

173 

174 with progress: 

175 yield from progress.track( 

176 sequence, 

177 total=total, 

178 completed=completed, 

179 description=description, 

180 update_period=update_period, 

181 ) 

182 

183 

184class _Reader(RawIOBase, BinaryIO): 

185 """A reader that tracks progress while it's being read from.""" 

186 

187 def __init__( 

188 self, 

189 handle: BinaryIO, 

190 progress: "Progress", 

191 task: TaskID, 

192 close_handle: bool = True, 

193 ) -> None: 

194 self.handle = handle 

195 self.progress = progress 

196 self.task = task 

197 self.close_handle = close_handle 

198 self._closed = False 

199 

200 def __enter__(self) -> "_Reader": 

201 self.handle.__enter__() 

202 return self 

203 

204 def __exit__( 

205 self, 

206 exc_type: Optional[Type[BaseException]], 

207 exc_val: Optional[BaseException], 

208 exc_tb: Optional[TracebackType], 

209 ) -> None: 

210 self.close() 

211 

212 def __iter__(self) -> BinaryIO: 

213 return self 

214 

215 def __next__(self) -> bytes: 

216 line = next(self.handle) 

217 self.progress.advance(self.task, advance=len(line)) 

218 return line 

219 

220 @property 

221 def closed(self) -> bool: 

222 return self._closed 

223 

224 def fileno(self) -> int: 

225 return self.handle.fileno() 

226 

227 def isatty(self) -> bool: 

228 return self.handle.isatty() 

229 

230 @property 

231 def mode(self) -> str: 

232 return self.handle.mode 

233 

234 @property 

235 def name(self) -> str: 

236 return self.handle.name 

237 

238 def readable(self) -> bool: 

239 return self.handle.readable() 

240 

241 def seekable(self) -> bool: 

242 return self.handle.seekable() 

243 

244 def writable(self) -> bool: 

245 return False 

246 

247 def read(self, size: int = -1) -> bytes: 

248 block = self.handle.read(size) 

249 self.progress.advance(self.task, advance=len(block)) 

250 return block 

251 

252 def readinto(self, b: Union[bytearray, memoryview, mmap]): # type: ignore[no-untyped-def, override] 

253 n = self.handle.readinto(b) # type: ignore[attr-defined] 

254 self.progress.advance(self.task, advance=n) 

255 return n 

256 

257 def readline(self, size: int = -1) -> bytes: # type: ignore[override] 

258 line = self.handle.readline(size) 

259 self.progress.advance(self.task, advance=len(line)) 

260 return line 

261 

262 def readlines(self, hint: int = -1) -> List[bytes]: 

263 lines = self.handle.readlines(hint) 

264 self.progress.advance(self.task, advance=sum(map(len, lines))) 

265 return lines 

266 

267 def close(self) -> None: 

268 if self.close_handle: 

269 self.handle.close() 

270 self._closed = True 

271 

272 def seek(self, offset: int, whence: int = 0) -> int: 

273 pos = self.handle.seek(offset, whence) 

274 self.progress.update(self.task, completed=pos) 

275 return pos 

276 

277 def tell(self) -> int: 

278 return self.handle.tell() 

279 

280 def write(self, s: Any) -> int: 

281 raise UnsupportedOperation("write") 

282 

283 def writelines(self, lines: Iterable[Any]) -> None: 

284 raise UnsupportedOperation("writelines") 

285 

286 

287class _ReadContext(ContextManager[_I], Generic[_I]): 

288 """A utility class to handle a context for both a reader and a progress.""" 

289 

290 def __init__(self, progress: "Progress", reader: _I) -> None: 

291 self.progress = progress 

292 self.reader: _I = reader 

293 

294 def __enter__(self) -> _I: 

295 self.progress.start() 

296 return self.reader.__enter__() 

297 

298 def __exit__( 

299 self, 

300 exc_type: Optional[Type[BaseException]], 

301 exc_val: Optional[BaseException], 

302 exc_tb: Optional[TracebackType], 

303 ) -> None: 

304 self.progress.stop() 

305 self.reader.__exit__(exc_type, exc_val, exc_tb) 

306 

307 

308def wrap_file( 

309 file: BinaryIO, 

310 total: int, 

311 *, 

312 description: str = "Reading...", 

313 auto_refresh: bool = True, 

314 console: Optional[Console] = None, 

315 transient: bool = False, 

316 get_time: Optional[Callable[[], float]] = None, 

317 refresh_per_second: float = 10, 

318 style: StyleType = "bar.back", 

319 complete_style: StyleType = "bar.complete", 

320 finished_style: StyleType = "bar.finished", 

321 pulse_style: StyleType = "bar.pulse", 

322 disable: bool = False, 

323) -> ContextManager[BinaryIO]: 

324 """Read bytes from a file while tracking progress. 

325 

326 Args: 

327 file (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. 

328 total (int): Total number of bytes to read. 

329 description (str, optional): Description of task show next to progress bar. Defaults to "Reading". 

330 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. 

331 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

332 console (Console, optional): Console to write to. Default creates internal Console instance. 

333 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. 

334 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

335 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

336 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

337 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

338 disable (bool, optional): Disable display of progress. 

339 Returns: 

340 ContextManager[BinaryIO]: A context manager yielding a progress reader. 

341 

342 """ 

343 

344 columns: List["ProgressColumn"] = ( 

345 [TextColumn("[progress.description]{task.description}")] if description else [] 

346 ) 

347 columns.extend( 

348 ( 

349 BarColumn( 

350 style=style, 

351 complete_style=complete_style, 

352 finished_style=finished_style, 

353 pulse_style=pulse_style, 

354 ), 

355 DownloadColumn(), 

356 TimeRemainingColumn(), 

357 ) 

358 ) 

359 progress = Progress( 

360 *columns, 

361 auto_refresh=auto_refresh, 

362 console=console, 

363 transient=transient, 

364 get_time=get_time, 

365 refresh_per_second=refresh_per_second or 10, 

366 disable=disable, 

367 ) 

368 

369 reader = progress.wrap_file(file, total=total, description=description) 

370 return _ReadContext(progress, reader) 

371 

372 

373@typing.overload 

374def open( 

375 file: Union[str, "PathLike[str]", bytes], 

376 mode: Union[Literal["rt"], Literal["r"]], 

377 buffering: int = -1, 

378 encoding: Optional[str] = None, 

379 errors: Optional[str] = None, 

380 newline: Optional[str] = None, 

381 *, 

382 total: Optional[int] = None, 

383 description: str = "Reading...", 

384 auto_refresh: bool = True, 

385 console: Optional[Console] = None, 

386 transient: bool = False, 

387 get_time: Optional[Callable[[], float]] = None, 

388 refresh_per_second: float = 10, 

389 style: StyleType = "bar.back", 

390 complete_style: StyleType = "bar.complete", 

391 finished_style: StyleType = "bar.finished", 

392 pulse_style: StyleType = "bar.pulse", 

393 disable: bool = False, 

394) -> ContextManager[TextIO]: 

395 pass 

396 

397 

398@typing.overload 

399def open( 

400 file: Union[str, "PathLike[str]", bytes], 

401 mode: Literal["rb"], 

402 buffering: int = -1, 

403 encoding: Optional[str] = None, 

404 errors: Optional[str] = None, 

405 newline: Optional[str] = None, 

406 *, 

407 total: Optional[int] = None, 

408 description: str = "Reading...", 

409 auto_refresh: bool = True, 

410 console: Optional[Console] = None, 

411 transient: bool = False, 

412 get_time: Optional[Callable[[], float]] = None, 

413 refresh_per_second: float = 10, 

414 style: StyleType = "bar.back", 

415 complete_style: StyleType = "bar.complete", 

416 finished_style: StyleType = "bar.finished", 

417 pulse_style: StyleType = "bar.pulse", 

418 disable: bool = False, 

419) -> ContextManager[BinaryIO]: 

420 pass 

421 

422 

423def open( 

424 file: Union[str, "PathLike[str]", bytes], 

425 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", 

426 buffering: int = -1, 

427 encoding: Optional[str] = None, 

428 errors: Optional[str] = None, 

429 newline: Optional[str] = None, 

430 *, 

431 total: Optional[int] = None, 

432 description: str = "Reading...", 

433 auto_refresh: bool = True, 

434 console: Optional[Console] = None, 

435 transient: bool = False, 

436 get_time: Optional[Callable[[], float]] = None, 

437 refresh_per_second: float = 10, 

438 style: StyleType = "bar.back", 

439 complete_style: StyleType = "bar.complete", 

440 finished_style: StyleType = "bar.finished", 

441 pulse_style: StyleType = "bar.pulse", 

442 disable: bool = False, 

443) -> Union[ContextManager[BinaryIO], ContextManager[TextIO]]: 

444 """Read bytes from a file while tracking progress. 

445 

446 Args: 

447 path (Union[str, PathLike[str], BinaryIO]): The path to the file to read, or a file-like object in binary mode. 

448 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". 

449 buffering (int): The buffering strategy to use, see :func:`io.open`. 

450 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. 

451 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. 

452 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open` 

453 total: (int, optional): Total number of bytes to read. Must be provided if reading from a file handle. Default for a path is os.stat(file).st_size. 

454 description (str, optional): Description of task show next to progress bar. Defaults to "Reading". 

455 auto_refresh (bool, optional): Automatic refresh, disable to force a refresh after each iteration. Default is True. 

456 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

457 console (Console, optional): Console to write to. Default creates internal Console instance. 

458 refresh_per_second (float): Number of times per second to refresh the progress information. Defaults to 10. 

459 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

460 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

461 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

462 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

463 disable (bool, optional): Disable display of progress. 

464 encoding (str, optional): The encoding to use when reading in text mode. 

465 

466 Returns: 

467 ContextManager[BinaryIO]: A context manager yielding a progress reader. 

468 

469 """ 

470 

471 columns: List["ProgressColumn"] = ( 

472 [TextColumn("[progress.description]{task.description}")] if description else [] 

473 ) 

474 columns.extend( 

475 ( 

476 BarColumn( 

477 style=style, 

478 complete_style=complete_style, 

479 finished_style=finished_style, 

480 pulse_style=pulse_style, 

481 ), 

482 DownloadColumn(), 

483 TimeRemainingColumn(), 

484 ) 

485 ) 

486 progress = Progress( 

487 *columns, 

488 auto_refresh=auto_refresh, 

489 console=console, 

490 transient=transient, 

491 get_time=get_time, 

492 refresh_per_second=refresh_per_second or 10, 

493 disable=disable, 

494 ) 

495 

496 reader = progress.open( 

497 file, 

498 mode=mode, 

499 buffering=buffering, 

500 encoding=encoding, 

501 errors=errors, 

502 newline=newline, 

503 total=total, 

504 description=description, 

505 ) 

506 return _ReadContext(progress, reader) # type: ignore[return-value, type-var] 

507 

508 

509class ProgressColumn(ABC): 

510 """Base class for a widget to use in progress display.""" 

511 

512 max_refresh: Optional[float] = None 

513 

514 def __init__(self, table_column: Optional[Column] = None) -> None: 

515 self._table_column = table_column 

516 self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {} 

517 self._update_time: Optional[float] = None 

518 

519 def get_table_column(self) -> Column: 

520 """Get a table column, used to build tasks table.""" 

521 return self._table_column or Column() 

522 

523 def __call__(self, task: "Task") -> RenderableType: 

524 """Called by the Progress object to return a renderable for the given task. 

525 

526 Args: 

527 task (Task): An object containing information regarding the task. 

528 

529 Returns: 

530 RenderableType: Anything renderable (including str). 

531 """ 

532 current_time = task.get_time() 

533 if self.max_refresh is not None and not task.completed: 

534 try: 

535 timestamp, renderable = self._renderable_cache[task.id] 

536 except KeyError: 

537 pass 

538 else: 

539 if timestamp + self.max_refresh > current_time: 

540 return renderable 

541 

542 renderable = self.render(task) 

543 self._renderable_cache[task.id] = (current_time, renderable) 

544 return renderable 

545 

546 @abstractmethod 

547 def render(self, task: "Task") -> RenderableType: 

548 """Should return a renderable object.""" 

549 

550 

551class RenderableColumn(ProgressColumn): 

552 """A column to insert an arbitrary column. 

553 

554 Args: 

555 renderable (RenderableType, optional): Any renderable. Defaults to empty string. 

556 """ 

557 

558 def __init__( 

559 self, renderable: RenderableType = "", *, table_column: Optional[Column] = None 

560 ): 

561 self.renderable = renderable 

562 super().__init__(table_column=table_column) 

563 

564 def render(self, task: "Task") -> RenderableType: 

565 return self.renderable 

566 

567 

568class SpinnerColumn(ProgressColumn): 

569 """A column with a 'spinner' animation. 

570 

571 Args: 

572 spinner_name (str, optional): Name of spinner animation. Defaults to "dots". 

573 style (StyleType, optional): Style of spinner. Defaults to "progress.spinner". 

574 speed (float, optional): Speed factor of spinner. Defaults to 1.0. 

575 finished_text (TextType, optional): Text used when task is finished. Defaults to " ". 

576 """ 

577 

578 def __init__( 

579 self, 

580 spinner_name: str = "dots", 

581 style: Optional[StyleType] = "progress.spinner", 

582 speed: float = 1.0, 

583 finished_text: TextType = " ", 

584 table_column: Optional[Column] = None, 

585 ): 

586 self.spinner = Spinner(spinner_name, style=style, speed=speed) 

587 self.finished_text = ( 

588 Text.from_markup(finished_text) 

589 if isinstance(finished_text, str) 

590 else finished_text 

591 ) 

592 super().__init__(table_column=table_column) 

593 

594 def set_spinner( 

595 self, 

596 spinner_name: str, 

597 spinner_style: Optional[StyleType] = "progress.spinner", 

598 speed: float = 1.0, 

599 ) -> None: 

600 """Set a new spinner. 

601 

602 Args: 

603 spinner_name (str): Spinner name, see python -m rich.spinner. 

604 spinner_style (Optional[StyleType], optional): Spinner style. Defaults to "progress.spinner". 

605 speed (float, optional): Speed factor of spinner. Defaults to 1.0. 

606 """ 

607 self.spinner = Spinner(spinner_name, style=spinner_style, speed=speed) 

608 

609 def render(self, task: "Task") -> RenderableType: 

610 text = ( 

611 self.finished_text 

612 if task.finished 

613 else self.spinner.render(task.get_time()) 

614 ) 

615 return text 

616 

617 

618class TextColumn(ProgressColumn): 

619 """A column containing text.""" 

620 

621 def __init__( 

622 self, 

623 text_format: str, 

624 style: StyleType = "none", 

625 justify: JustifyMethod = "left", 

626 markup: bool = True, 

627 highlighter: Optional[Highlighter] = None, 

628 table_column: Optional[Column] = None, 

629 ) -> None: 

630 self.text_format = text_format 

631 self.justify: JustifyMethod = justify 

632 self.style = style 

633 self.markup = markup 

634 self.highlighter = highlighter 

635 super().__init__(table_column=table_column or Column(no_wrap=True)) 

636 

637 def render(self, task: "Task") -> Text: 

638 _text = self.text_format.format(task=task) 

639 if self.markup: 

640 text = Text.from_markup(_text, style=self.style, justify=self.justify) 

641 else: 

642 text = Text(_text, style=self.style, justify=self.justify) 

643 if self.highlighter: 

644 self.highlighter.highlight(text) 

645 return text 

646 

647 

648class BarColumn(ProgressColumn): 

649 """Renders a visual progress bar. 

650 

651 Args: 

652 bar_width (Optional[int], optional): Width of bar or None for full width. Defaults to 40. 

653 style (StyleType, optional): Style for the bar background. Defaults to "bar.back". 

654 complete_style (StyleType, optional): Style for the completed bar. Defaults to "bar.complete". 

655 finished_style (StyleType, optional): Style for a finished bar. Defaults to "bar.finished". 

656 pulse_style (StyleType, optional): Style for pulsing bars. Defaults to "bar.pulse". 

657 """ 

658 

659 def __init__( 

660 self, 

661 bar_width: Optional[int] = 40, 

662 style: StyleType = "bar.back", 

663 complete_style: StyleType = "bar.complete", 

664 finished_style: StyleType = "bar.finished", 

665 pulse_style: StyleType = "bar.pulse", 

666 table_column: Optional[Column] = None, 

667 ) -> None: 

668 self.bar_width = bar_width 

669 self.style = style 

670 self.complete_style = complete_style 

671 self.finished_style = finished_style 

672 self.pulse_style = pulse_style 

673 super().__init__(table_column=table_column) 

674 

675 def render(self, task: "Task") -> ProgressBar: 

676 """Gets a progress bar widget for a task.""" 

677 return ProgressBar( 

678 total=max(0, task.total) if task.total is not None else None, 

679 completed=max(0, task.completed), 

680 width=None if self.bar_width is None else max(1, self.bar_width), 

681 pulse=not task.started, 

682 animation_time=task.get_time(), 

683 style=self.style, 

684 complete_style=self.complete_style, 

685 finished_style=self.finished_style, 

686 pulse_style=self.pulse_style, 

687 ) 

688 

689 

690class TimeElapsedColumn(ProgressColumn): 

691 """Renders time elapsed.""" 

692 

693 def render(self, task: "Task") -> Text: 

694 """Show time elapsed.""" 

695 elapsed = task.finished_time if task.finished else task.elapsed 

696 if elapsed is None: 

697 return Text("-:--:--", style="progress.elapsed") 

698 delta = timedelta(seconds=max(0, int(elapsed))) 

699 return Text(str(delta), style="progress.elapsed") 

700 

701 

702class TaskProgressColumn(TextColumn): 

703 """Show task progress as a percentage. 

704 

705 Args: 

706 text_format (str, optional): Format for percentage display. Defaults to "[progress.percentage]{task.percentage:>3.0f}%". 

707 text_format_no_percentage (str, optional): Format if percentage is unknown. Defaults to "". 

708 style (StyleType, optional): Style of output. Defaults to "none". 

709 justify (JustifyMethod, optional): Text justification. Defaults to "left". 

710 markup (bool, optional): Enable markup. Defaults to True. 

711 highlighter (Optional[Highlighter], optional): Highlighter to apply to output. Defaults to None. 

712 table_column (Optional[Column], optional): Table Column to use. Defaults to None. 

713 show_speed (bool, optional): Show speed if total is unknown. Defaults to False. 

714 """ 

715 

716 def __init__( 

717 self, 

718 text_format: str = "[progress.percentage]{task.percentage:>3.0f}%", 

719 text_format_no_percentage: str = "", 

720 style: StyleType = "none", 

721 justify: JustifyMethod = "left", 

722 markup: bool = True, 

723 highlighter: Optional[Highlighter] = None, 

724 table_column: Optional[Column] = None, 

725 show_speed: bool = False, 

726 ) -> None: 

727 self.text_format_no_percentage = text_format_no_percentage 

728 self.show_speed = show_speed 

729 super().__init__( 

730 text_format=text_format, 

731 style=style, 

732 justify=justify, 

733 markup=markup, 

734 highlighter=highlighter, 

735 table_column=table_column, 

736 ) 

737 

738 @classmethod 

739 def render_speed(cls, speed: Optional[float]) -> Text: 

740 """Render the speed in iterations per second. 

741 

742 Args: 

743 task (Task): A Task object. 

744 

745 Returns: 

746 Text: Text object containing the task speed. 

747 """ 

748 if speed is None: 

749 return Text("", style="progress.percentage") 

750 unit, suffix = filesize.pick_unit_and_suffix( 

751 int(speed), 

752 ["", "×10³", "×10⁶", "×10⁹", "×10¹²"], 

753 1000, 

754 ) 

755 data_speed = speed / unit 

756 return Text(f"{data_speed:.1f}{suffix} it/s", style="progress.percentage") 

757 

758 def render(self, task: "Task") -> Text: 

759 if task.total is None and self.show_speed: 

760 return self.render_speed(task.finished_speed or task.speed) 

761 text_format = ( 

762 self.text_format_no_percentage if task.total is None else self.text_format 

763 ) 

764 _text = text_format.format(task=task) 

765 if self.markup: 

766 text = Text.from_markup(_text, style=self.style, justify=self.justify) 

767 else: 

768 text = Text(_text, style=self.style, justify=self.justify) 

769 if self.highlighter: 

770 self.highlighter.highlight(text) 

771 return text 

772 

773 

774class TimeRemainingColumn(ProgressColumn): 

775 """Renders estimated time remaining. 

776 

777 Args: 

778 compact (bool, optional): Render MM:SS when time remaining is less than an hour. Defaults to False. 

779 elapsed_when_finished (bool, optional): Render time elapsed when the task is finished. Defaults to False. 

780 """ 

781 

782 # Only refresh twice a second to prevent jitter 

783 max_refresh = 0.5 

784 

785 def __init__( 

786 self, 

787 compact: bool = False, 

788 elapsed_when_finished: bool = False, 

789 table_column: Optional[Column] = None, 

790 ): 

791 self.compact = compact 

792 self.elapsed_when_finished = elapsed_when_finished 

793 super().__init__(table_column=table_column) 

794 

795 def render(self, task: "Task") -> Text: 

796 """Show time remaining.""" 

797 if self.elapsed_when_finished and task.finished: 

798 task_time = task.finished_time 

799 style = "progress.elapsed" 

800 else: 

801 task_time = task.time_remaining 

802 style = "progress.remaining" 

803 

804 if task.total is None: 

805 return Text("", style=style) 

806 

807 if task_time is None: 

808 return Text("--:--" if self.compact else "-:--:--", style=style) 

809 

810 # Based on https://github.com/tqdm/tqdm/blob/master/tqdm/std.py 

811 minutes, seconds = divmod(int(task_time), 60) 

812 hours, minutes = divmod(minutes, 60) 

813 

814 if self.compact and not hours: 

815 formatted = f"{minutes:02d}:{seconds:02d}" 

816 else: 

817 formatted = f"{hours:d}:{minutes:02d}:{seconds:02d}" 

818 

819 return Text(formatted, style=style) 

820 

821 

822class FileSizeColumn(ProgressColumn): 

823 """Renders completed filesize.""" 

824 

825 def render(self, task: "Task") -> Text: 

826 """Show data completed.""" 

827 data_size = filesize.decimal(int(task.completed)) 

828 return Text(data_size, style="progress.filesize") 

829 

830 

831class TotalFileSizeColumn(ProgressColumn): 

832 """Renders total filesize.""" 

833 

834 def render(self, task: "Task") -> Text: 

835 """Show data completed.""" 

836 data_size = filesize.decimal(int(task.total)) if task.total is not None else "" 

837 return Text(data_size, style="progress.filesize.total") 

838 

839 

840class MofNCompleteColumn(ProgressColumn): 

841 """Renders completed count/total, e.g. ' 10/1000'. 

842 

843 Best for bounded tasks with int quantities. 

844 

845 Space pads the completed count so that progress length does not change as task progresses 

846 past powers of 10. 

847 

848 Args: 

849 separator (str, optional): Text to separate completed and total values. Defaults to "/". 

850 """ 

851 

852 def __init__(self, separator: str = "/", table_column: Optional[Column] = None): 

853 self.separator = separator 

854 super().__init__(table_column=table_column) 

855 

856 def render(self, task: "Task") -> Text: 

857 """Show completed/total.""" 

858 completed = int(task.completed) 

859 total = int(task.total) if task.total is not None else "?" 

860 total_width = len(str(total)) 

861 return Text( 

862 f"{completed:{total_width}d}{self.separator}{total}", 

863 style="progress.download", 

864 ) 

865 

866 

867class DownloadColumn(ProgressColumn): 

868 """Renders file size downloaded and total, e.g. '0.5/2.3 GB'. 

869 

870 Args: 

871 binary_units (bool, optional): Use binary units, KiB, MiB etc. Defaults to False. 

872 """ 

873 

874 def __init__( 

875 self, binary_units: bool = False, table_column: Optional[Column] = None 

876 ) -> None: 

877 self.binary_units = binary_units 

878 super().__init__(table_column=table_column) 

879 

880 def render(self, task: "Task") -> Text: 

881 """Calculate common unit for completed and total.""" 

882 completed = int(task.completed) 

883 

884 unit_and_suffix_calculation_base = ( 

885 int(task.total) if task.total is not None else completed 

886 ) 

887 if self.binary_units: 

888 unit, suffix = filesize.pick_unit_and_suffix( 

889 unit_and_suffix_calculation_base, 

890 ["bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"], 

891 1024, 

892 ) 

893 else: 

894 unit, suffix = filesize.pick_unit_and_suffix( 

895 unit_and_suffix_calculation_base, 

896 ["bytes", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"], 

897 1000, 

898 ) 

899 precision = 0 if unit == 1 else 1 

900 

901 completed_ratio = completed / unit 

902 completed_str = f"{completed_ratio:,.{precision}f}" 

903 

904 if task.total is not None: 

905 total = int(task.total) 

906 total_ratio = total / unit 

907 total_str = f"{total_ratio:,.{precision}f}" 

908 else: 

909 total_str = "?" 

910 

911 download_status = f"{completed_str}/{total_str} {suffix}" 

912 download_text = Text(download_status, style="progress.download") 

913 return download_text 

914 

915 

916class TransferSpeedColumn(ProgressColumn): 

917 """Renders human readable transfer speed.""" 

918 

919 def render(self, task: "Task") -> Text: 

920 """Show data transfer speed.""" 

921 speed = task.finished_speed or task.speed 

922 if speed is None: 

923 return Text("?", style="progress.data.speed") 

924 data_speed = filesize.decimal(int(speed)) 

925 return Text(f"{data_speed}/s", style="progress.data.speed") 

926 

927 

928class ProgressSample(NamedTuple): 

929 """Sample of progress for a given time.""" 

930 

931 timestamp: float 

932 """Timestamp of sample.""" 

933 completed: float 

934 """Number of steps completed.""" 

935 

936 

937@dataclass 

938class Task: 

939 """Information regarding a progress task. 

940 

941 This object should be considered read-only outside of the :class:`~Progress` class. 

942 

943 """ 

944 

945 id: TaskID 

946 """Task ID associated with this task (used in Progress methods).""" 

947 

948 description: str 

949 """str: Description of the task.""" 

950 

951 total: Optional[float] 

952 """Optional[float]: Total number of steps in this task.""" 

953 

954 completed: float 

955 """float: Number of steps completed""" 

956 

957 _get_time: GetTimeCallable 

958 """Callable to get the current time.""" 

959 

960 finished_time: Optional[float] = None 

961 """float: Time task was finished.""" 

962 

963 visible: bool = True 

964 """bool: Indicates if this task is visible in the progress display.""" 

965 

966 fields: Dict[str, Any] = field(default_factory=dict) 

967 """dict: Arbitrary fields passed in via Progress.update.""" 

968 

969 start_time: Optional[float] = field(default=None, init=False, repr=False) 

970 """Optional[float]: Time this task was started, or None if not started.""" 

971 

972 stop_time: Optional[float] = field(default=None, init=False, repr=False) 

973 """Optional[float]: Time this task was stopped, or None if not stopped.""" 

974 

975 finished_speed: Optional[float] = None 

976 """Optional[float]: The last speed for a finished task.""" 

977 

978 _progress: Deque[ProgressSample] = field( 

979 default_factory=lambda: deque(maxlen=1000), init=False, repr=False 

980 ) 

981 

982 _lock: RLock = field(repr=False, default_factory=RLock) 

983 """Thread lock.""" 

984 

985 def get_time(self) -> float: 

986 """float: Get the current time, in seconds.""" 

987 return self._get_time() 

988 

989 @property 

990 def started(self) -> bool: 

991 """bool: Check if the task as started.""" 

992 return self.start_time is not None 

993 

994 @property 

995 def remaining(self) -> Optional[float]: 

996 """Optional[float]: Get the number of steps remaining, if a non-None total was set.""" 

997 if self.total is None: 

998 return None 

999 return self.total - self.completed 

1000 

1001 @property 

1002 def elapsed(self) -> Optional[float]: 

1003 """Optional[float]: Time elapsed since task was started, or ``None`` if the task hasn't started.""" 

1004 if self.start_time is None: 

1005 return None 

1006 if self.stop_time is not None: 

1007 return self.stop_time - self.start_time 

1008 return self.get_time() - self.start_time 

1009 

1010 @property 

1011 def finished(self) -> bool: 

1012 """Check if the task has finished.""" 

1013 return self.finished_time is not None 

1014 

1015 @property 

1016 def percentage(self) -> float: 

1017 """float: Get progress of task as a percentage. If a None total was set, returns 0""" 

1018 if not self.total: 

1019 return 0.0 

1020 completed = (self.completed / self.total) * 100.0 

1021 completed = min(100.0, max(0.0, completed)) 

1022 return completed 

1023 

1024 @property 

1025 def speed(self) -> Optional[float]: 

1026 """Optional[float]: Get the estimated speed in steps per second.""" 

1027 if self.start_time is None: 

1028 return None 

1029 with self._lock: 

1030 progress = self._progress 

1031 if not progress: 

1032 return None 

1033 total_time = progress[-1].timestamp - progress[0].timestamp 

1034 if total_time == 0: 

1035 return None 

1036 iter_progress = iter(progress) 

1037 next(iter_progress) 

1038 total_completed = sum(sample.completed for sample in iter_progress) 

1039 speed = total_completed / total_time 

1040 return speed 

1041 

1042 @property 

1043 def time_remaining(self) -> Optional[float]: 

1044 """Optional[float]: Get estimated time to completion, or ``None`` if no data.""" 

1045 if self.finished: 

1046 return 0.0 

1047 speed = self.speed 

1048 if not speed: 

1049 return None 

1050 remaining = self.remaining 

1051 if remaining is None: 

1052 return None 

1053 estimate = ceil(remaining / speed) 

1054 return estimate 

1055 

1056 def _reset(self) -> None: 

1057 """Reset progress.""" 

1058 self._progress.clear() 

1059 self.finished_time = None 

1060 self.finished_speed = None 

1061 

1062 

1063class Progress(JupyterMixin): 

1064 """Renders an auto-updating progress bar(s). 

1065 

1066 Args: 

1067 console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout. 

1068 auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`. 

1069 refresh_per_second (Optional[float], optional): Number of times per second to refresh the progress information or None to use default (10). Defaults to None. 

1070 speed_estimate_period: (float, optional): Period (in seconds) used to calculate the speed estimate. Defaults to 30. 

1071 transient: (bool, optional): Clear the progress on exit. Defaults to False. 

1072 redirect_stdout: (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True. 

1073 redirect_stderr: (bool, optional): Enable redirection of stderr. Defaults to True. 

1074 get_time: (Callable, optional): A callable that gets the current time, or None to use Console.get_time. Defaults to None. 

1075 disable (bool, optional): Disable progress display. Defaults to False 

1076 expand (bool, optional): Expand tasks table to fit width. Defaults to False. 

1077 """ 

1078 

1079 def __init__( 

1080 self, 

1081 *columns: Union[str, ProgressColumn], 

1082 console: Optional[Console] = None, 

1083 auto_refresh: bool = True, 

1084 refresh_per_second: float = 10, 

1085 speed_estimate_period: float = 30.0, 

1086 transient: bool = False, 

1087 redirect_stdout: bool = True, 

1088 redirect_stderr: bool = True, 

1089 get_time: Optional[GetTimeCallable] = None, 

1090 disable: bool = False, 

1091 expand: bool = False, 

1092 ) -> None: 

1093 assert refresh_per_second > 0, "refresh_per_second must be > 0" 

1094 self._lock = RLock() 

1095 self.columns = columns or self.get_default_columns() 

1096 self.speed_estimate_period = speed_estimate_period 

1097 

1098 self.disable = disable 

1099 self.expand = expand 

1100 self._tasks: Dict[TaskID, Task] = {} 

1101 self._task_index: TaskID = TaskID(0) 

1102 self.live = Live( 

1103 console=console or get_console(), 

1104 auto_refresh=auto_refresh, 

1105 refresh_per_second=refresh_per_second, 

1106 transient=transient, 

1107 redirect_stdout=redirect_stdout, 

1108 redirect_stderr=redirect_stderr, 

1109 get_renderable=self.get_renderable, 

1110 ) 

1111 self.get_time = get_time or self.console.get_time 

1112 self.print = self.console.print 

1113 self.log = self.console.log 

1114 

1115 @classmethod 

1116 def get_default_columns(cls) -> Tuple[ProgressColumn, ...]: 

1117 """Get the default columns used for a new Progress instance: 

1118 - a text column for the description (TextColumn) 

1119 - the bar itself (BarColumn) 

1120 - a text column showing completion percentage (TextColumn) 

1121 - an estimated-time-remaining column (TimeRemainingColumn) 

1122 If the Progress instance is created without passing a columns argument, 

1123 the default columns defined here will be used. 

1124 

1125 You can also create a Progress instance using custom columns before 

1126 and/or after the defaults, as in this example: 

1127 

1128 progress = Progress( 

1129 SpinnerColumn(), 

1130 *Progress.get_default_columns(), 

1131 "Elapsed:", 

1132 TimeElapsedColumn(), 

1133 ) 

1134 

1135 This code shows the creation of a Progress display, containing 

1136 a spinner to the left, the default columns, and a labeled elapsed 

1137 time column. 

1138 """ 

1139 return ( 

1140 TextColumn("[progress.description]{task.description}"), 

1141 BarColumn(), 

1142 TaskProgressColumn(), 

1143 TimeRemainingColumn(), 

1144 ) 

1145 

1146 @property 

1147 def console(self) -> Console: 

1148 return self.live.console 

1149 

1150 @property 

1151 def tasks(self) -> List[Task]: 

1152 """Get a list of Task instances.""" 

1153 with self._lock: 

1154 return list(self._tasks.values()) 

1155 

1156 @property 

1157 def task_ids(self) -> List[TaskID]: 

1158 """A list of task IDs.""" 

1159 with self._lock: 

1160 return list(self._tasks.keys()) 

1161 

1162 @property 

1163 def finished(self) -> bool: 

1164 """Check if all tasks have been completed.""" 

1165 with self._lock: 

1166 if not self._tasks: 

1167 return True 

1168 return all(task.finished for task in self._tasks.values()) 

1169 

1170 def start(self) -> None: 

1171 """Start the progress display.""" 

1172 if not self.disable: 

1173 self.live.start(refresh=True) 

1174 

1175 def stop(self) -> None: 

1176 """Stop the progress display.""" 

1177 self.live.stop() 

1178 if not self.console.is_interactive and not self.console.is_jupyter: 

1179 self.console.print() 

1180 

1181 def __enter__(self) -> Self: 

1182 self.start() 

1183 return self 

1184 

1185 def __exit__( 

1186 self, 

1187 exc_type: Optional[Type[BaseException]], 

1188 exc_val: Optional[BaseException], 

1189 exc_tb: Optional[TracebackType], 

1190 ) -> None: 

1191 self.stop() 

1192 

1193 def track( 

1194 self, 

1195 sequence: Union[Iterable[ProgressType], Sequence[ProgressType]], 

1196 total: Optional[float] = None, 

1197 completed: int = 0, 

1198 task_id: Optional[TaskID] = None, 

1199 description: str = "Working...", 

1200 update_period: float = 0.1, 

1201 ) -> Iterable[ProgressType]: 

1202 """Track progress by iterating over a sequence. 

1203 

1204 Args: 

1205 sequence (Sequence[ProgressType]): A sequence of values you want to iterate over and track progress. 

1206 total: (float, optional): Total number of steps. Default is len(sequence). 

1207 completed (int, optional): Number of steps completed so far. Defaults to 0. 

1208 task_id: (TaskID): Task to track. Default is new task. 

1209 description: (str, optional): Description of task, if new task is created. 

1210 update_period (float, optional): Minimum time (in seconds) between calls to update(). Defaults to 0.1. 

1211 

1212 Returns: 

1213 Iterable[ProgressType]: An iterable of values taken from the provided sequence. 

1214 """ 

1215 if total is None: 

1216 total = float(length_hint(sequence)) or None 

1217 

1218 if task_id is None: 

1219 task_id = self.add_task(description, total=total, completed=completed) 

1220 else: 

1221 self.update(task_id, total=total, completed=completed) 

1222 

1223 if self.live.auto_refresh: 

1224 with _TrackThread(self, task_id, update_period) as track_thread: 

1225 for value in sequence: 

1226 yield value 

1227 track_thread.completed += 1 

1228 else: 

1229 advance = self.advance 

1230 refresh = self.refresh 

1231 for value in sequence: 

1232 yield value 

1233 advance(task_id, 1) 

1234 refresh() 

1235 

1236 def wrap_file( 

1237 self, 

1238 file: BinaryIO, 

1239 total: Optional[int] = None, 

1240 *, 

1241 task_id: Optional[TaskID] = None, 

1242 description: str = "Reading...", 

1243 ) -> BinaryIO: 

1244 """Track progress file reading from a binary file. 

1245 

1246 Args: 

1247 file (BinaryIO): A file-like object opened in binary mode. 

1248 total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given. 

1249 task_id (TaskID): Task to track. Default is new task. 

1250 description (str, optional): Description of task, if new task is created. 

1251 

1252 Returns: 

1253 BinaryIO: A readable file-like object in binary mode. 

1254 

1255 Raises: 

1256 ValueError: When no total value can be extracted from the arguments or the task. 

1257 """ 

1258 # attempt to recover the total from the task 

1259 total_bytes: Optional[float] = None 

1260 if total is not None: 

1261 total_bytes = total 

1262 elif task_id is not None: 

1263 with self._lock: 

1264 total_bytes = self._tasks[task_id].total 

1265 if total_bytes is None: 

1266 raise ValueError( 

1267 f"unable to get the total number of bytes, please specify 'total'" 

1268 ) 

1269 

1270 # update total of task or create new task 

1271 if task_id is None: 

1272 task_id = self.add_task(description, total=total_bytes) 

1273 else: 

1274 self.update(task_id, total=total_bytes) 

1275 

1276 return _Reader(file, self, task_id, close_handle=False) 

1277 

1278 @typing.overload 

1279 def open( 

1280 self, 

1281 file: Union[str, "PathLike[str]", bytes], 

1282 mode: Literal["rb"], 

1283 buffering: int = -1, 

1284 encoding: Optional[str] = None, 

1285 errors: Optional[str] = None, 

1286 newline: Optional[str] = None, 

1287 *, 

1288 total: Optional[int] = None, 

1289 task_id: Optional[TaskID] = None, 

1290 description: str = "Reading...", 

1291 ) -> BinaryIO: 

1292 pass 

1293 

1294 @typing.overload 

1295 def open( 

1296 self, 

1297 file: Union[str, "PathLike[str]", bytes], 

1298 mode: Union[Literal["r"], Literal["rt"]], 

1299 buffering: int = -1, 

1300 encoding: Optional[str] = None, 

1301 errors: Optional[str] = None, 

1302 newline: Optional[str] = None, 

1303 *, 

1304 total: Optional[int] = None, 

1305 task_id: Optional[TaskID] = None, 

1306 description: str = "Reading...", 

1307 ) -> TextIO: 

1308 pass 

1309 

1310 def open( 

1311 self, 

1312 file: Union[str, "PathLike[str]", bytes], 

1313 mode: Union[Literal["rb"], Literal["rt"], Literal["r"]] = "r", 

1314 buffering: int = -1, 

1315 encoding: Optional[str] = None, 

1316 errors: Optional[str] = None, 

1317 newline: Optional[str] = None, 

1318 *, 

1319 total: Optional[int] = None, 

1320 task_id: Optional[TaskID] = None, 

1321 description: str = "Reading...", 

1322 ) -> Union[BinaryIO, TextIO]: 

1323 """Track progress while reading from a binary file. 

1324 

1325 Args: 

1326 path (Union[str, PathLike[str]]): The path to the file to read. 

1327 mode (str): The mode to use to open the file. Only supports "r", "rb" or "rt". 

1328 buffering (int): The buffering strategy to use, see :func:`io.open`. 

1329 encoding (str, optional): The encoding to use when reading in text mode, see :func:`io.open`. 

1330 errors (str, optional): The error handling strategy for decoding errors, see :func:`io.open`. 

1331 newline (str, optional): The strategy for handling newlines in text mode, see :func:`io.open`. 

1332 total (int, optional): Total number of bytes to read. If none given, os.stat(path).st_size is used. 

1333 task_id (TaskID): Task to track. Default is new task. 

1334 description (str, optional): Description of task, if new task is created. 

1335 

1336 Returns: 

1337 BinaryIO: A readable file-like object in binary mode. 

1338 

1339 Raises: 

1340 ValueError: When an invalid mode is given. 

1341 """ 

1342 # normalize the mode (always rb, rt) 

1343 _mode = "".join(sorted(mode, reverse=False)) 

1344 if _mode not in ("br", "rt", "r"): 

1345 raise ValueError(f"invalid mode {mode!r}") 

1346 

1347 # patch buffering to provide the same behaviour as the builtin `open` 

1348 line_buffering = buffering == 1 

1349 if _mode == "br" and buffering == 1: 

1350 warnings.warn( 

1351 "line buffering (buffering=1) isn't supported in binary mode, the default buffer size will be used", 

1352 RuntimeWarning, 

1353 ) 

1354 buffering = -1 

1355 elif _mode in ("rt", "r"): 

1356 if buffering == 0: 

1357 raise ValueError("can't have unbuffered text I/O") 

1358 elif buffering == 1: 

1359 buffering = -1 

1360 

1361 # attempt to get the total with `os.stat` 

1362 if total is None: 

1363 total = stat(file).st_size 

1364 

1365 # update total of task or create new task 

1366 if task_id is None: 

1367 task_id = self.add_task(description, total=total) 

1368 else: 

1369 self.update(task_id, total=total) 

1370 

1371 # open the file in binary mode, 

1372 handle = io.open(file, "rb", buffering=buffering) 

1373 reader = _Reader(handle, self, task_id, close_handle=True) 

1374 

1375 # wrap the reader in a `TextIOWrapper` if text mode 

1376 if mode in ("r", "rt"): 

1377 return io.TextIOWrapper( 

1378 reader, 

1379 encoding=encoding, 

1380 errors=errors, 

1381 newline=newline, 

1382 line_buffering=line_buffering, 

1383 ) 

1384 

1385 return reader 

1386 

1387 def start_task(self, task_id: TaskID) -> None: 

1388 """Start a task. 

1389 

1390 Starts a task (used when calculating elapsed time). You may need to call this manually, 

1391 if you called ``add_task`` with ``start=False``. 

1392 

1393 Args: 

1394 task_id (TaskID): ID of task. 

1395 """ 

1396 with self._lock: 

1397 task = self._tasks[task_id] 

1398 if task.start_time is None: 

1399 task.start_time = self.get_time() 

1400 

1401 def stop_task(self, task_id: TaskID) -> None: 

1402 """Stop a task. 

1403 

1404 This will freeze the elapsed time on the task. 

1405 

1406 Args: 

1407 task_id (TaskID): ID of task. 

1408 """ 

1409 with self._lock: 

1410 task = self._tasks[task_id] 

1411 current_time = self.get_time() 

1412 if task.start_time is None: 

1413 task.start_time = current_time 

1414 task.stop_time = current_time 

1415 

1416 def update( 

1417 self, 

1418 task_id: TaskID, 

1419 *, 

1420 total: Optional[float] = None, 

1421 completed: Optional[float] = None, 

1422 advance: Optional[float] = None, 

1423 description: Optional[str] = None, 

1424 visible: Optional[bool] = None, 

1425 refresh: bool = False, 

1426 **fields: Any, 

1427 ) -> None: 

1428 """Update information associated with a task. 

1429 

1430 Args: 

1431 task_id (TaskID): Task id (returned by add_task). 

1432 total (float, optional): Updates task.total if not None. 

1433 completed (float, optional): Updates task.completed if not None. 

1434 advance (float, optional): Add a value to task.completed if not None. 

1435 description (str, optional): Change task description if not None. 

1436 visible (bool, optional): Set visible flag if not None. 

1437 refresh (bool): Force a refresh of progress information. Default is False. 

1438 **fields (Any): Additional data fields required for rendering. 

1439 """ 

1440 with self._lock: 

1441 task = self._tasks[task_id] 

1442 completed_start = task.completed 

1443 

1444 if total is not None and total != task.total: 

1445 task.total = total 

1446 task._reset() 

1447 if advance is not None: 

1448 task.completed += advance 

1449 if completed is not None: 

1450 task.completed = completed 

1451 if description is not None: 

1452 task.description = description 

1453 if visible is not None: 

1454 task.visible = visible 

1455 task.fields.update(fields) 

1456 update_completed = task.completed - completed_start 

1457 

1458 current_time = self.get_time() 

1459 old_sample_time = current_time - self.speed_estimate_period 

1460 _progress = task._progress 

1461 

1462 popleft = _progress.popleft 

1463 while _progress and _progress[0].timestamp < old_sample_time: 

1464 popleft() 

1465 if update_completed > 0: 

1466 _progress.append(ProgressSample(current_time, update_completed)) 

1467 if ( 

1468 task.total is not None 

1469 and task.completed >= task.total 

1470 and task.finished_time is None 

1471 ): 

1472 task.finished_time = task.elapsed 

1473 

1474 if refresh: 

1475 self.refresh() 

1476 

1477 def reset( 

1478 self, 

1479 task_id: TaskID, 

1480 *, 

1481 start: bool = True, 

1482 total: Optional[float] = None, 

1483 completed: int = 0, 

1484 visible: Optional[bool] = None, 

1485 description: Optional[str] = None, 

1486 **fields: Any, 

1487 ) -> None: 

1488 """Reset a task so completed is 0 and the clock is reset. 

1489 

1490 Args: 

1491 task_id (TaskID): ID of task. 

1492 start (bool, optional): Start the task after reset. Defaults to True. 

1493 total (float, optional): New total steps in task, or None to use current total. Defaults to None. 

1494 completed (int, optional): Number of steps completed. Defaults to 0. 

1495 visible (bool, optional): Enable display of the task. Defaults to True. 

1496 description (str, optional): Change task description if not None. Defaults to None. 

1497 **fields (str): Additional data fields required for rendering. 

1498 """ 

1499 current_time = self.get_time() 

1500 with self._lock: 

1501 task = self._tasks[task_id] 

1502 task._reset() 

1503 task.start_time = current_time if start else None 

1504 if total is not None: 

1505 task.total = total 

1506 task.completed = completed 

1507 if visible is not None: 

1508 task.visible = visible 

1509 if fields: 

1510 task.fields = fields 

1511 if description is not None: 

1512 task.description = description 

1513 task.finished_time = None 

1514 self.refresh() 

1515 

1516 def advance(self, task_id: TaskID, advance: float = 1) -> None: 

1517 """Advance task by a number of steps. 

1518 

1519 Args: 

1520 task_id (TaskID): ID of task. 

1521 advance (float): Number of steps to advance. Default is 1. 

1522 """ 

1523 current_time = self.get_time() 

1524 with self._lock: 

1525 task = self._tasks[task_id] 

1526 completed_start = task.completed 

1527 task.completed += advance 

1528 update_completed = task.completed - completed_start 

1529 old_sample_time = current_time - self.speed_estimate_period 

1530 _progress = task._progress 

1531 

1532 popleft = _progress.popleft 

1533 while _progress and _progress[0].timestamp < old_sample_time: 

1534 popleft() 

1535 while len(_progress) > 1000: 

1536 popleft() 

1537 _progress.append(ProgressSample(current_time, update_completed)) 

1538 if ( 

1539 task.total is not None 

1540 and task.completed >= task.total 

1541 and task.finished_time is None 

1542 ): 

1543 task.finished_time = task.elapsed 

1544 task.finished_speed = task.speed 

1545 

1546 def refresh(self) -> None: 

1547 """Refresh (render) the progress information.""" 

1548 if not self.disable and self.live.is_started: 

1549 self.live.refresh() 

1550 

1551 def get_renderable(self) -> RenderableType: 

1552 """Get a renderable for the progress display.""" 

1553 renderable = Group(*self.get_renderables()) 

1554 return renderable 

1555 

1556 def get_renderables(self) -> Iterable[RenderableType]: 

1557 """Get a number of renderables for the progress display.""" 

1558 table = self.make_tasks_table(self.tasks) 

1559 yield table 

1560 

1561 def make_tasks_table(self, tasks: Iterable[Task]) -> Table: 

1562 """Get a table to render the Progress display. 

1563 

1564 Args: 

1565 tasks (Iterable[Task]): An iterable of Task instances, one per row of the table. 

1566 

1567 Returns: 

1568 Table: A table instance. 

1569 """ 

1570 table_columns = ( 

1571 ( 

1572 Column(no_wrap=True) 

1573 if isinstance(_column, str) 

1574 else _column.get_table_column().copy() 

1575 ) 

1576 for _column in self.columns 

1577 ) 

1578 table = Table.grid(*table_columns, padding=(0, 1), expand=self.expand) 

1579 

1580 for task in tasks: 

1581 if task.visible: 

1582 table.add_row( 

1583 *( 

1584 ( 

1585 column.format(task=task) 

1586 if isinstance(column, str) 

1587 else column(task) 

1588 ) 

1589 for column in self.columns 

1590 ) 

1591 ) 

1592 return table 

1593 

1594 def __rich__(self) -> RenderableType: 

1595 """Makes the Progress class itself renderable.""" 

1596 with self._lock: 

1597 return self.get_renderable() 

1598 

1599 def add_task( 

1600 self, 

1601 description: str, 

1602 start: bool = True, 

1603 total: Optional[float] = 100.0, 

1604 completed: int = 0, 

1605 visible: bool = True, 

1606 **fields: Any, 

1607 ) -> TaskID: 

1608 """Add a new 'task' to the Progress display. 

1609 

1610 Args: 

1611 description (str): A description of the task. 

1612 start (bool, optional): Start the task immediately (to calculate elapsed time). If set to False, 

1613 you will need to call `start` manually. Defaults to True. 

1614 total (float, optional): Number of total steps in the progress if known. 

1615 Set to None to render a pulsing animation. Defaults to 100. 

1616 completed (int, optional): Number of steps completed so far. Defaults to 0. 

1617 visible (bool, optional): Enable display of the task. Defaults to True. 

1618 **fields (str): Additional data fields required for rendering. 

1619 

1620 Returns: 

1621 TaskID: An ID you can use when calling `update`. 

1622 """ 

1623 with self._lock: 

1624 task = Task( 

1625 self._task_index, 

1626 description, 

1627 total, 

1628 completed, 

1629 visible=visible, 

1630 fields=fields, 

1631 _get_time=self.get_time, 

1632 _lock=self._lock, 

1633 ) 

1634 self._tasks[self._task_index] = task 

1635 if start: 

1636 self.start_task(self._task_index) 

1637 new_task_index = self._task_index 

1638 self._task_index = TaskID(int(self._task_index) + 1) 

1639 self.refresh() 

1640 return new_task_index 

1641 

1642 def remove_task(self, task_id: TaskID) -> None: 

1643 """Delete a task if it exists. 

1644 

1645 Args: 

1646 task_id (TaskID): A task ID. 

1647 

1648 """ 

1649 with self._lock: 

1650 del self._tasks[task_id] 

1651 

1652 

1653if __name__ == "__main__": # pragma: no coverage 

1654 import random 

1655 import time 

1656 

1657 from .panel import Panel 

1658 from .rule import Rule 

1659 from .syntax import Syntax 

1660 from .table import Table 

1661 

1662 syntax = Syntax( 

1663 '''def loop_last(values: Iterable[T]) -> Iterable[Tuple[bool, T]]: 

1664 """Iterate and generate a tuple with a flag for last value.""" 

1665 iter_values = iter(values) 

1666 try: 

1667 previous_value = next(iter_values) 

1668 except StopIteration: 

1669 return 

1670 for value in iter_values: 

1671 yield False, previous_value 

1672 previous_value = value 

1673 yield True, previous_value''', 

1674 "python", 

1675 line_numbers=True, 

1676 ) 

1677 

1678 table = Table("foo", "bar", "baz") 

1679 table.add_row("1", "2", "3") 

1680 

1681 progress_renderables = [ 

1682 "Text may be printed while the progress bars are rendering.", 

1683 Panel("In fact, [i]any[/i] renderable will work"), 

1684 "Such as [magenta]tables[/]...", 

1685 table, 

1686 "Pretty printed structures...", 

1687 {"type": "example", "text": "Pretty printed"}, 

1688 "Syntax...", 

1689 syntax, 

1690 Rule("Give it a try!"), 

1691 ] 

1692 

1693 from itertools import cycle 

1694 

1695 examples = cycle(progress_renderables) 

1696 

1697 console = Console(record=True) 

1698 

1699 with Progress( 

1700 SpinnerColumn(), 

1701 *Progress.get_default_columns(), 

1702 TimeElapsedColumn(), 

1703 console=console, 

1704 transient=False, 

1705 ) as progress: 

1706 task1 = progress.add_task("[red]Downloading", total=1000) 

1707 task2 = progress.add_task("[green]Processing", total=1000) 

1708 task3 = progress.add_task("[yellow]Thinking", total=None) 

1709 

1710 while not progress.finished: 

1711 progress.update(task1, advance=0.5) 

1712 progress.update(task2, advance=0.3) 

1713 time.sleep(0.01) 

1714 if random.randint(0, 100) < 1: 

1715 progress.log(next(examples))