Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/utils/log/logging_mixin.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

134 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import abc 

21import enum 

22import logging 

23import os 

24import re 

25import sys 

26from io import TextIOBase, UnsupportedOperation 

27from logging import Handler, StreamHandler 

28from typing import IO, TYPE_CHECKING, Any, TypeVar, cast 

29 

30import structlog 

31 

32if TYPE_CHECKING: 

33 from airflow._shared.logging.types import Logger 

34 

35# 7-bit C1 ANSI escape sequences 

36ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") 

37 

38 

39# Private: A sentinel objects 

40class SetContextPropagate(enum.Enum): 

41 """ 

42 Sentinel objects for log propagation contexts. 

43 

44 :meta private: 

45 """ 

46 

47 # If a `set_context` function wants to _keep_ propagation set on its logger it needs to return this 

48 # special value. 

49 MAINTAIN_PROPAGATE = object() 

50 # Don't use this one anymore! 

51 DISABLE_PROPAGATE = object() 

52 

53 

54def __getattr__(name): 

55 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"): 

56 # Compat for spelling on off chance someone is using this directly 

57 # And old object that isn't needed anymore 

58 return SetContextPropagate.DISABLE_PROPAGATE 

59 raise AttributeError(f"module {__name__} has no attribute {name}") 

60 

61 

62def remove_escape_codes(text: str) -> str: 

63 """Remove ANSI escapes codes from string; used to remove "colors" from log messages.""" 

64 return ANSI_ESCAPE.sub("", text) 

65 

66 

67_T = TypeVar("_T") 

68 

69 

70class LoggingMixin: 

71 """Convenience super-class to have a logger configured with the class name.""" 

72 

73 _log: Logger | None = None 

74 

75 # Parent logger used by this class. It should match one of the loggers defined in the 

76 # `logging_config_class`. By default, this attribute is used to create the final name of the logger, and 

77 # will prefix the `_logger_name` with a separating dot. 

78 _log_config_logger_name: str | None = None 

79 

80 _logger_name: str | None = None 

81 

82 def __init__(self, context=None): 

83 self._set_context(context) 

84 super().__init__() 

85 

86 @staticmethod 

87 def _create_logger_name( 

88 logged_class: type[_T], 

89 log_config_logger_name: str | None = None, 

90 class_logger_name: str | None = None, 

91 ) -> str: 

92 """ 

93 Generate a logger name for the given `logged_class`. 

94 

95 By default, this function returns the `class_logger_name` as logger name. If it is not provided, 

96 the {class.__module__}.{class.__name__} is returned instead. When a `parent_logger_name` is provided, 

97 it will prefix the logger name with a separating dot. 

98 """ 

99 logger_name: str = ( 

100 class_logger_name 

101 if class_logger_name is not None 

102 else f"{logged_class.__module__}.{logged_class.__name__}" 

103 ) 

104 

105 if log_config_logger_name: 

106 return f"{log_config_logger_name}.{logger_name}" if logger_name else log_config_logger_name 

107 return logger_name 

108 

109 @classmethod 

110 def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger: 

111 if obj._log is None: 

112 logger_name: str = cls._create_logger_name( 

113 logged_class=clazz, 

114 log_config_logger_name=obj._log_config_logger_name, 

115 class_logger_name=obj._logger_name, 

116 ) 

117 obj._log = structlog.get_logger(logger_name) 

118 return obj._log 

119 

120 @classmethod 

121 def logger(cls) -> Logger: 

122 """Return a logger.""" 

123 return LoggingMixin._get_log(cls, cls) 

124 

125 @property 

126 def log(self) -> Logger: 

127 """Return a logger.""" 

128 return LoggingMixin._get_log(self, self.__class__) 

129 

130 def _set_context(self, context): ... 

131 

132 

133class ExternalLoggingMixin(metaclass=abc.ABCMeta): 

134 """Define a log handler based on an external service (e.g. ELK, StackDriver).""" 

135 

136 @property 

137 @abc.abstractmethod 

138 def log_name(self) -> str: 

139 """Return log name.""" 

140 

141 @abc.abstractmethod 

142 def get_external_log_url(self, task_instance, try_number) -> str: 

143 """Return the URL for log visualization in the external service.""" 

144 

145 @property 

146 @abc.abstractmethod 

147 def supports_external_link(self) -> bool: 

148 """Return whether handler is able to support external links.""" 

149 

150 

151# We have to ignore typing errors here because Python I/O classes are a mess, and they do not 

152# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle 

153# While it is ok to make your class derive from TextIOBase (and its good thing to do as they provide 

154# base implementation for IO-implementing classes, it's impossible to make them work with 

155# IO generics (and apparently it has not even been intended) 

156# See more: https://giters.com/python/typeshed/issues/6077 

157class StreamLogWriter(TextIOBase, IO[str]): 

158 """ 

159 Allows to redirect stdout and stderr to logger. 

160 

161 :param logger: The logging.Logger instance to write to 

162 :param level: The log level method to write to, ie. logging.DEBUG, logging.WARNING 

163 """ 

164 

165 encoding = "undefined" 

166 

167 @property 

168 def mode(self): 

169 return "w" 

170 

171 @property 

172 def name(self): 

173 return f"<logger: {self.logger.name}>" 

174 

175 def writable(self): 

176 return True 

177 

178 def readable(self): 

179 return False 

180 

181 def seekable(self): 

182 return False 

183 

184 def fileno(self): 

185 raise UnsupportedOperation("fileno") 

186 

187 def __init__(self, logger, level): 

188 self.logger = logger 

189 self.level = level 

190 self._buffer = "" 

191 

192 def close(self): 

193 """ 

194 Provide close method, for compatibility with the io.IOBase interface. 

195 

196 This is a no-op method. 

197 """ 

198 

199 @property 

200 def closed(self): 

201 """ 

202 Return False to indicate that the stream is not closed. 

203 

204 Streams will be open for the duration of Airflow's lifecycle. 

205 

206 For compatibility with the io.IOBase interface. 

207 """ 

208 return False 

209 

210 def _propagate_log(self, message): 

211 """Propagate message removing escape codes.""" 

212 self.logger.log(self.level, remove_escape_codes(message)) 

213 

214 def write(self, message): 

215 """ 

216 Do whatever it takes to actually log the specified logging record. 

217 

218 :param message: message to log 

219 """ 

220 if message.endswith("\n"): 

221 message = message.rstrip() 

222 self._buffer += message 

223 self.flush() 

224 else: 

225 self._buffer += message 

226 

227 return len(message) 

228 

229 def flush(self): 

230 """Ensure all logging output has been flushed.""" 

231 buf = self._buffer 

232 if buf: 

233 self._buffer = "" 

234 self._propagate_log(buf) 

235 

236 def isatty(self): 

237 """ 

238 Return False to indicate the fd is not connected to a tty(-like) device. 

239 

240 For compatibility reasons. 

241 """ 

242 return False 

243 

244 

245class RedirectStdHandler(StreamHandler): 

246 """ 

247 Custom StreamHandler that uses current sys.stderr/stdout as the stream for logging. 

248 

249 This class is like a StreamHandler using sys.stderr/stdout, but uses 

250 whatever sys.stderr/stdout is currently set to rather than the value of 

251 sys.stderr/stdout at handler construction time, except when running a 

252 task in a kubernetes executor pod. 

253 """ 

254 

255 def __init__(self, stream): 

256 if not isinstance(stream, str): 

257 raise TypeError( 

258 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'." 

259 ) 

260 

261 self._use_stderr = True 

262 if "stdout" in stream: 

263 self._use_stderr = False 

264 self._orig_stream = sys.stdout 

265 else: 

266 self._orig_stream = sys.stderr 

267 # StreamHandler tries to set self.stream 

268 Handler.__init__(self) 

269 

270 @property 

271 def stream(self): 

272 """Returns current stream.""" 

273 # Executors can set this to true to configure logging correctly for 

274 # containerized executors. 

275 IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", "")) 

276 IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", "")) 

277 """Will be True if running in kubernetes executor pod.""" 

278 

279 if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER: 

280 return self._orig_stream 

281 if self._use_stderr: 

282 return sys.stderr 

283 

284 return sys.stdout 

285 

286 

287def set_context(logger, value): 

288 """ 

289 Walk the tree of loggers and try to set the context for each handler. 

290 

291 :param logger: logger 

292 :param value: value to set 

293 """ 

294 if not isinstance(logger, logging.Logger): 

295 # This fn doesn't make sense for structlog based handlers 

296 return 

297 

298 while logger: 

299 orig_propagate = logger.propagate 

300 for handler in logger.handlers: 

301 # Not all handlers need to have context passed in so we ignore 

302 # the error when handlers do not have set_context defined. 

303 

304 # Don't use getatrr so we have type checking. And we don't care if handler is actually a 

305 # FileTaskHandler, it just needs to have a set_context function! 

306 if hasattr(handler, "set_context"): 

307 from airflow.utils.log.file_task_handler import FileTaskHandler # noqa: TC001 

308 

309 flag = cast("FileTaskHandler", handler).set_context(value) 

310 # By default we disable propagate once we have configured the logger, unless that handler 

311 # explicitly asks us to keep it on. 

312 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE: 

313 logger.propagate = False 

314 if orig_propagate is True: 

315 # If we were set to propagate before we turned if off, then keep passing set_context up 

316 logger = logger.parent 

317 else: 

318 break