Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/log.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

119 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import warnings 

21from functools import cache 

22from pathlib import Path 

23from typing import TYPE_CHECKING, Any, BinaryIO, TextIO 

24 

25import structlog 

26import structlog.processors 

27 

28# We have to import this here, as it is used in the type annotations at runtime even if it seems it is 

29# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields. 

30from pydantic import JsonValue # noqa: TC002 

31 

32if TYPE_CHECKING: 

33 from structlog.typing import EventDict, FilteringBoundLogger, Processor 

34 

35 from airflow.logging_config import RemoteLogIO 

36 from airflow.sdk.types import Logger, RuntimeTaskInstanceProtocol as RuntimeTI 

37 

38 

39__all__ = ["configure_logging", "reset_logging", "mask_secret"] 

40 

41 

42def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: 

43 from airflow.sdk._shared.secrets_masker import redact 

44 

45 event_dict = redact(event_dict) # type: ignore[assignment] 

46 return event_dict 

47 

48 

49@cache 

50def logging_processors( 

51 json_output: bool, 

52 log_format: str = "", 

53 colors: bool = True, 

54 sending_to_supervisor: bool = False, 

55) -> tuple[Processor, ...]: 

56 from airflow.sdk._shared.logging.structlog import structlog_processors 

57 

58 extra_processors: tuple[Processor, ...] = () 

59 

60 mask_secrets = not sending_to_supervisor 

61 if mask_secrets: 

62 extra_processors += (mask_logs,) 

63 

64 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): 

65 extra_processors += remote_processors 

66 

67 procs, _, final_writer = structlog_processors( 

68 json_output=json_output, log_format=log_format, colors=colors 

69 ) 

70 return tuple(procs) + extra_processors + (final_writer,) 

71 

72 

73@cache 

74def configure_logging( 

75 json_output: bool = False, 

76 log_level: str = "DEFAULT", 

77 output: BinaryIO | TextIO | None = None, 

78 cache_logger_on_first_use: bool = True, 

79 sending_to_supervisor: bool = False, 

80 colored_console_log: bool | None = None, 

81): 

82 """Set up struct logging and stdlib logging config.""" 

83 from airflow.sdk.configuration import conf 

84 

85 if log_level == "DEFAULT": 

86 log_level = "INFO" 

87 

88 log_level = conf.get("logging", "logging_level", fallback="INFO") 

89 

90 # If colored_console_log is not explicitly set, read from configuration 

91 if colored_console_log is None: 

92 colored_console_log = conf.getboolean("logging", "colored_console_log", fallback=True) 

93 

94 namespace_log_levels = conf.get("logging", "namespace_levels", fallback=None) 

95 

96 from airflow.sdk._shared.logging import configure_logging, translate_config_values 

97 

98 log_fmt, callsite_params = translate_config_values( 

99 log_format=conf.get("logging", "log_format"), 

100 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]), 

101 ) 

102 

103 mask_secrets = not sending_to_supervisor 

104 extra_processors: tuple[Processor, ...] = () 

105 

106 if mask_secrets: 

107 extra_processors += (mask_logs,) 

108 

109 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): 

110 extra_processors += remote_processors 

111 

112 configure_logging( 

113 json_output=json_output, 

114 log_level=log_level, 

115 namespace_log_levels=namespace_log_levels, 

116 log_format=log_fmt, 

117 output=output, 

118 cache_logger_on_first_use=cache_logger_on_first_use, 

119 colors=colored_console_log, 

120 extra_processors=extra_processors, 

121 callsite_parameters=callsite_params, 

122 ) 

123 

124 global _warnings_showwarning 

125 

126 if _warnings_showwarning is None: 

127 _warnings_showwarning = warnings.showwarning 

128 # Capture warnings and show them via structlog -- i.e. in task logs 

129 warnings.showwarning = _showwarning 

130 

131 

132def logger_at_level(name: str, level: int) -> Logger: 

133 """Create a new logger at the given level.""" 

134 from airflow.sdk._shared.logging.structlog import LEVEL_TO_FILTERING_LOGGER 

135 

136 return structlog.wrap_logger( 

137 None, wrapper_class=LEVEL_TO_FILTERING_LOGGER[level], logger_factory_args=(name) 

138 ) 

139 

140 

141def init_log_file(local_relative_path: str) -> Path: 

142 """ 

143 Ensure log file and parent directories are created. 

144 

145 Any directories that are missing are created with the right permission bits. 

146 """ 

147 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here? 

148 # Currently using Core's conf for remote logging consistency. 

149 from airflow.configuration import conf 

150 from airflow.sdk._shared.logging import init_log_file 

151 

152 new_file_permissions = int( 

153 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 

154 8, 

155 ) 

156 new_folder_permissions = int( 

157 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 

158 8, 

159 ) 

160 

161 base_log_folder = conf.get("logging", "base_log_folder") 

162 

163 return init_log_file( 

164 base_log_folder, 

165 local_relative_path, 

166 new_folder_permissions=new_folder_permissions, 

167 new_file_permissions=new_file_permissions, 

168 ) 

169 

170 

171def load_remote_log_handler() -> RemoteLogIO | None: 

172 import airflow.logging_config 

173 

174 return airflow.logging_config.REMOTE_TASK_LOG 

175 

176 

177def load_remote_conn_id() -> str | None: 

178 import airflow.logging_config 

179 

180 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here? 

181 # Currently using Core's conf for remote logging consistency. 

182 from airflow.configuration import conf 

183 

184 if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None): 

185 return conn_id 

186 

187 return airflow.logging_config.DEFAULT_REMOTE_CONN_ID 

188 

189 

190def relative_path_from_logger(logger) -> Path | None: 

191 if not logger: 

192 return None 

193 if not hasattr(logger, "_file"): 

194 logger.warning("Unable to find log file, logger was of unexpected type", type=type(logger)) 

195 return None 

196 

197 fh = logger._file 

198 fname = fh.name 

199 

200 if fh.fileno() == 1 or not isinstance(fname, str): 

201 # Logging to stdout, or something odd about this logger, don't try to upload! 

202 return None 

203 

204 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here? 

205 # Currently using Core's conf for remote logging consistency 

206 from airflow.configuration import conf 

207 

208 base_log_folder = conf.get("logging", "base_log_folder") 

209 return Path(fname).relative_to(base_log_folder) 

210 

211 

212def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI): 

213 raw_logger = getattr(logger, "_logger") 

214 

215 handler = load_remote_log_handler() 

216 if not handler: 

217 return 

218 

219 try: 

220 relative_path = relative_path_from_logger(raw_logger) 

221 except Exception: 

222 return 

223 if not relative_path: 

224 return 

225 

226 log_relative_path = relative_path.as_posix() 

227 handler.upload(log_relative_path, ti) 

228 

229 

230def mask_secret(secret: JsonValue, name: str | None = None) -> None: 

231 """ 

232 Mask a secret in both task process and supervisor process. 

233 

234 For secrets loaded from backends (Vault, env vars, etc.), this ensures 

235 they're masked in both the task subprocess AND supervisor's log output. 

236 Works safely in both sync and async contexts. 

237 """ 

238 from contextlib import suppress 

239 

240 from airflow.sdk._shared.secrets_masker import _secrets_masker 

241 

242 _secrets_masker().add_mask(secret, name) 

243 

244 with suppress(Exception): 

245 # Try to tell supervisor (only if in task execution context) 

246 from airflow.sdk.execution_time import task_runner 

247 from airflow.sdk.execution_time.comms import MaskSecret 

248 

249 if comms := getattr(task_runner, "SUPERVISOR_COMMS", None): 

250 comms.send(MaskSecret(value=secret, name=name)) 

251 

252 

253def reset_logging(): 

254 """ 

255 Convince for testing. Not for production use. 

256 

257 :meta private: 

258 """ 

259 from airflow.sdk._shared.logging.structlog import structlog_processors 

260 

261 global _warnings_showwarning 

262 if _warnings_showwarning is not None: 

263 warnings.showwarning = _warnings_showwarning 

264 _warnings_showwarning = None 

265 

266 structlog_processors.cache_clear() 

267 logging_processors.cache_clear() 

268 

269 

270_warnings_showwarning: Any = None 

271 

272 

273def _showwarning( 

274 message: Warning | str, 

275 category: type[Warning], 

276 filename: str, 

277 lineno: int, 

278 file: TextIO | None = None, 

279 line: str | None = None, 

280) -> Any: 

281 """ 

282 Redirects warnings to structlog so they appear in task logs etc. 

283 

284 Implementation of showwarnings which redirects to logging, which will first 

285 check to see if the file parameter is None. If a file is specified, it will 

286 delegate to the original warnings implementation of showwarning. Otherwise, 

287 it will call warnings.formatwarning and will log the resulting string to a 

288 warnings logger named "py.warnings" with level logging.WARNING. 

289 """ 

290 if file is not None: 

291 if _warnings_showwarning is not None: 

292 _warnings_showwarning(message, category, filename, lineno, file, line) 

293 else: 

294 from airflow.sdk._shared.logging.structlog import reconfigure_logger 

295 

296 log = reconfigure_logger( 

297 structlog.get_logger("py.warnings").bind(), structlog.processors.CallsiteParameterAdder 

298 ) 

299 

300 log.warning(str(message), category=category.__name__, filename=filename, lineno=lineno)