Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/utils/logging.py: 37%

136 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import contextlib 

2import errno 

3import logging 

4import logging.handlers 

5import os 

6import sys 

7import threading 

8from dataclasses import dataclass 

9from io import TextIOWrapper 

10from logging import Filter 

11from typing import Any, ClassVar, Generator, List, Optional, TextIO, Type 

12 

13from pip._vendor.rich.console import ( 

14 Console, 

15 ConsoleOptions, 

16 ConsoleRenderable, 

17 RenderableType, 

18 RenderResult, 

19 RichCast, 

20) 

21from pip._vendor.rich.highlighter import NullHighlighter 

22from pip._vendor.rich.logging import RichHandler 

23from pip._vendor.rich.segment import Segment 

24from pip._vendor.rich.style import Style 

25 

26from pip._internal.utils._log import VERBOSE, getLogger 

27from pip._internal.utils.compat import WINDOWS 

28from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX 

29from pip._internal.utils.misc import ensure_dir 

30 

31_log_state = threading.local() 

32subprocess_logger = getLogger("pip.subprocessor") 

33 

34 

35class BrokenStdoutLoggingError(Exception): 

36 """ 

37 Raised if BrokenPipeError occurs for the stdout stream while logging. 

38 """ 

39 

40 

41def _is_broken_pipe_error(exc_class: Type[BaseException], exc: BaseException) -> bool: 

42 if exc_class is BrokenPipeError: 

43 return True 

44 

45 # On Windows, a broken pipe can show up as EINVAL rather than EPIPE: 

46 # https://bugs.python.org/issue19612 

47 # https://bugs.python.org/issue30418 

48 if not WINDOWS: 

49 return False 

50 

51 return isinstance(exc, OSError) and exc.errno in (errno.EINVAL, errno.EPIPE) 

52 

53 

54@contextlib.contextmanager 

55def indent_log(num: int = 2) -> Generator[None, None, None]: 

56 """ 

57 A context manager which will cause the log output to be indented for any 

58 log messages emitted inside it. 

59 """ 

60 # For thread-safety 

61 _log_state.indentation = get_indentation() 

62 _log_state.indentation += num 

63 try: 

64 yield 

65 finally: 

66 _log_state.indentation -= num 

67 

68 

69def get_indentation() -> int: 

70 return getattr(_log_state, "indentation", 0) 

71 

72 

73class IndentingFormatter(logging.Formatter): 

74 default_time_format = "%Y-%m-%dT%H:%M:%S" 

75 

76 def __init__( 

77 self, 

78 *args: Any, 

79 add_timestamp: bool = False, 

80 **kwargs: Any, 

81 ) -> None: 

82 """ 

83 A logging.Formatter that obeys the indent_log() context manager. 

84 

85 :param add_timestamp: A bool indicating output lines should be prefixed 

86 with their record's timestamp. 

87 """ 

88 self.add_timestamp = add_timestamp 

89 super().__init__(*args, **kwargs) 

90 

91 def get_message_start(self, formatted: str, levelno: int) -> str: 

92 """ 

93 Return the start of the formatted log message (not counting the 

94 prefix to add to each line). 

95 """ 

96 if levelno < logging.WARNING: 

97 return "" 

98 if formatted.startswith(DEPRECATION_MSG_PREFIX): 

99 # Then the message already has a prefix. We don't want it to 

100 # look like "WARNING: DEPRECATION: ...." 

101 return "" 

102 if levelno < logging.ERROR: 

103 return "WARNING: " 

104 

105 return "ERROR: " 

106 

107 def format(self, record: logging.LogRecord) -> str: 

108 """ 

109 Calls the standard formatter, but will indent all of the log message 

110 lines by our current indentation level. 

111 """ 

112 formatted = super().format(record) 

113 message_start = self.get_message_start(formatted, record.levelno) 

114 formatted = message_start + formatted 

115 

116 prefix = "" 

117 if self.add_timestamp: 

118 prefix = f"{self.formatTime(record)} " 

119 prefix += " " * get_indentation() 

120 formatted = "".join([prefix + line for line in formatted.splitlines(True)]) 

121 return formatted 

122 

123 

124@dataclass 

125class IndentedRenderable: 

126 renderable: RenderableType 

127 indent: int 

128 

129 def __rich_console__( 

130 self, console: Console, options: ConsoleOptions 

131 ) -> RenderResult: 

132 segments = console.render(self.renderable, options) 

133 lines = Segment.split_lines(segments) 

134 for line in lines: 

135 yield Segment(" " * self.indent) 

136 yield from line 

137 yield Segment("\n") 

138 

139 

140class RichPipStreamHandler(RichHandler): 

141 KEYWORDS: ClassVar[Optional[List[str]]] = [] 

142 

143 def __init__(self, stream: Optional[TextIO], no_color: bool) -> None: 

144 super().__init__( 

145 console=Console(file=stream, no_color=no_color, soft_wrap=True), 

146 show_time=False, 

147 show_level=False, 

148 show_path=False, 

149 highlighter=NullHighlighter(), 

150 ) 

151 

152 # Our custom override on Rich's logger, to make things work as we need them to. 

153 def emit(self, record: logging.LogRecord) -> None: 

154 style: Optional[Style] = None 

155 

156 # If we are given a diagnostic error to present, present it with indentation. 

157 assert isinstance(record.args, tuple) 

158 if record.msg == "[present-rich] %s" and len(record.args) == 1: 

159 rich_renderable = record.args[0] 

160 assert isinstance( 

161 rich_renderable, (ConsoleRenderable, RichCast, str) 

162 ), f"{rich_renderable} is not rich-console-renderable" 

163 

164 renderable: RenderableType = IndentedRenderable( 

165 rich_renderable, indent=get_indentation() 

166 ) 

167 else: 

168 message = self.format(record) 

169 renderable = self.render_message(record, message) 

170 if record.levelno is not None: 

171 if record.levelno >= logging.ERROR: 

172 style = Style(color="red") 

173 elif record.levelno >= logging.WARNING: 

174 style = Style(color="yellow") 

175 

176 try: 

177 self.console.print(renderable, overflow="ignore", crop=False, style=style) 

178 except Exception: 

179 self.handleError(record) 

180 

181 def handleError(self, record: logging.LogRecord) -> None: 

182 """Called when logging is unable to log some output.""" 

183 

184 exc_class, exc = sys.exc_info()[:2] 

185 # If a broken pipe occurred while calling write() or flush() on the 

186 # stdout stream in logging's Handler.emit(), then raise our special 

187 # exception so we can handle it in main() instead of logging the 

188 # broken pipe error and continuing. 

189 if ( 

190 exc_class 

191 and exc 

192 and self.console.file is sys.stdout 

193 and _is_broken_pipe_error(exc_class, exc) 

194 ): 

195 raise BrokenStdoutLoggingError() 

196 

197 return super().handleError(record) 

198 

199 

200class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler): 

201 def _open(self) -> TextIOWrapper: 

202 ensure_dir(os.path.dirname(self.baseFilename)) 

203 return super()._open() 

204 

205 

206class MaxLevelFilter(Filter): 

207 def __init__(self, level: int) -> None: 

208 self.level = level 

209 

210 def filter(self, record: logging.LogRecord) -> bool: 

211 return record.levelno < self.level 

212 

213 

214class ExcludeLoggerFilter(Filter): 

215 

216 """ 

217 A logging Filter that excludes records from a logger (or its children). 

218 """ 

219 

220 def filter(self, record: logging.LogRecord) -> bool: 

221 # The base Filter class allows only records from a logger (or its 

222 # children). 

223 return not super().filter(record) 

224 

225 

226def setup_logging(verbosity: int, no_color: bool, user_log_file: Optional[str]) -> int: 

227 """Configures and sets up all of the logging 

228 

229 Returns the requested logging level, as its integer value. 

230 """ 

231 

232 # Determine the level to be logging at. 

233 if verbosity >= 2: 

234 level_number = logging.DEBUG 

235 elif verbosity == 1: 

236 level_number = VERBOSE 

237 elif verbosity == -1: 

238 level_number = logging.WARNING 

239 elif verbosity == -2: 

240 level_number = logging.ERROR 

241 elif verbosity <= -3: 

242 level_number = logging.CRITICAL 

243 else: 

244 level_number = logging.INFO 

245 

246 level = logging.getLevelName(level_number) 

247 

248 # The "root" logger should match the "console" level *unless* we also need 

249 # to log to a user log file. 

250 include_user_log = user_log_file is not None 

251 if include_user_log: 

252 additional_log_file = user_log_file 

253 root_level = "DEBUG" 

254 else: 

255 additional_log_file = "/dev/null" 

256 root_level = level 

257 

258 # Disable any logging besides WARNING unless we have DEBUG level logging 

259 # enabled for vendored libraries. 

260 vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG" 

261 

262 # Shorthands for clarity 

263 log_streams = { 

264 "stdout": "ext://sys.stdout", 

265 "stderr": "ext://sys.stderr", 

266 } 

267 handler_classes = { 

268 "stream": "pip._internal.utils.logging.RichPipStreamHandler", 

269 "file": "pip._internal.utils.logging.BetterRotatingFileHandler", 

270 } 

271 handlers = ["console", "console_errors", "console_subprocess"] + ( 

272 ["user_log"] if include_user_log else [] 

273 ) 

274 

275 logging.config.dictConfig( 

276 { 

277 "version": 1, 

278 "disable_existing_loggers": False, 

279 "filters": { 

280 "exclude_warnings": { 

281 "()": "pip._internal.utils.logging.MaxLevelFilter", 

282 "level": logging.WARNING, 

283 }, 

284 "restrict_to_subprocess": { 

285 "()": "logging.Filter", 

286 "name": subprocess_logger.name, 

287 }, 

288 "exclude_subprocess": { 

289 "()": "pip._internal.utils.logging.ExcludeLoggerFilter", 

290 "name": subprocess_logger.name, 

291 }, 

292 }, 

293 "formatters": { 

294 "indent": { 

295 "()": IndentingFormatter, 

296 "format": "%(message)s", 

297 }, 

298 "indent_with_timestamp": { 

299 "()": IndentingFormatter, 

300 "format": "%(message)s", 

301 "add_timestamp": True, 

302 }, 

303 }, 

304 "handlers": { 

305 "console": { 

306 "level": level, 

307 "class": handler_classes["stream"], 

308 "no_color": no_color, 

309 "stream": log_streams["stdout"], 

310 "filters": ["exclude_subprocess", "exclude_warnings"], 

311 "formatter": "indent", 

312 }, 

313 "console_errors": { 

314 "level": "WARNING", 

315 "class": handler_classes["stream"], 

316 "no_color": no_color, 

317 "stream": log_streams["stderr"], 

318 "filters": ["exclude_subprocess"], 

319 "formatter": "indent", 

320 }, 

321 # A handler responsible for logging to the console messages 

322 # from the "subprocessor" logger. 

323 "console_subprocess": { 

324 "level": level, 

325 "class": handler_classes["stream"], 

326 "stream": log_streams["stderr"], 

327 "no_color": no_color, 

328 "filters": ["restrict_to_subprocess"], 

329 "formatter": "indent", 

330 }, 

331 "user_log": { 

332 "level": "DEBUG", 

333 "class": handler_classes["file"], 

334 "filename": additional_log_file, 

335 "encoding": "utf-8", 

336 "delay": True, 

337 "formatter": "indent_with_timestamp", 

338 }, 

339 }, 

340 "root": { 

341 "level": root_level, 

342 "handlers": handlers, 

343 }, 

344 "loggers": {"pip._vendor": {"level": vendored_log_level}}, 

345 } 

346 ) 

347 

348 return level_number