Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/utils/logging.py: 37%
136 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1import contextlib
2import errno
3import logging
4import logging.handlers
5import os
6import sys
7import threading
8from dataclasses import dataclass
9from io import TextIOWrapper
10from logging import Filter
11from typing import Any, ClassVar, Generator, List, Optional, TextIO, Type
13from pip._vendor.rich.console import (
14 Console,
15 ConsoleOptions,
16 ConsoleRenderable,
17 RenderableType,
18 RenderResult,
19 RichCast,
20)
21from pip._vendor.rich.highlighter import NullHighlighter
22from pip._vendor.rich.logging import RichHandler
23from pip._vendor.rich.segment import Segment
24from pip._vendor.rich.style import Style
26from pip._internal.utils._log import VERBOSE, getLogger
27from pip._internal.utils.compat import WINDOWS
28from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX
29from pip._internal.utils.misc import ensure_dir
31_log_state = threading.local()
32subprocess_logger = getLogger("pip.subprocessor")
35class BrokenStdoutLoggingError(Exception):
36 """
37 Raised if BrokenPipeError occurs for the stdout stream while logging.
38 """
41def _is_broken_pipe_error(exc_class: Type[BaseException], exc: BaseException) -> bool:
42 if exc_class is BrokenPipeError:
43 return True
45 # On Windows, a broken pipe can show up as EINVAL rather than EPIPE:
46 # https://bugs.python.org/issue19612
47 # https://bugs.python.org/issue30418
48 if not WINDOWS:
49 return False
51 return isinstance(exc, OSError) and exc.errno in (errno.EINVAL, errno.EPIPE)
54@contextlib.contextmanager
55def indent_log(num: int = 2) -> Generator[None, None, None]:
56 """
57 A context manager which will cause the log output to be indented for any
58 log messages emitted inside it.
59 """
60 # For thread-safety
61 _log_state.indentation = get_indentation()
62 _log_state.indentation += num
63 try:
64 yield
65 finally:
66 _log_state.indentation -= num
69def get_indentation() -> int:
70 return getattr(_log_state, "indentation", 0)
73class IndentingFormatter(logging.Formatter):
74 default_time_format = "%Y-%m-%dT%H:%M:%S"
76 def __init__(
77 self,
78 *args: Any,
79 add_timestamp: bool = False,
80 **kwargs: Any,
81 ) -> None:
82 """
83 A logging.Formatter that obeys the indent_log() context manager.
85 :param add_timestamp: A bool indicating output lines should be prefixed
86 with their record's timestamp.
87 """
88 self.add_timestamp = add_timestamp
89 super().__init__(*args, **kwargs)
91 def get_message_start(self, formatted: str, levelno: int) -> str:
92 """
93 Return the start of the formatted log message (not counting the
94 prefix to add to each line).
95 """
96 if levelno < logging.WARNING:
97 return ""
98 if formatted.startswith(DEPRECATION_MSG_PREFIX):
99 # Then the message already has a prefix. We don't want it to
100 # look like "WARNING: DEPRECATION: ...."
101 return ""
102 if levelno < logging.ERROR:
103 return "WARNING: "
105 return "ERROR: "
107 def format(self, record: logging.LogRecord) -> str:
108 """
109 Calls the standard formatter, but will indent all of the log message
110 lines by our current indentation level.
111 """
112 formatted = super().format(record)
113 message_start = self.get_message_start(formatted, record.levelno)
114 formatted = message_start + formatted
116 prefix = ""
117 if self.add_timestamp:
118 prefix = f"{self.formatTime(record)} "
119 prefix += " " * get_indentation()
120 formatted = "".join([prefix + line for line in formatted.splitlines(True)])
121 return formatted
124@dataclass
125class IndentedRenderable:
126 renderable: RenderableType
127 indent: int
129 def __rich_console__(
130 self, console: Console, options: ConsoleOptions
131 ) -> RenderResult:
132 segments = console.render(self.renderable, options)
133 lines = Segment.split_lines(segments)
134 for line in lines:
135 yield Segment(" " * self.indent)
136 yield from line
137 yield Segment("\n")
140class RichPipStreamHandler(RichHandler):
141 KEYWORDS: ClassVar[Optional[List[str]]] = []
143 def __init__(self, stream: Optional[TextIO], no_color: bool) -> None:
144 super().__init__(
145 console=Console(file=stream, no_color=no_color, soft_wrap=True),
146 show_time=False,
147 show_level=False,
148 show_path=False,
149 highlighter=NullHighlighter(),
150 )
152 # Our custom override on Rich's logger, to make things work as we need them to.
153 def emit(self, record: logging.LogRecord) -> None:
154 style: Optional[Style] = None
156 # If we are given a diagnostic error to present, present it with indentation.
157 assert isinstance(record.args, tuple)
158 if record.msg == "[present-rich] %s" and len(record.args) == 1:
159 rich_renderable = record.args[0]
160 assert isinstance(
161 rich_renderable, (ConsoleRenderable, RichCast, str)
162 ), f"{rich_renderable} is not rich-console-renderable"
164 renderable: RenderableType = IndentedRenderable(
165 rich_renderable, indent=get_indentation()
166 )
167 else:
168 message = self.format(record)
169 renderable = self.render_message(record, message)
170 if record.levelno is not None:
171 if record.levelno >= logging.ERROR:
172 style = Style(color="red")
173 elif record.levelno >= logging.WARNING:
174 style = Style(color="yellow")
176 try:
177 self.console.print(renderable, overflow="ignore", crop=False, style=style)
178 except Exception:
179 self.handleError(record)
181 def handleError(self, record: logging.LogRecord) -> None:
182 """Called when logging is unable to log some output."""
184 exc_class, exc = sys.exc_info()[:2]
185 # If a broken pipe occurred while calling write() or flush() on the
186 # stdout stream in logging's Handler.emit(), then raise our special
187 # exception so we can handle it in main() instead of logging the
188 # broken pipe error and continuing.
189 if (
190 exc_class
191 and exc
192 and self.console.file is sys.stdout
193 and _is_broken_pipe_error(exc_class, exc)
194 ):
195 raise BrokenStdoutLoggingError()
197 return super().handleError(record)
200class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler):
201 def _open(self) -> TextIOWrapper:
202 ensure_dir(os.path.dirname(self.baseFilename))
203 return super()._open()
206class MaxLevelFilter(Filter):
207 def __init__(self, level: int) -> None:
208 self.level = level
210 def filter(self, record: logging.LogRecord) -> bool:
211 return record.levelno < self.level
214class ExcludeLoggerFilter(Filter):
216 """
217 A logging Filter that excludes records from a logger (or its children).
218 """
220 def filter(self, record: logging.LogRecord) -> bool:
221 # The base Filter class allows only records from a logger (or its
222 # children).
223 return not super().filter(record)
226def setup_logging(verbosity: int, no_color: bool, user_log_file: Optional[str]) -> int:
227 """Configures and sets up all of the logging
229 Returns the requested logging level, as its integer value.
230 """
232 # Determine the level to be logging at.
233 if verbosity >= 2:
234 level_number = logging.DEBUG
235 elif verbosity == 1:
236 level_number = VERBOSE
237 elif verbosity == -1:
238 level_number = logging.WARNING
239 elif verbosity == -2:
240 level_number = logging.ERROR
241 elif verbosity <= -3:
242 level_number = logging.CRITICAL
243 else:
244 level_number = logging.INFO
246 level = logging.getLevelName(level_number)
248 # The "root" logger should match the "console" level *unless* we also need
249 # to log to a user log file.
250 include_user_log = user_log_file is not None
251 if include_user_log:
252 additional_log_file = user_log_file
253 root_level = "DEBUG"
254 else:
255 additional_log_file = "/dev/null"
256 root_level = level
258 # Disable any logging besides WARNING unless we have DEBUG level logging
259 # enabled for vendored libraries.
260 vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG"
262 # Shorthands for clarity
263 log_streams = {
264 "stdout": "ext://sys.stdout",
265 "stderr": "ext://sys.stderr",
266 }
267 handler_classes = {
268 "stream": "pip._internal.utils.logging.RichPipStreamHandler",
269 "file": "pip._internal.utils.logging.BetterRotatingFileHandler",
270 }
271 handlers = ["console", "console_errors", "console_subprocess"] + (
272 ["user_log"] if include_user_log else []
273 )
275 logging.config.dictConfig(
276 {
277 "version": 1,
278 "disable_existing_loggers": False,
279 "filters": {
280 "exclude_warnings": {
281 "()": "pip._internal.utils.logging.MaxLevelFilter",
282 "level": logging.WARNING,
283 },
284 "restrict_to_subprocess": {
285 "()": "logging.Filter",
286 "name": subprocess_logger.name,
287 },
288 "exclude_subprocess": {
289 "()": "pip._internal.utils.logging.ExcludeLoggerFilter",
290 "name": subprocess_logger.name,
291 },
292 },
293 "formatters": {
294 "indent": {
295 "()": IndentingFormatter,
296 "format": "%(message)s",
297 },
298 "indent_with_timestamp": {
299 "()": IndentingFormatter,
300 "format": "%(message)s",
301 "add_timestamp": True,
302 },
303 },
304 "handlers": {
305 "console": {
306 "level": level,
307 "class": handler_classes["stream"],
308 "no_color": no_color,
309 "stream": log_streams["stdout"],
310 "filters": ["exclude_subprocess", "exclude_warnings"],
311 "formatter": "indent",
312 },
313 "console_errors": {
314 "level": "WARNING",
315 "class": handler_classes["stream"],
316 "no_color": no_color,
317 "stream": log_streams["stderr"],
318 "filters": ["exclude_subprocess"],
319 "formatter": "indent",
320 },
321 # A handler responsible for logging to the console messages
322 # from the "subprocessor" logger.
323 "console_subprocess": {
324 "level": level,
325 "class": handler_classes["stream"],
326 "stream": log_streams["stderr"],
327 "no_color": no_color,
328 "filters": ["restrict_to_subprocess"],
329 "formatter": "indent",
330 },
331 "user_log": {
332 "level": "DEBUG",
333 "class": handler_classes["file"],
334 "filename": additional_log_file,
335 "encoding": "utf-8",
336 "delay": True,
337 "formatter": "indent_with_timestamp",
338 },
339 },
340 "root": {
341 "level": root_level,
342 "handlers": handlers,
343 },
344 "loggers": {"pip._vendor": {"level": vendored_log_level}},
345 }
346 )
348 return level_number