1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Airflow logging settings."""
19
20from __future__ import annotations
21
22import os
23from pathlib import Path
24from typing import Any
25from urllib.parse import urlsplit
26
27from airflow.configuration import conf
28from airflow.exceptions import AirflowException
29
30LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
31
32
33# Flask appbuilder's info level log is very verbose,
34# so it's set to 'WARN' by default.
35FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper()
36
37LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
38DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT")
39
40LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
41 "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware"
42)
43
44COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT")
45
46COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG")
47
48COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS")
49
50DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")
51
52BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")
53
54PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")
55
56DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
57 "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
58)
59
60DAG_PROCESSOR_MANAGER_LOG_STDOUT: str = conf.get_mandatory_value(
61 "logging", "DAG_PROCESSOR_MANAGER_LOG_STDOUT"
62)
63
64# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3
65# All of these handlers inherited from FileTaskHandler and providing any value rather than None
66# would raise deprecation warning.
67FILENAME_TEMPLATE: str | None = None
68
69PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE")
70
71DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
72 "version": 1,
73 "disable_existing_loggers": False,
74 "formatters": {
75 "airflow": {
76 "format": LOG_FORMAT,
77 "class": LOG_FORMATTER_CLASS,
78 },
79 "airflow_coloured": {
80 "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
81 "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
82 },
83 "source_processor": {
84 "format": DAG_PROCESSOR_LOG_FORMAT,
85 "class": LOG_FORMATTER_CLASS,
86 },
87 },
88 "filters": {
89 "mask_secrets": {
90 "()": "airflow.utils.log.secrets_masker.SecretsMasker",
91 },
92 },
93 "handlers": {
94 "console": {
95 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
96 "formatter": "airflow_coloured",
97 "stream": "sys.stdout",
98 "filters": ["mask_secrets"],
99 },
100 "task": {
101 "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
102 "formatter": "airflow",
103 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
104 "filters": ["mask_secrets"],
105 },
106 "processor": {
107 "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",
108 "formatter": "airflow",
109 "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
110 "filename_template": PROCESSOR_FILENAME_TEMPLATE,
111 "filters": ["mask_secrets"],
112 },
113 "processor_to_stdout": {
114 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
115 "formatter": "source_processor",
116 "stream": "sys.stdout",
117 "filters": ["mask_secrets"],
118 },
119 },
120 "loggers": {
121 "airflow.processor": {
122 "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"],
123 "level": LOG_LEVEL,
124 # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
125 "propagate": True,
126 },
127 "airflow.task": {
128 "handlers": ["task"],
129 "level": LOG_LEVEL,
130 # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
131 "propagate": True,
132 "filters": ["mask_secrets"],
133 },
134 "flask_appbuilder": {
135 "handlers": ["console"],
136 "level": FAB_LOG_LEVEL,
137 "propagate": True,
138 },
139 },
140 "root": {
141 "handlers": ["console"],
142 "level": LOG_LEVEL,
143 "filters": ["mask_secrets"],
144 },
145}
146
147EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None)
148if EXTRA_LOGGER_NAMES:
149 new_loggers = {
150 logger_name.strip(): {
151 "handlers": ["console"],
152 "level": LOG_LEVEL,
153 "propagate": True,
154 }
155 for logger_name in EXTRA_LOGGER_NAMES.split(",")
156 }
157 DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
158
159DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
160 "handlers": {
161 "processor_manager": {
162 "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
163 "formatter": "airflow",
164 "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
165 "mode": "a",
166 "maxBytes": 104857600, # 100MB
167 "backupCount": 5,
168 }
169 },
170 "loggers": {
171 "airflow.processor_manager": {
172 "handlers": ["processor_manager"],
173 "level": LOG_LEVEL,
174 "propagate": False,
175 }
176 },
177}
178
179if DAG_PROCESSOR_MANAGER_LOG_STDOUT == "True":
180 DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"].update(
181 {
182 "console": {
183 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
184 "formatter": "airflow",
185 "stream": "sys.stdout",
186 "filters": ["mask_secrets"],
187 }
188 }
189 )
190 DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]["airflow.processor_manager"]["handlers"].append("console")
191
192# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
193# This is to avoid exceptions when initializing RotatingFileHandler multiple times
194# in multiple processes.
195if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
196 DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
197 DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])
198
199 # Manually create log directory for processor_manager handler as RotatingFileHandler
200 # will only create file but not the directory.
201 processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
202 "processor_manager"
203 ]
204 directory: str = os.path.dirname(processor_manager_handler_config["filename"])
205 Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)
206
207##################
208# Remote logging #
209##################
210
211REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
212
213if REMOTE_LOGGING:
214 ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")
215
216 # Storage bucket URL for remote logging
217 # S3 buckets should start with "s3://"
218 # Cloudwatch log groups should start with "cloudwatch://"
219 # GCS buckets should start with "gs://"
220 # WASB buckets should start with "wasb"
221 # HDFS path should start with "hdfs://"
222 # just to help Airflow select correct handler
223 REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
224 REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})
225
226 if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
227 S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
228 "task": {
229 "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
230 "formatter": "airflow",
231 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
232 "s3_log_folder": REMOTE_BASE_LOG_FOLDER,
233 "filename_template": FILENAME_TEMPLATE,
234 },
235 }
236
237 DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
238 elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
239 url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
240 CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
241 "task": {
242 "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
243 "formatter": "airflow",
244 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
245 "log_group_arn": url_parts.netloc + url_parts.path,
246 "filename_template": FILENAME_TEMPLATE,
247 },
248 }
249
250 DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
251 elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
252 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
253 GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
254 "task": {
255 "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
256 "formatter": "airflow",
257 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
258 "gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
259 "filename_template": FILENAME_TEMPLATE,
260 "gcp_key_path": key_path,
261 },
262 }
263
264 DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
265 elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
266 wasb_log_container = conf.get_mandatory_value(
267 "azure_remote_logging", "remote_wasb_log_container", fallback="airflow-logs"
268 )
269 WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
270 "task": {
271 "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
272 "formatter": "airflow",
273 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
274 "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
275 "wasb_container": wasb_log_container,
276 "filename_template": FILENAME_TEMPLATE,
277 },
278 }
279
280 DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
281 elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
282 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
283 # stackdriver:///airflow-tasks => airflow-tasks
284 log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
285 STACKDRIVER_REMOTE_HANDLERS = {
286 "task": {
287 "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
288 "formatter": "airflow",
289 "gcp_log_name": log_name,
290 "gcp_key_path": key_path,
291 }
292 }
293
294 DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
295 elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
296 OSS_REMOTE_HANDLERS = {
297 "task": {
298 "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
299 "formatter": "airflow",
300 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
301 "oss_log_folder": REMOTE_BASE_LOG_FOLDER,
302 "filename_template": FILENAME_TEMPLATE,
303 },
304 }
305 DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
306 elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"):
307 HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
308 "task": {
309 "class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler",
310 "formatter": "airflow",
311 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
312 "hdfs_log_folder": REMOTE_BASE_LOG_FOLDER,
313 "filename_template": FILENAME_TEMPLATE,
314 },
315 }
316 DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS)
317 elif ELASTICSEARCH_HOST:
318 ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
319 ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
320 ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
321 ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
322 ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
323 ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
324 ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
325
326 ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
327 "task": {
328 "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
329 "formatter": "airflow",
330 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
331 "filename_template": FILENAME_TEMPLATE,
332 "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
333 "host": ELASTICSEARCH_HOST,
334 "frontend": ELASTICSEARCH_FRONTEND,
335 "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
336 "json_format": ELASTICSEARCH_JSON_FORMAT,
337 "json_fields": ELASTICSEARCH_JSON_FIELDS,
338 "host_field": ELASTICSEARCH_HOST_FIELD,
339 "offset_field": ELASTICSEARCH_OFFSET_FIELD,
340 },
341 }
342
343 DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
344 else:
345 raise AirflowException(
346 "Incorrect remote log configuration. Please check the configuration of option 'host' in "
347 "section 'elasticsearch' if you are using Elasticsearch. In the other case, "
348 "'remote_base_log_folder' option in the 'logging' section."
349 )
350 DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)