1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import logging
21import warnings
22from importlib import import_module
23from typing import TYPE_CHECKING, Any
24
25from airflow._shared.module_loading import import_string
26from airflow.configuration import conf
27from airflow.exceptions import AirflowConfigException
28
29if TYPE_CHECKING:
30 from airflow.logging.remote import RemoteLogIO
31
32log = logging.getLogger(__name__)
33
34
35class _ActiveLoggingConfig:
36 """Private class to hold active logging config variables."""
37
38 logging_config_loaded: bool = False
39 remote_task_log: RemoteLogIO | None
40 default_remote_conn_id: str | None = None
41
42
43def get_remote_task_log() -> RemoteLogIO | None:
44 if not _ActiveLoggingConfig.logging_config_loaded:
45 load_logging_config()
46 return _ActiveLoggingConfig.remote_task_log
47
48
49def get_default_remote_conn_id() -> str | None:
50 if not _ActiveLoggingConfig.logging_config_loaded:
51 load_logging_config()
52 return _ActiveLoggingConfig.default_remote_conn_id
53
54
55def load_logging_config() -> tuple[dict[str, Any], str]:
56 """Configure & Validate Airflow Logging."""
57 fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
58 logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback)
59 _ActiveLoggingConfig.logging_config_loaded = True
60
61 # Sometimes we end up with `""` as the value!
62 logging_class_path = logging_class_path or fallback
63
64 user_defined = logging_class_path != fallback
65
66 try:
67 logging_config = import_string(logging_class_path)
68
69 # Make sure that the variable is in scope
70 if not isinstance(logging_config, dict):
71 raise ValueError("Logging Config should be of dict type")
72
73 if user_defined:
74 log.info("Successfully imported user-defined logging config from %s", logging_class_path)
75
76 except Exception as err:
77 # Import default logging configurations.
78 raise ImportError(
79 f"Unable to load {'custom ' if user_defined else ''}logging config from {logging_class_path} due "
80 f"to: {type(err).__name__}:{err}"
81 )
82 else:
83 modpath = logging_class_path.rsplit(".", 1)[0]
84 try:
85 mod = import_module(modpath)
86
87 # Load remote logging configuration from the custom module
88 _ActiveLoggingConfig.remote_task_log = getattr(mod, "REMOTE_TASK_LOG")
89 _ActiveLoggingConfig.default_remote_conn_id = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None)
90 except Exception as err:
91 log.info("Remote task logs will not be available due to an error: %s", err)
92
93 return logging_config, logging_class_path
94
95
96def configure_logging():
97 from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values
98
99 logging_config, logging_class_path = load_logging_config()
100 try:
101 level: str = getattr(
102 logging_config, "LOG_LEVEL", conf.get("logging", "logging_level", fallback="INFO")
103 ).upper()
104
105 colors = getattr(
106 logging_config,
107 "COLORED_LOG",
108 conf.getboolean("logging", "colored_console_log", fallback=True),
109 )
110 # Try to init logging
111
112 log_fmt, callsite_params = translate_config_values(
113 log_format=getattr(logging_config, "LOG_FORMAT", conf.get("logging", "log_format", fallback="")),
114 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]),
115 )
116 configure_logging(
117 log_level=level,
118 namespace_log_levels=conf.get("logging", "namespace_levels", fallback=None),
119 stdlib_config=logging_config,
120 log_format=log_fmt,
121 callsite_parameters=callsite_params,
122 colors=colors,
123 )
124 except (ValueError, KeyError) as e:
125 log.error("Unable to load the config, contains a configuration error.")
126 # When there is an error in the config, escalate the exception
127 # otherwise Airflow would silently fall back on the default config
128 raise e
129
130 validate_logging_config()
131
132 new_folder_permissions = int(
133 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"),
134 8,
135 )
136
137 base_log_folder = conf.get("logging", "base_log_folder")
138
139 return init_log_folder(
140 base_log_folder,
141 new_folder_permissions=new_folder_permissions,
142 )
143
144
145def validate_logging_config():
146 """Validate the provided Logging Config."""
147 # Now lets validate the other logging-related settings
148 task_log_reader = conf.get("logging", "task_log_reader")
149
150 logger = logging.getLogger("airflow.task")
151
152 def _get_handler(name):
153 return next((h for h in logger.handlers if h.name == name), None)
154
155 if _get_handler(task_log_reader) is None:
156 # Check for pre 1.10 setting that might be in deployed airflow.cfg files
157 if task_log_reader == "file.task" and _get_handler("task"):
158 warnings.warn(
159 f"task_log_reader setting in [logging] has a deprecated value of {task_log_reader!r}, "
160 "but no handler with this name was found. Please update your config to use task. "
161 "Running config has been adjusted to match",
162 DeprecationWarning,
163 stacklevel=2,
164 )
165 conf.set("logging", "task_log_reader", "task")
166 else:
167 raise AirflowConfigException(
168 f"Configured task_log_reader {task_log_reader!r} was not a handler of "
169 f"the 'airflow.task' logger."
170 )