1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import logging
21import warnings
22from typing import TYPE_CHECKING, Any
23
24from airflow._shared.logging.remote import discover_remote_log_handler
25from airflow._shared.module_loading import import_string
26from airflow.configuration import conf
27from airflow.exceptions import AirflowConfigException
28
29if TYPE_CHECKING:
30 from airflow.logging.remote import RemoteLogIO
31
32log = logging.getLogger(__name__)
33
34
35class _ActiveLoggingConfig:
36 """Private class to hold active logging config variables."""
37
38 logging_config_loaded: bool = False
39 remote_task_log: RemoteLogIO | None
40 default_remote_conn_id: str | None = None
41
42 @classmethod
43 def set(cls, remote_task_log: RemoteLogIO | None, default_remote_conn_id: str | None) -> None:
44 """Set remote logging configuration atomically."""
45 cls.remote_task_log = remote_task_log
46 cls.default_remote_conn_id = default_remote_conn_id
47 cls.logging_config_loaded = True
48
49
50def get_remote_task_log() -> RemoteLogIO | None:
51 if not _ActiveLoggingConfig.logging_config_loaded:
52 load_logging_config()
53 return _ActiveLoggingConfig.remote_task_log
54
55
56def get_default_remote_conn_id() -> str | None:
57 if not _ActiveLoggingConfig.logging_config_loaded:
58 load_logging_config()
59 return _ActiveLoggingConfig.default_remote_conn_id
60
61
62def load_logging_config() -> tuple[dict[str, Any], str]:
63 """Configure & Validate Airflow Logging."""
64 fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
65 logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback)
66
67 # Sometimes we end up with `""` as the value!
68 logging_class_path = logging_class_path or fallback
69
70 user_defined = logging_class_path != fallback
71
72 try:
73 logging_config = import_string(logging_class_path)
74
75 # Make sure that the variable is in scope
76 if not isinstance(logging_config, dict):
77 raise ValueError("Logging Config should be of dict type")
78
79 if user_defined:
80 log.info("Successfully imported user-defined logging config from %s", logging_class_path)
81
82 except Exception as err:
83 # Import default logging configurations.
84 raise ImportError(
85 f"Unable to load {'custom ' if user_defined else ''}logging config from {logging_class_path} due "
86 f"to: {type(err).__name__}:{err}"
87 )
88 else:
89 # Load remote logging configuration using shared discovery logic
90 remote_task_log, default_remote_conn_id = discover_remote_log_handler(
91 logging_class_path, fallback, import_string
92 )
93 _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id)
94
95 return logging_config, logging_class_path
96
97
98def configure_logging():
99 from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values
100
101 logging_config, logging_class_path = load_logging_config()
102 try:
103 level: str = getattr(
104 logging_config, "LOG_LEVEL", conf.get("logging", "logging_level", fallback="INFO")
105 ).upper()
106
107 colors = getattr(
108 logging_config,
109 "COLORED_LOG",
110 conf.getboolean("logging", "colored_console_log", fallback=True),
111 )
112 # Try to init logging
113
114 log_fmt, callsite_params = translate_config_values(
115 log_format=getattr(logging_config, "LOG_FORMAT", conf.get("logging", "log_format", fallback="")),
116 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]),
117 )
118 configure_logging(
119 log_level=level,
120 namespace_log_levels=conf.get("logging", "namespace_levels", fallback=None),
121 stdlib_config=logging_config,
122 log_format=log_fmt,
123 callsite_parameters=callsite_params,
124 colors=colors,
125 )
126 except (ValueError, KeyError) as e:
127 log.error("Unable to load the config, contains a configuration error.")
128 # When there is an error in the config, escalate the exception
129 # otherwise Airflow would silently fall back on the default config
130 raise e
131
132 validate_logging_config()
133
134 new_folder_permissions = int(
135 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"),
136 8,
137 )
138
139 base_log_folder = conf.get("logging", "base_log_folder")
140
141 return init_log_folder(
142 base_log_folder,
143 new_folder_permissions=new_folder_permissions,
144 )
145
146
147def validate_logging_config():
148 """Validate the provided Logging Config."""
149 # Now lets validate the other logging-related settings
150 task_log_reader = conf.get("logging", "task_log_reader")
151
152 logger = logging.getLogger("airflow.task")
153
154 def _get_handler(name):
155 return next((h for h in logger.handlers if h.name == name), None)
156
157 if _get_handler(task_log_reader) is None:
158 # Check for pre 1.10 setting that might be in deployed airflow.cfg files
159 if task_log_reader == "file.task" and _get_handler("task"):
160 warnings.warn(
161 f"task_log_reader setting in [logging] has a deprecated value of {task_log_reader!r}, "
162 "but no handler with this name was found. Please update your config to use task. "
163 "Running config has been adjusted to match",
164 DeprecationWarning,
165 stacklevel=2,
166 )
167 conf.set("logging", "task_log_reader", "task")
168 else:
169 raise AirflowConfigException(
170 f"Configured task_log_reader {task_log_reader!r} was not a handler of "
171 f"the 'airflow.task' logger."
172 )