Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/config_templates/airflow_local_settings.py: 40%

73 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Airflow logging settings.""" 

19from __future__ import annotations 

20 

21import os 

22from pathlib import Path 

23from typing import Any 

24from urllib.parse import urlsplit 

25 

26from airflow.configuration import conf 

27from airflow.exceptions import AirflowException 

28 

29LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() 

30 

31 

32# Flask appbuilder's info level log is very verbose, 

33# so it's set to 'WARN' by default. 

34FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper() 

35 

36LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT") 

37DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT") 

38 

39LOG_FORMATTER_CLASS: str = conf.get_mandatory_value( 

40 "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware" 

41) 

42 

43COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT") 

44 

45COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG") 

46 

47COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS") 

48 

49DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET") 

50 

51BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER") 

52 

53PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY") 

54 

55DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value( 

56 "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION" 

57) 

58 

59# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3 

60# All of these handlers inherited from FileTaskHandler and providing any value rather than None 

61# would raise deprecation warning. 

62FILENAME_TEMPLATE: str | None = None 

63 

64PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE") 

65 

66DEFAULT_LOGGING_CONFIG: dict[str, Any] = { 

67 "version": 1, 

68 "disable_existing_loggers": False, 

69 "formatters": { 

70 "airflow": { 

71 "format": LOG_FORMAT, 

72 "class": LOG_FORMATTER_CLASS, 

73 }, 

74 "airflow_coloured": { 

75 "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, 

76 "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS, 

77 }, 

78 "source_processor": { 

79 "format": DAG_PROCESSOR_LOG_FORMAT, 

80 "class": LOG_FORMATTER_CLASS, 

81 }, 

82 }, 

83 "filters": { 

84 "mask_secrets": { 

85 "()": "airflow.utils.log.secrets_masker.SecretsMasker", 

86 }, 

87 }, 

88 "handlers": { 

89 "console": { 

90 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", 

91 "formatter": "airflow_coloured", 

92 "stream": "sys.stdout", 

93 "filters": ["mask_secrets"], 

94 }, 

95 "task": { 

96 "class": "airflow.utils.log.file_task_handler.FileTaskHandler", 

97 "formatter": "airflow", 

98 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), 

99 "filters": ["mask_secrets"], 

100 }, 

101 "processor": { 

102 "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler", 

103 "formatter": "airflow", 

104 "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER), 

105 "filename_template": PROCESSOR_FILENAME_TEMPLATE, 

106 "filters": ["mask_secrets"], 

107 }, 

108 "processor_to_stdout": { 

109 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", 

110 "formatter": "source_processor", 

111 "stream": "sys.stdout", 

112 "filters": ["mask_secrets"], 

113 }, 

114 }, 

115 "loggers": { 

116 "airflow.processor": { 

117 "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"], 

118 "level": LOG_LEVEL, 

119 # Set to true here (and reset via set_context) so that if no file is configured we still get logs! 

120 "propagate": True, 

121 }, 

122 "airflow.task": { 

123 "handlers": ["task"], 

124 "level": LOG_LEVEL, 

125 # Set to true here (and reset via set_context) so that if no file is configured we still get logs! 

126 "propagate": True, 

127 "filters": ["mask_secrets"], 

128 }, 

129 "flask_appbuilder": { 

130 "handlers": ["console"], 

131 "level": FAB_LOG_LEVEL, 

132 "propagate": True, 

133 }, 

134 }, 

135 "root": { 

136 "handlers": ["console"], 

137 "level": LOG_LEVEL, 

138 "filters": ["mask_secrets"], 

139 }, 

140} 

141 

142EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None) 

143if EXTRA_LOGGER_NAMES: 

144 new_loggers = { 

145 logger_name.strip(): { 

146 "handlers": ["console"], 

147 "level": LOG_LEVEL, 

148 "propagate": True, 

149 } 

150 for logger_name in EXTRA_LOGGER_NAMES.split(",") 

151 } 

152 DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers) 

153 

154DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = { 

155 "handlers": { 

156 "processor_manager": { 

157 "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler", 

158 "formatter": "airflow", 

159 "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION, 

160 "mode": "a", 

161 "maxBytes": 104857600, # 100MB 

162 "backupCount": 5, 

163 } 

164 }, 

165 "loggers": { 

166 "airflow.processor_manager": { 

167 "handlers": ["processor_manager"], 

168 "level": LOG_LEVEL, 

169 "propagate": False, 

170 } 

171 }, 

172} 

173 

174# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set. 

175# This is to avoid exceptions when initializing RotatingFileHandler multiple times 

176# in multiple processes. 

177if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True": 

178 DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"]) 

179 DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]) 

180 

181 # Manually create log directory for processor_manager handler as RotatingFileHandler 

182 # will only create file but not the directory. 

183 processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][ 

184 "processor_manager" 

185 ] 

186 directory: str = os.path.dirname(processor_manager_handler_config["filename"]) 

187 Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755) 

188 

189################## 

190# Remote logging # 

191################## 

192 

193REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") 

194 

195if REMOTE_LOGGING: 

196 

197 ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST") 

198 

199 # Storage bucket URL for remote logging 

200 # S3 buckets should start with "s3://" 

201 # Cloudwatch log groups should start with "cloudwatch://" 

202 # GCS buckets should start with "gs://" 

203 # WASB buckets should start with "wasb" 

204 # just to help Airflow select correct handler 

205 REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER") 

206 REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={}) 

207 

208 if REMOTE_BASE_LOG_FOLDER.startswith("s3://"): 

209 S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

210 "task": { 

211 "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler", 

212 "formatter": "airflow", 

213 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

214 "s3_log_folder": REMOTE_BASE_LOG_FOLDER, 

215 "filename_template": FILENAME_TEMPLATE, 

216 }, 

217 } 

218 

219 DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS) 

220 elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"): 

221 url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER) 

222 CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

223 "task": { 

224 "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler", 

225 "formatter": "airflow", 

226 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

227 "log_group_arn": url_parts.netloc + url_parts.path, 

228 "filename_template": FILENAME_TEMPLATE, 

229 }, 

230 } 

231 

232 DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS) 

233 elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"): 

234 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) 

235 GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

236 "task": { 

237 "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler", 

238 "formatter": "airflow", 

239 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

240 "gcs_log_folder": REMOTE_BASE_LOG_FOLDER, 

241 "filename_template": FILENAME_TEMPLATE, 

242 "gcp_key_path": key_path, 

243 }, 

244 } 

245 

246 DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS) 

247 elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"): 

248 WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { 

249 "task": { 

250 "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler", 

251 "formatter": "airflow", 

252 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

253 "wasb_log_folder": REMOTE_BASE_LOG_FOLDER, 

254 "wasb_container": "airflow-logs", 

255 "filename_template": FILENAME_TEMPLATE, 

256 }, 

257 } 

258 

259 DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS) 

260 elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"): 

261 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) 

262 # stackdriver:///airflow-tasks => airflow-tasks 

263 log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:] 

264 STACKDRIVER_REMOTE_HANDLERS = { 

265 "task": { 

266 "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler", 

267 "formatter": "airflow", 

268 "name": log_name, 

269 "gcp_key_path": key_path, 

270 } 

271 } 

272 

273 DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS) 

274 elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"): 

275 OSS_REMOTE_HANDLERS = { 

276 "task": { 

277 "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler", 

278 "formatter": "airflow", 

279 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), 

280 "oss_log_folder": REMOTE_BASE_LOG_FOLDER, 

281 "filename_template": FILENAME_TEMPLATE, 

282 }, 

283 } 

284 DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS) 

285 elif ELASTICSEARCH_HOST: 

286 ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") 

287 ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend") 

288 ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT") 

289 ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT") 

290 ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") 

291 ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") 

292 ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") 

293 

294 ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { 

295 "task": { 

296 "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", 

297 "formatter": "airflow", 

298 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

299 "filename_template": FILENAME_TEMPLATE, 

300 "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK, 

301 "host": ELASTICSEARCH_HOST, 

302 "frontend": ELASTICSEARCH_FRONTEND, 

303 "write_stdout": ELASTICSEARCH_WRITE_STDOUT, 

304 "json_format": ELASTICSEARCH_JSON_FORMAT, 

305 "json_fields": ELASTICSEARCH_JSON_FIELDS, 

306 "host_field": ELASTICSEARCH_HOST_FIELD, 

307 "offset_field": ELASTICSEARCH_OFFSET_FIELD, 

308 }, 

309 } 

310 

311 DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS) 

312 else: 

313 raise AirflowException( 

314 "Incorrect remote log configuration. Please check the configuration of option 'host' in " 

315 "section 'elasticsearch' if you are using Elasticsearch. In the other case, " 

316 "'remote_base_log_folder' option in the 'logging' section." 

317 ) 

318 DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)