1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import abc
21import enum
22import logging
23import sys
24from io import IOBase
25from logging import Handler, StreamHandler
26from typing import IO, TYPE_CHECKING, Any, Optional, TypeVar, cast
27
28import re2
29
30if TYPE_CHECKING:
31 from logging import Logger
32
33# 7-bit C1 ANSI escape sequences
34ANSI_ESCAPE = re2.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
35
36
37# Private: A sentinel objects
38class SetContextPropagate(enum.Enum):
39 """Sentinel objects for log propagation contexts.
40
41 :meta private:
42 """
43
44 # If a `set_context` function wants to _keep_ propagation set on its logger it needs to return this
45 # special value.
46 MAINTAIN_PROPAGATE = object()
47 # Don't use this one anymore!
48 DISABLE_PROPAGATE = object()
49
50
51def __getattr__(name):
52 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"):
53 # Compat for spelling on off chance someone is using this directly
54 # And old object that isn't needed anymore
55 return SetContextPropagate.DISABLE_PROPAGATE
56 raise AttributeError(f"module {__name__} has no attribute {name}")
57
58
59def remove_escape_codes(text: str) -> str:
60 """Remove ANSI escapes codes from string; used to remove "colors" from log messages."""
61 return ANSI_ESCAPE.sub("", text)
62
63
64_T = TypeVar("_T")
65
66
67class LoggingMixin:
68 """Convenience super-class to have a logger configured with the class name."""
69
70 _log: logging.Logger | None = None
71
72 # Parent logger used by this class. It should match one of the loggers defined in the
73 # `logging_config_class`. By default, this attribute is used to create the final name of the logger, and
74 # will prefix the `_logger_name` with a separating dot.
75 _log_config_logger_name: Optional[str] = None # noqa: UP007
76
77 _logger_name: Optional[str] = None # noqa: UP007
78
79 def __init__(self, context=None):
80 self._set_context(context)
81
82 @staticmethod
83 def _create_logger_name(
84 logged_class: type[_T],
85 log_config_logger_name: str | None = None,
86 class_logger_name: str | None = None,
87 ) -> str:
88 """Generate a logger name for the given `logged_class`.
89
90 By default, this function returns the `class_logger_name` as logger name. If it is not provided,
91 the {class.__module__}.{class.__name__} is returned instead. When a `parent_logger_name` is provided,
92 it will prefix the logger name with a separating dot.
93 """
94 logger_name: str = (
95 class_logger_name
96 if class_logger_name is not None
97 else f"{logged_class.__module__}.{logged_class.__name__}"
98 )
99
100 if log_config_logger_name:
101 return f"{log_config_logger_name}.{logger_name}" if logger_name else log_config_logger_name
102 return logger_name
103
104 @classmethod
105 def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger:
106 if obj._log is None:
107 logger_name: str = cls._create_logger_name(
108 logged_class=clazz,
109 log_config_logger_name=obj._log_config_logger_name,
110 class_logger_name=obj._logger_name,
111 )
112 obj._log = logging.getLogger(logger_name)
113 return obj._log
114
115 @classmethod
116 def logger(cls) -> Logger:
117 """Return a logger."""
118 return LoggingMixin._get_log(cls, cls)
119
120 @property
121 def log(self) -> Logger:
122 """Return a logger."""
123 return LoggingMixin._get_log(self, self.__class__)
124
125 def _set_context(self, context):
126 if context is not None:
127 set_context(self.log, context)
128
129
130class ExternalLoggingMixin:
131 """Define a log handler based on an external service (e.g. ELK, StackDriver)."""
132
133 @property
134 @abc.abstractmethod
135 def log_name(self) -> str:
136 """Return log name."""
137
138 @abc.abstractmethod
139 def get_external_log_url(self, task_instance, try_number) -> str:
140 """Return the URL for log visualization in the external service."""
141
142 @property
143 @abc.abstractmethod
144 def supports_external_link(self) -> bool:
145 """Return whether handler is able to support external links."""
146
147
148# We have to ignore typing errors here because Python I/O classes are a mess, and they do not
149# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle
150# While it is ok to make your class derive from IOBase (and its good thing to do as they provide
151# base implementation for IO-implementing classes, it's impossible to make them work with
152# IO generics (and apparently it has not even been intended)
153# See more: https://giters.com/python/typeshed/issues/6077
154class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc]
155 """
156 Allows to redirect stdout and stderr to logger.
157
158 :param log: The log level method to write to, ie. log.debug, log.warning
159 """
160
161 encoding: None = None
162
163 def __init__(self, logger, level):
164 self.logger = logger
165 self.level = level
166 self._buffer = ""
167
168 def close(self):
169 """
170 Provide close method, for compatibility with the io.IOBase interface.
171
172 This is a no-op method.
173 """
174
175 @property
176 def closed(self):
177 """
178 Return False to indicate that the stream is not closed.
179
180 Streams will be open for the duration of Airflow's lifecycle.
181
182 For compatibility with the io.IOBase interface.
183 """
184 return False
185
186 def _propagate_log(self, message):
187 """Propagate message removing escape codes."""
188 self.logger.log(self.level, remove_escape_codes(message))
189
190 def write(self, message):
191 """
192 Do whatever it takes to actually log the specified logging record.
193
194 :param message: message to log
195 """
196 if not message.endswith("\n"):
197 self._buffer += message
198 else:
199 self._buffer += message.rstrip()
200 self.flush()
201
202 def flush(self):
203 """Ensure all logging output has been flushed."""
204 buf = self._buffer
205 if buf:
206 self._buffer = ""
207 self._propagate_log(buf)
208
209 def isatty(self):
210 """
211 Return False to indicate the fd is not connected to a tty(-like) device.
212
213 For compatibility reasons.
214 """
215 return False
216
217
218class RedirectStdHandler(StreamHandler):
219 """
220 Custom StreamHandler that uses current sys.stderr/stdout as the stream for logging.
221
222 This class is like a StreamHandler using sys.stderr/stdout, but uses
223 whatever sys.stderr/stdout is currently set to rather than the value of
224 sys.stderr/stdout at handler construction time, except when running a
225 task in a kubernetes executor pod.
226 """
227
228 def __init__(self, stream):
229 if not isinstance(stream, str):
230 raise TypeError(
231 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'."
232 )
233
234 self._use_stderr = True
235 if "stdout" in stream:
236 self._use_stderr = False
237 self._orig_stream = sys.stdout
238 else:
239 self._orig_stream = sys.stderr
240 # StreamHandler tries to set self.stream
241 Handler.__init__(self)
242
243 @property
244 def stream(self):
245 """Returns current stream."""
246 from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
247
248 if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
249 return self._orig_stream
250 if self._use_stderr:
251 return sys.stderr
252
253 return sys.stdout
254
255
256def set_context(logger, value):
257 """
258 Walk the tree of loggers and try to set the context for each handler.
259
260 :param logger: logger
261 :param value: value to set
262 """
263 while logger:
264 orig_propagate = logger.propagate
265 for handler in logger.handlers:
266 # Not all handlers need to have context passed in so we ignore
267 # the error when handlers do not have set_context defined.
268
269 # Don't use getatrr so we have type checking. And we don't care if handler is actually a
270 # FileTaskHandler, it just needs to have a set_context function!
271 if hasattr(handler, "set_context"):
272 from airflow.utils.log.file_task_handler import FileTaskHandler
273
274 flag = cast(FileTaskHandler, handler).set_context(value)
275 # By default we disable propagate once we have configured the logger, unless that handler
276 # explicitly asks us to keep it on.
277 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
278 logger.propagate = False
279 if orig_propagate is True:
280 # If we were set to propagate before we turned if off, then keep passing set_context up
281 logger = logger.parent
282 else:
283 break