1from __future__ import annotations
2
3import contextlib
4import errno
5import logging
6import logging.handlers
7import os
8import sys
9import threading
10from collections.abc import Generator
11from dataclasses import dataclass
12from io import StringIO, TextIOWrapper
13from logging import Filter
14from typing import Any, ClassVar
15
16from pip._vendor.rich.console import (
17 Console,
18 ConsoleOptions,
19 ConsoleRenderable,
20 RenderableType,
21 RenderResult,
22 RichCast,
23)
24from pip._vendor.rich.highlighter import NullHighlighter
25from pip._vendor.rich.logging import RichHandler
26from pip._vendor.rich.segment import Segment
27from pip._vendor.rich.style import Style
28
29from pip._internal.utils._log import VERBOSE, getLogger
30from pip._internal.utils.compat import WINDOWS
31from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
32from pip._internal.utils.misc import StreamWrapper, ensure_dir
33
34_log_state = threading.local()
35_stdout_console = None
36_stderr_console = None
37subprocess_logger = getLogger("pip.subprocessor")
38
39
40class BrokenStdoutLoggingError(Exception):
41 """
42 Raised if BrokenPipeError occurs for the stdout stream while logging.
43 """
44
45
46def _is_broken_pipe_error(exc_class: type[BaseException], exc: BaseException) -> bool:
47 if exc_class is BrokenPipeError:
48 return True
49
50 # On Windows, a broken pipe can show up as EINVAL rather than EPIPE:
51 # https://bugs.python.org/issue19612
52 # https://bugs.python.org/issue30418
53 if not WINDOWS:
54 return False
55
56 return isinstance(exc, OSError) and exc.errno in (errno.EINVAL, errno.EPIPE)
57
58
59@contextlib.contextmanager
60def capture_logging() -> Generator[StringIO, None, None]:
61 """Capture all pip logs in a buffer temporarily."""
62 # Patching sys.std(out|err) directly is not viable as the caller
63 # may want to emit non-logging output (e.g. a rich spinner). To
64 # avoid capturing that, temporarily patch the root logging handlers
65 # to use new rich consoles that write to a StringIO.
66 handlers = {}
67 for handler in logging.getLogger().handlers:
68 if isinstance(handler, RichPipStreamHandler):
69 # Also store the handler's original console so it can be
70 # restored on context exit.
71 handlers[handler] = handler.console
72
73 fake_stream = StreamWrapper.from_stream(sys.stdout)
74 if not handlers:
75 yield fake_stream
76 return
77
78 # HACK: grab no_color attribute from a random handler console since
79 # it's a global option anyway.
80 no_color = next(iter(handlers.values())).no_color
81 fake_console = PipConsole(file=fake_stream, no_color=no_color, soft_wrap=True)
82 try:
83 for handler in handlers:
84 handler.console = fake_console
85 yield fake_stream
86 finally:
87 for handler, original_console in handlers.items():
88 handler.console = original_console
89
90
91@contextlib.contextmanager
92def indent_log(num: int = 2) -> Generator[None, None, None]:
93 """
94 A context manager which will cause the log output to be indented for any
95 log messages emitted inside it.
96 """
97 # For thread-safety
98 _log_state.indentation = get_indentation()
99 _log_state.indentation += num
100 try:
101 yield
102 finally:
103 _log_state.indentation -= num
104
105
106def get_indentation() -> int:
107 return getattr(_log_state, "indentation", 0)
108
109
110class IndentingFormatter(logging.Formatter):
111 default_time_format = "%Y-%m-%dT%H:%M:%S"
112
113 def __init__(
114 self,
115 *args: Any,
116 add_timestamp: bool = False,
117 **kwargs: Any,
118 ) -> None:
119 """
120 A logging.Formatter that obeys the indent_log() context manager.
121
122 :param add_timestamp: A bool indicating output lines should be prefixed
123 with their record's timestamp.
124 """
125 self.add_timestamp = add_timestamp
126 super().__init__(*args, **kwargs)
127
128 def get_message_start(self, formatted: str, levelno: int) -> str:
129 """
130 Return the start of the formatted log message (not counting the
131 prefix to add to each line).
132 """
133 if levelno < logging.WARNING:
134 return ""
135 if formatted.startswith(DEPRECATION_MSG_PREFIX):
136 # Then the message already has a prefix. We don't want it to
137 # look like "WARNING: DEPRECATION: ...."
138 return ""
139 if levelno < logging.ERROR:
140 return "WARNING: "
141
142 return "ERROR: "
143
144 def format(self, record: logging.LogRecord) -> str:
145 """
146 Calls the standard formatter, but will indent all of the log message
147 lines by our current indentation level.
148 """
149 formatted = super().format(record)
150 message_start = self.get_message_start(formatted, record.levelno)
151 formatted = message_start + formatted
152
153 prefix = ""
154 if self.add_timestamp:
155 prefix = f"{self.formatTime(record)} "
156 prefix += " " * get_indentation()
157 formatted = "".join([prefix + line for line in formatted.splitlines(True)])
158 return formatted
159
160
161@dataclass
162class IndentedRenderable:
163 renderable: RenderableType
164 indent: int
165
166 def __rich_console__(
167 self, console: Console, options: ConsoleOptions
168 ) -> RenderResult:
169 segments = console.render(self.renderable, options)
170 lines = Segment.split_lines(segments)
171 for line in lines:
172 yield Segment(" " * self.indent)
173 yield from line
174 yield Segment("\n")
175
176
177class PipConsole(Console):
178 def on_broken_pipe(self) -> None:
179 # Reraise the original exception, rich 13.8.0+ exits by default
180 # instead, preventing our handler from firing.
181 raise BrokenPipeError() from None
182
183
184def get_console(*, stderr: bool = False) -> Console:
185 if stderr:
186 assert _stderr_console is not None, "stderr rich console is missing!"
187 return _stderr_console
188 else:
189 assert _stdout_console is not None, "stdout rich console is missing!"
190 return _stdout_console
191
192
193class RichPipStreamHandler(RichHandler):
194 KEYWORDS: ClassVar[list[str] | None] = []
195
196 def __init__(self, console: Console) -> None:
197 super().__init__(
198 console=console,
199 show_time=False,
200 show_level=False,
201 show_path=False,
202 highlighter=NullHighlighter(),
203 )
204
205 # Our custom override on Rich's logger, to make things work as we need them to.
206 def emit(self, record: logging.LogRecord) -> None:
207 style: Style | None = None
208
209 # If we are given a diagnostic error to present, present it with indentation.
210 if getattr(record, "rich", False):
211 assert isinstance(record.args, tuple)
212 (rich_renderable,) = record.args
213 assert isinstance(
214 rich_renderable, (ConsoleRenderable, RichCast, str)
215 ), f"{rich_renderable} is not rich-console-renderable"
216
217 renderable: RenderableType = IndentedRenderable(
218 rich_renderable, indent=get_indentation()
219 )
220 else:
221 message = self.format(record)
222 renderable = self.render_message(record, message)
223 if record.levelno is not None:
224 if record.levelno >= logging.ERROR:
225 style = Style(color="red")
226 elif record.levelno >= logging.WARNING:
227 style = Style(color="yellow")
228
229 try:
230 self.console.print(renderable, overflow="ignore", crop=False, style=style)
231 except Exception:
232 self.handleError(record)
233
234 def handleError(self, record: logging.LogRecord) -> None:
235 """Called when logging is unable to log some output."""
236
237 exc_class, exc = sys.exc_info()[:2]
238 # If a broken pipe occurred while calling write() or flush() on the
239 # stdout stream in logging's Handler.emit(), then raise our special
240 # exception so we can handle it in main() instead of logging the
241 # broken pipe error and continuing.
242 if (
243 exc_class
244 and exc
245 and self.console.file is sys.stdout
246 and _is_broken_pipe_error(exc_class, exc)
247 ):
248 raise BrokenStdoutLoggingError()
249
250 return super().handleError(record)
251
252
253class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler):
254 def _open(self) -> TextIOWrapper:
255 ensure_dir(os.path.dirname(self.baseFilename))
256 return super()._open()
257
258
259class MaxLevelFilter(Filter):
260 def __init__(self, level: int) -> None:
261 self.level = level
262
263 def filter(self, record: logging.LogRecord) -> bool:
264 return record.levelno < self.level
265
266
267class ExcludeLoggerFilter(Filter):
268 """
269 A logging Filter that excludes records from a logger (or its children).
270 """
271
272 def filter(self, record: logging.LogRecord) -> bool:
273 # The base Filter class allows only records from a logger (or its
274 # children).
275 return not super().filter(record)
276
277
278def setup_logging(verbosity: int, no_color: bool, user_log_file: str | None) -> int:
279 """Configures and sets up all of the logging
280
281 Returns the requested logging level, as its integer value.
282 """
283
284 # Determine the level to be logging at.
285 if verbosity >= 2:
286 level_number = logging.DEBUG
287 elif verbosity == 1:
288 level_number = VERBOSE
289 elif verbosity == -1:
290 level_number = logging.WARNING
291 elif verbosity == -2:
292 level_number = logging.ERROR
293 elif verbosity <= -3:
294 level_number = logging.CRITICAL
295 else:
296 level_number = logging.INFO
297
298 level = logging.getLevelName(level_number)
299
300 # The "root" logger should match the "console" level *unless* we also need
301 # to log to a user log file.
302 include_user_log = user_log_file is not None
303 if include_user_log:
304 additional_log_file = user_log_file
305 root_level = "DEBUG"
306 else:
307 additional_log_file = "/dev/null"
308 root_level = level
309
310 # Disable any logging besides WARNING unless we have DEBUG level logging
311 # enabled for vendored libraries.
312 vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG"
313
314 # Shorthands for clarity
315 handler_classes = {
316 "stream": "pip._internal.utils.logging.RichPipStreamHandler",
317 "file": "pip._internal.utils.logging.BetterRotatingFileHandler",
318 }
319 handlers = ["console", "console_errors", "console_subprocess"] + (
320 ["user_log"] if include_user_log else []
321 )
322 global _stdout_console, stderr_console
323 _stdout_console = PipConsole(file=sys.stdout, no_color=no_color, soft_wrap=True)
324 _stderr_console = PipConsole(file=sys.stderr, no_color=no_color, soft_wrap=True)
325
326 logging.config.dictConfig(
327 {
328 "version": 1,
329 "disable_existing_loggers": False,
330 "filters": {
331 "exclude_warnings": {
332 "()": "pip._internal.utils.logging.MaxLevelFilter",
333 "level": logging.WARNING,
334 },
335 "restrict_to_subprocess": {
336 "()": "logging.Filter",
337 "name": subprocess_logger.name,
338 },
339 "exclude_subprocess": {
340 "()": "pip._internal.utils.logging.ExcludeLoggerFilter",
341 "name": subprocess_logger.name,
342 },
343 },
344 "formatters": {
345 "indent": {
346 "()": IndentingFormatter,
347 "format": "%(message)s",
348 },
349 "indent_with_timestamp": {
350 "()": IndentingFormatter,
351 "format": "%(message)s",
352 "add_timestamp": True,
353 },
354 },
355 "handlers": {
356 "console": {
357 "level": level,
358 "class": handler_classes["stream"],
359 "console": _stdout_console,
360 "filters": ["exclude_subprocess", "exclude_warnings"],
361 "formatter": "indent",
362 },
363 "console_errors": {
364 "level": "WARNING",
365 "class": handler_classes["stream"],
366 "console": _stderr_console,
367 "filters": ["exclude_subprocess"],
368 "formatter": "indent",
369 },
370 # A handler responsible for logging to the console messages
371 # from the "subprocessor" logger.
372 "console_subprocess": {
373 "level": level,
374 "class": handler_classes["stream"],
375 "console": _stderr_console,
376 "filters": ["restrict_to_subprocess"],
377 "formatter": "indent",
378 },
379 "user_log": {
380 "level": "DEBUG",
381 "class": handler_classes["file"],
382 "filename": additional_log_file,
383 "encoding": "utf-8",
384 "delay": True,
385 "formatter": "indent_with_timestamp",
386 },
387 },
388 "root": {
389 "level": root_level,
390 "handlers": handlers,
391 },
392 "loggers": {"pip._vendor": {"level": vendored_log_level}},
393 }
394 )
395
396 return level_number