Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/log.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

148 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import warnings 

21from collections.abc import Callable 

22from functools import cache 

23from pathlib import Path 

24from typing import TYPE_CHECKING, Any, BinaryIO, TextIO 

25 

26import structlog 

27import structlog.processors 

28 

29# We have to import this here, as it is used in the type annotations at runtime even if it seems it is 

30# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields. 

31from pydantic import JsonValue # noqa: TC002 

32 

33if TYPE_CHECKING: 

34 from structlog.typing import EventDict, FilteringBoundLogger, Processor 

35 

36 from airflow.sdk._shared.logging.remote import RemoteLogIO 

37 from airflow.sdk.types import Logger, RuntimeTaskInstanceProtocol as RuntimeTI 

38 

39 

40from airflow.sdk._shared.secrets_masker import redact 

41 

42 

43class _ActiveLoggingConfig: 

44 """Internal class to track active logging configuration.""" 

45 

46 logging_config_loaded = False 

47 remote_task_log: RemoteLogIO | None = None 

48 default_remote_conn_id: str | None = None 

49 

50 @classmethod 

51 def set(cls, remote_task_log: RemoteLogIO | None, default_remote_conn_id: str | None) -> None: 

52 """Set remote logging configuration.""" 

53 cls.remote_task_log = remote_task_log 

54 cls.default_remote_conn_id = default_remote_conn_id 

55 cls.logging_config_loaded = True 

56 

57 

58class _WarningsInterceptor: 

59 """A class to hold the reference to the original warnings.showwarning function.""" 

60 

61 _original_showwarning: Callable | None = None 

62 

63 @staticmethod 

64 def register(new_callable: Callable) -> None: 

65 if _WarningsInterceptor._original_showwarning is None: 

66 _WarningsInterceptor._original_showwarning = warnings.showwarning 

67 warnings.showwarning = new_callable 

68 

69 @staticmethod 

70 def reset() -> None: 

71 if _WarningsInterceptor._original_showwarning is not None: 

72 warnings.showwarning = _WarningsInterceptor._original_showwarning 

73 _WarningsInterceptor._original_showwarning = None 

74 

75 @staticmethod 

76 def emit_warning(*args: Any) -> None: 

77 if _WarningsInterceptor._original_showwarning is not None: 

78 _WarningsInterceptor._original_showwarning(*args) 

79 

80 

81def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: 

82 event_dict = redact(event_dict) # type: ignore[assignment] 

83 return event_dict 

84 

85 

86@cache 

87def logging_processors( 

88 json_output: bool, 

89 log_format: str = "", 

90 colors: bool = True, 

91 sending_to_supervisor: bool = False, 

92) -> tuple[Processor, ...]: 

93 from airflow.sdk._shared.logging.structlog import structlog_processors 

94 

95 extra_processors: tuple[Processor, ...] = () 

96 

97 mask_secrets = not sending_to_supervisor 

98 if mask_secrets: 

99 extra_processors += (mask_logs,) 

100 

101 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): 

102 extra_processors += remote_processors 

103 

104 procs, _, final_writer = structlog_processors( 

105 json_output=json_output, log_format=log_format, colors=colors 

106 ) 

107 return tuple(procs) + extra_processors + (final_writer,) 

108 

109 

110@cache 

111def configure_logging( 

112 json_output: bool = False, 

113 log_level: str = "DEFAULT", 

114 output: BinaryIO | TextIO | None = None, 

115 cache_logger_on_first_use: bool = True, 

116 sending_to_supervisor: bool = False, 

117 colored_console_log: bool | None = None, 

118): 

119 """Set up struct logging and stdlib logging config.""" 

120 from airflow.sdk.configuration import conf 

121 

122 if log_level == "DEFAULT": 

123 log_level = "INFO" 

124 

125 log_level = conf.get("logging", "logging_level", fallback="INFO") 

126 

127 # If colored_console_log is not explicitly set, read from configuration 

128 if colored_console_log is None: 

129 colored_console_log = conf.getboolean("logging", "colored_console_log", fallback=True) 

130 

131 namespace_log_levels = conf.get("logging", "namespace_levels", fallback=None) 

132 

133 from airflow.sdk._shared.logging import configure_logging, translate_config_values 

134 

135 log_fmt, callsite_params = translate_config_values( 

136 log_format=conf.get("logging", "log_format"), 

137 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]), 

138 ) 

139 

140 mask_secrets = not sending_to_supervisor 

141 extra_processors: tuple[Processor, ...] = () 

142 

143 if mask_secrets: 

144 extra_processors += (mask_logs,) 

145 

146 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): 

147 extra_processors += remote_processors 

148 

149 configure_logging( 

150 json_output=json_output, 

151 log_level=log_level, 

152 namespace_log_levels=namespace_log_levels, 

153 log_format=log_fmt, 

154 output=output, 

155 cache_logger_on_first_use=cache_logger_on_first_use, 

156 colors=colored_console_log, 

157 extra_processors=extra_processors, 

158 callsite_parameters=callsite_params, 

159 ) 

160 

161 _WarningsInterceptor.register(_showwarning) 

162 

163 

164def logger_at_level(name: str, level: int) -> Logger: 

165 """Create a new logger at the given level.""" 

166 from airflow.sdk._shared.logging.structlog import LEVEL_TO_FILTERING_LOGGER 

167 

168 return structlog.wrap_logger( 

169 None, wrapper_class=LEVEL_TO_FILTERING_LOGGER[level], logger_factory_args=(name) 

170 ) 

171 

172 

173def init_log_file(local_relative_path: str) -> Path: 

174 """ 

175 Ensure log file and parent directories are created. 

176 

177 Any directories that are missing are created with the right permission bits. 

178 """ 

179 from airflow.sdk._shared.logging import init_log_file 

180 from airflow.sdk.configuration import conf 

181 

182 new_file_permissions = int( 

183 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 

184 8, 

185 ) 

186 new_folder_permissions = int( 

187 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 

188 8, 

189 ) 

190 

191 base_log_folder = conf.get("logging", "base_log_folder") 

192 

193 return init_log_file( 

194 base_log_folder, 

195 local_relative_path, 

196 new_folder_permissions=new_folder_permissions, 

197 new_file_permissions=new_file_permissions, 

198 ) 

199 

200 

201def _load_logging_config() -> None: 

202 """Load and cache the remote logging configuration from SDK config.""" 

203 from airflow.sdk._shared.logging.remote import discover_remote_log_handler 

204 from airflow.sdk._shared.module_loading import import_string 

205 from airflow.sdk.configuration import conf 

206 

207 fallback = "airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG" 

208 logging_class_path = conf.get("logging", "logging_config_class", fallback=fallback) 

209 

210 # Load remote logging configuration using shared discovery logic 

211 remote_task_log, default_remote_conn_id = discover_remote_log_handler( 

212 logging_class_path, fallback, import_string 

213 ) 

214 _ActiveLoggingConfig.set(remote_task_log, default_remote_conn_id) 

215 

216 

217def load_remote_log_handler() -> RemoteLogIO | None: 

218 if not _ActiveLoggingConfig.logging_config_loaded: 

219 _load_logging_config() 

220 return _ActiveLoggingConfig.remote_task_log 

221 

222 

223def load_remote_conn_id() -> str | None: 

224 from airflow.sdk.configuration import conf 

225 

226 if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None): 

227 return conn_id 

228 

229 if not _ActiveLoggingConfig.logging_config_loaded: 

230 _load_logging_config() 

231 return _ActiveLoggingConfig.default_remote_conn_id 

232 

233 

234def relative_path_from_logger(logger) -> Path | None: 

235 if not logger: 

236 return None 

237 if not hasattr(logger, "_file"): 

238 logger.warning("Unable to find log file, logger was of unexpected type", type=type(logger)) 

239 return None 

240 

241 fh = logger._file 

242 fname = fh.name 

243 

244 if fh.fileno() == 1 or not isinstance(fname, str): 

245 # Logging to stdout, or something odd about this logger, don't try to upload! 

246 return None 

247 

248 from airflow.sdk.configuration import conf 

249 

250 base_log_folder = conf.get("logging", "base_log_folder") 

251 return Path(fname).relative_to(base_log_folder) 

252 

253 

254def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI): 

255 raw_logger = getattr(logger, "_logger") 

256 

257 handler = load_remote_log_handler() 

258 if not handler: 

259 return 

260 

261 try: 

262 relative_path = relative_path_from_logger(raw_logger) 

263 except Exception: 

264 return 

265 if not relative_path: 

266 return 

267 

268 log_relative_path = relative_path.as_posix() 

269 handler.upload(log_relative_path, ti) 

270 

271 

272def mask_secret(secret: JsonValue, name: str | None = None) -> None: 

273 """ 

274 Mask a secret in both task process and supervisor process. 

275 

276 For secrets loaded from backends (Vault, env vars, etc.), this ensures 

277 they're masked in both the task subprocess AND supervisor's log output. 

278 Works safely in both sync and async contexts. 

279 """ 

280 from contextlib import suppress 

281 

282 from airflow.sdk._shared.secrets_masker import _secrets_masker 

283 

284 _secrets_masker().add_mask(secret, name) 

285 

286 with suppress(Exception): 

287 # Try to tell supervisor (only if in task execution context) 

288 from airflow.sdk.execution_time import task_runner 

289 from airflow.sdk.execution_time.comms import MaskSecret 

290 

291 if comms := getattr(task_runner, "SUPERVISOR_COMMS", None): 

292 comms.send(MaskSecret(value=secret, name=name)) 

293 

294 

295def reset_logging(): 

296 """ 

297 Convince for testing. Not for production use. 

298 

299 :meta private: 

300 """ 

301 from airflow.sdk._shared.logging.structlog import structlog_processors 

302 

303 _WarningsInterceptor.reset() 

304 structlog_processors.cache_clear() 

305 logging_processors.cache_clear() 

306 

307 

308def _showwarning( 

309 message: Warning | str, 

310 category: type[Warning], 

311 filename: str, 

312 lineno: int, 

313 file: TextIO | None = None, 

314 line: str | None = None, 

315) -> Any: 

316 """ 

317 Redirects warnings to structlog so they appear in task logs etc. 

318 

319 Implementation of showwarnings which redirects to logging, which will first 

320 check to see if the file parameter is None. If a file is specified, it will 

321 delegate to the original warnings implementation of showwarning. Otherwise, 

322 it will call warnings.formatwarning and will log the resulting string to a 

323 warnings logger named "py.warnings" with level logging.WARNING. 

324 """ 

325 if file is not None: 

326 _WarningsInterceptor.emit_warning(message, category, filename, lineno, file, line) 

327 else: 

328 from airflow.sdk._shared.logging.structlog import reconfigure_logger 

329 

330 log = reconfigure_logger( 

331 structlog.get_logger("py.warnings").bind(), structlog.processors.CallsiteParameterAdder 

332 ) 

333 

334 log.warning(str(message), category=category.__name__, filename=filename, lineno=lineno)