Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/config_templates/airflow_local_settings.py: 40%
72 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Airflow logging settings."""
19from __future__ import annotations
21import os
22from pathlib import Path
23from typing import Any
24from urllib.parse import urlsplit
26from airflow.configuration import conf
27from airflow.exceptions import AirflowException
29# TODO: Logging format and level should be configured
30# in this file instead of from airflow.cfg. Currently
31# there are other log format and level configurations in
32# settings.py and cli.py. Please see AIRFLOW-1455.
33LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
36# Flask appbuilder's info level log is very verbose,
37# so it's set to 'WARN' by default.
38FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper()
40LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
41DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT")
43LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
44 "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware"
45)
47COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT")
49COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG")
51COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS")
53DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")
55BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")
57PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")
59DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
60 "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
61)
63# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3
64# All of these handlers inherited from FileTaskHandler and providing any value rather than None
65# would raise deprecation warning.
66FILENAME_TEMPLATE: str | None = None
68PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE")
70DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
71 "version": 1,
72 "disable_existing_loggers": False,
73 "formatters": {
74 "airflow": {
75 "format": LOG_FORMAT,
76 "class": LOG_FORMATTER_CLASS,
77 },
78 "airflow_coloured": {
79 "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
80 "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
81 },
82 "source_processor": {
83 "format": DAG_PROCESSOR_LOG_FORMAT,
84 "class": LOG_FORMATTER_CLASS,
85 },
86 },
87 "filters": {
88 "mask_secrets": {
89 "()": "airflow.utils.log.secrets_masker.SecretsMasker",
90 },
91 },
92 "handlers": {
93 "console": {
94 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
95 "formatter": "airflow_coloured",
96 "stream": "sys.stdout",
97 "filters": ["mask_secrets"],
98 },
99 "task": {
100 "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
101 "formatter": "airflow",
102 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
103 "filters": ["mask_secrets"],
104 },
105 "processor": {
106 "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",
107 "formatter": "airflow",
108 "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
109 "filename_template": PROCESSOR_FILENAME_TEMPLATE,
110 "filters": ["mask_secrets"],
111 },
112 "processor_to_stdout": {
113 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
114 "formatter": "source_processor",
115 "stream": "sys.stdout",
116 "filters": ["mask_secrets"],
117 },
118 },
119 "loggers": {
120 "airflow.processor": {
121 "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"],
122 "level": LOG_LEVEL,
123 # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
124 "propagate": True,
125 },
126 "airflow.task": {
127 "handlers": ["task"],
128 "level": LOG_LEVEL,
129 # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
130 "propagate": True,
131 "filters": ["mask_secrets"],
132 },
133 "flask_appbuilder": {
134 "handlers": ["console"],
135 "level": FAB_LOG_LEVEL,
136 "propagate": True,
137 },
138 },
139 "root": {
140 "handlers": ["console"],
141 "level": LOG_LEVEL,
142 "filters": ["mask_secrets"],
143 },
144}
146EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None)
147if EXTRA_LOGGER_NAMES:
148 new_loggers = {
149 logger_name.strip(): {
150 "handlers": ["console"],
151 "level": LOG_LEVEL,
152 "propagate": True,
153 }
154 for logger_name in EXTRA_LOGGER_NAMES.split(",")
155 }
156 DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
158DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
159 "handlers": {
160 "processor_manager": {
161 "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
162 "formatter": "airflow",
163 "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
164 "mode": "a",
165 "maxBytes": 104857600, # 100MB
166 "backupCount": 5,
167 }
168 },
169 "loggers": {
170 "airflow.processor_manager": {
171 "handlers": ["processor_manager"],
172 "level": LOG_LEVEL,
173 "propagate": False,
174 }
175 },
176}
178# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
179# This is to avoid exceptions when initializing RotatingFileHandler multiple times
180# in multiple processes.
181if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
182 DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
183 DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])
185 # Manually create log directory for processor_manager handler as RotatingFileHandler
186 # will only create file but not the directory.
187 processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
188 "processor_manager"
189 ]
190 directory: str = os.path.dirname(processor_manager_handler_config["filename"])
191 Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)
193##################
194# Remote logging #
195##################
197REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
199if REMOTE_LOGGING:
201 ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")
203 # Storage bucket URL for remote logging
204 # S3 buckets should start with "s3://"
205 # Cloudwatch log groups should start with "cloudwatch://"
206 # GCS buckets should start with "gs://"
207 # WASB buckets should start with "wasb"
208 # just to help Airflow select correct handler
209 REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
211 if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
212 S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
213 "task": {
214 "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
215 "formatter": "airflow",
216 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
217 "s3_log_folder": REMOTE_BASE_LOG_FOLDER,
218 "filename_template": FILENAME_TEMPLATE,
219 },
220 }
222 DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
223 elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
224 url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
225 CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
226 "task": {
227 "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
228 "formatter": "airflow",
229 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
230 "log_group_arn": url_parts.netloc + url_parts.path,
231 "filename_template": FILENAME_TEMPLATE,
232 },
233 }
235 DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
236 elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
237 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
238 GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
239 "task": {
240 "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
241 "formatter": "airflow",
242 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
243 "gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
244 "filename_template": FILENAME_TEMPLATE,
245 "gcp_key_path": key_path,
246 },
247 }
249 DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
250 elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
251 WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
252 "task": {
253 "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
254 "formatter": "airflow",
255 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
256 "wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
257 "wasb_container": "airflow-logs",
258 "filename_template": FILENAME_TEMPLATE,
259 "delete_local_copy": False,
260 },
261 }
263 DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
264 elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
265 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
266 # stackdriver:///airflow-tasks => airflow-tasks
267 log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
268 STACKDRIVER_REMOTE_HANDLERS = {
269 "task": {
270 "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
271 "formatter": "airflow",
272 "name": log_name,
273 "gcp_key_path": key_path,
274 }
275 }
277 DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
278 elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
279 OSS_REMOTE_HANDLERS = {
280 "task": {
281 "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
282 "formatter": "airflow",
283 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
284 "oss_log_folder": REMOTE_BASE_LOG_FOLDER,
285 "filename_template": FILENAME_TEMPLATE,
286 },
287 }
288 DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
289 elif ELASTICSEARCH_HOST:
290 ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get_mandatory_value("elasticsearch", "LOG_ID_TEMPLATE")
291 ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
292 ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
293 ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
294 ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
295 ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
296 ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
297 ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
299 ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
300 "task": {
301 "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
302 "formatter": "airflow",
303 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
304 "log_id_template": ELASTICSEARCH_LOG_ID_TEMPLATE,
305 "filename_template": FILENAME_TEMPLATE,
306 "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
307 "host": ELASTICSEARCH_HOST,
308 "frontend": ELASTICSEARCH_FRONTEND,
309 "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
310 "json_format": ELASTICSEARCH_JSON_FORMAT,
311 "json_fields": ELASTICSEARCH_JSON_FIELDS,
312 "host_field": ELASTICSEARCH_HOST_FIELD,
313 "offset_field": ELASTICSEARCH_OFFSET_FIELD,
314 },
315 }
317 DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
318 else:
319 raise AirflowException(
320 "Incorrect remote log configuration. Please check the configuration of option 'host' in "
321 "section 'elasticsearch' if you are using Elasticsearch. In the other case, "
322 "'remote_base_log_folder' option in the 'logging' section."
323 )