Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/log/secrets_masker.py: 32%

151 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""Mask sensitive information from logs""" 

18from __future__ import annotations 

19 

20import collections 

21import logging 

22import re 

23import sys 

24from typing import Any, Callable, Dict, Generator, Iterable, List, TextIO, Tuple, TypeVar, Union 

25 

26from airflow import settings 

27from airflow.compat.functools import cache, cached_property 

28 

29Redactable = TypeVar("Redactable", str, Dict[Any, Any], Tuple[Any, ...], List[Any]) 

30Redacted = Union[Redactable, str] 

31 

32log = logging.getLogger(__name__) 

33 

34DEFAULT_SENSITIVE_FIELDS = frozenset( 

35 { 

36 "access_token", 

37 "api_key", 

38 "apikey", 

39 "authorization", 

40 "passphrase", 

41 "passwd", 

42 "password", 

43 "private_key", 

44 "secret", 

45 "token", 

46 "keyfile_dict", 

47 "service_account", 

48 } 

49) 

50"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive""" 

51 

52SECRETS_TO_SKIP_MASKING_FOR_TESTS = {"airflow"} 

53 

54 

55@cache 

56def get_sensitive_variables_fields(): 

57 """Get comma-separated sensitive Variable Fields from airflow.cfg.""" 

58 from airflow.configuration import conf 

59 

60 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy() 

61 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names") 

62 if sensitive_variable_fields: 

63 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")}) 

64 return sensitive_fields 

65 

66 

67def should_hide_value_for_key(name): 

68 """Should the value for this given name (Variable name, or key in conn.extra_dejson) be hidden""" 

69 from airflow import settings 

70 

71 if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS: 

72 name = name.strip().lower() 

73 return any(s in name for s in get_sensitive_variables_fields()) 

74 return False 

75 

76 

77def mask_secret(secret: str | dict | Iterable, name: str | None = None) -> None: 

78 """ 

79 Mask a secret from appearing in the task logs. 

80 

81 If ``name`` is provided, then it will only be masked if the name matches 

82 one of the configured "sensitive" names. 

83 

84 If ``secret`` is a dict or a iterable (excluding str) then it will be 

85 recursively walked and keys with sensitive names will be hidden. 

86 """ 

87 # Filtering all log messages is not a free process, so we only do it when 

88 # running tasks 

89 if not secret: 

90 return 

91 

92 _secrets_masker().add_mask(secret, name) 

93 

94 

95def redact(value: Redactable, name: str | None = None) -> Redacted: 

96 """Redact any secrets found in ``value``.""" 

97 return _secrets_masker().redact(value, name) 

98 

99 

100@cache 

101def _secrets_masker() -> SecretsMasker: 

102 for flt in logging.getLogger("airflow.task").filters: 

103 if isinstance(flt, SecretsMasker): 

104 return flt 

105 raise RuntimeError( 

106 "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make " 

107 "sure you configure it taking airflow configuration as a base as explained at " 

108 "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html" 

109 "#advanced-configuration" 

110 ) 

111 

112 

113class SecretsMasker(logging.Filter): 

114 """Redact secrets from logs""" 

115 

116 replacer: re.Pattern | None = None 

117 patterns: set[str] 

118 

119 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered" 

120 MAX_RECURSION_DEPTH = 5 

121 

122 def __init__(self): 

123 super().__init__() 

124 self.patterns = set() 

125 

126 @cached_property 

127 def _record_attrs_to_ignore(self) -> Iterable[str]: 

128 # Doing log.info(..., extra={'foo': 2}) sets extra properties on 

129 # record, i.e. record.foo. And we need to filter those too. Fun 

130 # 

131 # Create a record, and look at what attributes are on it, and ignore 

132 # all the default ones! 

133 

134 record = logging.getLogRecordFactory()( 

135 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, 

136 "x", 

137 logging.INFO, 

138 __file__, 

139 1, 

140 "", 

141 tuple(), 

142 exc_info=None, 

143 func="funcname", 

144 ) 

145 return frozenset(record.__dict__).difference({"msg", "args"}) 

146 

147 def _redact_exception_with_context(self, exception): 

148 # Exception class may not be modifiable (e.g. declared by an 

149 # extension module such as JDBC). 

150 try: 

151 exception.args = (self.redact(v) for v in exception.args) 

152 except AttributeError: 

153 pass 

154 if exception.__context__: 

155 self._redact_exception_with_context(exception.__context__) 

156 if exception.__cause__ and exception.__cause__ is not exception.__context__: 

157 self._redact_exception_with_context(exception.__cause__) 

158 

159 def filter(self, record) -> bool: 

160 if settings.MASK_SECRETS_IN_LOGS is not True: 

161 return True 

162 

163 if self.ALREADY_FILTERED_FLAG in record.__dict__: 

164 # Filters are attached to multiple handlers and logs, keep a 

165 # "private" flag that stops us needing to process it more than once 

166 return True 

167 

168 if self.replacer: 

169 for k, v in record.__dict__.items(): 

170 if k in self._record_attrs_to_ignore: 

171 continue 

172 record.__dict__[k] = self.redact(v) 

173 if record.exc_info and record.exc_info[1] is not None: 

174 exc = record.exc_info[1] 

175 self._redact_exception_with_context(exc) 

176 record.__dict__[self.ALREADY_FILTERED_FLAG] = True 

177 

178 return True 

179 

180 def _redact_all(self, item: Redactable, depth: int) -> Redacted: 

181 if depth > self.MAX_RECURSION_DEPTH or isinstance(item, str): 

182 return "***" 

183 if isinstance(item, dict): 

184 return {dict_key: self._redact_all(subval, depth + 1) for dict_key, subval in item.items()} 

185 elif isinstance(item, (tuple, set)): 

186 # Turn set in to tuple! 

187 return tuple(self._redact_all(subval, depth + 1) for subval in item) 

188 elif isinstance(item, list): 

189 return list(self._redact_all(subval, depth + 1) for subval in item) 

190 else: 

191 return item 

192 

193 def _redact(self, item: Redactable, name: str | None, depth: int) -> Redacted: 

194 # Avoid spending too much effort on redacting on deeply nested 

195 # structures. This also avoid infinite recursion if a structure has 

196 # reference to self. 

197 if depth > self.MAX_RECURSION_DEPTH: 

198 return item 

199 try: 

200 if name and should_hide_value_for_key(name): 

201 return self._redact_all(item, depth) 

202 if isinstance(item, dict): 

203 return { 

204 dict_key: self._redact(subval, name=dict_key, depth=(depth + 1)) 

205 for dict_key, subval in item.items() 

206 } 

207 elif isinstance(item, str): 

208 if self.replacer: 

209 # We can't replace specific values, but the key-based redacting 

210 # can still happen, so we can't short-circuit, we need to walk 

211 # the structure. 

212 return self.replacer.sub("***", item) 

213 return item 

214 elif isinstance(item, (tuple, set)): 

215 # Turn set in to tuple! 

216 return tuple(self._redact(subval, name=None, depth=(depth + 1)) for subval in item) 

217 elif isinstance(item, list): 

218 return [self._redact(subval, name=None, depth=(depth + 1)) for subval in item] 

219 else: 

220 return item 

221 # I think this should never happen, but it does not hurt to leave it just in case 

222 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373) 

223 # but it caused infinite recursion, so we need to cast it to str first. 

224 except Exception as e: 

225 log.warning( 

226 "Unable to redact %s, please report this via <https://github.com/apache/airflow/issues>. " 

227 "Error was: %s: %s", 

228 repr(item), 

229 type(e).__name__, 

230 str(e), 

231 ) 

232 return item 

233 

234 def redact(self, item: Redactable, name: str | None = None) -> Redacted: 

235 """Redact an any secrets found in ``item``, if it is a string. 

236 

237 If ``name`` is given, and it's a "sensitive" name (see 

238 :func:`should_hide_value_for_key`) then all string values in the item 

239 is redacted. 

240 """ 

241 return self._redact(item, name, depth=0) 

242 

243 @cached_property 

244 def _mask_adapter(self) -> None | Callable: 

245 """Pulls the secret mask adapter from config. 

246 

247 This lives in a function here to be cached and only hit the config once. 

248 """ 

249 from airflow.configuration import conf 

250 

251 return conf.getimport("logging", "secret_mask_adapter", fallback=None) 

252 

253 @cached_property 

254 def _test_mode(self) -> bool: 

255 """Pulls the unit test mode flag from config. 

256 

257 This lives in a function here to be cached and only hit the config once. 

258 """ 

259 from airflow.configuration import conf 

260 

261 return conf.getboolean("core", "unit_test_mode") 

262 

263 def _adaptations(self, secret: str) -> Generator[str, None, None]: 

264 """Yields the secret along with any adaptations to the secret that should be masked.""" 

265 yield secret 

266 

267 if self._mask_adapter: 

268 # This can return an iterable of secrets to mask OR a single secret as a string 

269 secret_or_secrets = self._mask_adapter(secret) 

270 if not isinstance(secret_or_secrets, str): 

271 # if its not a string, it must be an iterable 

272 yield from secret_or_secrets 

273 else: 

274 yield secret_or_secrets 

275 

276 def add_mask(self, secret: str | dict | Iterable, name: str | None = None): 

277 """Add a new secret to be masked to this filter instance.""" 

278 if isinstance(secret, dict): 

279 for k, v in secret.items(): 

280 self.add_mask(v, k) 

281 elif isinstance(secret, str): 

282 if not secret or (self._test_mode and secret in SECRETS_TO_SKIP_MASKING_FOR_TESTS): 

283 return 

284 

285 new_mask = False 

286 for s in self._adaptations(secret): 

287 if s: 

288 pattern = re.escape(s) 

289 if pattern not in self.patterns and (not name or should_hide_value_for_key(name)): 

290 self.patterns.add(pattern) 

291 new_mask = True 

292 

293 if new_mask: 

294 self.replacer = re.compile("|".join(self.patterns)) 

295 

296 elif isinstance(secret, collections.abc.Iterable): 

297 for v in secret: 

298 self.add_mask(v, name) 

299 

300 

301class RedactedIO(TextIO): 

302 """IO class that redacts values going into stdout. 

303 

304 Expected usage:: 

305 

306 with contextlib.redirect_stdout(RedactedIO()): 

307 ... # Writes to stdout will be redacted. 

308 """ 

309 

310 def __init__(self): 

311 self.target = sys.stdout 

312 self.fileno = sys.stdout.fileno 

313 

314 def write(self, s: str) -> int: 

315 s = redact(s) 

316 return self.target.write(s) 

317 

318 def flush(self) -> None: 

319 return self.target.flush()