1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import abc
21import enum
22import logging
23import os
24import re
25import sys
26from io import TextIOBase, UnsupportedOperation
27from logging import Handler, StreamHandler
28from typing import IO, TYPE_CHECKING, Any, TypeVar, cast
29
30import structlog
31
32if TYPE_CHECKING:
33 from airflow._shared.logging.types import Logger
34
35# 7-bit C1 ANSI escape sequences
36ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
37
38
39# Private: A sentinel objects
40class SetContextPropagate(enum.Enum):
41 """
42 Sentinel objects for log propagation contexts.
43
44 :meta private:
45 """
46
47 # If a `set_context` function wants to _keep_ propagation set on its logger it needs to return this
48 # special value.
49 MAINTAIN_PROPAGATE = object()
50 # Don't use this one anymore!
51 DISABLE_PROPAGATE = object()
52
53
54def __getattr__(name):
55 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"):
56 # Compat for spelling on off chance someone is using this directly
57 # And old object that isn't needed anymore
58 return SetContextPropagate.DISABLE_PROPAGATE
59 raise AttributeError(f"module {__name__} has no attribute {name}")
60
61
62def remove_escape_codes(text: str) -> str:
63 """Remove ANSI escapes codes from string; used to remove "colors" from log messages."""
64 return ANSI_ESCAPE.sub("", text)
65
66
67_T = TypeVar("_T")
68
69
70class LoggingMixin:
71 """Convenience super-class to have a logger configured with the class name."""
72
73 _log: Logger | None = None
74
75 # Parent logger used by this class. It should match one of the loggers defined in the
76 # `logging_config_class`. By default, this attribute is used to create the final name of the logger, and
77 # will prefix the `_logger_name` with a separating dot.
78 _log_config_logger_name: str | None = None
79
80 _logger_name: str | None = None
81
82 def __init__(self, context=None):
83 self._set_context(context)
84 super().__init__()
85
86 @staticmethod
87 def _create_logger_name(
88 logged_class: type[_T],
89 log_config_logger_name: str | None = None,
90 class_logger_name: str | None = None,
91 ) -> str:
92 """
93 Generate a logger name for the given `logged_class`.
94
95 By default, this function returns the `class_logger_name` as logger name. If it is not provided,
96 the {class.__module__}.{class.__name__} is returned instead. When a `parent_logger_name` is provided,
97 it will prefix the logger name with a separating dot.
98 """
99 logger_name: str = (
100 class_logger_name
101 if class_logger_name is not None
102 else f"{logged_class.__module__}.{logged_class.__name__}"
103 )
104
105 if log_config_logger_name:
106 return f"{log_config_logger_name}.{logger_name}" if logger_name else log_config_logger_name
107 return logger_name
108
109 @classmethod
110 def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger:
111 if obj._log is None:
112 logger_name: str = cls._create_logger_name(
113 logged_class=clazz,
114 log_config_logger_name=obj._log_config_logger_name,
115 class_logger_name=obj._logger_name,
116 )
117 obj._log = structlog.get_logger(logger_name)
118 return obj._log
119
120 @classmethod
121 def logger(cls) -> Logger:
122 """Return a logger."""
123 return LoggingMixin._get_log(cls, cls)
124
125 @property
126 def log(self) -> Logger:
127 """Return a logger."""
128 return LoggingMixin._get_log(self, self.__class__)
129
130 def _set_context(self, context): ...
131
132
133class ExternalLoggingMixin(metaclass=abc.ABCMeta):
134 """Define a log handler based on an external service (e.g. ELK, StackDriver)."""
135
136 @property
137 @abc.abstractmethod
138 def log_name(self) -> str:
139 """Return log name."""
140
141 @abc.abstractmethod
142 def get_external_log_url(self, task_instance, try_number) -> str:
143 """Return the URL for log visualization in the external service."""
144
145 @property
146 @abc.abstractmethod
147 def supports_external_link(self) -> bool:
148 """Return whether handler is able to support external links."""
149
150
151# We have to ignore typing errors here because Python I/O classes are a mess, and they do not
152# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle
153# While it is ok to make your class derive from TextIOBase (and its good thing to do as they provide
154# base implementation for IO-implementing classes, it's impossible to make them work with
155# IO generics (and apparently it has not even been intended)
156# See more: https://giters.com/python/typeshed/issues/6077
157class StreamLogWriter(TextIOBase, IO[str]):
158 """
159 Allows to redirect stdout and stderr to logger.
160
161 :param logger: The logging.Logger instance to write to
162 :param level: The log level method to write to, ie. logging.DEBUG, logging.WARNING
163 """
164
165 encoding = "undefined"
166
167 @property
168 def mode(self):
169 return "w"
170
171 @property
172 def name(self):
173 return f"<logger: {self.logger.name}>"
174
175 def writable(self):
176 return True
177
178 def readable(self):
179 return False
180
181 def seekable(self):
182 return False
183
184 def fileno(self):
185 raise UnsupportedOperation("fileno")
186
187 def __init__(self, logger, level):
188 self.logger = logger
189 self.level = level
190 self._buffer = ""
191
192 def close(self):
193 """
194 Provide close method, for compatibility with the io.IOBase interface.
195
196 This is a no-op method.
197 """
198
199 @property
200 def closed(self):
201 """
202 Return False to indicate that the stream is not closed.
203
204 Streams will be open for the duration of Airflow's lifecycle.
205
206 For compatibility with the io.IOBase interface.
207 """
208 return False
209
210 def _propagate_log(self, message):
211 """Propagate message removing escape codes."""
212 self.logger.log(self.level, remove_escape_codes(message))
213
214 def write(self, message):
215 """
216 Do whatever it takes to actually log the specified logging record.
217
218 :param message: message to log
219 """
220 if message.endswith("\n"):
221 message = message.rstrip()
222 self._buffer += message
223 self.flush()
224 else:
225 self._buffer += message
226
227 return len(message)
228
229 def flush(self):
230 """Ensure all logging output has been flushed."""
231 buf = self._buffer
232 if buf:
233 self._buffer = ""
234 self._propagate_log(buf)
235
236 def isatty(self):
237 """
238 Return False to indicate the fd is not connected to a tty(-like) device.
239
240 For compatibility reasons.
241 """
242 return False
243
244
245class RedirectStdHandler(StreamHandler):
246 """
247 Custom StreamHandler that uses current sys.stderr/stdout as the stream for logging.
248
249 This class is like a StreamHandler using sys.stderr/stdout, but uses
250 whatever sys.stderr/stdout is currently set to rather than the value of
251 sys.stderr/stdout at handler construction time, except when running a
252 task in a kubernetes executor pod.
253 """
254
255 def __init__(self, stream):
256 if not isinstance(stream, str):
257 raise TypeError(
258 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'."
259 )
260
261 self._use_stderr = True
262 if "stdout" in stream:
263 self._use_stderr = False
264 self._orig_stream = sys.stdout
265 else:
266 self._orig_stream = sys.stderr
267 # StreamHandler tries to set self.stream
268 Handler.__init__(self)
269
270 @property
271 def stream(self):
272 """Returns current stream."""
273 # Executors can set this to true to configure logging correctly for
274 # containerized executors.
275 IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", ""))
276 IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
277 """Will be True if running in kubernetes executor pod."""
278
279 if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
280 return self._orig_stream
281 if self._use_stderr:
282 return sys.stderr
283
284 return sys.stdout
285
286
287def set_context(logger, value):
288 """
289 Walk the tree of loggers and try to set the context for each handler.
290
291 :param logger: logger
292 :param value: value to set
293 """
294 if not isinstance(logger, logging.Logger):
295 # This fn doesn't make sense for structlog based handlers
296 return
297
298 while logger:
299 orig_propagate = logger.propagate
300 for handler in logger.handlers:
301 # Not all handlers need to have context passed in so we ignore
302 # the error when handlers do not have set_context defined.
303
304 # Don't use getatrr so we have type checking. And we don't care if handler is actually a
305 # FileTaskHandler, it just needs to have a set_context function!
306 if hasattr(handler, "set_context"):
307 from airflow.utils.log.file_task_handler import FileTaskHandler # noqa: TC001
308
309 flag = cast("FileTaskHandler", handler).set_context(value)
310 # By default we disable propagate once we have configured the logger, unless that handler
311 # explicitly asks us to keep it on.
312 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
313 logger.propagate = False
314 if orig_propagate is True:
315 # If we were set to propagate before we turned if off, then keep passing set_context up
316 logger = logger.parent
317 else:
318 break