Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/secrets_masker/secrets_masker.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

295 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""Mask sensitive information from logs.""" 

18 

19from __future__ import annotations 

20 

21import collections.abc 

22import contextlib 

23import functools 

24import inspect 

25import logging 

26import re 

27import sys 

28from collections.abc import Generator, Iterable, Iterator 

29from enum import Enum 

30from functools import cache, cached_property 

31from re import Pattern 

32from typing import TYPE_CHECKING, Any, Protocol, TextIO, TypeAlias, TypeVar, overload 

33 

34# We have to import this here, as it is used in the type annotations at runtime even if it seems it is 

35# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields. 

36from pydantic import JsonValue # noqa: TC002 

37 

38if TYPE_CHECKING: 

39 from typing import TypeGuard 

40 

41 class _V1EnvVarLike(Protocol): 

42 def to_dict(self) -> dict[str, Any]: ... 

43 

44 

45V1EnvVar = TypeVar("V1EnvVar") 

46Redactable: TypeAlias = str | V1EnvVar | dict[Any, Any] | tuple[Any, ...] | list[Any] 

47Redacted: TypeAlias = Redactable | str 

48 

49log = logging.getLogger(__name__) 

50 

51DEFAULT_SENSITIVE_FIELDS = frozenset( 

52 { 

53 "access_key", 

54 "access_token", 

55 "api_key", 

56 "apikey", 

57 "authorization", 

58 "connection_string", 

59 "passphrase", 

60 "passwd", 

61 "password", 

62 "private_key", 

63 "proxy", 

64 "proxy_password", 

65 "proxies", 

66 "secret", 

67 "token", 

68 "keyfile_dict", 

69 "service_account", 

70 } 

71) 

72"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive""" 

73 

74SECRETS_TO_SKIP_MASKING = {"airflow"} 

75"""Common terms that should be excluded from masking in both production and tests""" 

76 

77 

78def should_hide_value_for_key(name): 

79 """ 

80 Return if the value for this given name should be hidden. 

81 

82 Name might be a Variable name, or key in conn.extra_dejson, for example. 

83 """ 

84 return _secrets_masker().should_hide_value_for_key(name) 

85 

86 

87def mask_secret(secret: JsonValue, name: str | None = None) -> None: 

88 """ 

89 Mask a secret from appearing in the logs. 

90 

91 If ``name`` is provided, then it will only be masked if the name matches one of the configured "sensitive" 

92 names. 

93 

94 If ``secret`` is a dict or a iterable (excluding str) then it will be recursively walked and keys with 

95 sensitive names will be hidden. 

96 

97 If the secret value is too short (by default 5 characters or fewer, configurable via the 

98 :ref:`[logging] min_length_masked_secret <config:logging__min_length_masked_secret>` setting) it will not 

99 be masked 

100 """ 

101 if not secret: 

102 return 

103 

104 _secrets_masker().add_mask(secret, name) 

105 

106 

107def redact( 

108 value: Redactable, name: str | None = None, max_depth: int | None = None, replacement: str = "***" 

109) -> Redacted: 

110 """Redact any secrets found in ``value`` with the given replacement.""" 

111 return _secrets_masker().redact(value, name, max_depth, replacement=replacement) 

112 

113 

114@overload 

115def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ... 

116 

117 

118@overload 

119def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ... 

120 

121 

122def merge( 

123 new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None 

124) -> Redacted: 

125 """ 

126 Merge a redacted value with its original unredacted counterpart. 

127 

128 Takes a user-modified redacted value and merges it with the original unredacted value. 

129 For sensitive fields that still contain "***" (unchanged), the original value is restored. 

130 For fields that have been updated by the user, the new value is preserved. 

131 """ 

132 return _secrets_masker().merge(new_value, old_value, name, max_depth) 

133 

134 

135@cache 

136def _secrets_masker() -> SecretsMasker: 

137 """ 

138 Get or create the module-level secrets masker instance. 

139 

140 This function implements a module level singleton pattern within this specific 

141 module. Note that different import paths (e.g., airflow._shared vs 

142 airflow.sdk._shared) will have separate global variables and thus separate 

143 masker instances. 

144 """ 

145 return SecretsMasker() 

146 

147 

148def reset_secrets_masker() -> None: 

149 """ 

150 Reset the secrets masker to clear existing patterns and replacer. 

151 

152 This utility ensures that an execution environment starts with a fresh masker, 

153 preventing any carry over of patterns or replacer from previous execution or parent processes. 

154 

155 New processor types should invoke this method when setting up their own masking to avoid 

156 inheriting masking rules from existing execution environments. 

157 """ 

158 _secrets_masker().reset_masker() 

159 

160 

161def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]: 

162 """Check if object is V1EnvVar, avoiding unnecessary imports.""" 

163 # Quick check: if k8s not imported, can't be a V1EnvVar instance 

164 if "kubernetes.client" not in sys.modules: 

165 return False 

166 

167 # K8s is loaded, safe to get/cache the type 

168 v1_type = _get_v1_env_var_type_cached() 

169 return isinstance(v, v1_type) 

170 

171 

172@cache 

173def _get_v1_env_var_type_cached() -> type: 

174 """Get V1EnvVar type (cached, only called when k8s is already loaded).""" 

175 try: 

176 from kubernetes.client import V1EnvVar 

177 

178 return V1EnvVar 

179 except ImportError: 

180 # Shouldn't happen since we check sys.modules first 

181 return type("V1EnvVar", (), {}) 

182 

183 

184class SecretsMasker(logging.Filter): 

185 """Redact secrets from logs.""" 

186 

187 replacer: Pattern | None = None 

188 patterns: set[str] 

189 

190 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered" 

191 MAX_RECURSION_DEPTH = 5 

192 _has_warned_short_secret = False 

193 mask_secrets_in_logs = False 

194 

195 min_length_to_mask = 5 

196 secret_mask_adapter = None 

197 

198 def __init__(self): 

199 super().__init__() 

200 self.patterns = set() 

201 self.sensitive_variables_fields = [] 

202 self.hide_sensitive_var_conn_fields = True 

203 

204 @classmethod 

205 def __init_subclass__(cls, **kwargs): 

206 super().__init_subclass__(**kwargs) 

207 

208 if cls._redact is not SecretsMasker._redact: 

209 sig = inspect.signature(cls._redact) 

210 # Compat for older versions of the OpenLineage plugin which subclasses this -- call the method 

211 # without the replacement character 

212 for param in sig.parameters.values(): 

213 if param.name == "replacement" or param.kind == param.VAR_KEYWORD: 

214 break 

215 else: 

216 # Block only runs if no break above. 

217 

218 f = cls._redact 

219 

220 @functools.wraps(f) 

221 def _redact(*args, replacement: str = "***", **kwargs): 

222 return f(*args, **kwargs) 

223 

224 cls._redact = _redact 

225 ... 

226 

227 @classmethod 

228 def enable_log_masking(cls) -> None: 

229 """Enable secret masking in logs.""" 

230 cls.mask_secrets_in_logs = True 

231 

232 @classmethod 

233 def disable_log_masking(cls) -> None: 

234 """Disable secret masking in logs.""" 

235 cls.mask_secrets_in_logs = False 

236 

237 @classmethod 

238 def is_log_masking_enabled(cls) -> bool: 

239 """Check if secret masking in logs is enabled.""" 

240 return cls.mask_secrets_in_logs 

241 

242 @cached_property 

243 def _record_attrs_to_ignore(self) -> Iterable[str]: 

244 # Doing log.info(..., extra={'foo': 2}) sets extra properties on 

245 # record, i.e. record.foo. And we need to filter those too. Fun 

246 # 

247 # Create a record, and look at what attributes are on it, and ignore 

248 # all the default ones! 

249 

250 record = logging.getLogRecordFactory()( 

251 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, 

252 "x", 

253 logging.INFO, 

254 __file__, 

255 1, 

256 "", 

257 (), 

258 exc_info=None, 

259 func="funcname", 

260 ) 

261 return frozenset(record.__dict__).difference({"msg", "args"}) 

262 

263 def _redact_exception_with_context_or_cause(self, exception, visited=None): 

264 # Exception class may not be modifiable (e.g. declared by an 

265 # extension module such as JDBC). 

266 with contextlib.suppress(AttributeError): 

267 if visited is None: 

268 visited = set() 

269 

270 if id(exception) in visited: 

271 # already visited - it was redacted earlier 

272 return exception 

273 

274 # Check depth before adding to visited to ensure we skip exceptions beyond the limit 

275 if len(visited) >= self.MAX_RECURSION_DEPTH: 

276 return RuntimeError( 

277 f"Stack trace redaction hit recursion limit of {self.MAX_RECURSION_DEPTH} " 

278 f"when processing exception of type {type(exception).__name__}. " 

279 f"The remaining exceptions will be skipped to avoid " 

280 f"infinite recursion and protect against revealing sensitive information." 

281 ) 

282 

283 visited.add(id(exception)) 

284 

285 exception.args = tuple(self.redact(v) for v in exception.args) 

286 if exception.__context__: 

287 exception.__context__ = self._redact_exception_with_context_or_cause( 

288 exception.__context__, visited 

289 ) 

290 if exception.__cause__ and exception.__cause__ is not exception.__context__: 

291 exception.__cause__ = self._redact_exception_with_context_or_cause( 

292 exception.__cause__, visited 

293 ) 

294 return exception 

295 

296 def filter(self, record) -> bool: 

297 if not self.is_log_masking_enabled(): 

298 return True 

299 

300 if self.ALREADY_FILTERED_FLAG in record.__dict__: 

301 # Filters are attached to multiple handlers and logs, keep a 

302 # "private" flag that stops us needing to process it more than once 

303 return True 

304 

305 if self.replacer: 

306 for k, v in record.__dict__.items(): 

307 if k not in self._record_attrs_to_ignore: 

308 record.__dict__[k] = self.redact(v) 

309 if record.exc_info and record.exc_info[1] is not None: 

310 exc = record.exc_info[1] 

311 self._redact_exception_with_context_or_cause(exc) 

312 record.__dict__[self.ALREADY_FILTERED_FLAG] = True 

313 

314 return True 

315 

316 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called 

317 # this function directly. New versions of that provider, and this class itself call it with a value 

318 def _redact_all( 

319 self, 

320 item: Redactable, 

321 depth: int, 

322 max_depth: int = MAX_RECURSION_DEPTH, 

323 *, 

324 replacement: str = "***", 

325 ) -> Redacted: 

326 if depth > max_depth or isinstance(item, str): 

327 return replacement 

328 if isinstance(item, dict): 

329 return { 

330 dict_key: self._redact_all(subval, depth + 1, max_depth, replacement=replacement) 

331 for dict_key, subval in item.items() 

332 } 

333 if isinstance(item, (tuple, set)): 

334 # Turn set in to tuple! 

335 return tuple( 

336 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item 

337 ) 

338 if isinstance(item, list): 

339 return list( 

340 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item 

341 ) 

342 return item 

343 

344 def _redact( 

345 self, item: Redactable, name: str | None, depth: int, max_depth: int, replacement: str = "***" 

346 ) -> Redacted: 

347 # Avoid spending too much effort on redacting on deeply nested 

348 # structures. This also avoid infinite recursion if a structure has 

349 # reference to self. 

350 if depth > max_depth: 

351 return item 

352 try: 

353 if name and self.should_hide_value_for_key(name): 

354 return self._redact_all(item, depth, max_depth, replacement=replacement) 

355 if isinstance(item, dict): 

356 to_return = { 

357 dict_key: self._redact( 

358 subval, name=dict_key, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

359 ) 

360 for dict_key, subval in item.items() 

361 } 

362 return to_return 

363 if isinstance(item, Enum): 

364 return self._redact( 

365 item=item.value, name=name, depth=depth, max_depth=max_depth, replacement=replacement 

366 ) 

367 if _is_v1_env_var(item): 

368 tmp = item.to_dict() 

369 if self.should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp: 

370 tmp["value"] = replacement 

371 else: 

372 return self._redact( 

373 item=tmp, name=name, depth=depth, max_depth=max_depth, replacement=replacement 

374 ) 

375 return tmp 

376 if isinstance(item, str): 

377 if self.replacer: 

378 # We can't replace specific values, but the key-based redacting 

379 # can still happen, so we can't short-circuit, we need to walk 

380 # the structure. 

381 return self.replacer.sub(replacement, str(item)) 

382 return item 

383 if isinstance(item, (tuple, set)): 

384 # Turn set in to tuple! 

385 return tuple( 

386 self._redact( 

387 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

388 ) 

389 for subval in item 

390 ) 

391 if isinstance(item, list): 

392 return [ 

393 self._redact( 

394 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

395 ) 

396 for subval in item 

397 ] 

398 return item 

399 # I think this should never happen, but it does not hurt to leave it just in case 

400 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373) 

401 # but it caused infinite recursion, to avoid this we mark the log as already filtered. 

402 except Exception as exc: 

403 log.warning( 

404 "Unable to redact value of type %s, please report this via " 

405 "<https://github.com/apache/airflow/issues>. Error was: %s: %s", 

406 type(item), 

407 type(exc).__name__, 

408 exc, 

409 extra={self.ALREADY_FILTERED_FLAG: True}, 

410 ) 

411 # Rather than expose sensitive info, lets play it safe 

412 return "<redaction-failed>" 

413 

414 def _merge( 

415 self, 

416 new_item: Redacted, 

417 old_item: Redactable, 

418 *, 

419 name: str | None, 

420 depth: int, 

421 max_depth: int, 

422 force_sensitive: bool = False, 

423 replacement: str, 

424 ) -> Redacted: 

425 """Merge a redacted item with its original unredacted counterpart.""" 

426 if depth > max_depth: 

427 if isinstance(new_item, str) and new_item == "***": 

428 return old_item 

429 return new_item 

430 

431 try: 

432 # Determine if we should treat this as sensitive 

433 is_sensitive = force_sensitive or (name is not None and self.should_hide_value_for_key(name)) 

434 

435 if isinstance(new_item, dict) and isinstance(old_item, dict): 

436 merged = {} 

437 for key in new_item.keys(): 

438 if key in old_item: 

439 # For dicts, pass the key as name unless we're in sensitive mode 

440 child_name = None if is_sensitive else key 

441 merged[key] = self._merge( 

442 new_item[key], 

443 old_item[key], 

444 name=child_name, 

445 depth=depth + 1, 

446 max_depth=max_depth, 

447 force_sensitive=is_sensitive, 

448 replacement=replacement, 

449 ) 

450 else: 

451 merged[key] = new_item[key] 

452 return merged 

453 

454 if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item): 

455 merged_list = [] 

456 for i in range(len(new_item)): 

457 if i < len(old_item): 

458 # In sensitive mode, check if individual item is redacted 

459 if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***": 

460 merged_list.append(old_item[i]) 

461 else: 

462 merged_list.append( 

463 self._merge( 

464 new_item[i], 

465 old_item[i], 

466 name=None, 

467 depth=depth + 1, 

468 max_depth=max_depth, 

469 force_sensitive=is_sensitive, 

470 replacement=replacement, 

471 ) 

472 ) 

473 else: 

474 merged_list.append(new_item[i]) 

475 

476 if isinstance(new_item, list): 

477 return list(merged_list) 

478 return tuple(merged_list) 

479 

480 if isinstance(new_item, set) and isinstance(old_item, set): 

481 # Sets are unordered, we cannot restore original items. 

482 return new_item 

483 

484 if _is_v1_env_var(new_item) and _is_v1_env_var(old_item): 

485 # TODO: Handle Kubernetes V1EnvVar objects if needed 

486 return new_item 

487 

488 if is_sensitive and isinstance(new_item, str) and new_item == "***": 

489 return old_item 

490 return new_item 

491 

492 except (TypeError, AttributeError, ValueError): 

493 return new_item 

494 

495 def redact( 

496 self, 

497 item: Redactable, 

498 name: str | None = None, 

499 max_depth: int | None = None, 

500 replacement: str = "***", 

501 ) -> Redacted: 

502 """ 

503 Redact an any secrets found in ``item``, if it is a string. 

504 

505 If ``name`` is given, and it's a "sensitive" name (see 

506 :func:`should_hide_value_for_key`) then all string values in the item 

507 is redacted. 

508 """ 

509 return self._redact( 

510 item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH, replacement=replacement 

511 ) 

512 

513 def merge( 

514 self, 

515 new_item: Redacted, 

516 old_item: Redactable, 

517 name: str | None = None, 

518 max_depth: int | None = None, 

519 replacement: str = "***", 

520 ) -> Redacted: 

521 """ 

522 Merge a redacted item with its original unredacted counterpart. 

523 

524 Takes a user-modified redacted item and merges it with the original unredacted item. 

525 For sensitive fields that still contain "***" (or whatever the ``replacement`` is specified as), the 

526 original value is restored. For fields that have been updated, the new value is preserved. 

527 """ 

528 return self._merge( 

529 new_item, 

530 old_item, 

531 name=name, 

532 depth=0, 

533 max_depth=max_depth or self.MAX_RECURSION_DEPTH, 

534 force_sensitive=False, 

535 replacement=replacement, 

536 ) 

537 

538 def _adaptations(self, secret: str) -> Generator[str, None, None]: 

539 """Yield the secret along with any adaptations to the secret that should be masked.""" 

540 yield secret 

541 

542 if self.secret_mask_adapter: 

543 # This can return an iterable of secrets to mask OR a single secret as a string 

544 secret_or_secrets = self.secret_mask_adapter(secret) 

545 if not isinstance(secret_or_secrets, str): 

546 # if its not a string, it must be an iterable 

547 yield from secret_or_secrets 

548 else: 

549 yield secret_or_secrets 

550 

551 def should_hide_value_for_key(self, name): 

552 """ 

553 Return if the value for this given name should be hidden. 

554 

555 Name might be a Variable name, or key in conn.extra_dejson, for example. 

556 """ 

557 if isinstance(name, str) and self.hide_sensitive_var_conn_fields: 

558 name = name.strip().lower() 

559 return any(s in name for s in self.sensitive_variables_fields) 

560 return False 

561 

562 def add_mask(self, secret: JsonValue, name: str | None = None): 

563 """Add a new secret to be masked to this filter instance.""" 

564 if isinstance(secret, dict): 

565 for k, v in secret.items(): 

566 self.add_mask(v, k) 

567 elif isinstance(secret, str): 

568 if not secret: 

569 return 

570 

571 if secret.lower() in SECRETS_TO_SKIP_MASKING: 

572 return 

573 

574 min_length = self.min_length_to_mask 

575 if len(secret) < min_length: 

576 if not SecretsMasker._has_warned_short_secret: 

577 log.warning( 

578 "Skipping masking for a secret as it's too short (<%d chars)", 

579 min_length, 

580 extra={self.ALREADY_FILTERED_FLAG: True}, 

581 ) 

582 SecretsMasker._has_warned_short_secret = True 

583 return 

584 

585 new_mask = False 

586 for s in self._adaptations(secret): 

587 if s: 

588 if len(s) < min_length: 

589 continue 

590 

591 if s.lower() in SECRETS_TO_SKIP_MASKING: 

592 continue 

593 

594 pattern = re.escape(s) 

595 if pattern not in self.patterns and (not name or self.should_hide_value_for_key(name)): 

596 self.patterns.add(pattern) 

597 new_mask = True 

598 if new_mask: 

599 self.replacer = re.compile("|".join(self.patterns)) 

600 

601 elif isinstance(secret, collections.abc.Iterable): 

602 for v in secret: 

603 self.add_mask(v, name) 

604 

605 def reset_masker(self): 

606 """Reset the patterns and the replacer in the masker instance.""" 

607 self.patterns = set() 

608 self.replacer = None 

609 

610 

611class RedactedIO(TextIO): 

612 """ 

613 IO class that redacts values going into stdout. 

614 

615 Expected usage:: 

616 

617 with contextlib.redirect_stdout(RedactedIO()): 

618 ... # Writes to stdout will be redacted. 

619 """ 

620 

621 def __init__(self): 

622 self.target = sys.stdout 

623 

624 def __enter__(self) -> TextIO: 

625 return self.target.__enter__() 

626 

627 def __exit__(self, t, v, b) -> None: 

628 return self.target.__exit__(t, v, b) 

629 

630 def __iter__(self) -> Iterator[str]: 

631 return iter(self.target) 

632 

633 def __next__(self) -> str: 

634 return next(self.target) 

635 

636 def close(self) -> None: 

637 return self.target.close() 

638 

639 def fileno(self) -> int: 

640 return self.target.fileno() 

641 

642 def flush(self) -> None: 

643 return self.target.flush() 

644 

645 def isatty(self) -> bool: 

646 return self.target.isatty() 

647 

648 def read(self, n: int = -1) -> str: 

649 return self.target.read(n) 

650 

651 def readable(self) -> bool: 

652 return self.target.readable() 

653 

654 def readline(self, n: int = -1) -> str: 

655 return self.target.readline(n) 

656 

657 def readlines(self, n: int = -1) -> list[str]: 

658 return self.target.readlines(n) 

659 

660 def seek(self, offset: int, whence: int = 0) -> int: 

661 return self.target.seek(offset, whence) 

662 

663 def seekable(self) -> bool: 

664 return self.target.seekable() 

665 

666 def tell(self) -> int: 

667 return self.target.tell() 

668 

669 def truncate(self, s: int | None = None) -> int: 

670 return self.target.truncate(s) 

671 

672 def writable(self) -> bool: 

673 return self.target.writable() 

674 

675 def write(self, s: str) -> int: 

676 s = str(redact(s)) 

677 return self.target.write(s) 

678 

679 def writelines(self, lines) -> None: 

680 self.target.writelines(lines)