Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/airflow/_shared/secrets_masker/secrets_masker.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

287 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""Mask sensitive information from logs.""" 

18 

19from __future__ import annotations 

20 

21import collections.abc 

22import contextlib 

23import functools 

24import inspect 

25import logging 

26import re 

27import sys 

28from collections.abc import Generator, Iterable, Iterator 

29from enum import Enum 

30from functools import cache, cached_property 

31from re import Pattern 

32from typing import TYPE_CHECKING, Any, Protocol, TextIO, TypeAlias, TypeVar, overload 

33 

34# We have to import this here, as it is used in the type annotations at runtime even if it seems it is 

35# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields. 

36from pydantic import JsonValue # noqa: TC002 

37 

38if TYPE_CHECKING: 

39 from typing import TypeGuard 

40 

41 class _V1EnvVarLike(Protocol): 

42 def to_dict(self) -> dict[str, Any]: ... 

43 

44 

45V1EnvVar = TypeVar("V1EnvVar") 

46Redactable: TypeAlias = str | V1EnvVar | dict[Any, Any] | tuple[Any, ...] | list[Any] 

47Redacted: TypeAlias = Redactable | str 

48 

49log = logging.getLogger(__name__) 

50 

51DEFAULT_SENSITIVE_FIELDS = frozenset( 

52 { 

53 "access_token", 

54 "api_key", 

55 "apikey", 

56 "authorization", 

57 "passphrase", 

58 "passwd", 

59 "password", 

60 "private_key", 

61 "proxy", 

62 "proxies", 

63 "secret", 

64 "token", 

65 "keyfile_dict", 

66 "service_account", 

67 } 

68) 

69"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive""" 

70 

71SECRETS_TO_SKIP_MASKING = {"airflow"} 

72"""Common terms that should be excluded from masking in both production and tests""" 

73 

74 

75def should_hide_value_for_key(name): 

76 """ 

77 Return if the value for this given name should be hidden. 

78 

79 Name might be a Variable name, or key in conn.extra_dejson, for example. 

80 """ 

81 return _secrets_masker().should_hide_value_for_key(name) 

82 

83 

84def mask_secret(secret: JsonValue, name: str | None = None) -> None: 

85 """ 

86 Mask a secret from appearing in the logs. 

87 

88 If ``name`` is provided, then it will only be masked if the name matches one of the configured "sensitive" 

89 names. 

90 

91 If ``secret`` is a dict or a iterable (excluding str) then it will be recursively walked and keys with 

92 sensitive names will be hidden. 

93 

94 If the secret value is too short (by default 5 characters or fewer, configurable via the 

95 :ref:`[logging] min_length_masked_secret <config:logging__min_length_masked_secret>` setting) it will not 

96 be masked 

97 """ 

98 if not secret: 

99 return 

100 

101 _secrets_masker().add_mask(secret, name) 

102 

103 

104def redact( 

105 value: Redactable, name: str | None = None, max_depth: int | None = None, replacement: str = "***" 

106) -> Redacted: 

107 """Redact any secrets found in ``value`` with the given replacement.""" 

108 return _secrets_masker().redact(value, name, max_depth, replacement=replacement) 

109 

110 

111@overload 

112def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ... 

113 

114 

115@overload 

116def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ... 

117 

118 

119def merge( 

120 new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None 

121) -> Redacted: 

122 """ 

123 Merge a redacted value with its original unredacted counterpart. 

124 

125 Takes a user-modified redacted value and merges it with the original unredacted value. 

126 For sensitive fields that still contain "***" (unchanged), the original value is restored. 

127 For fields that have been updated by the user, the new value is preserved. 

128 """ 

129 return _secrets_masker().merge(new_value, old_value, name, max_depth) 

130 

131 

132@cache 

133def _secrets_masker() -> SecretsMasker: 

134 """ 

135 Get or create the module-level secrets masker instance. 

136 

137 This function implements a module level singleton pattern within this specific 

138 module. Note that different import paths (e.g., airflow._shared vs 

139 airflow.sdk._shared) will have separate global variables and thus separate 

140 masker instances. 

141 """ 

142 return SecretsMasker() 

143 

144 

145def reset_secrets_masker() -> None: 

146 """ 

147 Reset the secrets masker to clear existing patterns and replacer. 

148 

149 This utility ensures that an execution environment starts with a fresh masker, 

150 preventing any carry over of patterns or replacer from previous execution or parent processes. 

151 

152 New processor types should invoke this method when setting up their own masking to avoid 

153 inheriting masking rules from existing execution environments. 

154 """ 

155 _secrets_masker().reset_masker() 

156 

157 

158def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]: 

159 """Check if object is V1EnvVar, avoiding unnecessary imports.""" 

160 # Quick check: if k8s not imported, can't be a V1EnvVar instance 

161 if "kubernetes.client" not in sys.modules: 

162 return False 

163 

164 # K8s is loaded, safe to get/cache the type 

165 v1_type = _get_v1_env_var_type_cached() 

166 return isinstance(v, v1_type) 

167 

168 

169@cache 

170def _get_v1_env_var_type_cached() -> type: 

171 """Get V1EnvVar type (cached, only called when k8s is already loaded).""" 

172 try: 

173 from kubernetes.client import V1EnvVar 

174 

175 return V1EnvVar 

176 except ImportError: 

177 # Shouldn't happen since we check sys.modules first 

178 return type("V1EnvVar", (), {}) 

179 

180 

181class SecretsMasker(logging.Filter): 

182 """Redact secrets from logs.""" 

183 

184 replacer: Pattern | None = None 

185 patterns: set[str] 

186 

187 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered" 

188 MAX_RECURSION_DEPTH = 5 

189 _has_warned_short_secret = False 

190 mask_secrets_in_logs = False 

191 

192 min_length_to_mask = 5 

193 secret_mask_adapter = None 

194 

195 def __init__(self): 

196 super().__init__() 

197 self.patterns = set() 

198 self.sensitive_variables_fields = [] 

199 self.hide_sensitive_var_conn_fields = True 

200 

201 @classmethod 

202 def __init_subclass__(cls, **kwargs): 

203 super().__init_subclass__(**kwargs) 

204 

205 if cls._redact is not SecretsMasker._redact: 

206 sig = inspect.signature(cls._redact) 

207 # Compat for older versions of the OpenLineage plugin which subclasses this -- call the method 

208 # without the replacement character 

209 for param in sig.parameters.values(): 

210 if param.name == "replacement" or param.kind == param.VAR_KEYWORD: 

211 break 

212 else: 

213 # Block only runs if no break above. 

214 

215 f = cls._redact 

216 

217 @functools.wraps(f) 

218 def _redact(*args, replacement: str = "***", **kwargs): 

219 return f(*args, **kwargs) 

220 

221 cls._redact = _redact 

222 ... 

223 

224 @classmethod 

225 def enable_log_masking(cls) -> None: 

226 """Enable secret masking in logs.""" 

227 cls.mask_secrets_in_logs = True 

228 

229 @classmethod 

230 def disable_log_masking(cls) -> None: 

231 """Disable secret masking in logs.""" 

232 cls.mask_secrets_in_logs = False 

233 

234 @classmethod 

235 def is_log_masking_enabled(cls) -> bool: 

236 """Check if secret masking in logs is enabled.""" 

237 return cls.mask_secrets_in_logs 

238 

239 @cached_property 

240 def _record_attrs_to_ignore(self) -> Iterable[str]: 

241 # Doing log.info(..., extra={'foo': 2}) sets extra properties on 

242 # record, i.e. record.foo. And we need to filter those too. Fun 

243 # 

244 # Create a record, and look at what attributes are on it, and ignore 

245 # all the default ones! 

246 

247 record = logging.getLogRecordFactory()( 

248 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, 

249 "x", 

250 logging.INFO, 

251 __file__, 

252 1, 

253 "", 

254 (), 

255 exc_info=None, 

256 func="funcname", 

257 ) 

258 return frozenset(record.__dict__).difference({"msg", "args"}) 

259 

260 def _redact_exception_with_context(self, exception): 

261 # Exception class may not be modifiable (e.g. declared by an 

262 # extension module such as JDBC). 

263 with contextlib.suppress(AttributeError): 

264 exception.args = (self.redact(v) for v in exception.args) 

265 if exception.__context__: 

266 self._redact_exception_with_context(exception.__context__) 

267 if exception.__cause__ and exception.__cause__ is not exception.__context__: 

268 self._redact_exception_with_context(exception.__cause__) 

269 

270 def filter(self, record) -> bool: 

271 if not self.is_log_masking_enabled(): 

272 return True 

273 

274 if self.ALREADY_FILTERED_FLAG in record.__dict__: 

275 # Filters are attached to multiple handlers and logs, keep a 

276 # "private" flag that stops us needing to process it more than once 

277 return True 

278 

279 if self.replacer: 

280 for k, v in record.__dict__.items(): 

281 if k not in self._record_attrs_to_ignore: 

282 record.__dict__[k] = self.redact(v) 

283 if record.exc_info and record.exc_info[1] is not None: 

284 exc = record.exc_info[1] 

285 self._redact_exception_with_context(exc) 

286 record.__dict__[self.ALREADY_FILTERED_FLAG] = True 

287 

288 return True 

289 

290 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called 

291 # this function directly. New versions of that provider, and this class itself call it with a value 

292 def _redact_all( 

293 self, 

294 item: Redactable, 

295 depth: int, 

296 max_depth: int = MAX_RECURSION_DEPTH, 

297 *, 

298 replacement: str = "***", 

299 ) -> Redacted: 

300 if depth > max_depth or isinstance(item, str): 

301 return replacement 

302 if isinstance(item, dict): 

303 return { 

304 dict_key: self._redact_all(subval, depth + 1, max_depth, replacement=replacement) 

305 for dict_key, subval in item.items() 

306 } 

307 if isinstance(item, (tuple, set)): 

308 # Turn set in to tuple! 

309 return tuple( 

310 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item 

311 ) 

312 if isinstance(item, list): 

313 return list( 

314 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item 

315 ) 

316 return item 

317 

318 def _redact( 

319 self, item: Redactable, name: str | None, depth: int, max_depth: int, replacement: str = "***" 

320 ) -> Redacted: 

321 # Avoid spending too much effort on redacting on deeply nested 

322 # structures. This also avoid infinite recursion if a structure has 

323 # reference to self. 

324 if depth > max_depth: 

325 return item 

326 try: 

327 if name and self.should_hide_value_for_key(name): 

328 return self._redact_all(item, depth, max_depth, replacement=replacement) 

329 if isinstance(item, dict): 

330 to_return = { 

331 dict_key: self._redact( 

332 subval, name=dict_key, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

333 ) 

334 for dict_key, subval in item.items() 

335 } 

336 return to_return 

337 if isinstance(item, Enum): 

338 return self._redact( 

339 item=item.value, name=name, depth=depth, max_depth=max_depth, replacement=replacement 

340 ) 

341 if _is_v1_env_var(item): 

342 tmp = item.to_dict() 

343 if self.should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp: 

344 tmp["value"] = replacement 

345 else: 

346 return self._redact( 

347 item=tmp, name=name, depth=depth, max_depth=max_depth, replacement=replacement 

348 ) 

349 return tmp 

350 if isinstance(item, str): 

351 if self.replacer: 

352 # We can't replace specific values, but the key-based redacting 

353 # can still happen, so we can't short-circuit, we need to walk 

354 # the structure. 

355 return self.replacer.sub(replacement, str(item)) 

356 return item 

357 if isinstance(item, (tuple, set)): 

358 # Turn set in to tuple! 

359 return tuple( 

360 self._redact( 

361 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

362 ) 

363 for subval in item 

364 ) 

365 if isinstance(item, list): 

366 return [ 

367 self._redact( 

368 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement 

369 ) 

370 for subval in item 

371 ] 

372 return item 

373 # I think this should never happen, but it does not hurt to leave it just in case 

374 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373) 

375 # but it caused infinite recursion, to avoid this we mark the log as already filtered. 

376 except Exception as exc: 

377 log.warning( 

378 "Unable to redact value of type %s, please report this via " 

379 "<https://github.com/apache/airflow/issues>. Error was: %s: %s", 

380 type(item), 

381 type(exc).__name__, 

382 exc, 

383 extra={self.ALREADY_FILTERED_FLAG: True}, 

384 ) 

385 # Rather than expose sensitive info, lets play it safe 

386 return "<redaction-failed>" 

387 

388 def _merge( 

389 self, 

390 new_item: Redacted, 

391 old_item: Redactable, 

392 *, 

393 name: str | None, 

394 depth: int, 

395 max_depth: int, 

396 force_sensitive: bool = False, 

397 replacement: str, 

398 ) -> Redacted: 

399 """Merge a redacted item with its original unredacted counterpart.""" 

400 if depth > max_depth: 

401 if isinstance(new_item, str) and new_item == "***": 

402 return old_item 

403 return new_item 

404 

405 try: 

406 # Determine if we should treat this as sensitive 

407 is_sensitive = force_sensitive or (name is not None and self.should_hide_value_for_key(name)) 

408 

409 if isinstance(new_item, dict) and isinstance(old_item, dict): 

410 merged = {} 

411 for key in new_item.keys(): 

412 if key in old_item: 

413 # For dicts, pass the key as name unless we're in sensitive mode 

414 child_name = None if is_sensitive else key 

415 merged[key] = self._merge( 

416 new_item[key], 

417 old_item[key], 

418 name=child_name, 

419 depth=depth + 1, 

420 max_depth=max_depth, 

421 force_sensitive=is_sensitive, 

422 replacement=replacement, 

423 ) 

424 else: 

425 merged[key] = new_item[key] 

426 return merged 

427 

428 if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item): 

429 merged_list = [] 

430 for i in range(len(new_item)): 

431 if i < len(old_item): 

432 # In sensitive mode, check if individual item is redacted 

433 if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***": 

434 merged_list.append(old_item[i]) 

435 else: 

436 merged_list.append( 

437 self._merge( 

438 new_item[i], 

439 old_item[i], 

440 name=None, 

441 depth=depth + 1, 

442 max_depth=max_depth, 

443 force_sensitive=is_sensitive, 

444 replacement=replacement, 

445 ) 

446 ) 

447 else: 

448 merged_list.append(new_item[i]) 

449 

450 if isinstance(new_item, list): 

451 return list(merged_list) 

452 return tuple(merged_list) 

453 

454 if isinstance(new_item, set) and isinstance(old_item, set): 

455 # Sets are unordered, we cannot restore original items. 

456 return new_item 

457 

458 if _is_v1_env_var(new_item) and _is_v1_env_var(old_item): 

459 # TODO: Handle Kubernetes V1EnvVar objects if needed 

460 return new_item 

461 

462 if is_sensitive and isinstance(new_item, str) and new_item == "***": 

463 return old_item 

464 return new_item 

465 

466 except (TypeError, AttributeError, ValueError): 

467 return new_item 

468 

469 def redact( 

470 self, 

471 item: Redactable, 

472 name: str | None = None, 

473 max_depth: int | None = None, 

474 replacement: str = "***", 

475 ) -> Redacted: 

476 """ 

477 Redact an any secrets found in ``item``, if it is a string. 

478 

479 If ``name`` is given, and it's a "sensitive" name (see 

480 :func:`should_hide_value_for_key`) then all string values in the item 

481 is redacted. 

482 """ 

483 return self._redact( 

484 item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH, replacement=replacement 

485 ) 

486 

487 def merge( 

488 self, 

489 new_item: Redacted, 

490 old_item: Redactable, 

491 name: str | None = None, 

492 max_depth: int | None = None, 

493 replacement: str = "***", 

494 ) -> Redacted: 

495 """ 

496 Merge a redacted item with its original unredacted counterpart. 

497 

498 Takes a user-modified redacted item and merges it with the original unredacted item. 

499 For sensitive fields that still contain "***" (or whatever the ``replacement`` is specified as), the 

500 original value is restored. For fields that have been updated, the new value is preserved. 

501 """ 

502 return self._merge( 

503 new_item, 

504 old_item, 

505 name=name, 

506 depth=0, 

507 max_depth=max_depth or self.MAX_RECURSION_DEPTH, 

508 force_sensitive=False, 

509 replacement=replacement, 

510 ) 

511 

512 def _adaptations(self, secret: str) -> Generator[str, None, None]: 

513 """Yield the secret along with any adaptations to the secret that should be masked.""" 

514 yield secret 

515 

516 if self.secret_mask_adapter: 

517 # This can return an iterable of secrets to mask OR a single secret as a string 

518 secret_or_secrets = self.secret_mask_adapter(secret) 

519 if not isinstance(secret_or_secrets, str): 

520 # if its not a string, it must be an iterable 

521 yield from secret_or_secrets 

522 else: 

523 yield secret_or_secrets 

524 

525 def should_hide_value_for_key(self, name): 

526 """ 

527 Return if the value for this given name should be hidden. 

528 

529 Name might be a Variable name, or key in conn.extra_dejson, for example. 

530 """ 

531 if isinstance(name, str) and self.hide_sensitive_var_conn_fields: 

532 name = name.strip().lower() 

533 return any(s in name for s in self.sensitive_variables_fields) 

534 return False 

535 

536 def add_mask(self, secret: JsonValue, name: str | None = None): 

537 """Add a new secret to be masked to this filter instance.""" 

538 if isinstance(secret, dict): 

539 for k, v in secret.items(): 

540 self.add_mask(v, k) 

541 elif isinstance(secret, str): 

542 if not secret: 

543 return 

544 

545 if secret.lower() in SECRETS_TO_SKIP_MASKING: 

546 return 

547 

548 min_length = self.min_length_to_mask 

549 if len(secret) < min_length: 

550 if not SecretsMasker._has_warned_short_secret: 

551 log.warning( 

552 "Skipping masking for a secret as it's too short (<%d chars)", 

553 min_length, 

554 extra={self.ALREADY_FILTERED_FLAG: True}, 

555 ) 

556 SecretsMasker._has_warned_short_secret = True 

557 return 

558 

559 new_mask = False 

560 for s in self._adaptations(secret): 

561 if s: 

562 if len(s) < min_length: 

563 continue 

564 

565 if s.lower() in SECRETS_TO_SKIP_MASKING: 

566 continue 

567 

568 pattern = re.escape(s) 

569 if pattern not in self.patterns and (not name or self.should_hide_value_for_key(name)): 

570 self.patterns.add(pattern) 

571 new_mask = True 

572 if new_mask: 

573 self.replacer = re.compile("|".join(self.patterns)) 

574 

575 elif isinstance(secret, collections.abc.Iterable): 

576 for v in secret: 

577 self.add_mask(v, name) 

578 

579 def reset_masker(self): 

580 """Reset the patterns and the replacer in the masker instance.""" 

581 self.patterns = set() 

582 self.replacer = None 

583 

584 

585class RedactedIO(TextIO): 

586 """ 

587 IO class that redacts values going into stdout. 

588 

589 Expected usage:: 

590 

591 with contextlib.redirect_stdout(RedactedIO()): 

592 ... # Writes to stdout will be redacted. 

593 """ 

594 

595 def __init__(self): 

596 self.target = sys.stdout 

597 

598 def __enter__(self) -> TextIO: 

599 return self.target.__enter__() 

600 

601 def __exit__(self, t, v, b) -> None: 

602 return self.target.__exit__(t, v, b) 

603 

604 def __iter__(self) -> Iterator[str]: 

605 return iter(self.target) 

606 

607 def __next__(self) -> str: 

608 return next(self.target) 

609 

610 def close(self) -> None: 

611 return self.target.close() 

612 

613 def fileno(self) -> int: 

614 return self.target.fileno() 

615 

616 def flush(self) -> None: 

617 return self.target.flush() 

618 

619 def isatty(self) -> bool: 

620 return self.target.isatty() 

621 

622 def read(self, n: int = -1) -> str: 

623 return self.target.read(n) 

624 

625 def readable(self) -> bool: 

626 return self.target.readable() 

627 

628 def readline(self, n: int = -1) -> str: 

629 return self.target.readline(n) 

630 

631 def readlines(self, n: int = -1) -> list[str]: 

632 return self.target.readlines(n) 

633 

634 def seek(self, offset: int, whence: int = 0) -> int: 

635 return self.target.seek(offset, whence) 

636 

637 def seekable(self) -> bool: 

638 return self.target.seekable() 

639 

640 def tell(self) -> int: 

641 return self.target.tell() 

642 

643 def truncate(self, s: int | None = None) -> int: 

644 return self.target.truncate(s) 

645 

646 def writable(self) -> bool: 

647 return self.target.writable() 

648 

649 def write(self, s: str) -> int: 

650 s = str(redact(s)) 

651 return self.target.write(s) 

652 

653 def writelines(self, lines) -> None: 

654 self.target.writelines(lines)