Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/log.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import warnings
21from functools import cache
22from pathlib import Path
23from typing import TYPE_CHECKING, Any, BinaryIO, TextIO
25import structlog
26import structlog.processors
28# We have to import this here, as it is used in the type annotations at runtime even if it seems it is
29# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields.
30from pydantic import JsonValue # noqa: TC002
32if TYPE_CHECKING:
33 from structlog.typing import EventDict, FilteringBoundLogger, Processor
35 from airflow.logging_config import RemoteLogIO
36 from airflow.sdk.types import Logger, RuntimeTaskInstanceProtocol as RuntimeTI
39__all__ = ["configure_logging", "reset_logging", "mask_secret"]
42def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
43 from airflow.sdk._shared.secrets_masker import redact
45 event_dict = redact(event_dict) # type: ignore[assignment]
46 return event_dict
49@cache
50def logging_processors(
51 json_output: bool,
52 log_format: str = "",
53 colors: bool = True,
54 sending_to_supervisor: bool = False,
55) -> tuple[Processor, ...]:
56 from airflow.sdk._shared.logging.structlog import structlog_processors
58 extra_processors: tuple[Processor, ...] = ()
60 mask_secrets = not sending_to_supervisor
61 if mask_secrets:
62 extra_processors += (mask_logs,)
64 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")):
65 extra_processors += remote_processors
67 procs, _, final_writer = structlog_processors(
68 json_output=json_output, log_format=log_format, colors=colors
69 )
70 return tuple(procs) + extra_processors + (final_writer,)
73@cache
74def configure_logging(
75 json_output: bool = False,
76 log_level: str = "DEFAULT",
77 output: BinaryIO | TextIO | None = None,
78 cache_logger_on_first_use: bool = True,
79 sending_to_supervisor: bool = False,
80 colored_console_log: bool | None = None,
81):
82 """Set up struct logging and stdlib logging config."""
83 from airflow.sdk.configuration import conf
85 if log_level == "DEFAULT":
86 log_level = "INFO"
88 log_level = conf.get("logging", "logging_level", fallback="INFO")
90 # If colored_console_log is not explicitly set, read from configuration
91 if colored_console_log is None:
92 colored_console_log = conf.getboolean("logging", "colored_console_log", fallback=True)
94 namespace_log_levels = conf.get("logging", "namespace_levels", fallback=None)
96 from airflow.sdk._shared.logging import configure_logging, translate_config_values
98 log_fmt, callsite_params = translate_config_values(
99 log_format=conf.get("logging", "log_format"),
100 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]),
101 )
103 mask_secrets = not sending_to_supervisor
104 extra_processors: tuple[Processor, ...] = ()
106 if mask_secrets:
107 extra_processors += (mask_logs,)
109 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")):
110 extra_processors += remote_processors
112 configure_logging(
113 json_output=json_output,
114 log_level=log_level,
115 namespace_log_levels=namespace_log_levels,
116 log_format=log_fmt,
117 output=output,
118 cache_logger_on_first_use=cache_logger_on_first_use,
119 colors=colored_console_log,
120 extra_processors=extra_processors,
121 callsite_parameters=callsite_params,
122 )
124 global _warnings_showwarning
126 if _warnings_showwarning is None:
127 _warnings_showwarning = warnings.showwarning
128 # Capture warnings and show them via structlog -- i.e. in task logs
129 warnings.showwarning = _showwarning
132def logger_at_level(name: str, level: int) -> Logger:
133 """Create a new logger at the given level."""
134 from airflow.sdk._shared.logging.structlog import LEVEL_TO_FILTERING_LOGGER
136 return structlog.wrap_logger(
137 None, wrapper_class=LEVEL_TO_FILTERING_LOGGER[level], logger_factory_args=(name)
138 )
141def init_log_file(local_relative_path: str) -> Path:
142 """
143 Ensure log file and parent directories are created.
145 Any directories that are missing are created with the right permission bits.
146 """
147 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here?
148 # Currently using Core's conf for remote logging consistency.
149 from airflow.configuration import conf
150 from airflow.sdk._shared.logging import init_log_file
152 new_file_permissions = int(
153 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"),
154 8,
155 )
156 new_folder_permissions = int(
157 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"),
158 8,
159 )
161 base_log_folder = conf.get("logging", "base_log_folder")
163 return init_log_file(
164 base_log_folder,
165 local_relative_path,
166 new_folder_permissions=new_folder_permissions,
167 new_file_permissions=new_file_permissions,
168 )
171def load_remote_log_handler() -> RemoteLogIO | None:
172 import airflow.logging_config
174 return airflow.logging_config.REMOTE_TASK_LOG
177def load_remote_conn_id() -> str | None:
178 import airflow.logging_config
180 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here?
181 # Currently using Core's conf for remote logging consistency.
182 from airflow.configuration import conf
184 if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None):
185 return conn_id
187 return airflow.logging_config.DEFAULT_REMOTE_CONN_ID
190def relative_path_from_logger(logger) -> Path | None:
191 if not logger:
192 return None
193 if not hasattr(logger, "_file"):
194 logger.warning("Unable to find log file, logger was of unexpected type", type=type(logger))
195 return None
197 fh = logger._file
198 fname = fh.name
200 if fh.fileno() == 1 or not isinstance(fname, str):
201 # Logging to stdout, or something odd about this logger, don't try to upload!
202 return None
204 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here?
205 # Currently using Core's conf for remote logging consistency
206 from airflow.configuration import conf
208 base_log_folder = conf.get("logging", "base_log_folder")
209 return Path(fname).relative_to(base_log_folder)
212def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI):
213 raw_logger = getattr(logger, "_logger")
215 handler = load_remote_log_handler()
216 if not handler:
217 return
219 try:
220 relative_path = relative_path_from_logger(raw_logger)
221 except Exception:
222 return
223 if not relative_path:
224 return
226 log_relative_path = relative_path.as_posix()
227 handler.upload(log_relative_path, ti)
230def mask_secret(secret: JsonValue, name: str | None = None) -> None:
231 """
232 Mask a secret in both task process and supervisor process.
234 For secrets loaded from backends (Vault, env vars, etc.), this ensures
235 they're masked in both the task subprocess AND supervisor's log output.
236 Works safely in both sync and async contexts.
237 """
238 from contextlib import suppress
240 from airflow.sdk._shared.secrets_masker import _secrets_masker
242 _secrets_masker().add_mask(secret, name)
244 with suppress(Exception):
245 # Try to tell supervisor (only if in task execution context)
246 from airflow.sdk.execution_time import task_runner
247 from airflow.sdk.execution_time.comms import MaskSecret
249 if comms := getattr(task_runner, "SUPERVISOR_COMMS", None):
250 comms.send(MaskSecret(value=secret, name=name))
253def reset_logging():
254 """
255 Convince for testing. Not for production use.
257 :meta private:
258 """
259 from airflow.sdk._shared.logging.structlog import structlog_processors
261 global _warnings_showwarning
262 if _warnings_showwarning is not None:
263 warnings.showwarning = _warnings_showwarning
264 _warnings_showwarning = None
266 structlog_processors.cache_clear()
267 logging_processors.cache_clear()
270_warnings_showwarning: Any = None
273def _showwarning(
274 message: Warning | str,
275 category: type[Warning],
276 filename: str,
277 lineno: int,
278 file: TextIO | None = None,
279 line: str | None = None,
280) -> Any:
281 """
282 Redirects warnings to structlog so they appear in task logs etc.
284 Implementation of showwarnings which redirects to logging, which will first
285 check to see if the file parameter is None. If a file is specified, it will
286 delegate to the original warnings implementation of showwarning. Otherwise,
287 it will call warnings.formatwarning and will log the resulting string to a
288 warnings logger named "py.warnings" with level logging.WARNING.
289 """
290 if file is not None:
291 if _warnings_showwarning is not None:
292 _warnings_showwarning(message, category, filename, lineno, file, line)
293 else:
294 from airflow.sdk._shared.logging.structlog import reconfigure_logger
296 log = reconfigure_logger(
297 structlog.get_logger("py.warnings").bind(), structlog.processors.CallsiteParameterAdder
298 )
300 log.warning(str(message), category=category.__name__, filename=filename, lineno=lineno)