Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/secrets_masker/secrets_masker.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

289 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""Mask sensitive information from logs.""" 

18 

19from __future__ import annotations 

20 

21import collections.abc 

22import contextlib 

23import functools 

24import inspect 

25import logging 

26import re 

27import sys 

28from collections.abc import Generator, Iterable, Iterator 

29from enum import Enum 

30from functools import cache, cached_property 

31from re import Pattern 

32from typing import TYPE_CHECKING, Any, Protocol, TextIO, TypeAlias, TypeVar, overload 

33 

34# We have to import this here, as it is used in the type annotations at runtime even if it seems it is 

35# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields. 

36from pydantic import JsonValue # noqa: TC002 

37 

38if TYPE_CHECKING: 

39 from typing import TypeGuard 

40 

41 class _V1EnvVarLike(Protocol): 

42 def to_dict(self) -> dict[str, Any]: ... 

43 

44 

45V1EnvVar = TypeVar("V1EnvVar") 

46Redactable: TypeAlias = str | V1EnvVar | dict[Any, Any] | tuple[Any, ...] | list[Any] 

47Redacted: TypeAlias = Redactable | str 

48 

49log = logging.getLogger(__name__) 

50 

51DEFAULT_SENSITIVE_FIELDS = frozenset( 

52 { 

53 "access_token", 

54 "api_key", 

55 "apikey", 

56 "authorization", 

57 "passphrase", 

58 "passwd", 

59 "password", 

60 "private_key", 

61 "secret", 

62 "token", 

63 "keyfile_dict", 

64 "service_account", 

65 } 

66) 

67"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive""" 

68 

69SECRETS_TO_SKIP_MASKING = {"airflow"} 

70"""Common terms that should be excluded from masking in both production and tests""" 

71 

72 

73def should_hide_value_for_key(name): 

74 """ 

75 Return if the value for this given name should be hidden. 

76 

77 Name might be a Variable name, or key in conn.extra_dejson, for example. 

78 """ 

79 return _secrets_masker().should_hide_value_for_key(name) 

80 

81 

82def mask_secret(secret: JsonValue, name: str | None = None) -> None: 

83 """ 

84 Mask a secret from appearing in the logs. 

85 

86 If ``name`` is provided, then it will only be masked if the name matches one of the configured "sensitive" 

87 names. 

88 

89 If ``secret`` is a dict or a iterable (excluding str) then it will be recursively walked and keys with 

90 sensitive names will be hidden. 

91 

92 If the secret value is too short (by default 5 characters or fewer, configurable via the 

93 :ref:`[logging] min_length_masked_secret <config:logging__min_length_masked_secret>` setting) it will not 

94 be masked 

95 """ 

96 if not secret: 

97 return 

98 

99 _secrets_masker().add_mask(secret, name) 

100 

101 

102def redact( 

103 value: Redactable, name: str | None = None, max_depth: int | None = None, replacement: str = "***" 

104) -> Redacted: 

105 """Redact any secrets found in ``value`` with the given replacement.""" 

106 return _secrets_masker().redact(value, name, max_depth, replacement=replacement) 

107 

108 

109@overload 

110def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ... 

111 

112 

113@overload 

114def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ... 

115 

116 

117def merge( 

118 new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None 

119) -> Redacted: 

120 """ 

121 Merge a redacted value with its original unredacted counterpart. 

122 

123 Takes a user-modified redacted value and merges it with the original unredacted value. 

124 For sensitive fields that still contain "***" (unchanged), the original value is restored. 

125 For fields that have been updated by the user, the new value is preserved. 

126 """ 

127 return _secrets_masker().merge(new_value, old_value, name, max_depth) 

128 

129 

130_global_secrets_masker: SecretsMasker | None = None 

131 

132 

133def _secrets_masker() -> SecretsMasker: 

134 """ 

135 Get or create the module-level secrets masker instance. 

136 

137 This function implements a module level singleton pattern within this specific 

138 module. Note that different import paths (e.g., airflow._shared vs 

139 airflow.sdk._shared) will have separate global variables and thus separate 

140 masker instances. 

141 """ 

142 global _global_secrets_masker 

143 if _global_secrets_masker is None: 

144 _global_secrets_masker = SecretsMasker() 

145 return _global_secrets_masker 

146 

147 

148def reset_secrets_masker() -> None: 

149 """ 

150 Reset the secrets masker to clear existing patterns and replacer. 

151 

152 This utility ensures that an execution environment starts with a fresh masker, 

153 preventing any carry over of patterns or replacer from previous execution or parent processes. 

154 

155 New processor types should invoke this method when setting up their own masking to avoid 

156 inheriting masking rules from existing execution environments. 

157 """ 

158 _secrets_masker().reset_masker() 

159 

160 

161def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]: 

162 """Check if object is V1EnvVar, avoiding unnecessary imports.""" 

163 # Quick check: if k8s not imported, can't be a V1EnvVar instance 

164 if "kubernetes.client" not in sys.modules: 

165 return False 

166 

167 # K8s is loaded, safe to get/cache the type 

168 v1_type = _get_v1_env_var_type_cached() 

169 return isinstance(v, v1_type) 

170 

171 

172@cache 

173def _get_v1_env_var_type_cached() -> type: 

174 """Get V1EnvVar type (cached, only called when k8s is already loaded).""" 

175 try: 

176 from kubernetes.client import V1EnvVar 

177 

178 return V1EnvVar 

179 except ImportError: 

180 # Shouldn't happen since we check sys.modules first 

181 return type("V1EnvVar", (), {}) 

182 

183 

184class SecretsMasker(logging.Filter): 

185 """Redact secrets from logs.""" 

186 

187 replacer: Pattern | None = None 

188 patterns: set[str] 

189 

190 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered" 

191 MAX_RECURSION_DEPTH = 5 

192 _has_warned_short_secret = False 

193 mask_secrets_in_logs = False 

194 

195 min_length_to_mask = 5 

196 secret_mask_adapter = None 

197 

198 def __init__(self): 

199 super().__init__() 

200 self.patterns = set() 

201 self.sensitive_variables_fields = [] 

202 

203 @classmethod 

204 def __init_subclass__(cls, **kwargs): 

205 super().__init_subclass__(**kwargs) 

206 

207 if cls._redact is not SecretsMasker._redact: 

208 sig = inspect.signature(cls._redact) 

209 # Compat for older versions of the OpenLineage plugin which subclasses this -- call the method 

210 # without the replacement character 

211 for param in sig.parameters.values(): 

212 if param.name == "replacement" or param.kind == param.VAR_KEYWORD: 

213 break 

214 else: 

215 # Block only runs if no break above. 

216 

217 f = cls._redact 

218 

219 @functools.wraps(f) 

220 def _redact(*args, replacement: str = "***", **kwargs): 

221 return f(*args, **kwargs) 

222 

223 cls._redact = _redact 

224 ... 

225 

226 @classmethod 

227 def enable_log_masking(cls) -> None: 

228 """Enable secret masking in logs.""" 

229 cls.mask_secrets_in_logs = True 

230 

231 @classmethod 

232 def disable_log_masking(cls) -> None: 

233 """Disable secret masking in logs.""" 

234 cls.mask_secrets_in_logs = False 

235 

236 @classmethod 

237 def is_log_masking_enabled(cls) -> bool: 

238 """Check if secret masking in logs is enabled.""" 

239 return cls.mask_secrets_in_logs 

240 

241 @cached_property 

242 def _record_attrs_to_ignore(self) -> Iterable[str]: 

243 # Doing log.info(..., extra={'foo': 2}) sets extra properties on 

244 # record, i.e. record.foo. And we need to filter those too. Fun 

245 # 

246 # Create a record, and look at what attributes are on it, and ignore 

247 # all the default ones! 

248 

249 record = logging.getLogRecordFactory()( 

250 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, 

251 "x", 

252 logging.INFO, 

253 __file__, 

254 1, 

255 "", 

256 (), 

257 exc_info=None, 

258 func="funcname", 

259 ) 

260 return frozenset(record.__dict__).difference({"msg", "args"}) 

261 

262 def _redact_exception_with_context(self, exception): 

263 # Exception class may not be modifiable (e.g. declared by an 

264 # extension module such as JDBC). 

265 with contextlib.suppress(AttributeError): 

266 exception.args = (self.redact(v) for v in exception.args) 

267 if exception.__context__: 

268 self._redact_exception_with_context(exception.__context__) 

269 if exception.__cause__ and exception.__cause__ is not exception.__context__: 

270 self._redact_exception_with_context(exception.__cause__) 

271 

272 def filter(self, record) -> bool: 

273 if not self.is_log_masking_enabled(): 

274 return True 

275 

276 if self.ALREADY_FILTERED_FLAG in record.__dict__: 

277 # Filters are attached to multiple handlers and logs, keep a 

278 # "private" flag that stops us needing to process it more than once 

279 return True 

280 

281 if self.replacer: 

282 for k, v in record.__dict__.items(): 

283 if k not in self._record_attrs_to_ignore: 

284 record.__dict__[k] = self.redact(v) 

285 if record.exc_info and record.exc_info[1] is not None: 

286 exc = record.exc_info[1] 

287 self._redact_exception_with_context(exc) 

288 record.__dict__[self.ALREADY_FILTERED_FLAG] = True 

289 

290 return True 

291 

292 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called 

293 # this function directly. New versions of that provider, and this class itself call it with a value 

294 def _redact_all( 

295 self, 

296 item: Redactable, 

297 depth: int, 

298 max_depth: int = MAX_RECURSION_DEPTH, 

299 *, 

300 replacement: str = "***", 

301 ) -> Redacted: 

302 if depth > max_depth or isinstance(item, str): 

303 return replacement 

304 if isinstance(item, dict): 

305 return { 

306 dict_key: self._redact_all(subval, depth + 1, max_depth, replacement=replacement) 

307 for dict_key, subval in item.items() 

308 } 

309 if isinstance(item, (tuple, set)): 

310 # Turn set in to tuple! 

311 return tuple( 

312 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item 

313 ) 

314 if isinstance(item, list): 

315 return list( 

316 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item 

317 ) 

318 return item 

319 

320 def _redact( 

321 self, item: Redactable, name: str | None, depth: int, max_depth: int, replacement: str = "***" 

322 ) -> Redacted: 

323 # Avoid spending too much effort on redacting on deeply nested 

324 # structures. This also avoid infinite recursion if a structure has 

325 # reference to self. 

326 if depth > max_depth: 

327 return item 

328 try: 

329 if name and self.should_hide_value_for_key(name): 

330 return self._redact_all(item, depth, max_depth, replacement=replacement) 

331 if isinstance(item, dict): 

332 to_return = { 

333 dict_key: self._redact( 

334 subval, name=dict_key, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

335 ) 

336 for dict_key, subval in item.items() 

337 } 

338 return to_return 

339 if isinstance(item, Enum): 

340 return self._redact( 

341 item=item.value, name=name, depth=depth, max_depth=max_depth, replacement=replacement 

342 ) 

343 if _is_v1_env_var(item): 

344 tmp = item.to_dict() 

345 if self.should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp: 

346 tmp["value"] = replacement 

347 else: 

348 return self._redact( 

349 item=tmp, name=name, depth=depth, max_depth=max_depth, replacement=replacement 

350 ) 

351 return tmp 

352 if isinstance(item, str): 

353 if self.replacer: 

354 # We can't replace specific values, but the key-based redacting 

355 # can still happen, so we can't short-circuit, we need to walk 

356 # the structure. 

357 return self.replacer.sub(replacement, str(item)) 

358 return item 

359 if isinstance(item, (tuple, set)): 

360 # Turn set in to tuple! 

361 return tuple( 

362 self._redact( 

363 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

364 ) 

365 for subval in item 

366 ) 

367 if isinstance(item, list): 

368 return [ 

369 self._redact( 

370 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

371 ) 

372 for subval in item 

373 ] 

374 return item 

375 # I think this should never happen, but it does not hurt to leave it just in case 

376 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373) 

377 # but it caused infinite recursion, to avoid this we mark the log as already filtered. 

378 except Exception as exc: 

379 log.warning( 

380 "Unable to redact value of type %s, please report this via " 

381 "<https://github.com/apache/airflow/issues>. Error was: %s: %s", 

382 type(item), 

383 type(exc).__name__, 

384 exc, 

385 extra={self.ALREADY_FILTERED_FLAG: True}, 

386 ) 

387 # Rather than expose sensitive info, lets play it safe 

388 return "<redaction-failed>" 

389 

390 def _merge( 

391 self, 

392 new_item: Redacted, 

393 old_item: Redactable, 

394 *, 

395 name: str | None, 

396 depth: int, 

397 max_depth: int, 

398 force_sensitive: bool = False, 

399 replacement: str, 

400 ) -> Redacted: 

401 """Merge a redacted item with its original unredacted counterpart.""" 

402 if depth > max_depth: 

403 if isinstance(new_item, str) and new_item == "***": 

404 return old_item 

405 return new_item 

406 

407 try: 

408 # Determine if we should treat this as sensitive 

409 is_sensitive = force_sensitive or (name is not None and self.should_hide_value_for_key(name)) 

410 

411 if isinstance(new_item, dict) and isinstance(old_item, dict): 

412 merged = {} 

413 for key in new_item.keys(): 

414 if key in old_item: 

415 # For dicts, pass the key as name unless we're in sensitive mode 

416 child_name = None if is_sensitive else key 

417 merged[key] = self._merge( 

418 new_item[key], 

419 old_item[key], 

420 name=child_name, 

421 depth=depth + 1, 

422 max_depth=max_depth, 

423 force_sensitive=is_sensitive, 

424 replacement=replacement, 

425 ) 

426 else: 

427 merged[key] = new_item[key] 

428 return merged 

429 

430 if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item): 

431 merged_list = [] 

432 for i in range(len(new_item)): 

433 if i < len(old_item): 

434 # In sensitive mode, check if individual item is redacted 

435 if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***": 

436 merged_list.append(old_item[i]) 

437 else: 

438 merged_list.append( 

439 self._merge( 

440 new_item[i], 

441 old_item[i], 

442 name=None, 

443 depth=depth + 1, 

444 max_depth=max_depth, 

445 force_sensitive=is_sensitive, 

446 replacement=replacement, 

447 ) 

448 ) 

449 else: 

450 merged_list.append(new_item[i]) 

451 

452 if isinstance(new_item, list): 

453 return list(merged_list) 

454 return tuple(merged_list) 

455 

456 if isinstance(new_item, set) and isinstance(old_item, set): 

457 # Sets are unordered, we cannot restore original items. 

458 return new_item 

459 

460 if _is_v1_env_var(new_item) and _is_v1_env_var(old_item): 

461 # TODO: Handle Kubernetes V1EnvVar objects if needed 

462 return new_item 

463 

464 if is_sensitive and isinstance(new_item, str) and new_item == "***": 

465 return old_item 

466 return new_item 

467 

468 except (TypeError, AttributeError, ValueError): 

469 return new_item 

470 

471 def redact( 

472 self, 

473 item: Redactable, 

474 name: str | None = None, 

475 max_depth: int | None = None, 

476 replacement: str = "***", 

477 ) -> Redacted: 

478 """ 

479 Redact an any secrets found in ``item``, if it is a string. 

480 

481 If ``name`` is given, and it's a "sensitive" name (see 

482 :func:`should_hide_value_for_key`) then all string values in the item 

483 is redacted. 

484 """ 

485 return self._redact( 

486 item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH, replacement=replacement 

487 ) 

488 

489 def merge( 

490 self, 

491 new_item: Redacted, 

492 old_item: Redactable, 

493 name: str | None = None, 

494 max_depth: int | None = None, 

495 replacement: str = "***", 

496 ) -> Redacted: 

497 """ 

498 Merge a redacted item with its original unredacted counterpart. 

499 

500 Takes a user-modified redacted item and merges it with the original unredacted item. 

501 For sensitive fields that still contain "***" (or whatever the ``replacement`` is specified as), the 

502 original value is restored. For fields that have been updated, the new value is preserved. 

503 """ 

504 return self._merge( 

505 new_item, 

506 old_item, 

507 name=name, 

508 depth=0, 

509 max_depth=max_depth or self.MAX_RECURSION_DEPTH, 

510 force_sensitive=False, 

511 replacement=replacement, 

512 ) 

513 

514 def _adaptations(self, secret: str) -> Generator[str, None, None]: 

515 """Yield the secret along with any adaptations to the secret that should be masked.""" 

516 yield secret 

517 

518 if self.secret_mask_adapter: 

519 # This can return an iterable of secrets to mask OR a single secret as a string 

520 secret_or_secrets = self.secret_mask_adapter(secret) 

521 if not isinstance(secret_or_secrets, str): 

522 # if its not a string, it must be an iterable 

523 yield from secret_or_secrets 

524 else: 

525 yield secret_or_secrets 

526 

527 def should_hide_value_for_key(self, name): 

528 """ 

529 Return if the value for this given name should be hidden. 

530 

531 Name might be a Variable name, or key in conn.extra_dejson, for example. 

532 """ 

533 from airflow import settings 

534 

535 if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS: 

536 name = name.strip().lower() 

537 return any(s in name for s in self.sensitive_variables_fields) 

538 return False 

539 

540 def add_mask(self, secret: JsonValue, name: str | None = None): 

541 """Add a new secret to be masked to this filter instance.""" 

542 if isinstance(secret, dict): 

543 for k, v in secret.items(): 

544 self.add_mask(v, k) 

545 elif isinstance(secret, str): 

546 if not secret: 

547 return 

548 

549 if secret.lower() in SECRETS_TO_SKIP_MASKING: 

550 return 

551 

552 min_length = self.min_length_to_mask 

553 if len(secret) < min_length: 

554 if not SecretsMasker._has_warned_short_secret: 

555 log.warning( 

556 "Skipping masking for a secret as it's too short (<%d chars)", 

557 min_length, 

558 extra={self.ALREADY_FILTERED_FLAG: True}, 

559 ) 

560 SecretsMasker._has_warned_short_secret = True 

561 return 

562 

563 new_mask = False 

564 for s in self._adaptations(secret): 

565 if s: 

566 if len(s) < min_length: 

567 continue 

568 

569 if s.lower() in SECRETS_TO_SKIP_MASKING: 

570 continue 

571 

572 pattern = re.escape(s) 

573 if pattern not in self.patterns and (not name or self.should_hide_value_for_key(name)): 

574 self.patterns.add(pattern) 

575 new_mask = True 

576 if new_mask: 

577 self.replacer = re.compile("|".join(self.patterns)) 

578 

579 elif isinstance(secret, collections.abc.Iterable): 

580 for v in secret: 

581 self.add_mask(v, name) 

582 

583 def reset_masker(self): 

584 """Reset the patterns and the replacer in the masker instance.""" 

585 self.patterns = set() 

586 self.replacer = None 

587 

588 

589class RedactedIO(TextIO): 

590 """ 

591 IO class that redacts values going into stdout. 

592 

593 Expected usage:: 

594 

595 with contextlib.redirect_stdout(RedactedIO()): 

596 ... # Writes to stdout will be redacted. 

597 """ 

598 

599 def __init__(self): 

600 self.target = sys.stdout 

601 

602 def __enter__(self) -> TextIO: 

603 return self.target.__enter__() 

604 

605 def __exit__(self, t, v, b) -> None: 

606 return self.target.__exit__(t, v, b) 

607 

608 def __iter__(self) -> Iterator[str]: 

609 return iter(self.target) 

610 

611 def __next__(self) -> str: 

612 return next(self.target) 

613 

614 def close(self) -> None: 

615 return self.target.close() 

616 

617 def fileno(self) -> int: 

618 return self.target.fileno() 

619 

620 def flush(self) -> None: 

621 return self.target.flush() 

622 

623 def isatty(self) -> bool: 

624 return self.target.isatty() 

625 

626 def read(self, n: int = -1) -> str: 

627 return self.target.read(n) 

628 

629 def readable(self) -> bool: 

630 return self.target.readable() 

631 

632 def readline(self, n: int = -1) -> str: 

633 return self.target.readline(n) 

634 

635 def readlines(self, n: int = -1) -> list[str]: 

636 return self.target.readlines(n) 

637 

638 def seek(self, offset: int, whence: int = 0) -> int: 

639 return self.target.seek(offset, whence) 

640 

641 def seekable(self) -> bool: 

642 return self.target.seekable() 

643 

644 def tell(self) -> int: 

645 return self.target.tell() 

646 

647 def truncate(self, s: int | None = None) -> int: 

648 return self.target.truncate(s) 

649 

650 def writable(self) -> bool: 

651 return self.target.writable() 

652 

653 def write(self, s: str) -> int: 

654 s = str(redact(s)) 

655 return self.target.write(s) 

656 

657 def writelines(self, lines) -> None: 

658 self.target.writelines(lines)