Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/config_templates/airflow_local_settings.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Airflow logging settings.""" 

19 

20from __future__ import annotations 

21 

22import os 

23from pathlib import Path 

24from typing import Any 

25from urllib.parse import urlsplit 

26 

27from airflow.configuration import conf 

28from airflow.exceptions import AirflowException 

29 

30LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() 

31 

32 

33# Flask appbuilder's info level log is very verbose, 

34# so it's set to 'WARN' by default. 

35FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper() 

36 

37LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT") 

38DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT") 

39 

40LOG_FORMATTER_CLASS: str = conf.get_mandatory_value( 

41 "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware" 

42) 

43 

44COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT") 

45 

46COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG") 

47 

48COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS") 

49 

50DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET") 

51 

52BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER") 

53 

54PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY") 

55 

56DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value( 

57 "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION" 

58) 

59 

60DAG_PROCESSOR_MANAGER_LOG_STDOUT: str = conf.get_mandatory_value( 

61 "logging", "DAG_PROCESSOR_MANAGER_LOG_STDOUT" 

62) 

63 

64# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3 

65# All of these handlers inherited from FileTaskHandler and providing any value rather than None 

66# would raise deprecation warning. 

67FILENAME_TEMPLATE: str | None = None 

68 

69PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE") 

70 

71DEFAULT_LOGGING_CONFIG: dict[str, Any] = { 

72 "version": 1, 

73 "disable_existing_loggers": False, 

74 "formatters": { 

75 "airflow": { 

76 "format": LOG_FORMAT, 

77 "class": LOG_FORMATTER_CLASS, 

78 }, 

79 "airflow_coloured": { 

80 "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, 

81 "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS, 

82 }, 

83 "source_processor": { 

84 "format": DAG_PROCESSOR_LOG_FORMAT, 

85 "class": LOG_FORMATTER_CLASS, 

86 }, 

87 }, 

88 "filters": { 

89 "mask_secrets": { 

90 "()": "airflow.utils.log.secrets_masker.SecretsMasker", 

91 }, 

92 }, 

93 "handlers": { 

94 "console": { 

95 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", 

96 "formatter": "airflow_coloured", 

97 "stream": "sys.stdout", 

98 "filters": ["mask_secrets"], 

99 }, 

100 "task": { 

101 "class": "airflow.utils.log.file_task_handler.FileTaskHandler", 

102 "formatter": "airflow", 

103 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), 

104 "filters": ["mask_secrets"], 

105 }, 

106 "processor": { 

107 "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler", 

108 "formatter": "airflow", 

109 "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER), 

110 "filename_template": PROCESSOR_FILENAME_TEMPLATE, 

111 "filters": ["mask_secrets"], 

112 }, 

113 "processor_to_stdout": { 

114 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", 

115 "formatter": "source_processor", 

116 "stream": "sys.stdout", 

117 "filters": ["mask_secrets"], 

118 }, 

119 }, 

120 "loggers": { 

121 "airflow.processor": { 

122 "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"], 

123 "level": LOG_LEVEL, 

124 # Set to true here (and reset via set_context) so that if no file is configured we still get logs! 

125 "propagate": True, 

126 }, 

127 "airflow.task": { 

128 "handlers": ["task"], 

129 "level": LOG_LEVEL, 

130 # Set to true here (and reset via set_context) so that if no file is configured we still get logs! 

131 "propagate": True, 

132 "filters": ["mask_secrets"], 

133 }, 

134 "flask_appbuilder": { 

135 "handlers": ["console"], 

136 "level": FAB_LOG_LEVEL, 

137 "propagate": True, 

138 }, 

139 }, 

140 "root": { 

141 "handlers": ["console"], 

142 "level": LOG_LEVEL, 

143 "filters": ["mask_secrets"], 

144 }, 

145} 

146 

147EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None) 

148if EXTRA_LOGGER_NAMES: 

149 new_loggers = { 

150 logger_name.strip(): { 

151 "handlers": ["console"], 

152 "level": LOG_LEVEL, 

153 "propagate": True, 

154 } 

155 for logger_name in EXTRA_LOGGER_NAMES.split(",") 

156 } 

157 DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers) 

158 

159DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = { 

160 "handlers": { 

161 "processor_manager": { 

162 "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler", 

163 "formatter": "airflow", 

164 "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION, 

165 "mode": "a", 

166 "maxBytes": 104857600, # 100MB 

167 "backupCount": 5, 

168 } 

169 }, 

170 "loggers": { 

171 "airflow.processor_manager": { 

172 "handlers": ["processor_manager"], 

173 "level": LOG_LEVEL, 

174 "propagate": False, 

175 } 

176 }, 

177} 

178 

179if DAG_PROCESSOR_MANAGER_LOG_STDOUT == "True": 

180 DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"].update( 

181 { 

182 "console": { 

183 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", 

184 "formatter": "airflow", 

185 "stream": "sys.stdout", 

186 "filters": ["mask_secrets"], 

187 } 

188 } 

189 ) 

190 DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]["airflow.processor_manager"]["handlers"].append("console") 

191 

192# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set. 

193# This is to avoid exceptions when initializing RotatingFileHandler multiple times 

194# in multiple processes. 

195if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True": 

196 DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"]) 

197 DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]) 

198 

199 # Manually create log directory for processor_manager handler as RotatingFileHandler 

200 # will only create file but not the directory. 

201 processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][ 

202 "processor_manager" 

203 ] 

204 directory: str = os.path.dirname(processor_manager_handler_config["filename"]) 

205 Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755) 

206 

207################## 

208# Remote logging # 

209################## 

210 

211REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") 

212 

213if REMOTE_LOGGING: 

214 ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST") 

215 

216 # Storage bucket URL for remote logging 

217 # S3 buckets should start with "s3://" 

218 # Cloudwatch log groups should start with "cloudwatch://" 

219 # GCS buckets should start with "gs://" 

220 # WASB buckets should start with "wasb" 

221 # HDFS path should start with "hdfs://" 

222 # just to help Airflow select correct handler 

223 REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER") 

224 REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={}) 

225 

226 if REMOTE_BASE_LOG_FOLDER.startswith("s3://"): 

227 S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

228 "task": { 

229 "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler", 

230 "formatter": "airflow", 

231 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

232 "s3_log_folder": REMOTE_BASE_LOG_FOLDER, 

233 "filename_template": FILENAME_TEMPLATE, 

234 }, 

235 } 

236 

237 DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS) 

238 elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"): 

239 url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER) 

240 CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

241 "task": { 

242 "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler", 

243 "formatter": "airflow", 

244 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

245 "log_group_arn": url_parts.netloc + url_parts.path, 

246 "filename_template": FILENAME_TEMPLATE, 

247 }, 

248 } 

249 

250 DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS) 

251 elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"): 

252 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) 

253 GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

254 "task": { 

255 "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler", 

256 "formatter": "airflow", 

257 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

258 "gcs_log_folder": REMOTE_BASE_LOG_FOLDER, 

259 "filename_template": FILENAME_TEMPLATE, 

260 "gcp_key_path": key_path, 

261 }, 

262 } 

263 

264 DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS) 

265 elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"): 

266 wasb_log_container = conf.get_mandatory_value( 

267 "azure_remote_logging", "remote_wasb_log_container", fallback="airflow-logs" 

268 ) 

269 WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { 

270 "task": { 

271 "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler", 

272 "formatter": "airflow", 

273 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

274 "wasb_log_folder": REMOTE_BASE_LOG_FOLDER, 

275 "wasb_container": wasb_log_container, 

276 "filename_template": FILENAME_TEMPLATE, 

277 }, 

278 } 

279 

280 DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS) 

281 elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"): 

282 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) 

283 # stackdriver:///airflow-tasks => airflow-tasks 

284 log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:] 

285 STACKDRIVER_REMOTE_HANDLERS = { 

286 "task": { 

287 "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler", 

288 "formatter": "airflow", 

289 "gcp_log_name": log_name, 

290 "gcp_key_path": key_path, 

291 } 

292 } 

293 

294 DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS) 

295 elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"): 

296 OSS_REMOTE_HANDLERS = { 

297 "task": { 

298 "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler", 

299 "formatter": "airflow", 

300 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), 

301 "oss_log_folder": REMOTE_BASE_LOG_FOLDER, 

302 "filename_template": FILENAME_TEMPLATE, 

303 }, 

304 } 

305 DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS) 

306 elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"): 

307 HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

308 "task": { 

309 "class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler", 

310 "formatter": "airflow", 

311 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

312 "hdfs_log_folder": REMOTE_BASE_LOG_FOLDER, 

313 "filename_template": FILENAME_TEMPLATE, 

314 }, 

315 } 

316 DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS) 

317 elif ELASTICSEARCH_HOST: 

318 ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") 

319 ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend") 

320 ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT") 

321 ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT") 

322 ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") 

323 ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") 

324 ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") 

325 

326 ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { 

327 "task": { 

328 "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", 

329 "formatter": "airflow", 

330 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

331 "filename_template": FILENAME_TEMPLATE, 

332 "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK, 

333 "host": ELASTICSEARCH_HOST, 

334 "frontend": ELASTICSEARCH_FRONTEND, 

335 "write_stdout": ELASTICSEARCH_WRITE_STDOUT, 

336 "json_format": ELASTICSEARCH_JSON_FORMAT, 

337 "json_fields": ELASTICSEARCH_JSON_FIELDS, 

338 "host_field": ELASTICSEARCH_HOST_FIELD, 

339 "offset_field": ELASTICSEARCH_OFFSET_FIELD, 

340 }, 

341 } 

342 

343 DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS) 

344 else: 

345 raise AirflowException( 

346 "Incorrect remote log configuration. Please check the configuration of option 'host' in " 

347 "section 'elasticsearch' if you are using Elasticsearch. In the other case, " 

348 "'remote_base_log_folder' option in the 'logging' section." 

349 ) 

350 DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)