Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py: 64%

89 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import abc 

21import enum 

22import logging 

23import re 

24import sys 

25from io import IOBase 

26from logging import Handler, Logger, StreamHandler 

27from typing import IO, cast 

28 

29# 7-bit C1 ANSI escape sequences 

30ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") 

31 

32 

33# Private: A sentinel objects 

34class SetContextPropagate(enum.Enum): 

35 """:meta private:""" 

36 

37 # If a `set_context` function wants to _keep_ propagation set on it's logger it needs to return this 

38 # special value. 

39 MAINTAIN_PROPAGATE = object() 

40 # Don't use this one anymore! 

41 DISABLE_PROPAGATE = object() 

42 

43 

44def __getattr__(name): 

45 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"): 

46 # Compat for spelling on off chance someone is using this directly 

47 # And old object that isn't needed anymore 

48 return SetContextPropagate.DISABLE_PROPAGATE 

49 raise AttributeError(f"module {__name__} has no attribute {name}") 

50 

51 

52def remove_escape_codes(text: str) -> str: 

53 """ 

54 Remove ANSI escapes codes from string. It's used to remove 

55 "colors" from log messages. 

56 """ 

57 return ANSI_ESCAPE.sub("", text) 

58 

59 

60class LoggingMixin: 

61 """Convenience super-class to have a logger configured with the class name""" 

62 

63 _log: logging.Logger | None = None 

64 

65 def __init__(self, context=None): 

66 self._set_context(context) 

67 

68 @property 

69 def log(self) -> Logger: 

70 """Returns a logger.""" 

71 if self._log is None: 

72 self._log = logging.getLogger(self.__class__.__module__ + "." + self.__class__.__name__) 

73 return self._log 

74 

75 def _set_context(self, context): 

76 if context is not None: 

77 set_context(self.log, context) 

78 

79 

80class ExternalLoggingMixin: 

81 """Define a log handler based on an external service (e.g. ELK, StackDriver).""" 

82 

83 @property 

84 @abc.abstractmethod 

85 def log_name(self) -> str: 

86 """Return log name""" 

87 

88 @abc.abstractmethod 

89 def get_external_log_url(self, task_instance, try_number) -> str: 

90 """Return the URL for log visualization in the external service.""" 

91 

92 @property 

93 @abc.abstractmethod 

94 def supports_external_link(self) -> bool: 

95 """Return whether handler is able to support external links.""" 

96 

97 

98# We have to ignore typing errors here because Python I/O classes are a mess, and they do not 

99# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle 

100# While it is ok to make your class derive from IOBase (and its good thing to do as they provide 

101# base implementation for IO-implementing classes, it's impossible to make them work with 

102# IO generics (and apparently it has not even been intended) 

103# See more: https://giters.com/python/typeshed/issues/6077 

104class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc] 

105 """Allows to redirect stdout and stderr to logger""" 

106 

107 encoding: None = None 

108 

109 def __init__(self, logger, level): 

110 """ 

111 :param log: The log level method to write to, ie. log.debug, log.warning 

112 :return: 

113 """ 

114 self.logger = logger 

115 self.level = level 

116 self._buffer = "" 

117 

118 def close(self): 

119 """ 

120 Provide close method, for compatibility with the io.IOBase interface. 

121 

122 This is a no-op method. 

123 """ 

124 

125 @property 

126 def closed(self): 

127 """ 

128 Returns False to indicate that the stream is not closed, as it will be 

129 open for the duration of Airflow's lifecycle. 

130 

131 For compatibility with the io.IOBase interface. 

132 """ 

133 return False 

134 

135 def _propagate_log(self, message): 

136 """Propagate message removing escape codes.""" 

137 self.logger.log(self.level, remove_escape_codes(message)) 

138 

139 def write(self, message): 

140 """ 

141 Do whatever it takes to actually log the specified logging record 

142 

143 :param message: message to log 

144 """ 

145 if not message.endswith("\n"): 

146 self._buffer += message 

147 else: 

148 self._buffer += message.rstrip() 

149 self.flush() 

150 

151 def flush(self): 

152 """Ensure all logging output has been flushed""" 

153 buf = self._buffer 

154 if len(buf) > 0: 

155 self._buffer = "" 

156 self._propagate_log(buf) 

157 

158 def isatty(self): 

159 """ 

160 Returns False to indicate the fd is not connected to a tty(-like) device. 

161 For compatibility reasons. 

162 """ 

163 return False 

164 

165 

166class RedirectStdHandler(StreamHandler): 

167 """ 

168 This class is like a StreamHandler using sys.stderr/stdout, but always uses 

169 whatever sys.stderr/stderr is currently set to rather than the value of 

170 sys.stderr/stdout at handler construction time. 

171 """ 

172 

173 def __init__(self, stream): 

174 if not isinstance(stream, str): 

175 raise Exception( 

176 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'." 

177 ) 

178 

179 self._use_stderr = True 

180 if "stdout" in stream: 

181 self._use_stderr = False 

182 

183 # StreamHandler tries to set self.stream 

184 Handler.__init__(self) 

185 

186 @property 

187 def stream(self): 

188 """Returns current stream.""" 

189 if self._use_stderr: 

190 return sys.stderr 

191 

192 return sys.stdout 

193 

194 

195def set_context(logger, value): 

196 """ 

197 Walks the tree of loggers and tries to set the context for each handler 

198 

199 :param logger: logger 

200 :param value: value to set 

201 """ 

202 while logger: 

203 orig_propagate = logger.propagate 

204 for handler in logger.handlers: 

205 # Not all handlers need to have context passed in so we ignore 

206 # the error when handlers do not have set_context defined. 

207 

208 # Don't use getatrr so we have type checking. And we don't care if handler is actually a 

209 # FileTaskHandler, it just needs to have a set_context function! 

210 if hasattr(handler, "set_context"): 

211 from airflow.utils.log.file_task_handler import FileTaskHandler 

212 

213 flag = cast(FileTaskHandler, handler).set_context(value) 

214 # By default we disable propagate once we have configured the logger, unless that handler 

215 # explicitly asks us to keep it on. 

216 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE: 

217 logger.propagate = False 

218 if orig_propagate is True: 

219 # If we were set to propagate before we turned if off, then keep passing set_context up 

220 logger = logger.parent 

221 else: 

222 break