Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/log.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import warnings
21from collections.abc import Callable
22from functools import cache
23from pathlib import Path
24from typing import TYPE_CHECKING, Any, BinaryIO, TextIO
26import structlog
27import structlog.processors
29# We have to import this here, as it is used in the type annotations at runtime even if it seems it is
30# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields.
31from pydantic import JsonValue # noqa: TC002
33if TYPE_CHECKING:
34 from structlog.typing import EventDict, FilteringBoundLogger, Processor
36 from airflow.logging_config import RemoteLogIO
37 from airflow.sdk.types import Logger, RuntimeTaskInstanceProtocol as RuntimeTI
40from airflow.sdk._shared.secrets_masker import redact
43class _WarningsInterceptor:
44 """A class to hold the reference to the original warnings.showwarning function."""
46 _original_showwarning: Callable | None = None
48 @staticmethod
49 def register(new_callable: Callable) -> None:
50 if _WarningsInterceptor._original_showwarning is None:
51 _WarningsInterceptor._original_showwarning = warnings.showwarning
52 warnings.showwarning = new_callable
54 @staticmethod
55 def reset() -> None:
56 if _WarningsInterceptor._original_showwarning is not None:
57 warnings.showwarning = _WarningsInterceptor._original_showwarning
58 _WarningsInterceptor._original_showwarning = None
60 @staticmethod
61 def emit_warning(*args: Any) -> None:
62 if _WarningsInterceptor._original_showwarning is not None:
63 _WarningsInterceptor._original_showwarning(*args)
66def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
67 event_dict = redact(event_dict) # type: ignore[assignment]
68 return event_dict
71@cache
72def logging_processors(
73 json_output: bool,
74 log_format: str = "",
75 colors: bool = True,
76 sending_to_supervisor: bool = False,
77) -> tuple[Processor, ...]:
78 from airflow.sdk._shared.logging.structlog import structlog_processors
80 extra_processors: tuple[Processor, ...] = ()
82 mask_secrets = not sending_to_supervisor
83 if mask_secrets:
84 extra_processors += (mask_logs,)
86 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")):
87 extra_processors += remote_processors
89 procs, _, final_writer = structlog_processors(
90 json_output=json_output, log_format=log_format, colors=colors
91 )
92 return tuple(procs) + extra_processors + (final_writer,)
95@cache
96def configure_logging(
97 json_output: bool = False,
98 log_level: str = "DEFAULT",
99 output: BinaryIO | TextIO | None = None,
100 cache_logger_on_first_use: bool = True,
101 sending_to_supervisor: bool = False,
102 colored_console_log: bool | None = None,
103):
104 """Set up struct logging and stdlib logging config."""
105 from airflow.sdk.configuration import conf
107 if log_level == "DEFAULT":
108 log_level = "INFO"
110 log_level = conf.get("logging", "logging_level", fallback="INFO")
112 # If colored_console_log is not explicitly set, read from configuration
113 if colored_console_log is None:
114 colored_console_log = conf.getboolean("logging", "colored_console_log", fallback=True)
116 namespace_log_levels = conf.get("logging", "namespace_levels", fallback=None)
118 from airflow.sdk._shared.logging import configure_logging, translate_config_values
120 log_fmt, callsite_params = translate_config_values(
121 log_format=conf.get("logging", "log_format"),
122 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]),
123 )
125 mask_secrets = not sending_to_supervisor
126 extra_processors: tuple[Processor, ...] = ()
128 if mask_secrets:
129 extra_processors += (mask_logs,)
131 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")):
132 extra_processors += remote_processors
134 configure_logging(
135 json_output=json_output,
136 log_level=log_level,
137 namespace_log_levels=namespace_log_levels,
138 log_format=log_fmt,
139 output=output,
140 cache_logger_on_first_use=cache_logger_on_first_use,
141 colors=colored_console_log,
142 extra_processors=extra_processors,
143 callsite_parameters=callsite_params,
144 )
146 _WarningsInterceptor.register(_showwarning)
149def logger_at_level(name: str, level: int) -> Logger:
150 """Create a new logger at the given level."""
151 from airflow.sdk._shared.logging.structlog import LEVEL_TO_FILTERING_LOGGER
153 return structlog.wrap_logger(
154 None, wrapper_class=LEVEL_TO_FILTERING_LOGGER[level], logger_factory_args=(name)
155 )
158def init_log_file(local_relative_path: str) -> Path:
159 """
160 Ensure log file and parent directories are created.
162 Any directories that are missing are created with the right permission bits.
163 """
164 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here?
165 # Currently using Core's conf for remote logging consistency.
166 from airflow.configuration import conf
167 from airflow.sdk._shared.logging import init_log_file
169 new_file_permissions = int(
170 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"),
171 8,
172 )
173 new_folder_permissions = int(
174 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"),
175 8,
176 )
178 base_log_folder = conf.get("logging", "base_log_folder")
180 return init_log_file(
181 base_log_folder,
182 local_relative_path,
183 new_folder_permissions=new_folder_permissions,
184 new_file_permissions=new_file_permissions,
185 )
188def load_remote_log_handler() -> RemoteLogIO | None:
189 from airflow.logging_config import get_remote_task_log
191 return get_remote_task_log()
194def load_remote_conn_id() -> str | None:
195 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here?
196 # Currently using Core's conf for remote logging consistency.
197 from airflow.configuration import conf
199 if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None):
200 return conn_id
202 from airflow.logging_config import get_default_remote_conn_id
204 return get_default_remote_conn_id()
207def relative_path_from_logger(logger) -> Path | None:
208 if not logger:
209 return None
210 if not hasattr(logger, "_file"):
211 logger.warning("Unable to find log file, logger was of unexpected type", type=type(logger))
212 return None
214 fh = logger._file
215 fname = fh.name
217 if fh.fileno() == 1 or not isinstance(fname, str):
218 # Logging to stdout, or something odd about this logger, don't try to upload!
219 return None
221 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here?
222 # Currently using Core's conf for remote logging consistency
223 from airflow.configuration import conf
225 base_log_folder = conf.get("logging", "base_log_folder")
226 return Path(fname).relative_to(base_log_folder)
229def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI):
230 raw_logger = getattr(logger, "_logger")
232 handler = load_remote_log_handler()
233 if not handler:
234 return
236 try:
237 relative_path = relative_path_from_logger(raw_logger)
238 except Exception:
239 return
240 if not relative_path:
241 return
243 log_relative_path = relative_path.as_posix()
244 handler.upload(log_relative_path, ti)
247def mask_secret(secret: JsonValue, name: str | None = None) -> None:
248 """
249 Mask a secret in both task process and supervisor process.
251 For secrets loaded from backends (Vault, env vars, etc.), this ensures
252 they're masked in both the task subprocess AND supervisor's log output.
253 Works safely in both sync and async contexts.
254 """
255 from contextlib import suppress
257 from airflow.sdk._shared.secrets_masker import _secrets_masker
259 _secrets_masker().add_mask(secret, name)
261 with suppress(Exception):
262 # Try to tell supervisor (only if in task execution context)
263 from airflow.sdk.execution_time import task_runner
264 from airflow.sdk.execution_time.comms import MaskSecret
266 if comms := getattr(task_runner, "SUPERVISOR_COMMS", None):
267 comms.send(MaskSecret(value=secret, name=name))
270def reset_logging():
271 """
272 Convince for testing. Not for production use.
274 :meta private:
275 """
276 from airflow.sdk._shared.logging.structlog import structlog_processors
278 _WarningsInterceptor.reset()
279 structlog_processors.cache_clear()
280 logging_processors.cache_clear()
283def _showwarning(
284 message: Warning | str,
285 category: type[Warning],
286 filename: str,
287 lineno: int,
288 file: TextIO | None = None,
289 line: str | None = None,
290) -> Any:
291 """
292 Redirects warnings to structlog so they appear in task logs etc.
294 Implementation of showwarnings which redirects to logging, which will first
295 check to see if the file parameter is None. If a file is specified, it will
296 delegate to the original warnings implementation of showwarning. Otherwise,
297 it will call warnings.formatwarning and will log the resulting string to a
298 warnings logger named "py.warnings" with level logging.WARNING.
299 """
300 if file is not None:
301 _WarningsInterceptor.emit_warning(message, category, filename, lineno, file, line)
302 else:
303 from airflow.sdk._shared.logging.structlog import reconfigure_logger
305 log = reconfigure_logger(
306 structlog.get_logger("py.warnings").bind(), structlog.processors.CallsiteParameterAdder
307 )
309 log.warning(str(message), category=category.__name__, filename=filename, lineno=lineno)