Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/config_templates/airflow_local_settings.py: 40%

72 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Airflow logging settings.""" 

19from __future__ import annotations 

20 

21import os 

22from pathlib import Path 

23from typing import Any 

24from urllib.parse import urlsplit 

25 

26from airflow.configuration import conf 

27from airflow.exceptions import AirflowException 

28 

29# TODO: Logging format and level should be configured 

30# in this file instead of from airflow.cfg. Currently 

31# there are other log format and level configurations in 

32# settings.py and cli.py. Please see AIRFLOW-1455. 

33LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() 

34 

35 

36# Flask appbuilder's info level log is very verbose, 

37# so it's set to 'WARN' by default. 

38FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper() 

39 

40LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT") 

41DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT") 

42 

43LOG_FORMATTER_CLASS: str = conf.get_mandatory_value( 

44 "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware" 

45) 

46 

47COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT") 

48 

49COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG") 

50 

51COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS") 

52 

53DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET") 

54 

55BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER") 

56 

57PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY") 

58 

59DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value( 

60 "logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION" 

61) 

62 

63# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3 

64# All of these handlers inherited from FileTaskHandler and providing any value rather than None 

65# would raise deprecation warning. 

66FILENAME_TEMPLATE: str | None = None 

67 

68PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE") 

69 

70DEFAULT_LOGGING_CONFIG: dict[str, Any] = { 

71 "version": 1, 

72 "disable_existing_loggers": False, 

73 "formatters": { 

74 "airflow": { 

75 "format": LOG_FORMAT, 

76 "class": LOG_FORMATTER_CLASS, 

77 }, 

78 "airflow_coloured": { 

79 "format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT, 

80 "class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS, 

81 }, 

82 "source_processor": { 

83 "format": DAG_PROCESSOR_LOG_FORMAT, 

84 "class": LOG_FORMATTER_CLASS, 

85 }, 

86 }, 

87 "filters": { 

88 "mask_secrets": { 

89 "()": "airflow.utils.log.secrets_masker.SecretsMasker", 

90 }, 

91 }, 

92 "handlers": { 

93 "console": { 

94 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", 

95 "formatter": "airflow_coloured", 

96 "stream": "sys.stdout", 

97 "filters": ["mask_secrets"], 

98 }, 

99 "task": { 

100 "class": "airflow.utils.log.file_task_handler.FileTaskHandler", 

101 "formatter": "airflow", 

102 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), 

103 "filters": ["mask_secrets"], 

104 }, 

105 "processor": { 

106 "class": "airflow.utils.log.file_processor_handler.FileProcessorHandler", 

107 "formatter": "airflow", 

108 "base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER), 

109 "filename_template": PROCESSOR_FILENAME_TEMPLATE, 

110 "filters": ["mask_secrets"], 

111 }, 

112 "processor_to_stdout": { 

113 "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", 

114 "formatter": "source_processor", 

115 "stream": "sys.stdout", 

116 "filters": ["mask_secrets"], 

117 }, 

118 }, 

119 "loggers": { 

120 "airflow.processor": { 

121 "handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"], 

122 "level": LOG_LEVEL, 

123 # Set to true here (and reset via set_context) so that if no file is configured we still get logs! 

124 "propagate": True, 

125 }, 

126 "airflow.task": { 

127 "handlers": ["task"], 

128 "level": LOG_LEVEL, 

129 # Set to true here (and reset via set_context) so that if no file is configured we still get logs! 

130 "propagate": True, 

131 "filters": ["mask_secrets"], 

132 }, 

133 "flask_appbuilder": { 

134 "handlers": ["console"], 

135 "level": FAB_LOG_LEVEL, 

136 "propagate": True, 

137 }, 

138 }, 

139 "root": { 

140 "handlers": ["console"], 

141 "level": LOG_LEVEL, 

142 "filters": ["mask_secrets"], 

143 }, 

144} 

145 

146EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None) 

147if EXTRA_LOGGER_NAMES: 

148 new_loggers = { 

149 logger_name.strip(): { 

150 "handlers": ["console"], 

151 "level": LOG_LEVEL, 

152 "propagate": True, 

153 } 

154 for logger_name in EXTRA_LOGGER_NAMES.split(",") 

155 } 

156 DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers) 

157 

158DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = { 

159 "handlers": { 

160 "processor_manager": { 

161 "class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler", 

162 "formatter": "airflow", 

163 "filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION, 

164 "mode": "a", 

165 "maxBytes": 104857600, # 100MB 

166 "backupCount": 5, 

167 } 

168 }, 

169 "loggers": { 

170 "airflow.processor_manager": { 

171 "handlers": ["processor_manager"], 

172 "level": LOG_LEVEL, 

173 "propagate": False, 

174 } 

175 }, 

176} 

177 

178# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set. 

179# This is to avoid exceptions when initializing RotatingFileHandler multiple times 

180# in multiple processes. 

181if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True": 

182 DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"]) 

183 DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]) 

184 

185 # Manually create log directory for processor_manager handler as RotatingFileHandler 

186 # will only create file but not the directory. 

187 processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][ 

188 "processor_manager" 

189 ] 

190 directory: str = os.path.dirname(processor_manager_handler_config["filename"]) 

191 Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755) 

192 

193################## 

194# Remote logging # 

195################## 

196 

197REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") 

198 

199if REMOTE_LOGGING: 

200 

201 ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST") 

202 

203 # Storage bucket URL for remote logging 

204 # S3 buckets should start with "s3://" 

205 # Cloudwatch log groups should start with "cloudwatch://" 

206 # GCS buckets should start with "gs://" 

207 # WASB buckets should start with "wasb" 

208 # just to help Airflow select correct handler 

209 REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER") 

210 

211 if REMOTE_BASE_LOG_FOLDER.startswith("s3://"): 

212 S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

213 "task": { 

214 "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler", 

215 "formatter": "airflow", 

216 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

217 "s3_log_folder": REMOTE_BASE_LOG_FOLDER, 

218 "filename_template": FILENAME_TEMPLATE, 

219 }, 

220 } 

221 

222 DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS) 

223 elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"): 

224 url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER) 

225 CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

226 "task": { 

227 "class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler", 

228 "formatter": "airflow", 

229 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

230 "log_group_arn": url_parts.netloc + url_parts.path, 

231 "filename_template": FILENAME_TEMPLATE, 

232 }, 

233 } 

234 

235 DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS) 

236 elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"): 

237 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) 

238 GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { 

239 "task": { 

240 "class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler", 

241 "formatter": "airflow", 

242 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

243 "gcs_log_folder": REMOTE_BASE_LOG_FOLDER, 

244 "filename_template": FILENAME_TEMPLATE, 

245 "gcp_key_path": key_path, 

246 }, 

247 } 

248 

249 DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS) 

250 elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"): 

251 WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { 

252 "task": { 

253 "class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler", 

254 "formatter": "airflow", 

255 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

256 "wasb_log_folder": REMOTE_BASE_LOG_FOLDER, 

257 "wasb_container": "airflow-logs", 

258 "filename_template": FILENAME_TEMPLATE, 

259 "delete_local_copy": False, 

260 }, 

261 } 

262 

263 DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS) 

264 elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"): 

265 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) 

266 # stackdriver:///airflow-tasks => airflow-tasks 

267 log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:] 

268 STACKDRIVER_REMOTE_HANDLERS = { 

269 "task": { 

270 "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler", 

271 "formatter": "airflow", 

272 "name": log_name, 

273 "gcp_key_path": key_path, 

274 } 

275 } 

276 

277 DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS) 

278 elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"): 

279 OSS_REMOTE_HANDLERS = { 

280 "task": { 

281 "class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler", 

282 "formatter": "airflow", 

283 "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER), 

284 "oss_log_folder": REMOTE_BASE_LOG_FOLDER, 

285 "filename_template": FILENAME_TEMPLATE, 

286 }, 

287 } 

288 DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS) 

289 elif ELASTICSEARCH_HOST: 

290 ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get_mandatory_value("elasticsearch", "LOG_ID_TEMPLATE") 

291 ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") 

292 ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend") 

293 ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT") 

294 ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT") 

295 ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") 

296 ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") 

297 ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") 

298 

299 ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { 

300 "task": { 

301 "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", 

302 "formatter": "airflow", 

303 "base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)), 

304 "log_id_template": ELASTICSEARCH_LOG_ID_TEMPLATE, 

305 "filename_template": FILENAME_TEMPLATE, 

306 "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK, 

307 "host": ELASTICSEARCH_HOST, 

308 "frontend": ELASTICSEARCH_FRONTEND, 

309 "write_stdout": ELASTICSEARCH_WRITE_STDOUT, 

310 "json_format": ELASTICSEARCH_JSON_FORMAT, 

311 "json_fields": ELASTICSEARCH_JSON_FIELDS, 

312 "host_field": ELASTICSEARCH_HOST_FIELD, 

313 "offset_field": ELASTICSEARCH_OFFSET_FIELD, 

314 }, 

315 } 

316 

317 DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS) 

318 else: 

319 raise AirflowException( 

320 "Incorrect remote log configuration. Please check the configuration of option 'host' in " 

321 "section 'elasticsearch' if you are using Elasticsearch. In the other case, " 

322 "'remote_base_log_folder' option in the 'logging' section." 

323 )