Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/cli/progress_bars.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

63 statements  

1from __future__ import annotations 

2 

3import functools 

4import sys 

5from collections.abc import Generator, Iterable, Iterator 

6from typing import TYPE_CHECKING, Callable, Literal, TypeVar 

7 

8from pip._vendor.rich.progress import ( 

9 BarColumn, 

10 DownloadColumn, 

11 FileSizeColumn, 

12 MofNCompleteColumn, 

13 Progress, 

14 ProgressColumn, 

15 SpinnerColumn, 

16 TextColumn, 

17 TimeElapsedColumn, 

18 TimeRemainingColumn, 

19 TransferSpeedColumn, 

20) 

21 

22from pip._internal.cli.spinners import RateLimiter 

23from pip._internal.utils.logging import get_console, get_indentation 

24 

25if TYPE_CHECKING: 

26 from pip._internal.req.req_install import InstallRequirement 

27 

28T = TypeVar("T") 

29ProgressRenderer = Callable[[Iterable[T]], Iterator[T]] 

30BarType = Literal["on", "off", "raw"] 

31 

32 

33def _rich_download_progress_bar( 

34 iterable: Iterable[bytes], 

35 *, 

36 bar_type: BarType, 

37 size: int | None, 

38 initial_progress: int | None = None, 

39) -> Generator[bytes, None, None]: 

40 assert bar_type == "on", "This should only be used in the default mode." 

41 

42 if not size: 

43 total = float("inf") 

44 columns: tuple[ProgressColumn, ...] = ( 

45 TextColumn("[progress.description]{task.description}"), 

46 SpinnerColumn("line", speed=1.5), 

47 FileSizeColumn(), 

48 TransferSpeedColumn(), 

49 TimeElapsedColumn(), 

50 ) 

51 else: 

52 total = size 

53 columns = ( 

54 TextColumn("[progress.description]{task.description}"), 

55 BarColumn(), 

56 DownloadColumn(), 

57 TransferSpeedColumn(), 

58 TextColumn("{task.fields[time_description]}"), 

59 TimeRemainingColumn(elapsed_when_finished=True), 

60 ) 

61 

62 progress = Progress(*columns, refresh_per_second=5) 

63 task_id = progress.add_task( 

64 " " * (get_indentation() + 2), total=total, time_description="eta" 

65 ) 

66 if initial_progress is not None: 

67 progress.update(task_id, advance=initial_progress) 

68 with progress: 

69 for chunk in iterable: 

70 yield chunk 

71 progress.update(task_id, advance=len(chunk)) 

72 progress.update(task_id, time_description="") 

73 

74 

75def _rich_install_progress_bar( 

76 iterable: Iterable[InstallRequirement], *, total: int 

77) -> Iterator[InstallRequirement]: 

78 columns = ( 

79 TextColumn("{task.fields[indent]}"), 

80 BarColumn(), 

81 MofNCompleteColumn(), 

82 TextColumn("{task.description}"), 

83 ) 

84 console = get_console() 

85 

86 bar = Progress(*columns, refresh_per_second=6, console=console, transient=True) 

87 # Hiding the progress bar at initialization forces a refresh cycle to occur 

88 # until the bar appears, avoiding very short flashes. 

89 task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False) 

90 with bar: 

91 for req in iterable: 

92 bar.update(task, description=rf"\[{req.name}]", visible=True) 

93 yield req 

94 bar.advance(task) 

95 

96 

97def _raw_progress_bar( 

98 iterable: Iterable[bytes], 

99 *, 

100 size: int | None, 

101 initial_progress: int | None = None, 

102) -> Generator[bytes, None, None]: 

103 def write_progress(current: int, total: int) -> None: 

104 sys.stdout.write(f"Progress {current} of {total}\n") 

105 sys.stdout.flush() 

106 

107 current = initial_progress or 0 

108 total = size or 0 

109 rate_limiter = RateLimiter(0.25) 

110 

111 write_progress(current, total) 

112 for chunk in iterable: 

113 current += len(chunk) 

114 if rate_limiter.ready() or current == total: 

115 write_progress(current, total) 

116 rate_limiter.reset() 

117 yield chunk 

118 

119 

120def get_download_progress_renderer( 

121 *, bar_type: BarType, size: int | None = None, initial_progress: int | None = None 

122) -> ProgressRenderer[bytes]: 

123 """Get an object that can be used to render the download progress. 

124 

125 Returns a callable, that takes an iterable to "wrap". 

126 """ 

127 if bar_type == "on": 

128 return functools.partial( 

129 _rich_download_progress_bar, 

130 bar_type=bar_type, 

131 size=size, 

132 initial_progress=initial_progress, 

133 ) 

134 elif bar_type == "raw": 

135 return functools.partial( 

136 _raw_progress_bar, 

137 size=size, 

138 initial_progress=initial_progress, 

139 ) 

140 else: 

141 return iter # no-op, when passed an iterator 

142 

143 

144def get_install_progress_renderer( 

145 *, bar_type: BarType, total: int 

146) -> ProgressRenderer[InstallRequirement]: 

147 """Get an object that can be used to render the install progress. 

148 Returns a callable, that takes an iterable to "wrap". 

149 """ 

150 if bar_type == "on": 

151 return functools.partial(_rich_install_progress_bar, total=total) 

152 else: 

153 return iter