Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/sdk/log.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

129 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import warnings 

21from collections.abc import Callable 

22from functools import cache 

23from pathlib import Path 

24from typing import TYPE_CHECKING, Any, BinaryIO, TextIO 

25 

26import structlog 

27import structlog.processors 

28 

29# We have to import this here, as it is used in the type annotations at runtime even if it seems it is 

30# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields. 

31from pydantic import JsonValue # noqa: TC002 

32 

33if TYPE_CHECKING: 

34 from structlog.typing import EventDict, FilteringBoundLogger, Processor 

35 

36 from airflow.logging_config import RemoteLogIO 

37 from airflow.sdk.types import Logger, RuntimeTaskInstanceProtocol as RuntimeTI 

38 

39 

40from airflow.sdk._shared.secrets_masker import redact 

41 

42 

43class _WarningsInterceptor: 

44 """A class to hold the reference to the original warnings.showwarning function.""" 

45 

46 _original_showwarning: Callable | None = None 

47 

48 @staticmethod 

49 def register(new_callable: Callable) -> None: 

50 if _WarningsInterceptor._original_showwarning is None: 

51 _WarningsInterceptor._original_showwarning = warnings.showwarning 

52 warnings.showwarning = new_callable 

53 

54 @staticmethod 

55 def reset() -> None: 

56 if _WarningsInterceptor._original_showwarning is not None: 

57 warnings.showwarning = _WarningsInterceptor._original_showwarning 

58 _WarningsInterceptor._original_showwarning = None 

59 

60 @staticmethod 

61 def emit_warning(*args: Any) -> None: 

62 if _WarningsInterceptor._original_showwarning is not None: 

63 _WarningsInterceptor._original_showwarning(*args) 

64 

65 

66def mask_logs(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: 

67 event_dict = redact(event_dict) # type: ignore[assignment] 

68 return event_dict 

69 

70 

71@cache 

72def logging_processors( 

73 json_output: bool, 

74 log_format: str = "", 

75 colors: bool = True, 

76 sending_to_supervisor: bool = False, 

77) -> tuple[Processor, ...]: 

78 from airflow.sdk._shared.logging.structlog import structlog_processors 

79 

80 extra_processors: tuple[Processor, ...] = () 

81 

82 mask_secrets = not sending_to_supervisor 

83 if mask_secrets: 

84 extra_processors += (mask_logs,) 

85 

86 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): 

87 extra_processors += remote_processors 

88 

89 procs, _, final_writer = structlog_processors( 

90 json_output=json_output, log_format=log_format, colors=colors 

91 ) 

92 return tuple(procs) + extra_processors + (final_writer,) 

93 

94 

95@cache 

96def configure_logging( 

97 json_output: bool = False, 

98 log_level: str = "DEFAULT", 

99 output: BinaryIO | TextIO | None = None, 

100 cache_logger_on_first_use: bool = True, 

101 sending_to_supervisor: bool = False, 

102 colored_console_log: bool | None = None, 

103): 

104 """Set up struct logging and stdlib logging config.""" 

105 from airflow.sdk.configuration import conf 

106 

107 if log_level == "DEFAULT": 

108 log_level = "INFO" 

109 

110 log_level = conf.get("logging", "logging_level", fallback="INFO") 

111 

112 # If colored_console_log is not explicitly set, read from configuration 

113 if colored_console_log is None: 

114 colored_console_log = conf.getboolean("logging", "colored_console_log", fallback=True) 

115 

116 namespace_log_levels = conf.get("logging", "namespace_levels", fallback=None) 

117 

118 from airflow.sdk._shared.logging import configure_logging, translate_config_values 

119 

120 log_fmt, callsite_params = translate_config_values( 

121 log_format=conf.get("logging", "log_format"), 

122 callsite_params=conf.getlist("logging", "callsite_parameters", fallback=[]), 

123 ) 

124 

125 mask_secrets = not sending_to_supervisor 

126 extra_processors: tuple[Processor, ...] = () 

127 

128 if mask_secrets: 

129 extra_processors += (mask_logs,) 

130 

131 if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): 

132 extra_processors += remote_processors 

133 

134 configure_logging( 

135 json_output=json_output, 

136 log_level=log_level, 

137 namespace_log_levels=namespace_log_levels, 

138 log_format=log_fmt, 

139 output=output, 

140 cache_logger_on_first_use=cache_logger_on_first_use, 

141 colors=colored_console_log, 

142 extra_processors=extra_processors, 

143 callsite_parameters=callsite_params, 

144 ) 

145 

146 _WarningsInterceptor.register(_showwarning) 

147 

148 

149def logger_at_level(name: str, level: int) -> Logger: 

150 """Create a new logger at the given level.""" 

151 from airflow.sdk._shared.logging.structlog import LEVEL_TO_FILTERING_LOGGER 

152 

153 return structlog.wrap_logger( 

154 None, wrapper_class=LEVEL_TO_FILTERING_LOGGER[level], logger_factory_args=(name) 

155 ) 

156 

157 

158def init_log_file(local_relative_path: str) -> Path: 

159 """ 

160 Ensure log file and parent directories are created. 

161 

162 Any directories that are missing are created with the right permission bits. 

163 """ 

164 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here? 

165 # Currently using Core's conf for remote logging consistency. 

166 from airflow.configuration import conf 

167 from airflow.sdk._shared.logging import init_log_file 

168 

169 new_file_permissions = int( 

170 conf.get("logging", "file_task_handler_new_file_permissions", fallback="0o664"), 

171 8, 

172 ) 

173 new_folder_permissions = int( 

174 conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 

175 8, 

176 ) 

177 

178 base_log_folder = conf.get("logging", "base_log_folder") 

179 

180 return init_log_file( 

181 base_log_folder, 

182 local_relative_path, 

183 new_folder_permissions=new_folder_permissions, 

184 new_file_permissions=new_file_permissions, 

185 ) 

186 

187 

188def load_remote_log_handler() -> RemoteLogIO | None: 

189 from airflow.logging_config import get_remote_task_log 

190 

191 return get_remote_task_log() 

192 

193 

194def load_remote_conn_id() -> str | None: 

195 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here? 

196 # Currently using Core's conf for remote logging consistency. 

197 from airflow.configuration import conf 

198 

199 if conn_id := conf.get("logging", "remote_log_conn_id", fallback=None): 

200 return conn_id 

201 

202 from airflow.logging_config import get_default_remote_conn_id 

203 

204 return get_default_remote_conn_id() 

205 

206 

207def relative_path_from_logger(logger) -> Path | None: 

208 if not logger: 

209 return None 

210 if not hasattr(logger, "_file"): 

211 logger.warning("Unable to find log file, logger was of unexpected type", type=type(logger)) 

212 return None 

213 

214 fh = logger._file 

215 fname = fh.name 

216 

217 if fh.fileno() == 1 or not isinstance(fname, str): 

218 # Logging to stdout, or something odd about this logger, don't try to upload! 

219 return None 

220 

221 # TODO: Over time, providers should use SDK's conf only. Verify and make changes to ensure we're aligned with that aim here? 

222 # Currently using Core's conf for remote logging consistency 

223 from airflow.configuration import conf 

224 

225 base_log_folder = conf.get("logging", "base_log_folder") 

226 return Path(fname).relative_to(base_log_folder) 

227 

228 

229def upload_to_remote(logger: FilteringBoundLogger, ti: RuntimeTI): 

230 raw_logger = getattr(logger, "_logger") 

231 

232 handler = load_remote_log_handler() 

233 if not handler: 

234 return 

235 

236 try: 

237 relative_path = relative_path_from_logger(raw_logger) 

238 except Exception: 

239 return 

240 if not relative_path: 

241 return 

242 

243 log_relative_path = relative_path.as_posix() 

244 handler.upload(log_relative_path, ti) 

245 

246 

247def mask_secret(secret: JsonValue, name: str | None = None) -> None: 

248 """ 

249 Mask a secret in both task process and supervisor process. 

250 

251 For secrets loaded from backends (Vault, env vars, etc.), this ensures 

252 they're masked in both the task subprocess AND supervisor's log output. 

253 Works safely in both sync and async contexts. 

254 """ 

255 from contextlib import suppress 

256 

257 from airflow.sdk._shared.secrets_masker import _secrets_masker 

258 

259 _secrets_masker().add_mask(secret, name) 

260 

261 with suppress(Exception): 

262 # Try to tell supervisor (only if in task execution context) 

263 from airflow.sdk.execution_time import task_runner 

264 from airflow.sdk.execution_time.comms import MaskSecret 

265 

266 if comms := getattr(task_runner, "SUPERVISOR_COMMS", None): 

267 comms.send(MaskSecret(value=secret, name=name)) 

268 

269 

270def reset_logging(): 

271 """ 

272 Convince for testing. Not for production use. 

273 

274 :meta private: 

275 """ 

276 from airflow.sdk._shared.logging.structlog import structlog_processors 

277 

278 _WarningsInterceptor.reset() 

279 structlog_processors.cache_clear() 

280 logging_processors.cache_clear() 

281 

282 

283def _showwarning( 

284 message: Warning | str, 

285 category: type[Warning], 

286 filename: str, 

287 lineno: int, 

288 file: TextIO | None = None, 

289 line: str | None = None, 

290) -> Any: 

291 """ 

292 Redirects warnings to structlog so they appear in task logs etc. 

293 

294 Implementation of showwarnings which redirects to logging, which will first 

295 check to see if the file parameter is None. If a file is specified, it will 

296 delegate to the original warnings implementation of showwarning. Otherwise, 

297 it will call warnings.formatwarning and will log the resulting string to a 

298 warnings logger named "py.warnings" with level logging.WARNING. 

299 """ 

300 if file is not None: 

301 _WarningsInterceptor.emit_warning(message, category, filename, lineno, file, line) 

302 else: 

303 from airflow.sdk._shared.logging.structlog import reconfigure_logger 

304 

305 log = reconfigure_logger( 

306 structlog.get_logger("py.warnings").bind(), structlog.processors.CallsiteParameterAdder 

307 ) 

308 

309 log.warning(str(message), category=category.__name__, filename=filename, lineno=lineno)