Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/log/secrets_masker.py: 35%

204 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""Mask sensitive information from logs.""" 

18from __future__ import annotations 

19 

20import collections.abc 

21import logging 

22import re 

23import sys 

24from functools import cached_property 

25from typing import ( 

26 TYPE_CHECKING, 

27 Any, 

28 Callable, 

29 Dict, 

30 Generator, 

31 Iterable, 

32 Iterator, 

33 List, 

34 TextIO, 

35 Tuple, 

36 TypeVar, 

37 Union, 

38) 

39 

40from airflow import settings 

41from airflow.compat.functools import cache 

42from airflow.typing_compat import TypeGuard 

43 

44if TYPE_CHECKING: 

45 from kubernetes.client import V1EnvVar 

46 

47Redactable = TypeVar("Redactable", str, "V1EnvVar", Dict[Any, Any], Tuple[Any, ...], List[Any]) 

48Redacted = Union[Redactable, str] 

49 

50log = logging.getLogger(__name__) 

51 

52DEFAULT_SENSITIVE_FIELDS = frozenset( 

53 { 

54 "access_token", 

55 "api_key", 

56 "apikey", 

57 "authorization", 

58 "passphrase", 

59 "passwd", 

60 "password", 

61 "private_key", 

62 "secret", 

63 "token", 

64 "keyfile_dict", 

65 "service_account", 

66 } 

67) 

68"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive""" 

69 

70SECRETS_TO_SKIP_MASKING_FOR_TESTS = {"airflow"} 

71 

72 

73@cache 

74def get_sensitive_variables_fields(): 

75 """Get comma-separated sensitive Variable Fields from airflow.cfg.""" 

76 from airflow.configuration import conf 

77 

78 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy() 

79 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names") 

80 if sensitive_variable_fields: 

81 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")}) 

82 return sensitive_fields 

83 

84 

85def should_hide_value_for_key(name): 

86 """Should the value for this given name (Variable name, or key in conn.extra_dejson) be hidden.""" 

87 from airflow import settings 

88 

89 if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS: 

90 name = name.strip().lower() 

91 return any(s in name for s in get_sensitive_variables_fields()) 

92 return False 

93 

94 

95def mask_secret(secret: str | dict | Iterable, name: str | None = None) -> None: 

96 """ 

97 Mask a secret from appearing in the task logs. 

98 

99 If ``name`` is provided, then it will only be masked if the name matches 

100 one of the configured "sensitive" names. 

101 

102 If ``secret`` is a dict or a iterable (excluding str) then it will be 

103 recursively walked and keys with sensitive names will be hidden. 

104 """ 

105 # Filtering all log messages is not a free process, so we only do it when 

106 # running tasks 

107 if not secret: 

108 return 

109 

110 _secrets_masker().add_mask(secret, name) 

111 

112 

113def redact(value: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted: 

114 """Redact any secrets found in ``value``.""" 

115 return _secrets_masker().redact(value, name, max_depth) 

116 

117 

118@cache 

119def _secrets_masker() -> SecretsMasker: 

120 for flt in logging.getLogger("airflow.task").filters: 

121 if isinstance(flt, SecretsMasker): 

122 return flt 

123 raise RuntimeError( 

124 "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make " 

125 "sure you configure it taking airflow configuration as a base as explained at " 

126 "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html" 

127 "#advanced-configuration" 

128 ) 

129 

130 

131@cache 

132def _get_v1_env_var_type() -> type: 

133 try: 

134 from kubernetes.client import V1EnvVar 

135 except ImportError: 

136 return type("V1EnvVar", (), {}) 

137 return V1EnvVar 

138 

139 

140def _is_v1_env_var(v: Any) -> TypeGuard[V1EnvVar]: 

141 return isinstance(v, _get_v1_env_var_type()) 

142 

143 

144class SecretsMasker(logging.Filter): 

145 """Redact secrets from logs.""" 

146 

147 replacer: re.Pattern | None = None 

148 patterns: set[str] 

149 

150 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered" 

151 MAX_RECURSION_DEPTH = 5 

152 

153 def __init__(self): 

154 super().__init__() 

155 self.patterns = set() 

156 

157 @cached_property 

158 def _record_attrs_to_ignore(self) -> Iterable[str]: 

159 # Doing log.info(..., extra={'foo': 2}) sets extra properties on 

160 # record, i.e. record.foo. And we need to filter those too. Fun 

161 # 

162 # Create a record, and look at what attributes are on it, and ignore 

163 # all the default ones! 

164 

165 record = logging.getLogRecordFactory()( 

166 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, 

167 "x", 

168 logging.INFO, 

169 __file__, 

170 1, 

171 "", 

172 tuple(), 

173 exc_info=None, 

174 func="funcname", 

175 ) 

176 return frozenset(record.__dict__).difference({"msg", "args"}) 

177 

178 def _redact_exception_with_context(self, exception): 

179 # Exception class may not be modifiable (e.g. declared by an 

180 # extension module such as JDBC). 

181 try: 

182 exception.args = (self.redact(v) for v in exception.args) 

183 except AttributeError: 

184 pass 

185 if exception.__context__: 

186 self._redact_exception_with_context(exception.__context__) 

187 if exception.__cause__ and exception.__cause__ is not exception.__context__: 

188 self._redact_exception_with_context(exception.__cause__) 

189 

190 def filter(self, record) -> bool: 

191 if settings.MASK_SECRETS_IN_LOGS is not True: 

192 return True 

193 

194 if self.ALREADY_FILTERED_FLAG in record.__dict__: 

195 # Filters are attached to multiple handlers and logs, keep a 

196 # "private" flag that stops us needing to process it more than once 

197 return True 

198 

199 if self.replacer: 

200 for k, v in record.__dict__.items(): 

201 if k in self._record_attrs_to_ignore: 

202 continue 

203 record.__dict__[k] = self.redact(v) 

204 if record.exc_info and record.exc_info[1] is not None: 

205 exc = record.exc_info[1] 

206 self._redact_exception_with_context(exc) 

207 record.__dict__[self.ALREADY_FILTERED_FLAG] = True 

208 

209 return True 

210 

211 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called 

212 # this function directly. New versions of that provider, and this class itself call it with a value 

213 def _redact_all(self, item: Redactable, depth: int, max_depth: int = MAX_RECURSION_DEPTH) -> Redacted: 

214 if depth > max_depth or isinstance(item, str): 

215 return "***" 

216 if isinstance(item, dict): 

217 return { 

218 dict_key: self._redact_all(subval, depth + 1, max_depth) for dict_key, subval in item.items() 

219 } 

220 elif isinstance(item, (tuple, set)): 

221 # Turn set in to tuple! 

222 return tuple(self._redact_all(subval, depth + 1, max_depth) for subval in item) 

223 elif isinstance(item, list): 

224 return list(self._redact_all(subval, depth + 1, max_depth) for subval in item) 

225 else: 

226 return item 

227 

228 def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int) -> Redacted: 

229 # Avoid spending too much effort on redacting on deeply nested 

230 # structures. This also avoid infinite recursion if a structure has 

231 # reference to self. 

232 if depth > max_depth: 

233 return item 

234 try: 

235 if name and should_hide_value_for_key(name): 

236 return self._redact_all(item, depth, max_depth) 

237 if isinstance(item, dict): 

238 to_return = { 

239 dict_key: self._redact(subval, name=dict_key, depth=(depth + 1), max_depth=max_depth) 

240 for dict_key, subval in item.items() 

241 } 

242 return to_return 

243 elif _is_v1_env_var(item): 

244 tmp: dict = item.to_dict() 

245 if should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp: 

246 tmp["value"] = "***" 

247 else: 

248 return self._redact(item=tmp, name=name, depth=depth, max_depth=max_depth) 

249 return tmp 

250 elif isinstance(item, str): 

251 if self.replacer: 

252 # We can't replace specific values, but the key-based redacting 

253 # can still happen, so we can't short-circuit, we need to walk 

254 # the structure. 

255 return self.replacer.sub("***", item) 

256 return item 

257 elif isinstance(item, (tuple, set)): 

258 # Turn set in to tuple! 

259 return tuple( 

260 self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item 

261 ) 

262 elif isinstance(item, list): 

263 return [ 

264 self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item 

265 ] 

266 else: 

267 return item 

268 # I think this should never happen, but it does not hurt to leave it just in case 

269 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373) 

270 # but it caused infinite recursion, so we need to cast it to str first. 

271 except Exception as e: 

272 log.warning( 

273 "Unable to redact %s, please report this via <https://github.com/apache/airflow/issues>. " 

274 "Error was: %s: %s", 

275 repr(item), 

276 type(e).__name__, 

277 str(e), 

278 ) 

279 return item 

280 

281 def redact(self, item: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted: 

282 """Redact an any secrets found in ``item``, if it is a string. 

283 

284 If ``name`` is given, and it's a "sensitive" name (see 

285 :func:`should_hide_value_for_key`) then all string values in the item 

286 is redacted. 

287 """ 

288 return self._redact(item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH) 

289 

290 @cached_property 

291 def _mask_adapter(self) -> None | Callable: 

292 """Pulls the secret mask adapter from config. 

293 

294 This lives in a function here to be cached and only hit the config once. 

295 """ 

296 from airflow.configuration import conf 

297 

298 return conf.getimport("logging", "secret_mask_adapter", fallback=None) 

299 

300 @cached_property 

301 def _test_mode(self) -> bool: 

302 """Pulls the unit test mode flag from config. 

303 

304 This lives in a function here to be cached and only hit the config once. 

305 """ 

306 from airflow.configuration import conf 

307 

308 return conf.getboolean("core", "unit_test_mode") 

309 

310 def _adaptations(self, secret: str) -> Generator[str, None, None]: 

311 """Yields the secret along with any adaptations to the secret that should be masked.""" 

312 yield secret 

313 

314 if self._mask_adapter: 

315 # This can return an iterable of secrets to mask OR a single secret as a string 

316 secret_or_secrets = self._mask_adapter(secret) 

317 if not isinstance(secret_or_secrets, str): 

318 # if its not a string, it must be an iterable 

319 yield from secret_or_secrets 

320 else: 

321 yield secret_or_secrets 

322 

323 def add_mask(self, secret: str | dict | Iterable, name: str | None = None): 

324 """Add a new secret to be masked to this filter instance.""" 

325 if isinstance(secret, dict): 

326 for k, v in secret.items(): 

327 self.add_mask(v, k) 

328 elif isinstance(secret, str): 

329 if not secret or (self._test_mode and secret in SECRETS_TO_SKIP_MASKING_FOR_TESTS): 

330 return 

331 

332 new_mask = False 

333 for s in self._adaptations(secret): 

334 if s: 

335 pattern = re.escape(s) 

336 if pattern not in self.patterns and (not name or should_hide_value_for_key(name)): 

337 self.patterns.add(pattern) 

338 new_mask = True 

339 

340 if new_mask: 

341 self.replacer = re.compile("|".join(self.patterns)) 

342 

343 elif isinstance(secret, collections.abc.Iterable): 

344 for v in secret: 

345 self.add_mask(v, name) 

346 

347 

348class RedactedIO(TextIO): 

349 """IO class that redacts values going into stdout. 

350 

351 Expected usage:: 

352 

353 with contextlib.redirect_stdout(RedactedIO()): 

354 ... # Writes to stdout will be redacted. 

355 """ 

356 

357 def __init__(self): 

358 self.target = sys.stdout 

359 

360 def __enter__(self) -> TextIO: 

361 return self.target.__enter__() 

362 

363 def __exit__(self, t, v, b) -> None: 

364 return self.target.__exit__(t, v, b) 

365 

366 def __iter__(self) -> Iterator[str]: 

367 return iter(self.target) 

368 

369 def __next__(self) -> str: 

370 return next(self.target) 

371 

372 def close(self) -> None: 

373 return self.target.close() 

374 

375 def fileno(self) -> int: 

376 return self.target.fileno() 

377 

378 def flush(self) -> None: 

379 return self.target.flush() 

380 

381 def isatty(self) -> bool: 

382 return self.target.isatty() 

383 

384 def read(self, n: int = -1) -> str: 

385 return self.target.read(n) 

386 

387 def readable(self) -> bool: 

388 return self.target.readable() 

389 

390 def readline(self, n: int = -1) -> str: 

391 return self.target.readline(n) 

392 

393 def readlines(self, n: int = -1) -> list[str]: 

394 return self.target.readlines(n) 

395 

396 def seek(self, offset: int, whence: int = 0) -> int: 

397 return self.target.seek(offset, whence) 

398 

399 def seekable(self) -> bool: 

400 return self.target.seekable() 

401 

402 def tell(self) -> int: 

403 return self.target.tell() 

404 

405 def truncate(self, s: int | None = None) -> int: 

406 return self.target.truncate(s) 

407 

408 def writable(self) -> bool: 

409 return self.target.writable() 

410 

411 def write(self, s: str) -> int: 

412 s = redact(s) 

413 return self.target.write(s) 

414 

415 def writelines(self, lines) -> None: 

416 self.target.writelines(lines)