Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/log/logging_mixin.py: 64%
89 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import abc
21import enum
22import logging
23import re
24import sys
25from io import IOBase
26from logging import Handler, Logger, StreamHandler
27from typing import IO, cast
29# 7-bit C1 ANSI escape sequences
30ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
33# Private: A sentinel objects
34class SetContextPropagate(enum.Enum):
35 """:meta private:"""
37 # If a `set_context` function wants to _keep_ propagation set on it's logger it needs to return this
38 # special value.
39 MAINTAIN_PROPAGATE = object()
40 # Don't use this one anymore!
41 DISABLE_PROPAGATE = object()
44def __getattr__(name):
45 if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"):
46 # Compat for spelling on off chance someone is using this directly
47 # And old object that isn't needed anymore
48 return SetContextPropagate.DISABLE_PROPAGATE
49 raise AttributeError(f"module {__name__} has no attribute {name}")
52def remove_escape_codes(text: str) -> str:
53 """
54 Remove ANSI escapes codes from string. It's used to remove
55 "colors" from log messages.
56 """
57 return ANSI_ESCAPE.sub("", text)
60class LoggingMixin:
61 """Convenience super-class to have a logger configured with the class name"""
63 _log: logging.Logger | None = None
65 def __init__(self, context=None):
66 self._set_context(context)
68 @property
69 def log(self) -> Logger:
70 """Returns a logger."""
71 if self._log is None:
72 self._log = logging.getLogger(self.__class__.__module__ + "." + self.__class__.__name__)
73 return self._log
75 def _set_context(self, context):
76 if context is not None:
77 set_context(self.log, context)
80class ExternalLoggingMixin:
81 """Define a log handler based on an external service (e.g. ELK, StackDriver)."""
83 @property
84 @abc.abstractmethod
85 def log_name(self) -> str:
86 """Return log name"""
88 @abc.abstractmethod
89 def get_external_log_url(self, task_instance, try_number) -> str:
90 """Return the URL for log visualization in the external service."""
92 @property
93 @abc.abstractmethod
94 def supports_external_link(self) -> bool:
95 """Return whether handler is able to support external links."""
98# We have to ignore typing errors here because Python I/O classes are a mess, and they do not
99# have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle
100# While it is ok to make your class derive from IOBase (and its good thing to do as they provide
101# base implementation for IO-implementing classes, it's impossible to make them work with
102# IO generics (and apparently it has not even been intended)
103# See more: https://giters.com/python/typeshed/issues/6077
104class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc]
105 """Allows to redirect stdout and stderr to logger"""
107 encoding: None = None
109 def __init__(self, logger, level):
110 """
111 :param log: The log level method to write to, ie. log.debug, log.warning
112 :return:
113 """
114 self.logger = logger
115 self.level = level
116 self._buffer = ""
118 def close(self):
119 """
120 Provide close method, for compatibility with the io.IOBase interface.
122 This is a no-op method.
123 """
125 @property
126 def closed(self):
127 """
128 Returns False to indicate that the stream is not closed, as it will be
129 open for the duration of Airflow's lifecycle.
131 For compatibility with the io.IOBase interface.
132 """
133 return False
135 def _propagate_log(self, message):
136 """Propagate message removing escape codes."""
137 self.logger.log(self.level, remove_escape_codes(message))
139 def write(self, message):
140 """
141 Do whatever it takes to actually log the specified logging record
143 :param message: message to log
144 """
145 if not message.endswith("\n"):
146 self._buffer += message
147 else:
148 self._buffer += message.rstrip()
149 self.flush()
151 def flush(self):
152 """Ensure all logging output has been flushed"""
153 buf = self._buffer
154 if len(buf) > 0:
155 self._buffer = ""
156 self._propagate_log(buf)
158 def isatty(self):
159 """
160 Returns False to indicate the fd is not connected to a tty(-like) device.
161 For compatibility reasons.
162 """
163 return False
166class RedirectStdHandler(StreamHandler):
167 """
168 This class is like a StreamHandler using sys.stderr/stdout, but always uses
169 whatever sys.stderr/stderr is currently set to rather than the value of
170 sys.stderr/stdout at handler construction time.
171 """
173 def __init__(self, stream):
174 if not isinstance(stream, str):
175 raise Exception(
176 "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'."
177 )
179 self._use_stderr = True
180 if "stdout" in stream:
181 self._use_stderr = False
183 # StreamHandler tries to set self.stream
184 Handler.__init__(self)
186 @property
187 def stream(self):
188 """Returns current stream."""
189 if self._use_stderr:
190 return sys.stderr
192 return sys.stdout
195def set_context(logger, value):
196 """
197 Walks the tree of loggers and tries to set the context for each handler
199 :param logger: logger
200 :param value: value to set
201 """
202 while logger:
203 orig_propagate = logger.propagate
204 for handler in logger.handlers:
205 # Not all handlers need to have context passed in so we ignore
206 # the error when handlers do not have set_context defined.
208 # Don't use getatrr so we have type checking. And we don't care if handler is actually a
209 # FileTaskHandler, it just needs to have a set_context function!
210 if hasattr(handler, "set_context"):
211 from airflow.utils.log.file_task_handler import FileTaskHandler
213 flag = cast(FileTaskHandler, handler).set_context(value)
214 # By default we disable propagate once we have configured the logger, unless that handler
215 # explicitly asks us to keep it on.
216 if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
217 logger.propagate = False
218 if orig_propagate is True:
219 # If we were set to propagate before we turned if off, then keep passing set_context up
220 logger = logger.parent
221 else:
222 break