Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/log.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import warnings
21from collections.abc import Callable
22from functools import cache
23from pathlib import Path
24from typing import TYPE_CHECKING, Any, BinaryIO, TextIO
26import structlog
27import structlog.processors
29# We have to import this here, as it is used in the type annotations at runtime even if it seems it is
30# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields.
31from pydantic import JsonValue # noqa: TC002
33if TYPE_CHECKING:
34 from structlog.typing import EventDict, FilteringBoundLogger, Processor
36 from airflow.sdk._shared.logging.remote import RemoteLogIO
37 from airflow.sdk.types import Logger, RuntimeTaskInstanceProtocol as RuntimeTI
40from airflow.sdk._shared.secrets_masker import redact
43class _ActiveLoggingConfig:
44 """Internal class to track active logging configuration."""
46 logging_config_loaded = False
47 remote_task_log: RemoteLogIO | None = None
48 default_remote_conn_id: str | None = None
50 @classmethod
51 def set(cls, remote_task_log: RemoteLogIO | None, default_remote_conn_id: str | None) -> None:
52 """Set remote logging configuration."""
53 cls.remote_task_log = remote_task_log
54 cls.default_remote_conn_id = default_remote_conn_id
55 cls.logging_config_loaded = True
58class _WarningsInterceptor:
59 """A class to hold the reference to the original warnings.showwarning function."""
61 _original_showwarning: Callable | None = None
63 @staticmethod
64 def register(new_callable: Callable) -> None:
65 if _WarningsInterceptor._original_showwarning is None:
66 _WarningsInterceptor._original_showwarning = warnings.showwarning
67 warnings.showwarning = new_callable
69 @staticmethod
70 def reset() -> None:
71 if _WarningsInterceptor._original_showwarning is not None:
72 warnings.showwarning = _WarningsInterceptor._original_showwarning
73 _WarningsInterceptor._original_showwarning = None
75 @staticmethod
76 def emit_warning(*args: Any) -> None:
77 if _WarningsInterceptor._original_showwarning is not None:
78 _WarningsInterceptor._original_showwarning(*args)
81def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
82 event_dict = redact(event_dict) # type: ignore[assignment]
83 return event_dict
86@cache
87def logging_processors(
88 json_output: bool,
89 log_format: str = "",
90 colors: bool = True,
91 sending_to_supervisor: bool = False,
92) -> tuple[Processor, ...]:
93 from airflow.sdk._shared.logging.structlog import structlog_processors
95 extra_processors: tuple[Processor, ...] = ()
97 mask_secrets = not sending_to_supervisor
98 if mask_secrets:
99 extra_processors += (mask_logs,)
101 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")):
102 extra_processors += remote_processors
104 procs, _, final_writer = structlog_processors(
105 json_output=json_output, log_format=log_format, colors=colors
106 )
107 return tuple(procs) + extra_processors + (final_writer,)
110@cache
111def configure_logging(
112 json_output: bool = False,
113 log_level: str = "DEFAULT",
114 output: BinaryIO | TextIO | None = None,
115 cache_logger_on_first_use: bool = True,
116 sending_to_supervisor: bool = False,
117 colored_console_log: bool | None = None,
118):
119 """Set up struct logging and stdlib logging config."""
120 from airflow.sdk.configuration import conf
122 if log_level == "DEFAULT":
123 log_level = "INFO"
125 log_level = conf.get("logging", "logging_level", fallback="INFO")
127 # If colored_console_log is not explicitly set, read from configuration
128 if colored_console_log is None:
129 colored_console_log = conf.getboolean("logging", "colored_console_log", fallback=True)
131 namespace_log_levels = conf.get("logging", "namespace_levels", fallback=None)
133 from airflow.sdk._shared.logging import configure_logging, translate_config_values
135 log_fmt, callsite_params = translate_config_values(
136 log_format=conf.get("logging", "log_format"),
137 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]),
138 )
140 mask_secrets = not sending_to_supervisor
141 extra_processors: tuple[Processor, ...] = ()
143 if mask_secrets:
144 extra_processors += (mask_logs,)
146 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")):
147 extra_processors += remote_processors
149 configure_logging(
150 json_output=json_output,
151 log_level=log_level,
152 namespace_log_levels=namespace_log_levels,
153 log_format=log_fmt,
154 output=output,
155 cache_logger_on_first_use=cache_logger_on_first_use,
156 colors=colored_console_log,
157 extra_processors=extra_processors,
158 callsite_parameters=callsite_params,
159 )
161 _WarningsInterceptor.register(_showwarning)
164def logger_at_level(name: str, level: int) -> Logger:
165 """Create a new logger at the given level."""
166 from airflow.sdk._shared.logging.structlog import LEVEL_TO_FILTERING_LOGGER
168 return structlog.wrap_logger(
169 None, wrapper_class=LEVEL_TO_FILTERING_LOGGER[level], logger_factory_args=(name)
170 )
173def init_log_file(local_relative_path: str) -> Path:
174 """
175 Ensure log file and parent directories are created.
177 Any directories that are missing are created with the right permission bits.
178 """
179 from airflow.sdk._shared.logging import init_log_file
180 from airflow.sdk.configuration import conf
182 new_file_permissions = int(
183 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"),
184 8,
185 )
186 new_folder_permissions = int(
187 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"),
188 8,
189 )
191 base_log_folder = conf.get("logging", "base_log_folder")
193 return init_log_file(
194 base_log_folder,
195 local_relative_path,
196 new_folder_permissions=new_folder_permissions,
197 new_file_permissions=new_file_permissions,
198 )
201def _load_logging_config() -> None:
202 """Load and cache the remote logging configuration from SDK config."""
203 from airflow.sdk._shared.logging.remote import discover_remote_log_handler
204 from airflow.sdk._shared.module_loading import import_string
205 from airflow.sdk.configuration import conf
207 fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
208 logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback)
210 # Load remote logging configuration using shared discovery logic
211 remote_task_log, default_remote_conn_id = discover_remote_log_handler(
212 logging_class_path, fallback, import_string
213 )
214 _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)
217def load_remote_log_handler() -> RemoteLogIO | None:
218 if not _ActiveLoggingConfig.logging_config_loaded:
219 _load_logging_config()
220 return _ActiveLoggingConfig.remote_task_log
223def load_remote_conn_id() -> str | None:
224 from airflow.sdk.configuration import conf
226 if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None):
227 return conn_id
229 if not _ActiveLoggingConfig.logging_config_loaded:
230 _load_logging_config()
231 return _ActiveLoggingConfig.default_remote_conn_id
234def relative_path_from_logger(logger) -> Path | None:
235 if not logger:
236 return None
237 if not hasattr(logger, "_file"):
238 logger.warning("Unable to find log file, logger was of unexpected type", type=type(logger))
239 return None
241 fh = logger._file
242 fname = fh.name
244 if fh.fileno() == 1 or not isinstance(fname, str):
245 # Logging to stdout, or something odd about this logger, don't try to upload!
246 return None
248 from airflow.sdk.configuration import conf
250 base_log_folder = conf.get("logging", "base_log_folder")
251 return Path(fname).relative_to(base_log_folder)
254def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI):
255 raw_logger = getattr(logger, "_logger")
257 handler = load_remote_log_handler()
258 if not handler:
259 return
261 try:
262 relative_path = relative_path_from_logger(raw_logger)
263 except Exception:
264 return
265 if not relative_path:
266 return
268 log_relative_path = relative_path.as_posix()
269 handler.upload(log_relative_path, ti)
272def mask_secret(secret: JsonValue, name: str | None = None) -> None:
273 """
274 Mask a secret in both task process and supervisor process.
276 For secrets loaded from backends (Vault, env vars, etc.), this ensures
277 they're masked in both the task subprocess AND supervisor's log output.
278 Works safely in both sync and async contexts.
279 """
280 from contextlib import suppress
282 from airflow.sdk._shared.secrets_masker import _secrets_masker
284 _secrets_masker().add_mask(secret, name)
286 with suppress(Exception):
287 # Try to tell supervisor (only if in task execution context)
288 from airflow.sdk.execution_time import task_runner
289 from airflow.sdk.execution_time.comms import MaskSecret
291 if comms := getattr(task_runner, "SUPERVISOR_COMMS", None):
292 comms.send(MaskSecret(value=secret, name=name))
295def reset_logging():
296 """
297 Convince for testing. Not for production use.
299 :meta private:
300 """
301 from airflow.sdk._shared.logging.structlog import structlog_processors
303 _WarningsInterceptor.reset()
304 structlog_processors.cache_clear()
305 logging_processors.cache_clear()
308def _showwarning(
309 message: Warning | str,
310 category: type[Warning],
311 filename: str,
312 lineno: int,
313 file: TextIO | None = None,
314 line: str | None = None,
315) -> Any:
316 """
317 Redirects warnings to structlog so they appear in task logs etc.
319 Implementation of showwarnings which redirects to logging, which will first
320 check to see if the file parameter is None. If a file is specified, it will
321 delegate to the original warnings implementation of showwarning. Otherwise,
322 it will call warnings.formatwarning and will log the resulting string to a
323 warnings logger named "py.warnings" with level logging.WARNING.
324 """
325 if file is not None:
326 _WarningsInterceptor.emit_warning(message, category, filename, lineno, file, line)
327 else:
328 from airflow.sdk._shared.logging.structlog import reconfigure_logger
330 log = reconfigure_logger(
331 structlog.get_logger("py.warnings").bind(), structlog.processors.CallsiteParameterAdder
332 )
334 log.warning(str(message), category=category.__name__, filename=filename, lineno=lineno)