Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

112 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import abc 

21import enum 

22import logging 

23import sys 

24from io import IOBase 

25from logging import Handler, StreamHandler 

26from typing import IO, TYPE_CHECKING, Any, Optional, TypeVar, cast 

27 

28import re2 

29 

30if TYPE_CHECKING: 

31 from logging import Logger 

32 

33# 7-bit C1 ANSI escape sequences 

34ANSI_ESCAPE = re2.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") 

35 

36 

37# Private: A sentinel objects 

38class SetContextPropagate(enum.Enum): 

39 """Sentinel objects for log propagation contexts. 

40 

41 :meta private: 

42 """ 

43 

44 # If a `set_context` function wants to _keep_ propagation set on its logger it needs to return this 

45 # special value. 

46 MAINTAIN_PROPAGATE = object() 

47 # Don't use this one anymore! 

48 DISABLE_PROPAGATE = object() 

49 

50 

51def __getattr__(name): 

52 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"): 

53 # Compat for spelling on off chance someone is using this directly 

54 # And old object that isn't needed anymore 

55 return SetContextPropagate.DISABLE_PROPAGATE 

56 raise AttributeError(f"module {__name__} has no attribute {name}") 

57 

58 

59def remove_escape_codes(text: str) -> str: 

60 """Remove ANSI escapes codes from string; used to remove "colors" from log messages.""" 

61 return ANSI_ESCAPE.sub("", text) 

62 

63 

64_T = TypeVar("_T") 

65 

66 

67class LoggingMixin: 

68 """Convenience super-class to have a logger configured with the class name.""" 

69 

70 _log: logging.Logger | None = None 

71 

72 # Parent logger used by this class. It should match one of the loggers defined in the 

73 # `logging_config_class`. By default, this attribute is used to create the final name of the logger, and 

74 # will prefix the `_logger_name` with a separating dot. 

75 _log_config_logger_name: Optional[str] = None # noqa: UP007 

76 

77 _logger_name: Optional[str] = None # noqa: UP007 

78 

79 def __init__(self, context=None): 

80 self._set_context(context) 

81 

82 @staticmethod 

83 def _create_logger_name( 

84 logged_class: type[_T], 

85 log_config_logger_name: str | None = None, 

86 class_logger_name: str | None = None, 

87 ) -> str: 

88 """Generate a logger name for the given `logged_class`. 

89 

90 By default, this function returns the `class_logger_name` as logger name. If it is not provided, 

91 the {class.__module__}.{class.__name__} is returned instead. When a `parent_logger_name` is provided, 

92 it will prefix the logger name with a separating dot. 

93 """ 

94 logger_name: str = ( 

95 class_logger_name 

96 if class_logger_name is not None 

97 else f"{logged_class.__module__}.{logged_class.__name__}" 

98 ) 

99 

100 if log_config_logger_name: 

101 return f"{log_config_logger_name}.{logger_name}" if logger_name else log_config_logger_name 

102 return logger_name 

103 

104 @classmethod 

105 def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger: 

106 if obj._log is None: 

107 logger_name: str = cls._create_logger_name( 

108 logged_class=clazz, 

109 log_config_logger_name=obj._log_config_logger_name, 

110 class_logger_name=obj._logger_name, 

111 ) 

112 obj._log = logging.getLogger(logger_name) 

113 return obj._log 

114 

115 @classmethod 

116 def logger(cls) -> Logger: 

117 """Return a logger.""" 

118 return LoggingMixin._get_log(cls, cls) 

119 

120 @property 

121 def log(self) -> Logger: 

122 """Return a logger.""" 

123 return LoggingMixin._get_log(self, self.__class__) 

124 

125 def _set_context(self, context): 

126 if context is not None: 

127 set_context(self.log, context) 

128 

129 

130class ExternalLoggingMixin: 

131 """Define a log handler based on an external service (e.g. ELK, StackDriver).""" 

132 

133 @property 

134 @abc.abstractmethod 

135 def log_name(self) -> str: 

136 """Return log name.""" 

137 

138 @abc.abstractmethod 

139 def get_external_log_url(self, task_instance, try_number) -> str: 

140 """Return the URL for log visualization in the external service.""" 

141 

142 @property 

143 @abc.abstractmethod 

144 def supports_external_link(self) -> bool: 

145 """Return whether handler is able to support external links.""" 

146 

147 

148# We have to ignore typing errors here because Python I/O classes are a mess, and they do not 

149# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle 

150# While it is ok to make your class derive from IOBase (and its good thing to do as they provide 

151# base implementation for IO-implementing classes, it's impossible to make them work with 

152# IO generics (and apparently it has not even been intended) 

153# See more: https://giters.com/python/typeshed/issues/6077 

154class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc] 

155 """ 

156 Allows to redirect stdout and stderr to logger. 

157 

158 :param log: The log level method to write to, ie. log.debug, log.warning 

159 """ 

160 

161 encoding: None = None 

162 

163 def __init__(self, logger, level): 

164 self.logger = logger 

165 self.level = level 

166 self._buffer = "" 

167 

168 def close(self): 

169 """ 

170 Provide close method, for compatibility with the io.IOBase interface. 

171 

172 This is a no-op method. 

173 """ 

174 

175 @property 

176 def closed(self): 

177 """ 

178 Return False to indicate that the stream is not closed. 

179 

180 Streams will be open for the duration of Airflow's lifecycle. 

181 

182 For compatibility with the io.IOBase interface. 

183 """ 

184 return False 

185 

186 def _propagate_log(self, message): 

187 """Propagate message removing escape codes.""" 

188 self.logger.log(self.level, remove_escape_codes(message)) 

189 

190 def write(self, message): 

191 """ 

192 Do whatever it takes to actually log the specified logging record. 

193 

194 :param message: message to log 

195 """ 

196 if not message.endswith("\n"): 

197 self._buffer += message 

198 else: 

199 self._buffer += message.rstrip() 

200 self.flush() 

201 

202 def flush(self): 

203 """Ensure all logging output has been flushed.""" 

204 buf = self._buffer 

205 if buf: 

206 self._buffer = "" 

207 self._propagate_log(buf) 

208 

209 def isatty(self): 

210 """ 

211 Return False to indicate the fd is not connected to a tty(-like) device. 

212 

213 For compatibility reasons. 

214 """ 

215 return False 

216 

217 

218class RedirectStdHandler(StreamHandler): 

219 """ 

220 Custom StreamHandler that uses current sys.stderr/stdout as the stream for logging. 

221 

222 This class is like a StreamHandler using sys.stderr/stdout, but uses 

223 whatever sys.stderr/stdout is currently set to rather than the value of 

224 sys.stderr/stdout at handler construction time, except when running a 

225 task in a kubernetes executor pod. 

226 """ 

227 

228 def __init__(self, stream): 

229 if not isinstance(stream, str): 

230 raise TypeError( 

231 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'." 

232 ) 

233 

234 self._use_stderr = True 

235 if "stdout" in stream: 

236 self._use_stderr = False 

237 self._orig_stream = sys.stdout 

238 else: 

239 self._orig_stream = sys.stderr 

240 # StreamHandler tries to set self.stream 

241 Handler.__init__(self) 

242 

243 @property 

244 def stream(self): 

245 """Returns current stream.""" 

246 from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD 

247 

248 if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER: 

249 return self._orig_stream 

250 if self._use_stderr: 

251 return sys.stderr 

252 

253 return sys.stdout 

254 

255 

256def set_context(logger, value): 

257 """ 

258 Walk the tree of loggers and try to set the context for each handler. 

259 

260 :param logger: logger 

261 :param value: value to set 

262 """ 

263 while logger: 

264 orig_propagate = logger.propagate 

265 for handler in logger.handlers: 

266 # Not all handlers need to have context passed in so we ignore 

267 # the error when handlers do not have set_context defined. 

268 

269 # Don't use getatrr so we have type checking. And we don't care if handler is actually a 

270 # FileTaskHandler, it just needs to have a set_context function! 

271 if hasattr(handler, "set_context"): 

272 from airflow.utils.log.file_task_handler import FileTaskHandler 

273 

274 flag = cast(FileTaskHandler, handler).set_context(value) 

275 # By default we disable propagate once we have configured the logger, unless that handler 

276 # explicitly asks us to keep it on. 

277 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE: 

278 logger.propagate = False 

279 if orig_propagate is True: 

280 # If we were set to propagate before we turned if off, then keep passing set_context up 

281 logger = logger.parent 

282 else: 

283 break