1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Airflow logging settings."""
19
20from __future__ import annotations
21
22import os
23from typing import TYPE_CHECKING, Any
24from urllib.parse import urlsplit
25
26from airflow.configuration import conf
27from airflow.exceptions import AirflowException
28
29if TYPE_CHECKING:
30 from airflow.logging_config import RemoteLogIO, RemoteLogStreamIO
31
32LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
33
34
35# Flask appbuilder's info level log is very verbose,
36# so it's set to 'WARN' by default.
37FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper()
38
39LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
40DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT")
41
42LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
43 "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware"
44)
45
46DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")
47
48BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER"))
49
50# This isn't used anymore, but kept for compat of people who might have imported it
51DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
52 "version": 1,
53 "disable_existing_loggers": False,
54 "formatters": {
55 "airflow": {
56 "format": LOG_FORMAT,
57 "class": LOG_FORMATTER_CLASS,
58 },
59 "source_processor": {
60 "format": DAG_PROCESSOR_LOG_FORMAT,
61 "class": LOG_FORMATTER_CLASS,
62 },
63 },
64 "filters": {
65 "mask_secrets_core": {
66 "()": "airflow._shared.secrets_masker._secrets_masker",
67 },
68 },
69 "handlers": {
70 "console": {
71 "class": "logging.StreamHandler",
72 # "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
73 "formatter": "airflow",
74 "stream": "sys.stdout",
75 "filters": ["mask_secrets_core"],
76 },
77 "task": {
78 "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
79 "formatter": "airflow",
80 "base_log_folder": BASE_LOG_FOLDER,
81 "filters": ["mask_secrets_core"],
82 },
83 },
84 "loggers": {
85 "airflow.task": {
86 "handlers": ["task"],
87 "level": LOG_LEVEL,
88 # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
89 "propagate": True,
90 "filters": ["mask_secrets_core"],
91 },
92 "flask_appbuilder": {
93 "handlers": ["console"],
94 "level": FAB_LOG_LEVEL,
95 "propagate": True,
96 },
97 },
98 "root": {
99 "handlers": ["console"],
100 "level": LOG_LEVEL,
101 "filters": ["mask_secrets_core"],
102 },
103}
104
105EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None)
106if EXTRA_LOGGER_NAMES:
107 new_loggers = {
108 logger_name.strip(): {
109 "handlers": ["console"],
110 "level": LOG_LEVEL,
111 "propagate": True,
112 }
113 for logger_name in EXTRA_LOGGER_NAMES.split(",")
114 }
115 DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
116
117##################
118# Remote logging #
119##################
120
121REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
122REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None
123DEFAULT_REMOTE_CONN_ID: str | None = None
124
125
126def _default_conn_name_from(mod_path, hook_name):
127 # Try to set the default conn name from a hook, but don't error if something goes wrong at runtime
128 from importlib import import_module
129
130 global DEFAULT_REMOTE_CONN_ID
131
132 try:
133 mod = import_module(mod_path)
134
135 hook = getattr(mod, hook_name)
136
137 DEFAULT_REMOTE_CONN_ID = getattr(hook, "default_conn_name")
138 except Exception:
139 # Lets error in tests though!
140 if "PYTEST_CURRENT_TEST" in os.environ:
141 raise
142 return None
143
144
145if REMOTE_LOGGING:
146 ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")
147 OPENSEARCH_HOST: str | None = conf.get("opensearch", "HOST")
148 # Storage bucket URL for remote logging
149 # S3 buckets should start with "s3://"
150 # Cloudwatch log groups should start with "cloudwatch://"
151 # GCS buckets should start with "gs://"
152 # WASB buckets should start with "wasb"
153 # HDFS path should start with "hdfs://"
154 # just to help Airflow select correct handler
155 remote_base_log_folder: str = conf.get_mandatory_value("logging", "remote_base_log_folder")
156 remote_task_handler_kwargs = conf.getjson("logging", "remote_task_handler_kwargs", fallback={})
157 if not isinstance(remote_task_handler_kwargs, dict):
158 raise ValueError(
159 "logging/remote_task_handler_kwargs must be a JSON object (a python dict), we got "
160 f"{type(remote_task_handler_kwargs)}"
161 )
162 delete_local_copy = conf.getboolean("logging", "delete_local_logs")
163
164 if remote_base_log_folder.startswith("s3://"):
165 from airflow.providers.amazon.aws.log.s3_task_handler import S3RemoteLogIO
166
167 _default_conn_name_from("airflow.providers.amazon.aws.hooks.s3", "S3Hook")
168 REMOTE_TASK_LOG = S3RemoteLogIO(
169 **(
170 {
171 "base_log_folder": BASE_LOG_FOLDER,
172 "remote_base": remote_base_log_folder,
173 "delete_local_copy": delete_local_copy,
174 }
175 | remote_task_handler_kwargs
176 )
177 )
178 remote_task_handler_kwargs = {}
179
180 elif remote_base_log_folder.startswith("cloudwatch://"):
181 from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudWatchRemoteLogIO
182
183 _default_conn_name_from("airflow.providers.amazon.aws.hooks.logs", "AwsLogsHook")
184 url_parts = urlsplit(remote_base_log_folder)
185 REMOTE_TASK_LOG = CloudWatchRemoteLogIO(
186 **(
187 {
188 "base_log_folder": BASE_LOG_FOLDER,
189 "remote_base": remote_base_log_folder,
190 "delete_local_copy": delete_local_copy,
191 "log_group_arn": url_parts.netloc + url_parts.path,
192 }
193 | remote_task_handler_kwargs
194 )
195 )
196 remote_task_handler_kwargs = {}
197 elif remote_base_log_folder.startswith("gs://"):
198 from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO
199
200 _default_conn_name_from("airflow.providers.google.cloud.hooks.gcs", "GCSHook")
201 key_path = conf.get_mandatory_value("logging", "google_key_path", fallback=None)
202
203 REMOTE_TASK_LOG = GCSRemoteLogIO(
204 **(
205 {
206 "base_log_folder": BASE_LOG_FOLDER,
207 "remote_base": remote_base_log_folder,
208 "delete_local_copy": delete_local_copy,
209 "gcp_key_path": key_path,
210 }
211 | remote_task_handler_kwargs
212 )
213 )
214 remote_task_handler_kwargs = {}
215 elif remote_base_log_folder.startswith("wasb"):
216 from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbRemoteLogIO
217
218 _default_conn_name_from("airflow.providers.microsoft.azure.hooks.wasb", "WasbHook")
219 wasb_log_container = conf.get_mandatory_value(
220 "azure_remote_logging", "remote_wasb_log_container", fallback="airflow-logs"
221 )
222
223 REMOTE_TASK_LOG = WasbRemoteLogIO(
224 **(
225 {
226 "base_log_folder": BASE_LOG_FOLDER,
227 "remote_base": remote_base_log_folder,
228 "delete_local_copy": delete_local_copy,
229 "wasb_container": wasb_log_container,
230 }
231 | remote_task_handler_kwargs
232 )
233 )
234 remote_task_handler_kwargs = {}
235 elif remote_base_log_folder.startswith("stackdriver://"):
236 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
237 # stackdriver:///airflow-tasks => airflow-tasks
238 log_name = urlsplit(remote_base_log_folder).path[1:]
239 STACKDRIVER_REMOTE_HANDLERS = {
240 "task": {
241 "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
242 "formatter": "airflow",
243 "gcp_log_name": log_name,
244 "gcp_key_path": key_path,
245 }
246 }
247
248 DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
249 elif remote_base_log_folder.startswith("oss://"):
250 from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSRemoteLogIO
251
252 _default_conn_name_from("airflow.providers.alibaba.cloud.hooks.oss", "OSSHook")
253
254 REMOTE_TASK_LOG = OSSRemoteLogIO(
255 **(
256 {
257 "base_log_folder": BASE_LOG_FOLDER,
258 "remote_base": remote_base_log_folder,
259 "delete_local_copy": delete_local_copy,
260 }
261 | remote_task_handler_kwargs
262 )
263 )
264 remote_task_handler_kwargs = {}
265 elif remote_base_log_folder.startswith("hdfs://"):
266 from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsRemoteLogIO
267
268 _default_conn_name_from("airflow.providers.apache.hdfs.hooks.webhdfs", "WebHDFSHook")
269
270 REMOTE_TASK_LOG = HdfsRemoteLogIO(
271 **(
272 {
273 "base_log_folder": BASE_LOG_FOLDER,
274 "remote_base": urlsplit(remote_base_log_folder).path,
275 "delete_local_copy": delete_local_copy,
276 }
277 | remote_task_handler_kwargs
278 )
279 )
280 remote_task_handler_kwargs = {}
281 elif ELASTICSEARCH_HOST:
282 ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
283 ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
284 ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
285 ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", "WRITE_TO_ES")
286 ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
287 ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
288 ELASTICSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
289 ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
290 ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
291
292 ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
293 "task": {
294 "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
295 "formatter": "airflow",
296 "base_log_folder": BASE_LOG_FOLDER,
297 "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
298 "host": ELASTICSEARCH_HOST,
299 "frontend": ELASTICSEARCH_FRONTEND,
300 "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
301 "write_to_es": ELASTICSEARCH_WRITE_TO_ES,
302 "target_index": ELASTICSEARCH_TARGET_INDEX,
303 "json_format": ELASTICSEARCH_JSON_FORMAT,
304 "json_fields": ELASTICSEARCH_JSON_FIELDS,
305 "host_field": ELASTICSEARCH_HOST_FIELD,
306 "offset_field": ELASTICSEARCH_OFFSET_FIELD,
307 },
308 }
309
310 DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
311 elif OPENSEARCH_HOST:
312 OPENSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK")
313 OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT")
314 OPENSEARCH_USERNAME: str = conf.get_mandatory_value("opensearch", "USERNAME")
315 OPENSEARCH_PASSWORD: str = conf.get_mandatory_value("opensearch", "PASSWORD")
316 OPENSEARCH_WRITE_STDOUT: bool = conf.getboolean("opensearch", "WRITE_STDOUT")
317 OPENSEARCH_JSON_FORMAT: bool = conf.getboolean("opensearch", "JSON_FORMAT")
318 OPENSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("opensearch", "JSON_FIELDS")
319 OPENSEARCH_HOST_FIELD: str = conf.get_mandatory_value("opensearch", "HOST_FIELD")
320 OPENSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("opensearch", "OFFSET_FIELD")
321
322 OPENSEARCH_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
323 "task": {
324 "class": "airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler",
325 "formatter": "airflow",
326 "base_log_folder": BASE_LOG_FOLDER,
327 "end_of_log_mark": OPENSEARCH_END_OF_LOG_MARK,
328 "host": OPENSEARCH_HOST,
329 "port": OPENSEARCH_PORT,
330 "username": OPENSEARCH_USERNAME,
331 "password": OPENSEARCH_PASSWORD,
332 "write_stdout": OPENSEARCH_WRITE_STDOUT,
333 "json_format": OPENSEARCH_JSON_FORMAT,
334 "json_fields": OPENSEARCH_JSON_FIELDS,
335 "host_field": OPENSEARCH_HOST_FIELD,
336 "offset_field": OPENSEARCH_OFFSET_FIELD,
337 },
338 }
339 DEFAULT_LOGGING_CONFIG["handlers"].update(OPENSEARCH_REMOTE_HANDLERS)
340 else:
341 raise AirflowException(
342 "Incorrect remote log configuration. Please check the configuration of option 'host' in "
343 "section 'elasticsearch' if you are using Elasticsearch. In the other case, "
344 "'remote_base_log_folder' option in the 'logging' section."
345 )
346 DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(remote_task_handler_kwargs)