Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/log/logging_mixin.py: 60%
101 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import abc
21import enum
22import logging
23import re
24import sys
25from io import IOBase
26from logging import Handler, Logger, StreamHandler
27from typing import IO, Any, TypeVar, cast
29# 7-bit C1 ANSI escape sequences
30ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
33# Private: A sentinel objects
34class SetContextPropagate(enum.Enum):
35 """Sentinel objects for log propagation contexts.
37 :meta private:
38 """
40 # If a `set_context` function wants to _keep_ propagation set on it's logger it needs to return this
41 # special value.
42 MAINTAIN_PROPAGATE = object()
43 # Don't use this one anymore!
44 DISABLE_PROPAGATE = object()
47def __getattr__(name):
48 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"):
49 # Compat for spelling on off chance someone is using this directly
50 # And old object that isn't needed anymore
51 return SetContextPropagate.DISABLE_PROPAGATE
52 raise AttributeError(f"module {__name__} has no attribute {name}")
55def remove_escape_codes(text: str) -> str:
56 """
57 Remove ANSI escapes codes from string. It's used to remove
58 "colors" from log messages.
59 """
60 return ANSI_ESCAPE.sub("", text)
63_T = TypeVar("_T")
66class LoggingMixin:
67 """Convenience super-class to have a logger configured with the class name."""
69 _log: logging.Logger | None = None
71 def __init__(self, context=None):
72 self._set_context(context)
74 @staticmethod
75 def _get_log(obj: Any, clazz: type[_T]) -> Logger:
76 if obj._log is None:
77 obj._log = logging.getLogger(f"{clazz.__module__}.{clazz.__name__}")
78 return obj._log
80 @classmethod
81 def logger(cls) -> Logger:
82 """Returns a logger."""
83 return LoggingMixin._get_log(cls, cls)
85 @property
86 def log(self) -> Logger:
87 """Returns a logger."""
88 return LoggingMixin._get_log(self, self.__class__)
90 def _set_context(self, context):
91 if context is not None:
92 set_context(self.log, context)
95class ExternalLoggingMixin:
96 """Define a log handler based on an external service (e.g. ELK, StackDriver)."""
98 @property
99 @abc.abstractmethod
100 def log_name(self) -> str:
101 """Return log name."""
103 @abc.abstractmethod
104 def get_external_log_url(self, task_instance, try_number) -> str:
105 """Return the URL for log visualization in the external service."""
107 @property
108 @abc.abstractmethod
109 def supports_external_link(self) -> bool:
110 """Return whether handler is able to support external links."""
113# We have to ignore typing errors here because Python I/O classes are a mess, and they do not
114# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle
115# While it is ok to make your class derive from IOBase (and its good thing to do as they provide
116# base implementation for IO-implementing classes, it's impossible to make them work with
117# IO generics (and apparently it has not even been intended)
118# See more: https://giters.com/python/typeshed/issues/6077
119class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc]
120 """Allows to redirect stdout and stderr to logger."""
122 encoding: None = None
124 def __init__(self, logger, level):
125 """
126 :param log: The log level method to write to, ie. log.debug, log.warning
127 :return:
128 """
129 self.logger = logger
130 self.level = level
131 self._buffer = ""
133 def close(self):
134 """
135 Provide close method, for compatibility with the io.IOBase interface.
137 This is a no-op method.
138 """
140 @property
141 def closed(self):
142 """
143 Returns False to indicate that the stream is not closed, as it will be
144 open for the duration of Airflow's lifecycle.
146 For compatibility with the io.IOBase interface.
147 """
148 return False
150 def _propagate_log(self, message):
151 """Propagate message removing escape codes."""
152 self.logger.log(self.level, remove_escape_codes(message))
154 def write(self, message):
155 """
156 Do whatever it takes to actually log the specified logging record.
158 :param message: message to log
159 """
160 if not message.endswith("\n"):
161 self._buffer += message
162 else:
163 self._buffer += message.rstrip()
164 self.flush()
166 def flush(self):
167 """Ensure all logging output has been flushed."""
168 buf = self._buffer
169 if len(buf) > 0:
170 self._buffer = ""
171 self._propagate_log(buf)
173 def isatty(self):
174 """
175 Returns False to indicate the fd is not connected to a tty(-like) device.
176 For compatibility reasons.
177 """
178 return False
181class RedirectStdHandler(StreamHandler):
182 """
183 This class is like a StreamHandler using sys.stderr/stdout, but uses
184 whatever sys.stderr/stderr is currently set to rather than the value of
185 sys.stderr/stdout at handler construction time, except when running a
186 task in a kubernetes executor pod.
187 """
189 def __init__(self, stream):
190 if not isinstance(stream, str):
191 raise Exception(
192 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'."
193 )
195 self._use_stderr = True
196 if "stdout" in stream:
197 self._use_stderr = False
198 self._orig_stream = sys.stdout
199 else:
200 self._orig_stream = sys.stderr
201 # StreamHandler tries to set self.stream
202 Handler.__init__(self)
204 @property
205 def stream(self):
206 """Returns current stream."""
207 from airflow.settings import IS_K8S_EXECUTOR_POD
209 if IS_K8S_EXECUTOR_POD:
210 return self._orig_stream
211 if self._use_stderr:
212 return sys.stderr
214 return sys.stdout
217def set_context(logger, value):
218 """
219 Walks the tree of loggers and tries to set the context for each handler.
221 :param logger: logger
222 :param value: value to set
223 """
224 while logger:
225 orig_propagate = logger.propagate
226 for handler in logger.handlers:
227 # Not all handlers need to have context passed in so we ignore
228 # the error when handlers do not have set_context defined.
230 # Don't use getatrr so we have type checking. And we don't care if handler is actually a
231 # FileTaskHandler, it just needs to have a set_context function!
232 if hasattr(handler, "set_context"):
233 from airflow.utils.log.file_task_handler import FileTaskHandler
235 flag = cast(FileTaskHandler, handler).set_context(value)
236 # By default we disable propagate once we have configured the logger, unless that handler
237 # explicitly asks us to keep it on.
238 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
239 logger.propagate = False
240 if orig_propagate is True:
241 # If we were set to propagate before we turned if off, then keep passing set_context up
242 logger = logger.parent
243 else:
244 break