Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/utils/log/logging_mixin.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

131 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import abc 

21import enum 

22import logging 

23import re 

24import sys 

25from io import TextIOBase, UnsupportedOperation 

26from logging import Handler, StreamHandler 

27from typing import IO, TYPE_CHECKING, Any, TypeVar, cast 

28 

29import structlog 

30 

31if TYPE_CHECKING: 

32 from airflow._shared.logging.types import Logger 

33 

34# 7-bit C1 ANSI escape sequences 

35ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") 

36 

37 

38# Private: A sentinel objects 

39class SetContextPropagate(enum.Enum): 

40 """ 

41 Sentinel objects for log propagation contexts. 

42 

43 :meta private: 

44 """ 

45 

46 # If a `set_context` function wants to _keep_ propagation set on its logger it needs to return this 

47 # special value. 

48 MAINTAIN_PROPAGATE = object() 

49 # Don't use this one anymore! 

50 DISABLE_PROPAGATE = object() 

51 

52 

53def __getattr__(name): 

54 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"): 

55 # Compat for spelling on off chance someone is using this directly 

56 # And old object that isn't needed anymore 

57 return SetContextPropagate.DISABLE_PROPAGATE 

58 raise AttributeError(f"module {__name__} has no attribute {name}") 

59 

60 

61def remove_escape_codes(text: str) -> str: 

62 """Remove ANSI escapes codes from string; used to remove "colors" from log messages.""" 

63 return ANSI_ESCAPE.sub("", text) 

64 

65 

66_T = TypeVar("_T") 

67 

68 

69class LoggingMixin: 

70 """Convenience super-class to have a logger configured with the class name.""" 

71 

72 _log: Logger | None = None 

73 

74 # Parent logger used by this class. It should match one of the loggers defined in the 

75 # `logging_config_class`. By default, this attribute is used to create the final name of the logger, and 

76 # will prefix the `_logger_name` with a separating dot. 

77 _log_config_logger_name: str | None = None 

78 

79 _logger_name: str | None = None 

80 

81 def __init__(self, context=None): 

82 self._set_context(context) 

83 super().__init__() 

84 

85 @staticmethod 

86 def _create_logger_name( 

87 logged_class: type[_T], 

88 log_config_logger_name: str | None = None, 

89 class_logger_name: str | None = None, 

90 ) -> str: 

91 """ 

92 Generate a logger name for the given `logged_class`. 

93 

94 By default, this function returns the `class_logger_name` as logger name. If it is not provided, 

95 the {class.__module__}.{class.__name__} is returned instead. When a `parent_logger_name` is provided, 

96 it will prefix the logger name with a separating dot. 

97 """ 

98 logger_name: str = ( 

99 class_logger_name 

100 if class_logger_name is not None 

101 else f"{logged_class.__module__}.{logged_class.__name__}" 

102 ) 

103 

104 if log_config_logger_name: 

105 return f"{log_config_logger_name}.{logger_name}" if logger_name else log_config_logger_name 

106 return logger_name 

107 

108 @classmethod 

109 def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger: 

110 if obj._log is None: 

111 logger_name: str = cls._create_logger_name( 

112 logged_class=clazz, 

113 log_config_logger_name=obj._log_config_logger_name, 

114 class_logger_name=obj._logger_name, 

115 ) 

116 obj._log = structlog.get_logger(logger_name) 

117 return obj._log 

118 

119 @classmethod 

120 def logger(cls) -> Logger: 

121 """Return a logger.""" 

122 return LoggingMixin._get_log(cls, cls) 

123 

124 @property 

125 def log(self) -> Logger: 

126 """Return a logger.""" 

127 return LoggingMixin._get_log(self, self.__class__) 

128 

129 def _set_context(self, context): ... 

130 

131 

132class ExternalLoggingMixin(metaclass=abc.ABCMeta): 

133 """Define a log handler based on an external service (e.g. ELK, StackDriver).""" 

134 

135 @property 

136 @abc.abstractmethod 

137 def log_name(self) -> str: 

138 """Return log name.""" 

139 

140 @abc.abstractmethod 

141 def get_external_log_url(self, task_instance, try_number) -> str: 

142 """Return the URL for log visualization in the external service.""" 

143 

144 @property 

145 @abc.abstractmethod 

146 def supports_external_link(self) -> bool: 

147 """Return whether handler is able to support external links.""" 

148 

149 

150# We have to ignore typing errors here because Python I/O classes are a mess, and they do not 

151# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle 

152# While it is ok to make your class derive from TextIOBase (and its good thing to do as they provide 

153# base implementation for IO-implementing classes, it's impossible to make them work with 

154# IO generics (and apparently it has not even been intended) 

155# See more: https://giters.com/python/typeshed/issues/6077 

156class StreamLogWriter(TextIOBase, IO[str]): 

157 """ 

158 Allows to redirect stdout and stderr to logger. 

159 

160 :param logger: The logging.Logger instance to write to 

161 :param level: The log level method to write to, ie. logging.DEBUG, logging.WARNING 

162 """ 

163 

164 encoding = "undefined" 

165 

166 @property 

167 def mode(self): 

168 return "w" 

169 

170 @property 

171 def name(self): 

172 return f"<logger: {self.logger.name}>" 

173 

174 def writable(self): 

175 return True 

176 

177 def readable(self): 

178 return False 

179 

180 def seekable(self): 

181 return False 

182 

183 def fileno(self): 

184 raise UnsupportedOperation("fileno") 

185 

186 def __init__(self, logger, level): 

187 self.logger = logger 

188 self.level = level 

189 self._buffer = "" 

190 

191 def close(self): 

192 """ 

193 Provide close method, for compatibility with the io.IOBase interface. 

194 

195 This is a no-op method. 

196 """ 

197 

198 @property 

199 def closed(self): 

200 """ 

201 Return False to indicate that the stream is not closed. 

202 

203 Streams will be open for the duration of Airflow's lifecycle. 

204 

205 For compatibility with the io.IOBase interface. 

206 """ 

207 return False 

208 

209 def _propagate_log(self, message): 

210 """Propagate message removing escape codes.""" 

211 self.logger.log(self.level, remove_escape_codes(message)) 

212 

213 def write(self, message): 

214 """ 

215 Do whatever it takes to actually log the specified logging record. 

216 

217 :param message: message to log 

218 """ 

219 if message.endswith("\n"): 

220 message = message.rstrip() 

221 self._buffer += message 

222 self.flush() 

223 else: 

224 self._buffer += message 

225 

226 return len(message) 

227 

228 def flush(self): 

229 """Ensure all logging output has been flushed.""" 

230 buf = self._buffer 

231 if buf: 

232 self._buffer = "" 

233 self._propagate_log(buf) 

234 

235 def isatty(self): 

236 """ 

237 Return False to indicate the fd is not connected to a tty(-like) device. 

238 

239 For compatibility reasons. 

240 """ 

241 return False 

242 

243 

244class RedirectStdHandler(StreamHandler): 

245 """ 

246 Custom StreamHandler that uses current sys.stderr/stdout as the stream for logging. 

247 

248 This class is like a StreamHandler using sys.stderr/stdout, but uses 

249 whatever sys.stderr/stdout is currently set to rather than the value of 

250 sys.stderr/stdout at handler construction time, except when running a 

251 task in a kubernetes executor pod. 

252 """ 

253 

254 def __init__(self, stream): 

255 if not isinstance(stream, str): 

256 raise TypeError( 

257 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'." 

258 ) 

259 

260 self._use_stderr = True 

261 if "stdout" in stream: 

262 self._use_stderr = False 

263 self._orig_stream = sys.stdout 

264 else: 

265 self._orig_stream = sys.stderr 

266 # StreamHandler tries to set self.stream 

267 Handler.__init__(self) 

268 

269 @property 

270 def stream(self): 

271 """Returns current stream.""" 

272 from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD 

273 

274 if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER: 

275 return self._orig_stream 

276 if self._use_stderr: 

277 return sys.stderr 

278 

279 return sys.stdout 

280 

281 

282def set_context(logger, value): 

283 """ 

284 Walk the tree of loggers and try to set the context for each handler. 

285 

286 :param logger: logger 

287 :param value: value to set 

288 """ 

289 if not isinstance(logger, logging.Logger): 

290 # This fn doesn't make sense for structlog based handlers 

291 return 

292 

293 while logger: 

294 orig_propagate = logger.propagate 

295 for handler in logger.handlers: 

296 # Not all handlers need to have context passed in so we ignore 

297 # the error when handlers do not have set_context defined. 

298 

299 # Don't use getatrr so we have type checking. And we don't care if handler is actually a 

300 # FileTaskHandler, it just needs to have a set_context function! 

301 if hasattr(handler, "set_context"): 

302 from airflow.utils.log.file_task_handler import FileTaskHandler # noqa: TC001 

303 

304 flag = cast("FileTaskHandler", handler).set_context(value) 

305 # By default we disable propagate once we have configured the logger, unless that handler 

306 # explicitly asks us to keep it on. 

307 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE: 

308 logger.propagate = False 

309 if orig_propagate is True: 

310 # If we were set to propagate before we turned if off, then keep passing set_context up 

311 logger = logger.parent 

312 else: 

313 break