Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py: 60%

101 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import abc 

21import enum 

22import logging 

23import re 

24import sys 

25from io import IOBase 

26from logging import Handler, Logger, StreamHandler 

27from typing import IO, Any, TypeVar, cast 

28 

29# 7-bit C1 ANSI escape sequences 

30ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") 

31 

32 

33# Private: A sentinel objects 

34class SetContextPropagate(enum.Enum): 

35 """Sentinel objects for log propagation contexts. 

36 

37 :meta private: 

38 """ 

39 

40 # If a `set_context` function wants to _keep_ propagation set on it's logger it needs to return this 

41 # special value. 

42 MAINTAIN_PROPAGATE = object() 

43 # Don't use this one anymore! 

44 DISABLE_PROPAGATE = object() 

45 

46 

47def __getattr__(name): 

48 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"): 

49 # Compat for spelling on off chance someone is using this directly 

50 # And old object that isn't needed anymore 

51 return SetContextPropagate.DISABLE_PROPAGATE 

52 raise AttributeError(f"module {__name__} has no attribute {name}") 

53 

54 

55def remove_escape_codes(text: str) -> str: 

56 """ 

57 Remove ANSI escapes codes from string. It's used to remove 

58 "colors" from log messages. 

59 """ 

60 return ANSI_ESCAPE.sub("", text) 

61 

62 

63_T = TypeVar("_T") 

64 

65 

66class LoggingMixin: 

67 """Convenience super-class to have a logger configured with the class name.""" 

68 

69 _log: logging.Logger | None = None 

70 

71 def __init__(self, context=None): 

72 self._set_context(context) 

73 

74 @staticmethod 

75 def _get_log(obj: Any, clazz: type[_T]) -> Logger: 

76 if obj._log is None: 

77 obj._log = logging.getLogger(f"{clazz.__module__}.{clazz.__name__}") 

78 return obj._log 

79 

80 @classmethod 

81 def logger(cls) -> Logger: 

82 """Returns a logger.""" 

83 return LoggingMixin._get_log(cls, cls) 

84 

85 @property 

86 def log(self) -> Logger: 

87 """Returns a logger.""" 

88 return LoggingMixin._get_log(self, self.__class__) 

89 

90 def _set_context(self, context): 

91 if context is not None: 

92 set_context(self.log, context) 

93 

94 

95class ExternalLoggingMixin: 

96 """Define a log handler based on an external service (e.g. ELK, StackDriver).""" 

97 

98 @property 

99 @abc.abstractmethod 

100 def log_name(self) -> str: 

101 """Return log name.""" 

102 

103 @abc.abstractmethod 

104 def get_external_log_url(self, task_instance, try_number) -> str: 

105 """Return the URL for log visualization in the external service.""" 

106 

107 @property 

108 @abc.abstractmethod 

109 def supports_external_link(self) -> bool: 

110 """Return whether handler is able to support external links.""" 

111 

112 

113# We have to ignore typing errors here because Python I/O classes are a mess, and they do not 

114# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle 

115# While it is ok to make your class derive from IOBase (and its good thing to do as they provide 

116# base implementation for IO-implementing classes, it's impossible to make them work with 

117# IO generics (and apparently it has not even been intended) 

118# See more: https://giters.com/python/typeshed/issues/6077 

119class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc] 

120 """Allows to redirect stdout and stderr to logger.""" 

121 

122 encoding: None = None 

123 

124 def __init__(self, logger, level): 

125 """ 

126 :param log: The log level method to write to, ie. log.debug, log.warning 

127 :return: 

128 """ 

129 self.logger = logger 

130 self.level = level 

131 self._buffer = "" 

132 

133 def close(self): 

134 """ 

135 Provide close method, for compatibility with the io.IOBase interface. 

136 

137 This is a no-op method. 

138 """ 

139 

140 @property 

141 def closed(self): 

142 """ 

143 Returns False to indicate that the stream is not closed, as it will be 

144 open for the duration of Airflow's lifecycle. 

145 

146 For compatibility with the io.IOBase interface. 

147 """ 

148 return False 

149 

150 def _propagate_log(self, message): 

151 """Propagate message removing escape codes.""" 

152 self.logger.log(self.level, remove_escape_codes(message)) 

153 

154 def write(self, message): 

155 """ 

156 Do whatever it takes to actually log the specified logging record. 

157 

158 :param message: message to log 

159 """ 

160 if not message.endswith("\n"): 

161 self._buffer += message 

162 else: 

163 self._buffer += message.rstrip() 

164 self.flush() 

165 

166 def flush(self): 

167 """Ensure all logging output has been flushed.""" 

168 buf = self._buffer 

169 if len(buf) > 0: 

170 self._buffer = "" 

171 self._propagate_log(buf) 

172 

173 def isatty(self): 

174 """ 

175 Returns False to indicate the fd is not connected to a tty(-like) device. 

176 For compatibility reasons. 

177 """ 

178 return False 

179 

180 

181class RedirectStdHandler(StreamHandler): 

182 """ 

183 This class is like a StreamHandler using sys.stderr/stdout, but uses 

184 whatever sys.stderr/stderr is currently set to rather than the value of 

185 sys.stderr/stdout at handler construction time, except when running a 

186 task in a kubernetes executor pod. 

187 """ 

188 

189 def __init__(self, stream): 

190 if not isinstance(stream, str): 

191 raise Exception( 

192 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'." 

193 ) 

194 

195 self._use_stderr = True 

196 if "stdout" in stream: 

197 self._use_stderr = False 

198 self._orig_stream = sys.stdout 

199 else: 

200 self._orig_stream = sys.stderr 

201 # StreamHandler tries to set self.stream 

202 Handler.__init__(self) 

203 

204 @property 

205 def stream(self): 

206 """Returns current stream.""" 

207 from airflow.settings import IS_K8S_EXECUTOR_POD 

208 

209 if IS_K8S_EXECUTOR_POD: 

210 return self._orig_stream 

211 if self._use_stderr: 

212 return sys.stderr 

213 

214 return sys.stdout 

215 

216 

217def set_context(logger, value): 

218 """ 

219 Walks the tree of loggers and tries to set the context for each handler. 

220 

221 :param logger: logger 

222 :param value: value to set 

223 """ 

224 while logger: 

225 orig_propagate = logger.propagate 

226 for handler in logger.handlers: 

227 # Not all handlers need to have context passed in so we ignore 

228 # the error when handlers do not have set_context defined. 

229 

230 # Don't use getatrr so we have type checking. And we don't care if handler is actually a 

231 # FileTaskHandler, it just needs to have a set_context function! 

232 if hasattr(handler, "set_context"): 

233 from airflow.utils.log.file_task_handler import FileTaskHandler 

234 

235 flag = cast(FileTaskHandler, handler).set_context(value) 

236 # By default we disable propagate once we have configured the logger, unless that handler 

237 # explicitly asks us to keep it on. 

238 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE: 

239 logger.propagate = False 

240 if orig_propagate is True: 

241 # If we were set to propagate before we turned if off, then keep passing set_context up 

242 logger = logger.parent 

243 else: 

244 break