Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/secrets_masker/secrets_masker.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

287 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""Mask sensitive information from logs.""" 

18 

19from __future__ import annotations 

20 

21import collections.abc 

22import contextlib 

23import functools 

24import inspect 

25import logging 

26import re 

27import sys 

28from collections.abc import Generator, Iterable, Iterator 

29from enum import Enum 

30from functools import cache, cached_property 

31from re import Pattern 

32from typing import TYPE_CHECKING, Any, Protocol, TextIO, TypeAlias, TypeVar, overload 

33 

34# We have to import this here, as it is used in the type annotations at runtime even if it seems it is 

35# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields. 

36from pydantic import JsonValue # noqa: TC002 

37 

38if TYPE_CHECKING: 

39 from typing import TypeGuard 

40 

41 class _V1EnvVarLike(Protocol): 

42 def to_dict(self) -> dict[str, Any]: ... 

43 

44 

45V1EnvVar = TypeVar("V1EnvVar") 

46Redactable: TypeAlias = str | V1EnvVar | dict[Any, Any] | tuple[Any, ...] | list[Any] 

47Redacted: TypeAlias = Redactable | str 

48 

49log = logging.getLogger(__name__) 

50 

51DEFAULT_SENSITIVE_FIELDS = frozenset( 

52 { 

53 "access_token", 

54 "api_key", 

55 "apikey", 

56 "authorization", 

57 "passphrase", 

58 "passwd", 

59 "password", 

60 "private_key", 

61 "proxy", 

62 "proxies", 

63 "secret", 

64 "token", 

65 "keyfile_dict", 

66 "service_account", 

67 } 

68) 

69"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive""" 

70 

71SECRETS_TO_SKIP_MASKING = {"airflow"} 

72"""Common terms that should be excluded from masking in both production and tests""" 

73 

74 

75def should_hide_value_for_key(name): 

76 """ 

77 Return if the value for this given name should be hidden. 

78 

79 Name might be a Variable name, or key in conn.extra_dejson, for example. 

80 """ 

81 return _secrets_masker().should_hide_value_for_key(name) 

82 

83 

84def mask_secret(secret: JsonValue, name: str | None = None) -> None: 

85 """ 

86 Mask a secret from appearing in the logs. 

87 

88 If ``name`` is provided, then it will only be masked if the name matches one of the configured "sensitive" 

89 names. 

90 

91 If ``secret`` is a dict or a iterable (excluding str) then it will be recursively walked and keys with 

92 sensitive names will be hidden. 

93 

94 If the secret value is too short (by default 5 characters or fewer, configurable via the 

95 :ref:`[logging] min_length_masked_secret <config:logging__min_length_masked_secret>` setting) it will not 

96 be masked 

97 """ 

98 if not secret: 

99 return 

100 

101 _secrets_masker().add_mask(secret, name) 

102 

103 

104def redact( 

105 value: Redactable, name: str | None = None, max_depth: int | None = None, replacement: str = "***" 

106) -> Redacted: 

107 """Redact any secrets found in ``value`` with the given replacement.""" 

108 return _secrets_masker().redact(value, name, max_depth, replacement=replacement) 

109 

110 

111@overload 

112def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ... 

113 

114 

115@overload 

116def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ... 

117 

118 

119def merge( 

120 new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None 

121) -> Redacted: 

122 """ 

123 Merge a redacted value with its original unredacted counterpart. 

124 

125 Takes a user-modified redacted value and merges it with the original unredacted value. 

126 For sensitive fields that still contain "***" (unchanged), the original value is restored. 

127 For fields that have been updated by the user, the new value is preserved. 

128 """ 

129 return _secrets_masker().merge(new_value, old_value, name, max_depth) 

130 

131 

132@cache 

133def _secrets_masker() -> SecretsMasker: 

134 """ 

135 Get or create the module-level secrets masker instance. 

136 

137 This function implements a module level singleton pattern within this specific 

138 module. Note that different import paths (e.g., airflow._shared vs 

139 airflow.sdk._shared) will have separate global variables and thus separate 

140 masker instances. 

141 """ 

142 return SecretsMasker() 

143 

144 

145def reset_secrets_masker() -> None: 

146 """ 

147 Reset the secrets masker to clear existing patterns and replacer. 

148 

149 This utility ensures that an execution environment starts with a fresh masker, 

150 preventing any carry over of patterns or replacer from previous execution or parent processes. 

151 

152 New processor types should invoke this method when setting up their own masking to avoid 

153 inheriting masking rules from existing execution environments. 

154 """ 

155 _secrets_masker().reset_masker() 

156 

157 

158def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]: 

159 """Check if object is V1EnvVar, avoiding unnecessary imports.""" 

160 # Quick check: if k8s not imported, can't be a V1EnvVar instance 

161 if "kubernetes.client" not in sys.modules: 

162 return False 

163 

164 # K8s is loaded, safe to get/cache the type 

165 v1_type = _get_v1_env_var_type_cached() 

166 return isinstance(v, v1_type) 

167 

168 

169@cache 

170def _get_v1_env_var_type_cached() -> type: 

171 """Get V1EnvVar type (cached, only called when k8s is already loaded).""" 

172 try: 

173 from kubernetes.client import V1EnvVar 

174 

175 return V1EnvVar 

176 except ImportError: 

177 # Shouldn't happen since we check sys.modules first 

178 return type("V1EnvVar", (), {}) 

179 

180 

181class SecretsMasker(logging.Filter): 

182 """Redact secrets from logs.""" 

183 

184 replacer: Pattern | None = None 

185 patterns: set[str] 

186 

187 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered" 

188 MAX_RECURSION_DEPTH = 5 

189 _has_warned_short_secret = False 

190 mask_secrets_in_logs = False 

191 

192 min_length_to_mask = 5 

193 secret_mask_adapter = None 

194 

195 def __init__(self): 

196 super().__init__() 

197 self.patterns = set() 

198 self.sensitive_variables_fields = [] 

199 

200 @classmethod 

201 def __init_subclass__(cls, **kwargs): 

202 super().__init_subclass__(**kwargs) 

203 

204 if cls._redact is not SecretsMasker._redact: 

205 sig = inspect.signature(cls._redact) 

206 # Compat for older versions of the OpenLineage plugin which subclasses this -- call the method 

207 # without the replacement character 

208 for param in sig.parameters.values(): 

209 if param.name == "replacement" or param.kind == param.VAR_KEYWORD: 

210 break 

211 else: 

212 # Block only runs if no break above. 

213 

214 f = cls._redact 

215 

216 @functools.wraps(f) 

217 def _redact(*args, replacement: str = "***", **kwargs): 

218 return f(*args, **kwargs) 

219 

220 cls._redact = _redact 

221 ... 

222 

223 @classmethod 

224 def enable_log_masking(cls) -> None: 

225 """Enable secret masking in logs.""" 

226 cls.mask_secrets_in_logs = True 

227 

228 @classmethod 

229 def disable_log_masking(cls) -> None: 

230 """Disable secret masking in logs.""" 

231 cls.mask_secrets_in_logs = False 

232 

233 @classmethod 

234 def is_log_masking_enabled(cls) -> bool: 

235 """Check if secret masking in logs is enabled.""" 

236 return cls.mask_secrets_in_logs 

237 

238 @cached_property 

239 def _record_attrs_to_ignore(self) -> Iterable[str]: 

240 # Doing log.info(..., extra={'foo': 2}) sets extra properties on 

241 # record, i.e. record.foo. And we need to filter those too. Fun 

242 # 

243 # Create a record, and look at what attributes are on it, and ignore 

244 # all the default ones! 

245 

246 record = logging.getLogRecordFactory()( 

247 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, 

248 "x", 

249 logging.INFO, 

250 __file__, 

251 1, 

252 "", 

253 (), 

254 exc_info=None, 

255 func="funcname", 

256 ) 

257 return frozenset(record.__dict__).difference({"msg", "args"}) 

258 

259 def _redact_exception_with_context(self, exception): 

260 # Exception class may not be modifiable (e.g. declared by an 

261 # extension module such as JDBC). 

262 with contextlib.suppress(AttributeError): 

263 exception.args = (self.redact(v) for v in exception.args) 

264 if exception.__context__: 

265 self._redact_exception_with_context(exception.__context__) 

266 if exception.__cause__ and exception.__cause__ is not exception.__context__: 

267 self._redact_exception_with_context(exception.__cause__) 

268 

269 def filter(self, record) -> bool: 

270 if not self.is_log_masking_enabled(): 

271 return True 

272 

273 if self.ALREADY_FILTERED_FLAG in record.__dict__: 

274 # Filters are attached to multiple handlers and logs, keep a 

275 # "private" flag that stops us needing to process it more than once 

276 return True 

277 

278 if self.replacer: 

279 for k, v in record.__dict__.items(): 

280 if k not in self._record_attrs_to_ignore: 

281 record.__dict__[k] = self.redact(v) 

282 if record.exc_info and record.exc_info[1] is not None: 

283 exc = record.exc_info[1] 

284 self._redact_exception_with_context(exc) 

285 record.__dict__[self.ALREADY_FILTERED_FLAG] = True 

286 

287 return True 

288 

289 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called 

290 # this function directly. New versions of that provider, and this class itself call it with a value 

291 def _redact_all( 

292 self, 

293 item: Redactable, 

294 depth: int, 

295 max_depth: int = MAX_RECURSION_DEPTH, 

296 *, 

297 replacement: str = "***", 

298 ) -> Redacted: 

299 if depth > max_depth or isinstance(item, str): 

300 return replacement 

301 if isinstance(item, dict): 

302 return { 

303 dict_key: self._redact_all(subval, depth + 1, max_depth, replacement=replacement) 

304 for dict_key, subval in item.items() 

305 } 

306 if isinstance(item, (tuple, set)): 

307 # Turn set in to tuple! 

308 return tuple( 

309 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item 

310 ) 

311 if isinstance(item, list): 

312 return list( 

313 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item 

314 ) 

315 return item 

316 

317 def _redact( 

318 self, item: Redactable, name: str | None, depth: int, max_depth: int, replacement: str = "***" 

319 ) -> Redacted: 

320 # Avoid spending too much effort on redacting on deeply nested 

321 # structures. This also avoid infinite recursion if a structure has 

322 # reference to self. 

323 if depth > max_depth: 

324 return item 

325 try: 

326 if name and self.should_hide_value_for_key(name): 

327 return self._redact_all(item, depth, max_depth, replacement=replacement) 

328 if isinstance(item, dict): 

329 to_return = { 

330 dict_key: self._redact( 

331 subval, name=dict_key, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

332 ) 

333 for dict_key, subval in item.items() 

334 } 

335 return to_return 

336 if isinstance(item, Enum): 

337 return self._redact( 

338 item=item.value, name=name, depth=depth, max_depth=max_depth, replacement=replacement 

339 ) 

340 if _is_v1_env_var(item): 

341 tmp = item.to_dict() 

342 if self.should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp: 

343 tmp["value"] = replacement 

344 else: 

345 return self._redact( 

346 item=tmp, name=name, depth=depth, max_depth=max_depth, replacement=replacement 

347 ) 

348 return tmp 

349 if isinstance(item, str): 

350 if self.replacer: 

351 # We can't replace specific values, but the key-based redacting 

352 # can still happen, so we can't short-circuit, we need to walk 

353 # the structure. 

354 return self.replacer.sub(replacement, str(item)) 

355 return item 

356 if isinstance(item, (tuple, set)): 

357 # Turn set in to tuple! 

358 return tuple( 

359 self._redact( 

360 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

361 ) 

362 for subval in item 

363 ) 

364 if isinstance(item, list): 

365 return [ 

366 self._redact( 

367 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

368 ) 

369 for subval in item 

370 ] 

371 return item 

372 # I think this should never happen, but it does not hurt to leave it just in case 

373 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373) 

374 # but it caused infinite recursion, to avoid this we mark the log as already filtered. 

375 except Exception as exc: 

376 log.warning( 

377 "Unable to redact value of type %s, please report this via " 

378 "<https://github.com/apache/airflow/issues>. Error was: %s: %s", 

379 type(item), 

380 type(exc).__name__, 

381 exc, 

382 extra={self.ALREADY_FILTERED_FLAG: True}, 

383 ) 

384 # Rather than expose sensitive info, lets play it safe 

385 return "<redaction-failed>" 

386 

387 def _merge( 

388 self, 

389 new_item: Redacted, 

390 old_item: Redactable, 

391 *, 

392 name: str | None, 

393 depth: int, 

394 max_depth: int, 

395 force_sensitive: bool = False, 

396 replacement: str, 

397 ) -> Redacted: 

398 """Merge a redacted item with its original unredacted counterpart.""" 

399 if depth > max_depth: 

400 if isinstance(new_item, str) and new_item == "***": 

401 return old_item 

402 return new_item 

403 

404 try: 

405 # Determine if we should treat this as sensitive 

406 is_sensitive = force_sensitive or (name is not None and self.should_hide_value_for_key(name)) 

407 

408 if isinstance(new_item, dict) and isinstance(old_item, dict): 

409 merged = {} 

410 for key in new_item.keys(): 

411 if key in old_item: 

412 # For dicts, pass the key as name unless we're in sensitive mode 

413 child_name = None if is_sensitive else key 

414 merged[key] = self._merge( 

415 new_item[key], 

416 old_item[key], 

417 name=child_name, 

418 depth=depth + 1, 

419 max_depth=max_depth, 

420 force_sensitive=is_sensitive, 

421 replacement=replacement, 

422 ) 

423 else: 

424 merged[key] = new_item[key] 

425 return merged 

426 

427 if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item): 

428 merged_list = [] 

429 for i in range(len(new_item)): 

430 if i < len(old_item): 

431 # In sensitive mode, check if individual item is redacted 

432 if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***": 

433 merged_list.append(old_item[i]) 

434 else: 

435 merged_list.append( 

436 self._merge( 

437 new_item[i], 

438 old_item[i], 

439 name=None, 

440 depth=depth + 1, 

441 max_depth=max_depth, 

442 force_sensitive=is_sensitive, 

443 replacement=replacement, 

444 ) 

445 ) 

446 else: 

447 merged_list.append(new_item[i]) 

448 

449 if isinstance(new_item, list): 

450 return list(merged_list) 

451 return tuple(merged_list) 

452 

453 if isinstance(new_item, set) and isinstance(old_item, set): 

454 # Sets are unordered, we cannot restore original items. 

455 return new_item 

456 

457 if _is_v1_env_var(new_item) and _is_v1_env_var(old_item): 

458 # TODO: Handle Kubernetes V1EnvVar objects if needed 

459 return new_item 

460 

461 if is_sensitive and isinstance(new_item, str) and new_item == "***": 

462 return old_item 

463 return new_item 

464 

465 except (TypeError, AttributeError, ValueError): 

466 return new_item 

467 

468 def redact( 

469 self, 

470 item: Redactable, 

471 name: str | None = None, 

472 max_depth: int | None = None, 

473 replacement: str = "***", 

474 ) -> Redacted: 

475 """ 

476 Redact an any secrets found in ``item``, if it is a string. 

477 

478 If ``name`` is given, and it's a "sensitive" name (see 

479 :func:`should_hide_value_for_key`) then all string values in the item 

480 is redacted. 

481 """ 

482 return self._redact( 

483 item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH, replacement=replacement 

484 ) 

485 

486 def merge( 

487 self, 

488 new_item: Redacted, 

489 old_item: Redactable, 

490 name: str | None = None, 

491 max_depth: int | None = None, 

492 replacement: str = "***", 

493 ) -> Redacted: 

494 """ 

495 Merge a redacted item with its original unredacted counterpart. 

496 

497 Takes a user-modified redacted item and merges it with the original unredacted item. 

498 For sensitive fields that still contain "***" (or whatever the ``replacement`` is specified as), the 

499 original value is restored. For fields that have been updated, the new value is preserved. 

500 """ 

501 return self._merge( 

502 new_item, 

503 old_item, 

504 name=name, 

505 depth=0, 

506 max_depth=max_depth or self.MAX_RECURSION_DEPTH, 

507 force_sensitive=False, 

508 replacement=replacement, 

509 ) 

510 

511 def _adaptations(self, secret: str) -> Generator[str, None, None]: 

512 """Yield the secret along with any adaptations to the secret that should be masked.""" 

513 yield secret 

514 

515 if self.secret_mask_adapter: 

516 # This can return an iterable of secrets to mask OR a single secret as a string 

517 secret_or_secrets = self.secret_mask_adapter(secret) 

518 if not isinstance(secret_or_secrets, str): 

519 # if its not a string, it must be an iterable 

520 yield from secret_or_secrets 

521 else: 

522 yield secret_or_secrets 

523 

524 def should_hide_value_for_key(self, name): 

525 """ 

526 Return if the value for this given name should be hidden. 

527 

528 Name might be a Variable name, or key in conn.extra_dejson, for example. 

529 """ 

530 from airflow.configuration import conf 

531 

532 if isinstance(name, str) and conf.getboolean("core", "hide_sensitive_var_conn_fields"): 

533 name = name.strip().lower() 

534 return any(s in name for s in self.sensitive_variables_fields) 

535 return False 

536 

537 def add_mask(self, secret: JsonValue, name: str | None = None): 

538 """Add a new secret to be masked to this filter instance.""" 

539 if isinstance(secret, dict): 

540 for k, v in secret.items(): 

541 self.add_mask(v, k) 

542 elif isinstance(secret, str): 

543 if not secret: 

544 return 

545 

546 if secret.lower() in SECRETS_TO_SKIP_MASKING: 

547 return 

548 

549 min_length = self.min_length_to_mask 

550 if len(secret) < min_length: 

551 if not SecretsMasker._has_warned_short_secret: 

552 log.warning( 

553 "Skipping masking for a secret as it's too short (<%d chars)", 

554 min_length, 

555 extra={self.ALREADY_FILTERED_FLAG: True}, 

556 ) 

557 SecretsMasker._has_warned_short_secret = True 

558 return 

559 

560 new_mask = False 

561 for s in self._adaptations(secret): 

562 if s: 

563 if len(s) < min_length: 

564 continue 

565 

566 if s.lower() in SECRETS_TO_SKIP_MASKING: 

567 continue 

568 

569 pattern = re.escape(s) 

570 if pattern not in self.patterns and (not name or self.should_hide_value_for_key(name)): 

571 self.patterns.add(pattern) 

572 new_mask = True 

573 if new_mask: 

574 self.replacer = re.compile("|".join(self.patterns)) 

575 

576 elif isinstance(secret, collections.abc.Iterable): 

577 for v in secret: 

578 self.add_mask(v, name) 

579 

580 def reset_masker(self): 

581 """Reset the patterns and the replacer in the masker instance.""" 

582 self.patterns = set() 

583 self.replacer = None 

584 

585 

586class RedactedIO(TextIO): 

587 """ 

588 IO class that redacts values going into stdout. 

589 

590 Expected usage:: 

591 

592 with contextlib.redirect_stdout(RedactedIO()): 

593 ... # Writes to stdout will be redacted. 

594 """ 

595 

596 def __init__(self): 

597 self.target = sys.stdout 

598 

599 def __enter__(self) -> TextIO: 

600 return self.target.__enter__() 

601 

602 def __exit__(self, t, v, b) -> None: 

603 return self.target.__exit__(t, v, b) 

604 

605 def __iter__(self) -> Iterator[str]: 

606 return iter(self.target) 

607 

608 def __next__(self) -> str: 

609 return next(self.target) 

610 

611 def close(self) -> None: 

612 return self.target.close() 

613 

614 def fileno(self) -> int: 

615 return self.target.fileno() 

616 

617 def flush(self) -> None: 

618 return self.target.flush() 

619 

620 def isatty(self) -> bool: 

621 return self.target.isatty() 

622 

623 def read(self, n: int = -1) -> str: 

624 return self.target.read(n) 

625 

626 def readable(self) -> bool: 

627 return self.target.readable() 

628 

629 def readline(self, n: int = -1) -> str: 

630 return self.target.readline(n) 

631 

632 def readlines(self, n: int = -1) -> list[str]: 

633 return self.target.readlines(n) 

634 

635 def seek(self, offset: int, whence: int = 0) -> int: 

636 return self.target.seek(offset, whence) 

637 

638 def seekable(self) -> bool: 

639 return self.target.seekable() 

640 

641 def tell(self) -> int: 

642 return self.target.tell() 

643 

644 def truncate(self, s: int | None = None) -> int: 

645 return self.target.truncate(s) 

646 

647 def writable(self) -> bool: 

648 return self.target.writable() 

649 

650 def write(self, s: str) -> int: 

651 s = str(redact(s)) 

652 return self.target.write(s) 

653 

654 def writelines(self, lines) -> None: 

655 self.target.writelines(lines)