1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import logging
21import warnings
22from importlib import import_module
23from typing import TYPE_CHECKING, Any
24
25from airflow._shared.module_loading import import_string
26from airflow.configuration import conf
27from airflow.exceptions import AirflowConfigException
28
29if TYPE_CHECKING:
30 from airflow.logging.remote import RemoteLogIO
31
32log = logging.getLogger(__name__)
33
34
35REMOTE_TASK_LOG: RemoteLogIO | None
36DEFAULT_REMOTE_CONN_ID: str | None = None
37
38
39def __getattr__(name: str):
40 if name == "REMOTE_TASK_LOG":
41 load_logging_config()
42 return REMOTE_TASK_LOG
43
44
45def load_logging_config() -> tuple[dict[str, Any], str]:
46 """Configure & Validate Airflow Logging."""
47 global REMOTE_TASK_LOG, DEFAULT_REMOTE_CONN_ID
48 fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG"
49 logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback)
50
51 # Sometimes we end up with `""` as the value!
52 logging_class_path = logging_class_path or fallback
53
54 user_defined = logging_class_path != fallback
55
56 try:
57 logging_config = import_string(logging_class_path)
58
59 # Make sure that the variable is in scope
60 if not isinstance(logging_config, dict):
61 raise ValueError("Logging Config should be of dict type")
62
63 if user_defined:
64 log.info("Successfully imported user-defined logging config from %s", logging_class_path)
65
66 except Exception as err:
67 # Import default logging configurations.
68 raise ImportError(
69 f"Unable to load {'custom ' if user_defined else ''}logging config from {logging_class_path} due "
70 f"to: {type(err).__name__}:{err}"
71 )
72 else:
73 modpath = logging_class_path.rsplit(".", 1)[0]
74 try:
75 mod = import_module(modpath)
76
77 # Load remote logging configuration from the custom module
78 REMOTE_TASK_LOG = getattr(mod, "REMOTE_TASK_LOG")
79 DEFAULT_REMOTE_CONN_ID = getattr(mod, "DEFAULT_REMOTE_CONN_ID", None)
80 except Exception as err:
81 log.info("Remote task logs will not be available due to an error: %s", err)
82
83 return logging_config, logging_class_path
84
85
86def configure_logging():
87 from airflow._shared.logging import configure_logging, init_log_folder, translate_config_values
88
89 logging_config, logging_class_path = load_logging_config()
90 try:
91 level: str = getattr(
92 logging_config, "LOG_LEVEL", conf.get("logging", "logging_level", fallback="INFO")
93 ).upper()
94
95 colors = getattr(
96 logging_config,
97 "COLORED_LOG",
98 conf.getboolean("logging", "colored_console_log", fallback=True),
99 )
100 # Try to init logging
101
102 log_fmt, callsite_params = translate_config_values(
103 log_format=getattr(logging_config, "LOG_FORMAT", conf.get("logging", "log_format", fallback="")),
104 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]),
105 )
106 configure_logging(
107 log_level=level,
108 namespace_log_levels=conf.get("logging", "namespace_levels", fallback=None),
109 stdlib_config=logging_config,
110 log_format=log_fmt,
111 callsite_parameters=callsite_params,
112 colors=colors,
113 )
114 except (ValueError, KeyError) as e:
115 log.error("Unable to load the config, contains a configuration error.")
116 # When there is an error in the config, escalate the exception
117 # otherwise Airflow would silently fall back on the default config
118 raise e
119
120 validate_logging_config()
121
122 new_folder_permissions = int(
123 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"),
124 8,
125 )
126
127 base_log_folder = conf.get("logging", "base_log_folder")
128
129 return init_log_folder(
130 base_log_folder,
131 new_folder_permissions=new_folder_permissions,
132 )
133
134
135def validate_logging_config():
136 """Validate the provided Logging Config."""
137 # Now lets validate the other logging-related settings
138 task_log_reader = conf.get("logging", "task_log_reader")
139
140 logger = logging.getLogger("airflow.task")
141
142 def _get_handler(name):
143 return next((h for h in logger.handlers if h.name == name), None)
144
145 if _get_handler(task_log_reader) is None:
146 # Check for pre 1.10 setting that might be in deployed airflow.cfg files
147 if task_log_reader == "file.task" and _get_handler("task"):
148 warnings.warn(
149 f"task_log_reader setting in [logging] has a deprecated value of {task_log_reader!r}, "
150 "but no handler with this name was found. Please update your config to use task. "
151 "Running config has been adjusted to match",
152 DeprecationWarning,
153 stacklevel=2,
154 )
155 conf.set("logging", "task_log_reader", "task")
156 else:
157 raise AirflowConfigException(
158 f"Configured task_log_reader {task_log_reader!r} was not a handler of "
159 f"the 'airflow.task' logger."
160 )