1from __future__ import annotations
2
3import functools
4import sys
5from collections.abc import Generator, Iterable, Iterator
6from typing import TYPE_CHECKING, Callable, Literal, TypeVar
7
8from pip._vendor.rich.progress import (
9 BarColumn,
10 DownloadColumn,
11 FileSizeColumn,
12 MofNCompleteColumn,
13 Progress,
14 ProgressColumn,
15 SpinnerColumn,
16 TextColumn,
17 TimeElapsedColumn,
18 TimeRemainingColumn,
19 TransferSpeedColumn,
20)
21
22from pip._internal.cli.spinners import RateLimiter
23from pip._internal.utils.logging import get_console, get_indentation
24
25if TYPE_CHECKING:
26 from pip._internal.req.req_install import InstallRequirement
27
28T = TypeVar("T")
29ProgressRenderer = Callable[[Iterable[T]], Iterator[T]]
30BarType = Literal["on", "off", "raw"]
31
32
33def _rich_download_progress_bar(
34 iterable: Iterable[bytes],
35 *,
36 bar_type: BarType,
37 size: int | None,
38 initial_progress: int | None = None,
39) -> Generator[bytes, None, None]:
40 assert bar_type == "on", "This should only be used in the default mode."
41
42 if not size:
43 total = float("inf")
44 columns: tuple[ProgressColumn, ...] = (
45 TextColumn("[progress.description]{task.description}"),
46 SpinnerColumn("line", speed=1.5),
47 FileSizeColumn(),
48 TransferSpeedColumn(),
49 TimeElapsedColumn(),
50 )
51 else:
52 total = size
53 columns = (
54 TextColumn("[progress.description]{task.description}"),
55 BarColumn(),
56 DownloadColumn(),
57 TransferSpeedColumn(),
58 TextColumn("{task.fields[time_description]}"),
59 TimeRemainingColumn(elapsed_when_finished=True),
60 )
61
62 progress = Progress(*columns, refresh_per_second=5)
63 task_id = progress.add_task(
64 " " * (get_indentation() + 2), total=total, time_description="eta"
65 )
66 if initial_progress is not None:
67 progress.update(task_id, advance=initial_progress)
68 with progress:
69 for chunk in iterable:
70 yield chunk
71 progress.update(task_id, advance=len(chunk))
72 progress.update(task_id, time_description="")
73
74
75def _rich_install_progress_bar(
76 iterable: Iterable[InstallRequirement], *, total: int
77) -> Iterator[InstallRequirement]:
78 columns = (
79 TextColumn("{task.fields[indent]}"),
80 BarColumn(),
81 MofNCompleteColumn(),
82 TextColumn("{task.description}"),
83 )
84 console = get_console()
85
86 bar = Progress(*columns, refresh_per_second=6, console=console, transient=True)
87 # Hiding the progress bar at initialization forces a refresh cycle to occur
88 # until the bar appears, avoiding very short flashes.
89 task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False)
90 with bar:
91 for req in iterable:
92 bar.update(task, description=rf"\[{req.name}]", visible=True)
93 yield req
94 bar.advance(task)
95
96
97def _raw_progress_bar(
98 iterable: Iterable[bytes],
99 *,
100 size: int | None,
101 initial_progress: int | None = None,
102) -> Generator[bytes, None, None]:
103 def write_progress(current: int, total: int) -> None:
104 sys.stdout.write(f"Progress {current} of {total}\n")
105 sys.stdout.flush()
106
107 current = initial_progress or 0
108 total = size or 0
109 rate_limiter = RateLimiter(0.25)
110
111 write_progress(current, total)
112 for chunk in iterable:
113 current += len(chunk)
114 if rate_limiter.ready() or current == total:
115 write_progress(current, total)
116 rate_limiter.reset()
117 yield chunk
118
119
120def get_download_progress_renderer(
121 *, bar_type: BarType, size: int | None = None, initial_progress: int | None = None
122) -> ProgressRenderer[bytes]:
123 """Get an object that can be used to render the download progress.
124
125 Returns a callable, that takes an iterable to "wrap".
126 """
127 if bar_type == "on":
128 return functools.partial(
129 _rich_download_progress_bar,
130 bar_type=bar_type,
131 size=size,
132 initial_progress=initial_progress,
133 )
134 elif bar_type == "raw":
135 return functools.partial(
136 _raw_progress_bar,
137 size=size,
138 initial_progress=initial_progress,
139 )
140 else:
141 return iter # no-op, when passed an iterator
142
143
144def get_install_progress_renderer(
145 *, bar_type: BarType, total: int
146) -> ProgressRenderer[InstallRequirement]:
147 """Get an object that can be used to render the install progress.
148 Returns a callable, that takes an iterable to "wrap".
149 """
150 if bar_type == "on":
151 return functools.partial(_rich_install_progress_bar, total=total)
152 else:
153 return iter