Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/log/secrets_masker.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

206 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""Mask sensitive information from logs.""" 

18 

19from __future__ import annotations 

20 

21import collections.abc 

22import logging 

23import sys 

24from enum import Enum 

25from functools import cached_property 

26from typing import ( 

27 TYPE_CHECKING, 

28 Any, 

29 Callable, 

30 Dict, 

31 Generator, 

32 Iterable, 

33 Iterator, 

34 List, 

35 Pattern, 

36 TextIO, 

37 Tuple, 

38 TypeVar, 

39 Union, 

40) 

41 

42import re2 

43 

44from airflow import settings 

45from airflow.compat.functools import cache 

46 

47if TYPE_CHECKING: 

48 from kubernetes.client import V1EnvVar 

49 

50 from airflow.typing_compat import TypeGuard 

51 

52Redactable = TypeVar("Redactable", str, "V1EnvVar", Dict[Any, Any], Tuple[Any, ...], List[Any]) 

53Redacted = Union[Redactable, str] 

54 

55log = logging.getLogger(__name__) 

56 

57DEFAULT_SENSITIVE_FIELDS = frozenset( 

58 { 

59 "access_token", 

60 "api_key", 

61 "apikey", 

62 "authorization", 

63 "passphrase", 

64 "passwd", 

65 "password", 

66 "private_key", 

67 "secret", 

68 "token", 

69 "keyfile_dict", 

70 "service_account", 

71 } 

72) 

73"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive""" 

74 

75SECRETS_TO_SKIP_MASKING_FOR_TESTS = {"airflow"} 

76 

77 

78@cache 

79def get_sensitive_variables_fields(): 

80 """Get comma-separated sensitive Variable Fields from airflow.cfg.""" 

81 from airflow.configuration import conf 

82 

83 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy() 

84 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names") 

85 if sensitive_variable_fields: 

86 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")}) 

87 return sensitive_fields 

88 

89 

90def should_hide_value_for_key(name): 

91 """ 

92 Return if the value for this given name should be hidden. 

93 

94 Name might be a Variable name, or key in conn.extra_dejson, for example. 

95 """ 

96 from airflow import settings 

97 

98 if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS: 

99 name = name.strip().lower() 

100 return any(s in name for s in get_sensitive_variables_fields()) 

101 return False 

102 

103 

104def mask_secret(secret: str | dict | Iterable, name: str | None = None) -> None: 

105 """ 

106 Mask a secret from appearing in the task logs. 

107 

108 If ``name`` is provided, then it will only be masked if the name matches 

109 one of the configured "sensitive" names. 

110 

111 If ``secret`` is a dict or a iterable (excluding str) then it will be 

112 recursively walked and keys with sensitive names will be hidden. 

113 """ 

114 # Filtering all log messages is not a free process, so we only do it when 

115 # running tasks 

116 if not secret: 

117 return 

118 

119 _secrets_masker().add_mask(secret, name) 

120 

121 

122def redact(value: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted: 

123 """Redact any secrets found in ``value``.""" 

124 return _secrets_masker().redact(value, name, max_depth) 

125 

126 

127@cache 

128def _secrets_masker() -> SecretsMasker: 

129 for flt in logging.getLogger("airflow.task").filters: 

130 if isinstance(flt, SecretsMasker): 

131 return flt 

132 raise RuntimeError( 

133 "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make " 

134 "sure you configure it taking airflow configuration as a base as explained at " 

135 "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html" 

136 "#advanced-configuration" 

137 ) 

138 

139 

140@cache 

141def _get_v1_env_var_type() -> type: 

142 try: 

143 from kubernetes.client import V1EnvVar 

144 except ImportError: 

145 return type("V1EnvVar", (), {}) 

146 return V1EnvVar 

147 

148 

149def _is_v1_env_var(v: Any) -> TypeGuard[V1EnvVar]: 

150 return isinstance(v, _get_v1_env_var_type()) 

151 

152 

153class SecretsMasker(logging.Filter): 

154 """Redact secrets from logs.""" 

155 

156 replacer: Pattern | None = None 

157 patterns: set[str] 

158 

159 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered" 

160 MAX_RECURSION_DEPTH = 5 

161 

162 def __init__(self): 

163 super().__init__() 

164 self.patterns = set() 

165 

166 @cached_property 

167 def _record_attrs_to_ignore(self) -> Iterable[str]: 

168 # Doing log.info(..., extra={'foo': 2}) sets extra properties on 

169 # record, i.e. record.foo. And we need to filter those too. Fun 

170 # 

171 # Create a record, and look at what attributes are on it, and ignore 

172 # all the default ones! 

173 

174 record = logging.getLogRecordFactory()( 

175 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, 

176 "x", 

177 logging.INFO, 

178 __file__, 

179 1, 

180 "", 

181 (), 

182 exc_info=None, 

183 func="funcname", 

184 ) 

185 return frozenset(record.__dict__).difference({"msg", "args"}) 

186 

187 def _redact_exception_with_context(self, exception): 

188 # Exception class may not be modifiable (e.g. declared by an 

189 # extension module such as JDBC). 

190 try: 

191 exception.args = (self.redact(v) for v in exception.args) 

192 except AttributeError: 

193 pass 

194 if exception.__context__: 

195 self._redact_exception_with_context(exception.__context__) 

196 if exception.__cause__ and exception.__cause__ is not exception.__context__: 

197 self._redact_exception_with_context(exception.__cause__) 

198 

199 def filter(self, record) -> bool: 

200 if settings.MASK_SECRETS_IN_LOGS is not True: 

201 return True 

202 

203 if self.ALREADY_FILTERED_FLAG in record.__dict__: 

204 # Filters are attached to multiple handlers and logs, keep a 

205 # "private" flag that stops us needing to process it more than once 

206 return True 

207 

208 if self.replacer: 

209 for k, v in record.__dict__.items(): 

210 if k not in self._record_attrs_to_ignore: 

211 record.__dict__[k] = self.redact(v) 

212 if record.exc_info and record.exc_info[1] is not None: 

213 exc = record.exc_info[1] 

214 self._redact_exception_with_context(exc) 

215 record.__dict__[self.ALREADY_FILTERED_FLAG] = True 

216 

217 return True 

218 

219 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called 

220 # this function directly. New versions of that provider, and this class itself call it with a value 

221 def _redact_all(self, item: Redactable, depth: int, max_depth: int = MAX_RECURSION_DEPTH) -> Redacted: 

222 if depth > max_depth or isinstance(item, str): 

223 return "***" 

224 if isinstance(item, dict): 

225 return { 

226 dict_key: self._redact_all(subval, depth + 1, max_depth) for dict_key, subval in item.items() 

227 } 

228 elif isinstance(item, (tuple, set)): 

229 # Turn set in to tuple! 

230 return tuple(self._redact_all(subval, depth + 1, max_depth) for subval in item) 

231 elif isinstance(item, list): 

232 return list(self._redact_all(subval, depth + 1, max_depth) for subval in item) 

233 else: 

234 return item 

235 

236 def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int) -> Redacted: 

237 # Avoid spending too much effort on redacting on deeply nested 

238 # structures. This also avoid infinite recursion if a structure has 

239 # reference to self. 

240 if depth > max_depth: 

241 return item 

242 try: 

243 if name and should_hide_value_for_key(name): 

244 return self._redact_all(item, depth, max_depth) 

245 if isinstance(item, dict): 

246 to_return = { 

247 dict_key: self._redact(subval, name=dict_key, depth=(depth + 1), max_depth=max_depth) 

248 for dict_key, subval in item.items() 

249 } 

250 return to_return 

251 elif isinstance(item, Enum): 

252 return self._redact(item=item.value, name=name, depth=depth, max_depth=max_depth) 

253 elif _is_v1_env_var(item): 

254 tmp: dict = item.to_dict() 

255 if should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp: 

256 tmp["value"] = "***" 

257 else: 

258 return self._redact(item=tmp, name=name, depth=depth, max_depth=max_depth) 

259 return tmp 

260 elif isinstance(item, str): 

261 if self.replacer: 

262 # We can't replace specific values, but the key-based redacting 

263 # can still happen, so we can't short-circuit, we need to walk 

264 # the structure. 

265 return self.replacer.sub("***", str(item)) 

266 return item 

267 elif isinstance(item, (tuple, set)): 

268 # Turn set in to tuple! 

269 return tuple( 

270 self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item 

271 ) 

272 elif isinstance(item, list): 

273 return [ 

274 self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item 

275 ] 

276 else: 

277 return item 

278 # I think this should never happen, but it does not hurt to leave it just in case 

279 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373) 

280 # but it caused infinite recursion, to avoid this we mark the log as already filtered. 

281 except Exception as exc: 

282 log.warning( 

283 "Unable to redact value of type %s, please report this via " 

284 "<https://github.com/apache/airflow/issues>. Error was: %s: %s", 

285 item, 

286 type(exc).__name__, 

287 exc, 

288 extra={self.ALREADY_FILTERED_FLAG: True}, 

289 ) 

290 return item 

291 

292 def redact(self, item: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted: 

293 """Redact an any secrets found in ``item``, if it is a string. 

294 

295 If ``name`` is given, and it's a "sensitive" name (see 

296 :func:`should_hide_value_for_key`) then all string values in the item 

297 is redacted. 

298 """ 

299 return self._redact(item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH) 

300 

301 @cached_property 

302 def _mask_adapter(self) -> None | Callable: 

303 """Pulls the secret mask adapter from config. 

304 

305 This lives in a function here to be cached and only hit the config once. 

306 """ 

307 from airflow.configuration import conf 

308 

309 return conf.getimport("logging", "secret_mask_adapter", fallback=None) 

310 

311 @cached_property 

312 def _test_mode(self) -> bool: 

313 """Pulls the unit test mode flag from config. 

314 

315 This lives in a function here to be cached and only hit the config once. 

316 """ 

317 from airflow.configuration import conf 

318 

319 return conf.getboolean("core", "unit_test_mode") 

320 

321 def _adaptations(self, secret: str) -> Generator[str, None, None]: 

322 """Yield the secret along with any adaptations to the secret that should be masked.""" 

323 yield secret 

324 

325 if self._mask_adapter: 

326 # This can return an iterable of secrets to mask OR a single secret as a string 

327 secret_or_secrets = self._mask_adapter(secret) 

328 if not isinstance(secret_or_secrets, str): 

329 # if its not a string, it must be an iterable 

330 yield from secret_or_secrets 

331 else: 

332 yield secret_or_secrets 

333 

334 def add_mask(self, secret: str | dict | Iterable, name: str | None = None): 

335 """Add a new secret to be masked to this filter instance.""" 

336 if isinstance(secret, dict): 

337 for k, v in secret.items(): 

338 self.add_mask(v, k) 

339 elif isinstance(secret, str): 

340 if not secret or (self._test_mode and secret in SECRETS_TO_SKIP_MASKING_FOR_TESTS): 

341 return 

342 

343 new_mask = False 

344 for s in self._adaptations(secret): 

345 if s: 

346 pattern = re2.escape(s) 

347 if pattern not in self.patterns and (not name or should_hide_value_for_key(name)): 

348 self.patterns.add(pattern) 

349 new_mask = True 

350 

351 if new_mask: 

352 self.replacer = re2.compile("|".join(self.patterns)) 

353 

354 elif isinstance(secret, collections.abc.Iterable): 

355 for v in secret: 

356 self.add_mask(v, name) 

357 

358 

359class RedactedIO(TextIO): 

360 """IO class that redacts values going into stdout. 

361 

362 Expected usage:: 

363 

364 with contextlib.redirect_stdout(RedactedIO()): 

365 ... # Writes to stdout will be redacted. 

366 """ 

367 

368 def __init__(self): 

369 self.target = sys.stdout 

370 

371 def __enter__(self) -> TextIO: 

372 return self.target.__enter__() 

373 

374 def __exit__(self, t, v, b) -> None: 

375 return self.target.__exit__(t, v, b) 

376 

377 def __iter__(self) -> Iterator[str]: 

378 return iter(self.target) 

379 

380 def __next__(self) -> str: 

381 return next(self.target) 

382 

383 def close(self) -> None: 

384 return self.target.close() 

385 

386 def fileno(self) -> int: 

387 return self.target.fileno() 

388 

389 def flush(self) -> None: 

390 return self.target.flush() 

391 

392 def isatty(self) -> bool: 

393 return self.target.isatty() 

394 

395 def read(self, n: int = -1) -> str: 

396 return self.target.read(n) 

397 

398 def readable(self) -> bool: 

399 return self.target.readable() 

400 

401 def readline(self, n: int = -1) -> str: 

402 return self.target.readline(n) 

403 

404 def readlines(self, n: int = -1) -> list[str]: 

405 return self.target.readlines(n) 

406 

407 def seek(self, offset: int, whence: int = 0) -> int: 

408 return self.target.seek(offset, whence) 

409 

410 def seekable(self) -> bool: 

411 return self.target.seekable() 

412 

413 def tell(self) -> int: 

414 return self.target.tell() 

415 

416 def truncate(self, s: int | None = None) -> int: 

417 return self.target.truncate(s) 

418 

419 def writable(self) -> bool: 

420 return self.target.writable() 

421 

422 def write(self, s: str) -> int: 

423 s = redact(s) 

424 return self.target.write(s) 

425 

426 def writelines(self, lines) -> None: 

427 self.target.writelines(lines)