Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/utils/logging.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

168 statements  

1from __future__ import annotations 

2 

3import contextlib 

4import errno 

5import logging 

6import logging.handlers 

7import os 

8import sys 

9import threading 

10from collections.abc import Generator 

11from dataclasses import dataclass 

12from io import StringIO, TextIOWrapper 

13from logging import Filter 

14from typing import Any, ClassVar 

15 

16from pip._vendor.rich.console import ( 

17 Console, 

18 ConsoleOptions, 

19 ConsoleRenderable, 

20 RenderableType, 

21 RenderResult, 

22 RichCast, 

23) 

24from pip._vendor.rich.highlighter import NullHighlighter 

25from pip._vendor.rich.logging import RichHandler 

26from pip._vendor.rich.segment import Segment 

27from pip._vendor.rich.style import Style 

28 

29from pip._internal.utils._log import VERBOSE, getLogger 

30from pip._internal.utils.compat import WINDOWS 

31from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX 

32from pip._internal.utils.misc import StreamWrapper, ensure_dir 

33 

34_log_state = threading.local() 

35_stdout_console = None 

36_stderr_console = None 

37subprocess_logger = getLogger("pip.subprocessor") 

38 

39 

40class BrokenStdoutLoggingError(Exception): 

41 """ 

42 Raised if BrokenPipeError occurs for the stdout stream while logging. 

43 """ 

44 

45 

46def _is_broken_pipe_error(exc_class: type[BaseException], exc: BaseException) -> bool: 

47 if exc_class is BrokenPipeError: 

48 return True 

49 

50 # On Windows, a broken pipe can show up as EINVAL rather than EPIPE: 

51 # https://bugs.python.org/issue19612 

52 # https://bugs.python.org/issue30418 

53 if not WINDOWS: 

54 return False 

55 

56 return isinstance(exc, OSError) and exc.errno in (errno.EINVAL, errno.EPIPE) 

57 

58 

59@contextlib.contextmanager 

60def capture_logging() -> Generator[StringIO, None, None]: 

61 """Capture all pip logs in a buffer temporarily.""" 

62 # Patching sys.std(out|err) directly is not viable as the caller 

63 # may want to emit non-logging output (e.g. a rich spinner). To 

64 # avoid capturing that, temporarily patch the root logging handlers 

65 # to use new rich consoles that write to a StringIO. 

66 handlers = {} 

67 for handler in logging.getLogger().handlers: 

68 if isinstance(handler, RichPipStreamHandler): 

69 # Also store the handler's original console so it can be 

70 # restored on context exit. 

71 handlers[handler] = handler.console 

72 

73 fake_stream = StreamWrapper.from_stream(sys.stdout) 

74 if not handlers: 

75 yield fake_stream 

76 return 

77 

78 # HACK: grab no_color attribute from a random handler console since 

79 # it's a global option anyway. 

80 no_color = next(iter(handlers.values())).no_color 

81 fake_console = PipConsole(file=fake_stream, no_color=no_color, soft_wrap=True) 

82 try: 

83 for handler in handlers: 

84 handler.console = fake_console 

85 yield fake_stream 

86 finally: 

87 for handler, original_console in handlers.items(): 

88 handler.console = original_console 

89 

90 

91@contextlib.contextmanager 

92def indent_log(num: int = 2) -> Generator[None, None, None]: 

93 """ 

94 A context manager which will cause the log output to be indented for any 

95 log messages emitted inside it. 

96 """ 

97 # For thread-safety 

98 _log_state.indentation = get_indentation() 

99 _log_state.indentation += num 

100 try: 

101 yield 

102 finally: 

103 _log_state.indentation -= num 

104 

105 

106def get_indentation() -> int: 

107 return getattr(_log_state, "indentation", 0) 

108 

109 

110class IndentingFormatter(logging.Formatter): 

111 default_time_format = "%Y-%m-%dT%H:%M:%S" 

112 

113 def __init__( 

114 self, 

115 *args: Any, 

116 add_timestamp: bool = False, 

117 **kwargs: Any, 

118 ) -> None: 

119 """ 

120 A logging.Formatter that obeys the indent_log() context manager. 

121 

122 :param add_timestamp: A bool indicating output lines should be prefixed 

123 with their record's timestamp. 

124 """ 

125 self.add_timestamp = add_timestamp 

126 super().__init__(*args, **kwargs) 

127 

128 def get_message_start(self, formatted: str, levelno: int) -> str: 

129 """ 

130 Return the start of the formatted log message (not counting the 

131 prefix to add to each line). 

132 """ 

133 if levelno < logging.WARNING: 

134 return "" 

135 if formatted.startswith(DEPRECATION_MSG_PREFIX): 

136 # Then the message already has a prefix. We don't want it to 

137 # look like "WARNING: DEPRECATION: ...." 

138 return "" 

139 if levelno < logging.ERROR: 

140 return "WARNING: " 

141 

142 return "ERROR: " 

143 

144 def format(self, record: logging.LogRecord) -> str: 

145 """ 

146 Calls the standard formatter, but will indent all of the log message 

147 lines by our current indentation level. 

148 """ 

149 formatted = super().format(record) 

150 message_start = self.get_message_start(formatted, record.levelno) 

151 formatted = message_start + formatted 

152 

153 prefix = "" 

154 if self.add_timestamp: 

155 prefix = f"{self.formatTime(record)} " 

156 prefix += " " * get_indentation() 

157 formatted = "".join([prefix + line for line in formatted.splitlines(True)]) 

158 return formatted 

159 

160 

161@dataclass 

162class IndentedRenderable: 

163 renderable: RenderableType 

164 indent: int 

165 

166 def __rich_console__( 

167 self, console: Console, options: ConsoleOptions 

168 ) -> RenderResult: 

169 segments = console.render(self.renderable, options) 

170 lines = Segment.split_lines(segments) 

171 for line in lines: 

172 yield Segment(" " * self.indent) 

173 yield from line 

174 yield Segment("\n") 

175 

176 

177class PipConsole(Console): 

178 def on_broken_pipe(self) -> None: 

179 # Reraise the original exception, rich 13.8.0+ exits by default 

180 # instead, preventing our handler from firing. 

181 raise BrokenPipeError() from None 

182 

183 

184def get_console(*, stderr: bool = False) -> Console: 

185 if stderr: 

186 assert _stderr_console is not None, "stderr rich console is missing!" 

187 return _stderr_console 

188 else: 

189 assert _stdout_console is not None, "stdout rich console is missing!" 

190 return _stdout_console 

191 

192 

193class RichPipStreamHandler(RichHandler): 

194 KEYWORDS: ClassVar[list[str] | None] = [] 

195 

196 def __init__(self, console: Console) -> None: 

197 super().__init__( 

198 console=console, 

199 show_time=False, 

200 show_level=False, 

201 show_path=False, 

202 highlighter=NullHighlighter(), 

203 ) 

204 

205 # Our custom override on Rich's logger, to make things work as we need them to. 

206 def emit(self, record: logging.LogRecord) -> None: 

207 style: Style | None = None 

208 

209 # If we are given a diagnostic error to present, present it with indentation. 

210 if getattr(record, "rich", False): 

211 assert isinstance(record.args, tuple) 

212 (rich_renderable,) = record.args 

213 assert isinstance( 

214 rich_renderable, (ConsoleRenderable, RichCast, str) 

215 ), f"{rich_renderable} is not rich-console-renderable" 

216 

217 renderable: RenderableType = IndentedRenderable( 

218 rich_renderable, indent=get_indentation() 

219 ) 

220 else: 

221 message = self.format(record) 

222 renderable = self.render_message(record, message) 

223 if record.levelno is not None: 

224 if record.levelno >= logging.ERROR: 

225 style = Style(color="red") 

226 elif record.levelno >= logging.WARNING: 

227 style = Style(color="yellow") 

228 

229 try: 

230 self.console.print(renderable, overflow="ignore", crop=False, style=style) 

231 except Exception: 

232 self.handleError(record) 

233 

234 def handleError(self, record: logging.LogRecord) -> None: 

235 """Called when logging is unable to log some output.""" 

236 

237 exc_class, exc = sys.exc_info()[:2] 

238 # If a broken pipe occurred while calling write() or flush() on the 

239 # stdout stream in logging's Handler.emit(), then raise our special 

240 # exception so we can handle it in main() instead of logging the 

241 # broken pipe error and continuing. 

242 if ( 

243 exc_class 

244 and exc 

245 and self.console.file is sys.stdout 

246 and _is_broken_pipe_error(exc_class, exc) 

247 ): 

248 raise BrokenStdoutLoggingError() 

249 

250 return super().handleError(record) 

251 

252 

253class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler): 

254 def _open(self) -> TextIOWrapper: 

255 ensure_dir(os.path.dirname(self.baseFilename)) 

256 return super()._open() 

257 

258 

259class MaxLevelFilter(Filter): 

260 def __init__(self, level: int) -> None: 

261 self.level = level 

262 

263 def filter(self, record: logging.LogRecord) -> bool: 

264 return record.levelno < self.level 

265 

266 

267class ExcludeLoggerFilter(Filter): 

268 """ 

269 A logging Filter that excludes records from a logger (or its children). 

270 """ 

271 

272 def filter(self, record: logging.LogRecord) -> bool: 

273 # The base Filter class allows only records from a logger (or its 

274 # children). 

275 return not super().filter(record) 

276 

277 

278def setup_logging(verbosity: int, no_color: bool, user_log_file: str | None) -> int: 

279 """Configures and sets up all of the logging 

280 

281 Returns the requested logging level, as its integer value. 

282 """ 

283 

284 # Determine the level to be logging at. 

285 if verbosity >= 2: 

286 level_number = logging.DEBUG 

287 elif verbosity == 1: 

288 level_number = VERBOSE 

289 elif verbosity == -1: 

290 level_number = logging.WARNING 

291 elif verbosity == -2: 

292 level_number = logging.ERROR 

293 elif verbosity <= -3: 

294 level_number = logging.CRITICAL 

295 else: 

296 level_number = logging.INFO 

297 

298 level = logging.getLevelName(level_number) 

299 

300 # The "root" logger should match the "console" level *unless* we also need 

301 # to log to a user log file. 

302 include_user_log = user_log_file is not None 

303 if include_user_log: 

304 additional_log_file = user_log_file 

305 root_level = "DEBUG" 

306 else: 

307 additional_log_file = "/dev/null" 

308 root_level = level 

309 

310 # Disable any logging besides WARNING unless we have DEBUG level logging 

311 # enabled for vendored libraries. 

312 vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG" 

313 

314 # Shorthands for clarity 

315 handler_classes = { 

316 "stream": "pip._internal.utils.logging.RichPipStreamHandler", 

317 "file": "pip._internal.utils.logging.BetterRotatingFileHandler", 

318 } 

319 handlers = ["console", "console_errors", "console_subprocess"] + ( 

320 ["user_log"] if include_user_log else [] 

321 ) 

322 global _stdout_console, stderr_console 

323 _stdout_console = PipConsole(file=sys.stdout, no_color=no_color, soft_wrap=True) 

324 _stderr_console = PipConsole(file=sys.stderr, no_color=no_color, soft_wrap=True) 

325 

326 logging.config.dictConfig( 

327 { 

328 "version": 1, 

329 "disable_existing_loggers": False, 

330 "filters": { 

331 "exclude_warnings": { 

332 "()": "pip._internal.utils.logging.MaxLevelFilter", 

333 "level": logging.WARNING, 

334 }, 

335 "restrict_to_subprocess": { 

336 "()": "logging.Filter", 

337 "name": subprocess_logger.name, 

338 }, 

339 "exclude_subprocess": { 

340 "()": "pip._internal.utils.logging.ExcludeLoggerFilter", 

341 "name": subprocess_logger.name, 

342 }, 

343 }, 

344 "formatters": { 

345 "indent": { 

346 "()": IndentingFormatter, 

347 "format": "%(message)s", 

348 }, 

349 "indent_with_timestamp": { 

350 "()": IndentingFormatter, 

351 "format": "%(message)s", 

352 "add_timestamp": True, 

353 }, 

354 }, 

355 "handlers": { 

356 "console": { 

357 "level": level, 

358 "class": handler_classes["stream"], 

359 "console": _stdout_console, 

360 "filters": ["exclude_subprocess", "exclude_warnings"], 

361 "formatter": "indent", 

362 }, 

363 "console_errors": { 

364 "level": "WARNING", 

365 "class": handler_classes["stream"], 

366 "console": _stderr_console, 

367 "filters": ["exclude_subprocess"], 

368 "formatter": "indent", 

369 }, 

370 # A handler responsible for logging to the console messages 

371 # from the "subprocessor" logger. 

372 "console_subprocess": { 

373 "level": level, 

374 "class": handler_classes["stream"], 

375 "console": _stderr_console, 

376 "filters": ["restrict_to_subprocess"], 

377 "formatter": "indent", 

378 }, 

379 "user_log": { 

380 "level": "DEBUG", 

381 "class": handler_classes["file"], 

382 "filename": additional_log_file, 

383 "encoding": "utf-8", 

384 "delay": True, 

385 "formatter": "indent_with_timestamp", 

386 }, 

387 }, 

388 "root": { 

389 "level": root_level, 

390 "handlers": handlers, 

391 }, 

392 "loggers": {"pip._vendor": {"level": vendored_log_level}}, 

393 } 

394 ) 

395 

396 return level_number