1from __future__ import annotations
2
3import logging
4import os
5import shlex
6import subprocess
7from collections.abc import Iterable, Mapping
8from typing import Any, Callable, Literal, Union
9
10from pip._vendor.rich.markup import escape
11
12from pip._internal.cli.spinners import SpinnerInterface, open_spinner
13from pip._internal.exceptions import InstallationSubprocessError
14from pip._internal.utils.logging import VERBOSE, subprocess_logger
15from pip._internal.utils.misc import HiddenText
16
17CommandArgs = list[Union[str, HiddenText]]
18
19
20def make_command(*args: str | HiddenText | CommandArgs) -> CommandArgs:
21 """
22 Create a CommandArgs object.
23 """
24 command_args: CommandArgs = []
25 for arg in args:
26 # Check for list instead of CommandArgs since CommandArgs is
27 # only known during type-checking.
28 if isinstance(arg, list):
29 command_args.extend(arg)
30 else:
31 # Otherwise, arg is str or HiddenText.
32 command_args.append(arg)
33
34 return command_args
35
36
37def format_command_args(args: list[str] | CommandArgs) -> str:
38 """
39 Format command arguments for display.
40 """
41 # For HiddenText arguments, display the redacted form by calling str().
42 # Also, we don't apply str() to arguments that aren't HiddenText since
43 # this can trigger a UnicodeDecodeError in Python 2 if the argument
44 # has type unicode and includes a non-ascii character. (The type
45 # checker doesn't ensure the annotations are correct in all cases.)
46 return " ".join(
47 shlex.quote(str(arg)) if isinstance(arg, HiddenText) else shlex.quote(arg)
48 for arg in args
49 )
50
51
52def reveal_command_args(args: list[str] | CommandArgs) -> list[str]:
53 """
54 Return the arguments in their raw, unredacted form.
55 """
56 return [arg.secret if isinstance(arg, HiddenText) else arg for arg in args]
57
58
59def call_subprocess(
60 cmd: list[str] | CommandArgs,
61 show_stdout: bool = False,
62 cwd: str | None = None,
63 on_returncode: Literal["raise", "warn", "ignore"] = "raise",
64 extra_ok_returncodes: Iterable[int] | None = None,
65 extra_environ: Mapping[str, Any] | None = None,
66 unset_environ: Iterable[str] | None = None,
67 spinner: SpinnerInterface | None = None,
68 log_failed_cmd: bool | None = True,
69 stdout_only: bool | None = False,
70 *,
71 command_desc: str,
72) -> str:
73 """
74 Args:
75 show_stdout: if true, use INFO to log the subprocess's stderr and
76 stdout streams. Otherwise, use DEBUG. Defaults to False.
77 extra_ok_returncodes: an iterable of integer return codes that are
78 acceptable, in addition to 0. Defaults to None, which means [].
79 unset_environ: an iterable of environment variable names to unset
80 prior to calling subprocess.Popen().
81 log_failed_cmd: if false, failed commands are not logged, only raised.
82 stdout_only: if true, return only stdout, else return both. When true,
83 logging of both stdout and stderr occurs when the subprocess has
84 terminated, else logging occurs as subprocess output is produced.
85 """
86 if extra_ok_returncodes is None:
87 extra_ok_returncodes = []
88 if unset_environ is None:
89 unset_environ = []
90 # Most places in pip use show_stdout=False. What this means is--
91 #
92 # - We connect the child's output (combined stderr and stdout) to a
93 # single pipe, which we read.
94 # - We log this output to stderr at DEBUG level as it is received.
95 # - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
96 # requested), then we show a spinner so the user can still see the
97 # subprocess is in progress.
98 # - If the subprocess exits with an error, we log the output to stderr
99 # at ERROR level if it hasn't already been displayed to the console
100 # (e.g. if --verbose logging wasn't enabled). This way we don't log
101 # the output to the console twice.
102 #
103 # If show_stdout=True, then the above is still done, but with DEBUG
104 # replaced by INFO.
105 if show_stdout:
106 # Then log the subprocess output at INFO level.
107 log_subprocess: Callable[..., None] = subprocess_logger.info
108 used_level = logging.INFO
109 else:
110 # Then log the subprocess output using VERBOSE. This also ensures
111 # it will be logged to the log file (aka user_log), if enabled.
112 log_subprocess = subprocess_logger.verbose
113 used_level = VERBOSE
114
115 # Whether the subprocess will be visible in the console.
116 showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
117
118 # Only use the spinner if we're not showing the subprocess output
119 # and we have a spinner.
120 use_spinner = not showing_subprocess and spinner is not None
121
122 log_subprocess("Running command %s", command_desc)
123 env = os.environ.copy()
124 if extra_environ:
125 env.update(extra_environ)
126 for name in unset_environ:
127 env.pop(name, None)
128 try:
129 proc = subprocess.Popen(
130 # Convert HiddenText objects to the underlying str.
131 reveal_command_args(cmd),
132 stdin=subprocess.PIPE,
133 stdout=subprocess.PIPE,
134 stderr=subprocess.STDOUT if not stdout_only else subprocess.PIPE,
135 cwd=cwd,
136 env=env,
137 errors="backslashreplace",
138 )
139 except Exception as exc:
140 if log_failed_cmd:
141 subprocess_logger.critical(
142 "Error %s while executing command %s",
143 exc,
144 command_desc,
145 )
146 raise
147 all_output = []
148 if not stdout_only:
149 assert proc.stdout
150 assert proc.stdin
151 proc.stdin.close()
152 # In this mode, stdout and stderr are in the same pipe.
153 while True:
154 line: str = proc.stdout.readline()
155 if not line:
156 break
157 line = line.rstrip()
158 all_output.append(line + "\n")
159
160 # Show the line immediately.
161 log_subprocess(line)
162 # Update the spinner.
163 if use_spinner:
164 assert spinner
165 spinner.spin()
166 try:
167 proc.wait()
168 finally:
169 if proc.stdout:
170 proc.stdout.close()
171 output = "".join(all_output)
172 else:
173 # In this mode, stdout and stderr are in different pipes.
174 # We must use communicate() which is the only safe way to read both.
175 out, err = proc.communicate()
176 # log line by line to preserve pip log indenting
177 for out_line in out.splitlines():
178 log_subprocess(out_line)
179 all_output.append(out)
180 for err_line in err.splitlines():
181 log_subprocess(err_line)
182 all_output.append(err)
183 output = out
184
185 proc_had_error = proc.returncode and proc.returncode not in extra_ok_returncodes
186 if use_spinner:
187 assert spinner
188 if proc_had_error:
189 spinner.finish("error")
190 else:
191 spinner.finish("done")
192 if proc_had_error:
193 if on_returncode == "raise":
194 error = InstallationSubprocessError(
195 command_description=command_desc,
196 exit_code=proc.returncode,
197 output_lines=all_output if not showing_subprocess else None,
198 )
199 if log_failed_cmd:
200 subprocess_logger.error("%s", error, extra={"rich": True})
201 subprocess_logger.verbose(
202 "[bold magenta]full command[/]: [blue]%s[/]",
203 escape(format_command_args(cmd)),
204 extra={"markup": True},
205 )
206 subprocess_logger.verbose(
207 "[bold magenta]cwd[/]: %s",
208 escape(cwd or "[inherit]"),
209 extra={"markup": True},
210 )
211
212 raise error
213 elif on_returncode == "warn":
214 subprocess_logger.warning(
215 'Command "%s" had error code %s in %s',
216 command_desc,
217 proc.returncode,
218 cwd,
219 )
220 elif on_returncode == "ignore":
221 pass
222 else:
223 raise ValueError(f"Invalid value: on_returncode={on_returncode!r}")
224 return output
225
226
227def runner_with_spinner_message(message: str) -> Callable[..., None]:
228 """Provide a subprocess_runner that shows a spinner message.
229
230 Intended for use with for BuildBackendHookCaller. Thus, the runner has
231 an API that matches what's expected by BuildBackendHookCaller.subprocess_runner.
232 """
233
234 def runner(
235 cmd: list[str],
236 cwd: str | None = None,
237 extra_environ: Mapping[str, Any] | None = None,
238 ) -> None:
239 with open_spinner(message) as spinner:
240 call_subprocess(
241 cmd,
242 command_desc=message,
243 cwd=cwd,
244 extra_environ=extra_environ,
245 spinner=spinner,
246 )
247
248 return runner