Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/config_templates/airflow_local_settings.py: 40%
73 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Airflow logging settings."""
19from __future__ import annotations
21import os
22from pathlib import Path
23from typing import Any
24from urllib.parse import urlsplit
26from airflow.configuration import conf
27from airflow.exceptions import AirflowException
29LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
32# Flask appbuilder's info level log is very verbose,
33# so it's set to 'WARN' by default.
34FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper()
36LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
37DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT")
39LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
40 "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware"
41)
43COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT")
45COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG")
47COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS")
49DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")
51BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")
53PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")
55DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
56 "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
57)
59# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3
60# All of these handlers inherited from FileTaskHandler and providing any value rather than None
61# would raise deprecation warning.
62FILENAME_TEMPLATE: str | None = None
64PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE")
66DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
67 "version": 1,
68 "disable_existing_loggers": False,
69 "formatters": {
70 "airflow": {
71 "format": LOG_FORMAT,
72 "class": LOG_FORMATTER_CLASS,
73 },
74 "airflow_coloured": {
75 "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
76 "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
77 },
78 "source_processor": {
79 "format": DAG_PROCESSOR_LOG_FORMAT,
80 "class": LOG_FORMATTER_CLASS,
81 },
82 },
83 "filters": {
84 "mask_secrets": {
85 "()": "airflow.utils.log.secrets_masker.SecretsMasker",
86 },
87 },
88 "handlers": {
89 "console": {
90 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
91 "formatter": "airflow_coloured",
92 "stream": "sys.stdout",
93 "filters": ["mask_secrets"],
94 },
95 "task": {
96 "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
97 "formatter": "airflow",
98 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
99 "filters": ["mask_secrets"],
100 },
101 "processor": {
102 "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",
103 "formatter": "airflow",
104 "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
105 "filename_template": PROCESSOR_FILENAME_TEMPLATE,
106 "filters": ["mask_secrets"],
107 },
108 "processor_to_stdout": {
109 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
110 "formatter": "source_processor",
111 "stream": "sys.stdout",
112 "filters": ["mask_secrets"],
113 },
114 },
115 "loggers": {
116 "airflow.processor": {
117 "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"],
118 "level": LOG_LEVEL,
119 # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
120 "propagate": True,
121 },
122 "airflow.task": {
123 "handlers": ["task"],
124 "level": LOG_LEVEL,
125 # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
126 "propagate": True,
127 "filters": ["mask_secrets"],
128 },
129 "flask_appbuilder": {
130 "handlers": ["console"],
131 "level": FAB_LOG_LEVEL,
132 "propagate": True,
133 },
134 },
135 "root": {
136 "handlers": ["console"],
137 "level": LOG_LEVEL,
138 "filters": ["mask_secrets"],
139 },
140}
142EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None)
143if EXTRA_LOGGER_NAMES:
144 new_loggers = {
145 logger_name.strip(): {
146 "handlers": ["console"],
147 "level": LOG_LEVEL,
148 "propagate": True,
149 }
150 for logger_name in EXTRA_LOGGER_NAMES.split(",")
151 }
152 DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
154DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
155 "handlers": {
156 "processor_manager": {
157 "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
158 "formatter": "airflow",
159 "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
160 "mode": "a",
161 "maxBytes": 104857600, # 100MB
162 "backupCount": 5,
163 }
164 },
165 "loggers": {
166 "airflow.processor_manager": {
167 "handlers": ["processor_manager"],
168 "level": LOG_LEVEL,
169 "propagate": False,
170 }
171 },
172}
174# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
175# This is to avoid exceptions when initializing RotatingFileHandler multiple times
176# in multiple processes.
177if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
178 DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
179 DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])
181 # Manually create log directory for processor_manager handler as RotatingFileHandler
182 # will only create file but not the directory.
183 processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
184 "processor_manager"
185 ]
186 directory: str = os.path.dirname(processor_manager_handler_config["filename"])
187 Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)
189##################
190# Remote logging #
191##################
193REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
195if REMOTE_LOGGING:
197 ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")
199 # Storage bucket URL for remote logging
200 # S3 buckets should start with "s3://"
201 # Cloudwatch log groups should start with "cloudwatch://"
202 # GCS buckets should start with "gs://"
203 # WASB buckets should start with "wasb"
204 # just to help Airflow select correct handler
205 REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
206 REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})
208 if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
209 S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
210 "task": {
211 "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
212 "formatter": "airflow",
213 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
214 "s3_log_folder": REMOTE_BASE_LOG_FOLDER,
215 "filename_template": FILENAME_TEMPLATE,
216 },
217 }
219 DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
220 elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
221 url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
222 CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
223 "task": {
224 "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
225 "formatter": "airflow",
226 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
227 "log_group_arn": url_parts.netloc + url_parts.path,
228 "filename_template": FILENAME_TEMPLATE,
229 },
230 }
232 DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
233 elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
234 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
235 GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
236 "task": {
237 "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
238 "formatter": "airflow",
239 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
240 "gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
241 "filename_template": FILENAME_TEMPLATE,
242 "gcp_key_path": key_path,
243 },
244 }
246 DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
247 elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
248 WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
249 "task": {
250 "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
251 "formatter": "airflow",
252 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
253 "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
254 "wasb_container": "airflow-logs",
255 "filename_template": FILENAME_TEMPLATE,
256 },
257 }
259 DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
260 elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
261 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
262 # stackdriver:///airflow-tasks => airflow-tasks
263 log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
264 STACKDRIVER_REMOTE_HANDLERS = {
265 "task": {
266 "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
267 "formatter": "airflow",
268 "name": log_name,
269 "gcp_key_path": key_path,
270 }
271 }
273 DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
274 elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
275 OSS_REMOTE_HANDLERS = {
276 "task": {
277 "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
278 "formatter": "airflow",
279 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
280 "oss_log_folder": REMOTE_BASE_LOG_FOLDER,
281 "filename_template": FILENAME_TEMPLATE,
282 },
283 }
284 DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
285 elif ELASTICSEARCH_HOST:
286 ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
287 ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
288 ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
289 ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
290 ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
291 ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
292 ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
294 ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
295 "task": {
296 "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
297 "formatter": "airflow",
298 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
299 "filename_template": FILENAME_TEMPLATE,
300 "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
301 "host": ELASTICSEARCH_HOST,
302 "frontend": ELASTICSEARCH_FRONTEND,
303 "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
304 "json_format": ELASTICSEARCH_JSON_FORMAT,
305 "json_fields": ELASTICSEARCH_JSON_FIELDS,
306 "host_field": ELASTICSEARCH_HOST_FIELD,
307 "offset_field": ELASTICSEARCH_OFFSET_FIELD,
308 },
309 }
311 DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
312 else:
313 raise AirflowException(
314 "Incorrect remote log configuration. Please check the configuration of option 'host' in "
315 "section 'elasticsearch' if you are using Elasticsearch. In the other case, "
316 "'remote_base_log_folder' option in the 'logging' section."
317 )
318 DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)