Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/log/logging_mixin.py: 59%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import abc
21import enum
22import logging
23import sys
24from io import IOBase
25from logging import Handler, StreamHandler
26from typing import IO, TYPE_CHECKING, Any, Optional, TypeVar, cast
28import re2
30if TYPE_CHECKING:
31 from logging import Logger
33# 7-bit C1 ANSI escape sequences
34ANSI_ESCAPE = re2.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
37# Private: A sentinel objects
38class SetContextPropagate(enum.Enum):
39 """Sentinel objects for log propagation contexts.
41 :meta private:
42 """
44 # If a `set_context` function wants to _keep_ propagation set on its logger it needs to return this
45 # special value.
46 MAINTAIN_PROPAGATE = object()
47 # Don't use this one anymore!
48 DISABLE_PROPAGATE = object()
51def __getattr__(name):
52 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"):
53 # Compat for spelling on off chance someone is using this directly
54 # And old object that isn't needed anymore
55 return SetContextPropagate.DISABLE_PROPAGATE
56 raise AttributeError(f"module {__name__} has no attribute {name}")
59def remove_escape_codes(text: str) -> str:
60 """Remove ANSI escapes codes from string; used to remove "colors" from log messages."""
61 return ANSI_ESCAPE.sub("", text)
64_T = TypeVar("_T")
67class LoggingMixin:
68 """Convenience super-class to have a logger configured with the class name."""
70 _log: logging.Logger | None = None
72 # Parent logger used by this class. It should match one of the loggers defined in the
73 # `logging_config_class`. By default, this attribute is used to create the final name of the logger, and
74 # will prefix the `_logger_name` with a separating dot.
75 _log_config_logger_name: Optional[str] = None # noqa: UP007
77 _logger_name: Optional[str] = None # noqa: UP007
79 def __init__(self, context=None):
80 self._set_context(context)
82 @staticmethod
83 def _create_logger_name(
84 logged_class: type[_T],
85 log_config_logger_name: str | None = None,
86 class_logger_name: str | None = None,
87 ) -> str:
88 """Generate a logger name for the given `logged_class`.
90 By default, this function returns the `class_logger_name` as logger name. If it is not provided,
91 the {class.__module__}.{class.__name__} is returned instead. When a `parent_logger_name` is provided,
92 it will prefix the logger name with a separating dot.
93 """
94 logger_name: str = (
95 class_logger_name
96 if class_logger_name is not None
97 else f"{logged_class.__module__}.{logged_class.__name__}"
98 )
100 if log_config_logger_name:
101 return f"{log_config_logger_name}.{logger_name}" if logger_name else log_config_logger_name
102 return logger_name
104 @classmethod
105 def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger:
106 if obj._log is None:
107 logger_name: str = cls._create_logger_name(
108 logged_class=clazz,
109 log_config_logger_name=obj._log_config_logger_name,
110 class_logger_name=obj._logger_name,
111 )
112 obj._log = logging.getLogger(logger_name)
113 return obj._log
115 @classmethod
116 def logger(cls) -> Logger:
117 """Return a logger."""
118 return LoggingMixin._get_log(cls, cls)
120 @property
121 def log(self) -> Logger:
122 """Return a logger."""
123 return LoggingMixin._get_log(self, self.__class__)
125 def _set_context(self, context):
126 if context is not None:
127 set_context(self.log, context)
130class ExternalLoggingMixin:
131 """Define a log handler based on an external service (e.g. ELK, StackDriver)."""
133 @property
134 @abc.abstractmethod
135 def log_name(self) -> str:
136 """Return log name."""
138 @abc.abstractmethod
139 def get_external_log_url(self, task_instance, try_number) -> str:
140 """Return the URL for log visualization in the external service."""
142 @property
143 @abc.abstractmethod
144 def supports_external_link(self) -> bool:
145 """Return whether handler is able to support external links."""
148# We have to ignore typing errors here because Python I/O classes are a mess, and they do not
149# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle
150# While it is ok to make your class derive from IOBase (and its good thing to do as they provide
151# base implementation for IO-implementing classes, it's impossible to make them work with
152# IO generics (and apparently it has not even been intended)
153# See more: https://giters.com/python/typeshed/issues/6077
154class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc]
155 """
156 Allows to redirect stdout and stderr to logger.
158 :param log: The log level method to write to, ie. log.debug, log.warning
159 """
161 encoding: None = None
163 def __init__(self, logger, level):
164 self.logger = logger
165 self.level = level
166 self._buffer = ""
168 def close(self):
169 """
170 Provide close method, for compatibility with the io.IOBase interface.
172 This is a no-op method.
173 """
175 @property
176 def closed(self):
177 """
178 Return False to indicate that the stream is not closed.
180 Streams will be open for the duration of Airflow's lifecycle.
182 For compatibility with the io.IOBase interface.
183 """
184 return False
186 def _propagate_log(self, message):
187 """Propagate message removing escape codes."""
188 self.logger.log(self.level, remove_escape_codes(message))
190 def write(self, message):
191 """
192 Do whatever it takes to actually log the specified logging record.
194 :param message: message to log
195 """
196 if not message.endswith("\n"):
197 self._buffer += message
198 else:
199 self._buffer += message.rstrip()
200 self.flush()
202 def flush(self):
203 """Ensure all logging output has been flushed."""
204 buf = self._buffer
205 if buf:
206 self._buffer = ""
207 self._propagate_log(buf)
209 def isatty(self):
210 """
211 Return False to indicate the fd is not connected to a tty(-like) device.
213 For compatibility reasons.
214 """
215 return False
218class RedirectStdHandler(StreamHandler):
219 """
220 Custom StreamHandler that uses current sys.stderr/stdout as the stream for logging.
222 This class is like a StreamHandler using sys.stderr/stdout, but uses
223 whatever sys.stderr/stdout is currently set to rather than the value of
224 sys.stderr/stdout at handler construction time, except when running a
225 task in a kubernetes executor pod.
226 """
228 def __init__(self, stream):
229 if not isinstance(stream, str):
230 raise TypeError(
231 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'."
232 )
234 self._use_stderr = True
235 if "stdout" in stream:
236 self._use_stderr = False
237 self._orig_stream = sys.stdout
238 else:
239 self._orig_stream = sys.stderr
240 # StreamHandler tries to set self.stream
241 Handler.__init__(self)
243 @property
244 def stream(self):
245 """Returns current stream."""
246 from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
248 if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
249 return self._orig_stream
250 if self._use_stderr:
251 return sys.stderr
253 return sys.stdout
256def set_context(logger, value):
257 """
258 Walk the tree of loggers and try to set the context for each handler.
260 :param logger: logger
261 :param value: value to set
262 """
263 while logger:
264 orig_propagate = logger.propagate
265 for handler in logger.handlers:
266 # Not all handlers need to have context passed in so we ignore
267 # the error when handlers do not have set_context defined.
269 # Don't use getatrr so we have type checking. And we don't care if handler is actually a
270 # FileTaskHandler, it just needs to have a set_context function!
271 if hasattr(handler, "set_context"):
272 from airflow.utils.log.file_task_handler import FileTaskHandler
274 flag = cast(FileTaskHandler, handler).set_context(value)
275 # By default we disable propagate once we have configured the logger, unless that handler
276 # explicitly asks us to keep it on.
277 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
278 logger.propagate = False
279 if orig_propagate is True:
280 # If we were set to propagate before we turned if off, then keep passing set_context up
281 logger = logger.parent
282 else:
283 break