Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/config_templates/airflow_local_settings.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

107 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18"""Airflow logging settings.""" 

19 

20from __future__ import annotations 

21 

22import os 

23from typing import TYPE_CHECKING, Any 

24from urllib.parse import urlsplit 

25 

26from airflow.configuration import conf 

27from airflow.exceptions import AirflowException 

28 

29if TYPE_CHECKING: 

30 from airflow.logging_config import RemoteLogIO, RemoteLogStreamIO 

31 

32LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper() 

33 

34 

35# Flask appbuilder's info level log is very verbose, 

36# so it's set to 'WARN' by default. 

37FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper() 

38 

39LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT") 

40DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT") 

41 

42LOG_FORMATTER_CLASS: str = conf.get_mandatory_value( 

43 "logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware" 

44) 

45 

46DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET") 

47 

48BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")) 

49 

50# This isn't used anymore, but kept for compat of people who might have imported it 

51DEFAULT_LOGGING_CONFIG: dict[str, Any] = { 

52 "version": 1, 

53 "disable_existing_loggers": False, 

54 "formatters": { 

55 "airflow": { 

56 "format": LOG_FORMAT, 

57 "class": LOG_FORMATTER_CLASS, 

58 }, 

59 "source_processor": { 

60 "format": DAG_PROCESSOR_LOG_FORMAT, 

61 "class": LOG_FORMATTER_CLASS, 

62 }, 

63 }, 

64 "filters": { 

65 "mask_secrets_core": { 

66 "()": "airflow._shared.secrets_masker._secrets_masker", 

67 }, 

68 }, 

69 "handlers": { 

70 "console": { 

71 "class": "logging.StreamHandler", 

72 # "class": "airflow.utils.log.logging_mixin.RedirectStdHandler", 

73 "formatter": "airflow", 

74 "stream": "sys.stdout", 

75 "filters": ["mask_secrets_core"], 

76 }, 

77 "task": { 

78 "class": "airflow.utils.log.file_task_handler.FileTaskHandler", 

79 "formatter": "airflow", 

80 "base_log_folder": BASE_LOG_FOLDER, 

81 "filters": ["mask_secrets_core"], 

82 }, 

83 }, 

84 "loggers": { 

85 "airflow.task": { 

86 "handlers": ["task"], 

87 "level": LOG_LEVEL, 

88 # Set to true here (and reset via set_context) so that if no file is configured we still get logs! 

89 "propagate": True, 

90 "filters": ["mask_secrets_core"], 

91 }, 

92 "flask_appbuilder": { 

93 "handlers": ["console"], 

94 "level": FAB_LOG_LEVEL, 

95 "propagate": True, 

96 }, 

97 }, 

98 "root": { 

99 "handlers": ["console"], 

100 "level": LOG_LEVEL, 

101 "filters": ["mask_secrets_core"], 

102 }, 

103} 

104 

105EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None) 

106if EXTRA_LOGGER_NAMES: 

107 new_loggers = { 

108 logger_name.strip(): { 

109 "handlers": ["console"], 

110 "level": LOG_LEVEL, 

111 "propagate": True, 

112 } 

113 for logger_name in EXTRA_LOGGER_NAMES.split(",") 

114 } 

115 DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers) 

116 

117################## 

118# Remote logging # 

119################## 

120 

121REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging") 

122REMOTE_TASK_LOG: RemoteLogIO | RemoteLogStreamIO | None = None 

123DEFAULT_REMOTE_CONN_ID: str | None = None 

124 

125 

126def _default_conn_name_from(mod_path, hook_name): 

127 # Try to set the default conn name from a hook, but don't error if something goes wrong at runtime 

128 from importlib import import_module 

129 

130 global DEFAULT_REMOTE_CONN_ID 

131 

132 try: 

133 mod = import_module(mod_path) 

134 

135 hook = getattr(mod, hook_name) 

136 

137 DEFAULT_REMOTE_CONN_ID = getattr(hook, "default_conn_name") 

138 except Exception: 

139 # Lets error in tests though! 

140 if "PYTEST_CURRENT_TEST" in os.environ: 

141 raise 

142 return None 

143 

144 

145if REMOTE_LOGGING: 

146 ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST") 

147 OPENSEARCH_HOST: str | None = conf.get("opensearch", "HOST") 

148 # Storage bucket URL for remote logging 

149 # S3 buckets should start with "s3://" 

150 # Cloudwatch log groups should start with "cloudwatch://" 

151 # GCS buckets should start with "gs://" 

152 # WASB buckets should start with "wasb" 

153 # HDFS path should start with "hdfs://" 

154 # just to help Airflow select correct handler 

155 remote_base_log_folder: str = conf.get_mandatory_value("logging", "remote_base_log_folder") 

156 remote_task_handler_kwargs = conf.getjson("logging", "remote_task_handler_kwargs", fallback={}) 

157 if not isinstance(remote_task_handler_kwargs, dict): 

158 raise ValueError( 

159 "logging/remote_task_handler_kwargs must be a JSON object (a python dict), we got " 

160 f"{type(remote_task_handler_kwargs)}" 

161 ) 

162 delete_local_copy = conf.getboolean("logging", "delete_local_logs") 

163 

164 if remote_base_log_folder.startswith("s3://"): 

165 from airflow.providers.amazon.aws.log.s3_task_handler import S3RemoteLogIO 

166 

167 _default_conn_name_from("airflow.providers.amazon.aws.hooks.s3", "S3Hook") 

168 REMOTE_TASK_LOG = S3RemoteLogIO( 

169 **( 

170 { 

171 "base_log_folder": BASE_LOG_FOLDER, 

172 "remote_base": remote_base_log_folder, 

173 "delete_local_copy": delete_local_copy, 

174 } 

175 | remote_task_handler_kwargs 

176 ) 

177 ) 

178 remote_task_handler_kwargs = {} 

179 

180 elif remote_base_log_folder.startswith("cloudwatch://"): 

181 from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudWatchRemoteLogIO 

182 

183 _default_conn_name_from("airflow.providers.amazon.aws.hooks.logs", "AwsLogsHook") 

184 url_parts = urlsplit(remote_base_log_folder) 

185 REMOTE_TASK_LOG = CloudWatchRemoteLogIO( 

186 **( 

187 { 

188 "base_log_folder": BASE_LOG_FOLDER, 

189 "remote_base": remote_base_log_folder, 

190 "delete_local_copy": delete_local_copy, 

191 "log_group_arn": url_parts.netloc + url_parts.path, 

192 } 

193 | remote_task_handler_kwargs 

194 ) 

195 ) 

196 remote_task_handler_kwargs = {} 

197 elif remote_base_log_folder.startswith("gs://"): 

198 from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO 

199 

200 _default_conn_name_from("airflow.providers.google.cloud.hooks.gcs", "GCSHook") 

201 key_path = conf.get_mandatory_value("logging", "google_key_path", fallback=None) 

202 

203 REMOTE_TASK_LOG = GCSRemoteLogIO( 

204 **( 

205 { 

206 "base_log_folder": BASE_LOG_FOLDER, 

207 "remote_base": remote_base_log_folder, 

208 "delete_local_copy": delete_local_copy, 

209 "gcp_key_path": key_path, 

210 } 

211 | remote_task_handler_kwargs 

212 ) 

213 ) 

214 remote_task_handler_kwargs = {} 

215 elif remote_base_log_folder.startswith("wasb"): 

216 from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbRemoteLogIO 

217 

218 _default_conn_name_from("airflow.providers.microsoft.azure.hooks.wasb", "WasbHook") 

219 wasb_log_container = conf.get_mandatory_value( 

220 "azure_remote_logging", "remote_wasb_log_container", fallback="airflow-logs" 

221 ) 

222 

223 REMOTE_TASK_LOG = WasbRemoteLogIO( 

224 **( 

225 { 

226 "base_log_folder": BASE_LOG_FOLDER, 

227 "remote_base": remote_base_log_folder, 

228 "delete_local_copy": delete_local_copy, 

229 "wasb_container": wasb_log_container, 

230 } 

231 | remote_task_handler_kwargs 

232 ) 

233 ) 

234 remote_task_handler_kwargs = {} 

235 elif remote_base_log_folder.startswith("stackdriver://"): 

236 key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None) 

237 # stackdriver:///airflow-tasks => airflow-tasks 

238 log_name = urlsplit(remote_base_log_folder).path[1:] 

239 STACKDRIVER_REMOTE_HANDLERS = { 

240 "task": { 

241 "class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler", 

242 "formatter": "airflow", 

243 "gcp_log_name": log_name, 

244 "gcp_key_path": key_path, 

245 } 

246 } 

247 

248 DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS) 

249 elif remote_base_log_folder.startswith("oss://"): 

250 from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSRemoteLogIO 

251 

252 _default_conn_name_from("airflow.providers.alibaba.cloud.hooks.oss", "OSSHook") 

253 

254 REMOTE_TASK_LOG = OSSRemoteLogIO( 

255 **( 

256 { 

257 "base_log_folder": BASE_LOG_FOLDER, 

258 "remote_base": remote_base_log_folder, 

259 "delete_local_copy": delete_local_copy, 

260 } 

261 | remote_task_handler_kwargs 

262 ) 

263 ) 

264 remote_task_handler_kwargs = {} 

265 elif remote_base_log_folder.startswith("hdfs://"): 

266 from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsRemoteLogIO 

267 

268 _default_conn_name_from("airflow.providers.apache.hdfs.hooks.webhdfs", "WebHDFSHook") 

269 

270 REMOTE_TASK_LOG = HdfsRemoteLogIO( 

271 **( 

272 { 

273 "base_log_folder": BASE_LOG_FOLDER, 

274 "remote_base": urlsplit(remote_base_log_folder).path, 

275 "delete_local_copy": delete_local_copy, 

276 } 

277 | remote_task_handler_kwargs 

278 ) 

279 ) 

280 remote_task_handler_kwargs = {} 

281 elif ELASTICSEARCH_HOST: 

282 ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") 

283 ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend") 

284 ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT") 

285 ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", "WRITE_TO_ES") 

286 ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT") 

287 ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") 

288 ELASTICSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("elasticsearch", "TARGET_INDEX") 

289 ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") 

290 ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") 

291 

292 ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { 

293 "task": { 

294 "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", 

295 "formatter": "airflow", 

296 "base_log_folder": BASE_LOG_FOLDER, 

297 "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK, 

298 "host": ELASTICSEARCH_HOST, 

299 "frontend": ELASTICSEARCH_FRONTEND, 

300 "write_stdout": ELASTICSEARCH_WRITE_STDOUT, 

301 "write_to_es": ELASTICSEARCH_WRITE_TO_ES, 

302 "target_index": ELASTICSEARCH_TARGET_INDEX, 

303 "json_format": ELASTICSEARCH_JSON_FORMAT, 

304 "json_fields": ELASTICSEARCH_JSON_FIELDS, 

305 "host_field": ELASTICSEARCH_HOST_FIELD, 

306 "offset_field": ELASTICSEARCH_OFFSET_FIELD, 

307 }, 

308 } 

309 

310 DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS) 

311 elif OPENSEARCH_HOST: 

312 OPENSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK") 

313 OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT") 

314 OPENSEARCH_USERNAME: str = conf.get_mandatory_value("opensearch", "USERNAME") 

315 OPENSEARCH_PASSWORD: str = conf.get_mandatory_value("opensearch", "PASSWORD") 

316 OPENSEARCH_WRITE_STDOUT: bool = conf.getboolean("opensearch", "WRITE_STDOUT") 

317 OPENSEARCH_JSON_FORMAT: bool = conf.getboolean("opensearch", "JSON_FORMAT") 

318 OPENSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("opensearch", "JSON_FIELDS") 

319 OPENSEARCH_HOST_FIELD: str = conf.get_mandatory_value("opensearch", "HOST_FIELD") 

320 OPENSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("opensearch", "OFFSET_FIELD") 

321 

322 OPENSEARCH_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { 

323 "task": { 

324 "class": "airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler", 

325 "formatter": "airflow", 

326 "base_log_folder": BASE_LOG_FOLDER, 

327 "end_of_log_mark": OPENSEARCH_END_OF_LOG_MARK, 

328 "host": OPENSEARCH_HOST, 

329 "port": OPENSEARCH_PORT, 

330 "username": OPENSEARCH_USERNAME, 

331 "password": OPENSEARCH_PASSWORD, 

332 "write_stdout": OPENSEARCH_WRITE_STDOUT, 

333 "json_format": OPENSEARCH_JSON_FORMAT, 

334 "json_fields": OPENSEARCH_JSON_FIELDS, 

335 "host_field": OPENSEARCH_HOST_FIELD, 

336 "offset_field": OPENSEARCH_OFFSET_FIELD, 

337 }, 

338 } 

339 DEFAULT_LOGGING_CONFIG["handlers"].update(OPENSEARCH_REMOTE_HANDLERS) 

340 else: 

341 raise AirflowException( 

342 "Incorrect remote log configuration. Please check the configuration of option 'host' in " 

343 "section 'elasticsearch' if you are using Elasticsearch. In the other case, " 

344 "'remote_base_log_folder' option in the 'logging' section." 

345 ) 

346 DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(remote_task_handler_kwargs)