1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""Mask sensitive information from logs."""
18
19from __future__ import annotations
20
21import collections.abc
22import contextlib
23import functools
24import inspect
25import logging
26import re
27import sys
28from collections.abc import Generator, Iterable, Iterator
29from enum import Enum
30from functools import cache, cached_property
31from re import Pattern
32from typing import TYPE_CHECKING, Any, Protocol, TextIO, TypeAlias, TypeVar, overload
33
34# We have to import this here, as it is used in the type annotations at runtime even if it seems it is
35# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields.
36from pydantic import JsonValue # noqa: TC002
37
38if TYPE_CHECKING:
39 from typing import TypeGuard
40
41 class _V1EnvVarLike(Protocol):
42 def to_dict(self) -> dict[str, Any]: ...
43
44
45V1EnvVar = TypeVar("V1EnvVar")
46Redactable: TypeAlias = str | V1EnvVar | dict[Any, Any] | tuple[Any, ...] | list[Any]
47Redacted: TypeAlias = Redactable | str
48
49log = logging.getLogger(__name__)
50
51DEFAULT_SENSITIVE_FIELDS = frozenset(
52 {
53 "access_token",
54 "api_key",
55 "apikey",
56 "authorization",
57 "passphrase",
58 "passwd",
59 "password",
60 "private_key",
61 "secret",
62 "token",
63 "keyfile_dict",
64 "service_account",
65 }
66)
67"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
68
69SECRETS_TO_SKIP_MASKING = {"airflow"}
70"""Common terms that should be excluded from masking in both production and tests"""
71
72
73def should_hide_value_for_key(name):
74 """
75 Return if the value for this given name should be hidden.
76
77 Name might be a Variable name, or key in conn.extra_dejson, for example.
78 """
79 return _secrets_masker().should_hide_value_for_key(name)
80
81
82def mask_secret(secret: JsonValue, name: str | None = None) -> None:
83 """
84 Mask a secret from appearing in the logs.
85
86 If ``name`` is provided, then it will only be masked if the name matches one of the configured "sensitive"
87 names.
88
89 If ``secret`` is a dict or a iterable (excluding str) then it will be recursively walked and keys with
90 sensitive names will be hidden.
91
92 If the secret value is too short (by default 5 characters or fewer, configurable via the
93 :ref:`[logging] min_length_masked_secret <config:logging__min_length_masked_secret>` setting) it will not
94 be masked
95 """
96 if not secret:
97 return
98
99 _secrets_masker().add_mask(secret, name)
100
101
102def redact(
103 value: Redactable, name: str | None = None, max_depth: int | None = None, replacement: str = "***"
104) -> Redacted:
105 """Redact any secrets found in ``value`` with the given replacement."""
106 return _secrets_masker().redact(value, name, max_depth, replacement=replacement)
107
108
109@overload
110def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ...
111
112
113@overload
114def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ...
115
116
117def merge(
118 new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None
119) -> Redacted:
120 """
121 Merge a redacted value with its original unredacted counterpart.
122
123 Takes a user-modified redacted value and merges it with the original unredacted value.
124 For sensitive fields that still contain "***" (unchanged), the original value is restored.
125 For fields that have been updated by the user, the new value is preserved.
126 """
127 return _secrets_masker().merge(new_value, old_value, name, max_depth)
128
129
130_global_secrets_masker: SecretsMasker | None = None
131
132
133def _secrets_masker() -> SecretsMasker:
134 """
135 Get or create the module-level secrets masker instance.
136
137 This function implements a module level singleton pattern within this specific
138 module. Note that different import paths (e.g., airflow._shared vs
139 airflow.sdk._shared) will have separate global variables and thus separate
140 masker instances.
141 """
142 global _global_secrets_masker
143 if _global_secrets_masker is None:
144 _global_secrets_masker = SecretsMasker()
145 return _global_secrets_masker
146
147
148def reset_secrets_masker() -> None:
149 """
150 Reset the secrets masker to clear existing patterns and replacer.
151
152 This utility ensures that an execution environment starts with a fresh masker,
153 preventing any carry over of patterns or replacer from previous execution or parent processes.
154
155 New processor types should invoke this method when setting up their own masking to avoid
156 inheriting masking rules from existing execution environments.
157 """
158 _secrets_masker().reset_masker()
159
160
161def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]:
162 """Check if object is V1EnvVar, avoiding unnecessary imports."""
163 # Quick check: if k8s not imported, can't be a V1EnvVar instance
164 if "kubernetes.client" not in sys.modules:
165 return False
166
167 # K8s is loaded, safe to get/cache the type
168 v1_type = _get_v1_env_var_type_cached()
169 return isinstance(v, v1_type)
170
171
172@cache
173def _get_v1_env_var_type_cached() -> type:
174 """Get V1EnvVar type (cached, only called when k8s is already loaded)."""
175 try:
176 from kubernetes.client import V1EnvVar
177
178 return V1EnvVar
179 except ImportError:
180 # Shouldn't happen since we check sys.modules first
181 return type("V1EnvVar", (), {})
182
183
184class SecretsMasker(logging.Filter):
185 """Redact secrets from logs."""
186
187 replacer: Pattern | None = None
188 patterns: set[str]
189
190 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
191 MAX_RECURSION_DEPTH = 5
192 _has_warned_short_secret = False
193 mask_secrets_in_logs = False
194
195 min_length_to_mask = 5
196 secret_mask_adapter = None
197
198 def __init__(self):
199 super().__init__()
200 self.patterns = set()
201 self.sensitive_variables_fields = []
202
203 @classmethod
204 def __init_subclass__(cls, **kwargs):
205 super().__init_subclass__(**kwargs)
206
207 if cls._redact is not SecretsMasker._redact:
208 sig = inspect.signature(cls._redact)
209 # Compat for older versions of the OpenLineage plugin which subclasses this -- call the method
210 # without the replacement character
211 for param in sig.parameters.values():
212 if param.name == "replacement" or param.kind == param.VAR_KEYWORD:
213 break
214 else:
215 # Block only runs if no break above.
216
217 f = cls._redact
218
219 @functools.wraps(f)
220 def _redact(*args, replacement: str = "***", **kwargs):
221 return f(*args, **kwargs)
222
223 cls._redact = _redact
224 ...
225
226 @classmethod
227 def enable_log_masking(cls) -> None:
228 """Enable secret masking in logs."""
229 cls.mask_secrets_in_logs = True
230
231 @classmethod
232 def disable_log_masking(cls) -> None:
233 """Disable secret masking in logs."""
234 cls.mask_secrets_in_logs = False
235
236 @classmethod
237 def is_log_masking_enabled(cls) -> bool:
238 """Check if secret masking in logs is enabled."""
239 return cls.mask_secrets_in_logs
240
241 @cached_property
242 def _record_attrs_to_ignore(self) -> Iterable[str]:
243 # Doing log.info(..., extra={'foo': 2}) sets extra properties on
244 # record, i.e. record.foo. And we need to filter those too. Fun
245 #
246 # Create a record, and look at what attributes are on it, and ignore
247 # all the default ones!
248
249 record = logging.getLogRecordFactory()(
250 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
251 "x",
252 logging.INFO,
253 __file__,
254 1,
255 "",
256 (),
257 exc_info=None,
258 func="funcname",
259 )
260 return frozenset(record.__dict__).difference({"msg", "args"})
261
262 def _redact_exception_with_context(self, exception):
263 # Exception class may not be modifiable (e.g. declared by an
264 # extension module such as JDBC).
265 with contextlib.suppress(AttributeError):
266 exception.args = (self.redact(v) for v in exception.args)
267 if exception.__context__:
268 self._redact_exception_with_context(exception.__context__)
269 if exception.__cause__ and exception.__cause__ is not exception.__context__:
270 self._redact_exception_with_context(exception.__cause__)
271
272 def filter(self, record) -> bool:
273 if not self.is_log_masking_enabled():
274 return True
275
276 if self.ALREADY_FILTERED_FLAG in record.__dict__:
277 # Filters are attached to multiple handlers and logs, keep a
278 # "private" flag that stops us needing to process it more than once
279 return True
280
281 if self.replacer:
282 for k, v in record.__dict__.items():
283 if k not in self._record_attrs_to_ignore:
284 record.__dict__[k] = self.redact(v)
285 if record.exc_info and record.exc_info[1] is not None:
286 exc = record.exc_info[1]
287 self._redact_exception_with_context(exc)
288 record.__dict__[self.ALREADY_FILTERED_FLAG] = True
289
290 return True
291
292 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called
293 # this function directly. New versions of that provider, and this class itself call it with a value
294 def _redact_all(
295 self,
296 item: Redactable,
297 depth: int,
298 max_depth: int = MAX_RECURSION_DEPTH,
299 *,
300 replacement: str = "***",
301 ) -> Redacted:
302 if depth > max_depth or isinstance(item, str):
303 return replacement
304 if isinstance(item, dict):
305 return {
306 dict_key: self._redact_all(subval, depth + 1, max_depth, replacement=replacement)
307 for dict_key, subval in item.items()
308 }
309 if isinstance(item, (tuple, set)):
310 # Turn set in to tuple!
311 return tuple(
312 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item
313 )
314 if isinstance(item, list):
315 return list(
316 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item
317 )
318 return item
319
320 def _redact(
321 self, item: Redactable, name: str | None, depth: int, max_depth: int, replacement: str = "***"
322 ) -> Redacted:
323 # Avoid spending too much effort on redacting on deeply nested
324 # structures. This also avoid infinite recursion if a structure has
325 # reference to self.
326 if depth > max_depth:
327 return item
328 try:
329 if name and self.should_hide_value_for_key(name):
330 return self._redact_all(item, depth, max_depth, replacement=replacement)
331 if isinstance(item, dict):
332 to_return = {
333 dict_key: self._redact(
334 subval, name=dict_key, depth=(depth + 1), max_depth=max_depth, replacement=replacement
335 )
336 for dict_key, subval in item.items()
337 }
338 return to_return
339 if isinstance(item, Enum):
340 return self._redact(
341 item=item.value, name=name, depth=depth, max_depth=max_depth, replacement=replacement
342 )
343 if _is_v1_env_var(item):
344 tmp = item.to_dict()
345 if self.should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp:
346 tmp["value"] = replacement
347 else:
348 return self._redact(
349 item=tmp, name=name, depth=depth, max_depth=max_depth, replacement=replacement
350 )
351 return tmp
352 if isinstance(item, str):
353 if self.replacer:
354 # We can't replace specific values, but the key-based redacting
355 # can still happen, so we can't short-circuit, we need to walk
356 # the structure.
357 return self.replacer.sub(replacement, str(item))
358 return item
359 if isinstance(item, (tuple, set)):
360 # Turn set in to tuple!
361 return tuple(
362 self._redact(
363 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement
364 )
365 for subval in item
366 )
367 if isinstance(item, list):
368 return [
369 self._redact(
370 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement
371 )
372 for subval in item
373 ]
374 return item
375 # I think this should never happen, but it does not hurt to leave it just in case
376 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
377 # but it caused infinite recursion, to avoid this we mark the log as already filtered.
378 except Exception as exc:
379 log.warning(
380 "Unable to redact value of type %s, please report this via "
381 "<https://github.com/apache/airflow/issues>. Error was: %s: %s",
382 type(item),
383 type(exc).__name__,
384 exc,
385 extra={self.ALREADY_FILTERED_FLAG: True},
386 )
387 # Rather than expose sensitive info, lets play it safe
388 return "<redaction-failed>"
389
390 def _merge(
391 self,
392 new_item: Redacted,
393 old_item: Redactable,
394 *,
395 name: str | None,
396 depth: int,
397 max_depth: int,
398 force_sensitive: bool = False,
399 replacement: str,
400 ) -> Redacted:
401 """Merge a redacted item with its original unredacted counterpart."""
402 if depth > max_depth:
403 if isinstance(new_item, str) and new_item == "***":
404 return old_item
405 return new_item
406
407 try:
408 # Determine if we should treat this as sensitive
409 is_sensitive = force_sensitive or (name is not None and self.should_hide_value_for_key(name))
410
411 if isinstance(new_item, dict) and isinstance(old_item, dict):
412 merged = {}
413 for key in new_item.keys():
414 if key in old_item:
415 # For dicts, pass the key as name unless we're in sensitive mode
416 child_name = None if is_sensitive else key
417 merged[key] = self._merge(
418 new_item[key],
419 old_item[key],
420 name=child_name,
421 depth=depth + 1,
422 max_depth=max_depth,
423 force_sensitive=is_sensitive,
424 replacement=replacement,
425 )
426 else:
427 merged[key] = new_item[key]
428 return merged
429
430 if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item):
431 merged_list = []
432 for i in range(len(new_item)):
433 if i < len(old_item):
434 # In sensitive mode, check if individual item is redacted
435 if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***":
436 merged_list.append(old_item[i])
437 else:
438 merged_list.append(
439 self._merge(
440 new_item[i],
441 old_item[i],
442 name=None,
443 depth=depth + 1,
444 max_depth=max_depth,
445 force_sensitive=is_sensitive,
446 replacement=replacement,
447 )
448 )
449 else:
450 merged_list.append(new_item[i])
451
452 if isinstance(new_item, list):
453 return list(merged_list)
454 return tuple(merged_list)
455
456 if isinstance(new_item, set) and isinstance(old_item, set):
457 # Sets are unordered, we cannot restore original items.
458 return new_item
459
460 if _is_v1_env_var(new_item) and _is_v1_env_var(old_item):
461 # TODO: Handle Kubernetes V1EnvVar objects if needed
462 return new_item
463
464 if is_sensitive and isinstance(new_item, str) and new_item == "***":
465 return old_item
466 return new_item
467
468 except (TypeError, AttributeError, ValueError):
469 return new_item
470
471 def redact(
472 self,
473 item: Redactable,
474 name: str | None = None,
475 max_depth: int | None = None,
476 replacement: str = "***",
477 ) -> Redacted:
478 """
479 Redact an any secrets found in ``item``, if it is a string.
480
481 If ``name`` is given, and it's a "sensitive" name (see
482 :func:`should_hide_value_for_key`) then all string values in the item
483 is redacted.
484 """
485 return self._redact(
486 item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH, replacement=replacement
487 )
488
489 def merge(
490 self,
491 new_item: Redacted,
492 old_item: Redactable,
493 name: str | None = None,
494 max_depth: int | None = None,
495 replacement: str = "***",
496 ) -> Redacted:
497 """
498 Merge a redacted item with its original unredacted counterpart.
499
500 Takes a user-modified redacted item and merges it with the original unredacted item.
501 For sensitive fields that still contain "***" (or whatever the ``replacement`` is specified as), the
502 original value is restored. For fields that have been updated, the new value is preserved.
503 """
504 return self._merge(
505 new_item,
506 old_item,
507 name=name,
508 depth=0,
509 max_depth=max_depth or self.MAX_RECURSION_DEPTH,
510 force_sensitive=False,
511 replacement=replacement,
512 )
513
514 def _adaptations(self, secret: str) -> Generator[str, None, None]:
515 """Yield the secret along with any adaptations to the secret that should be masked."""
516 yield secret
517
518 if self.secret_mask_adapter:
519 # This can return an iterable of secrets to mask OR a single secret as a string
520 secret_or_secrets = self.secret_mask_adapter(secret)
521 if not isinstance(secret_or_secrets, str):
522 # if its not a string, it must be an iterable
523 yield from secret_or_secrets
524 else:
525 yield secret_or_secrets
526
527 def should_hide_value_for_key(self, name):
528 """
529 Return if the value for this given name should be hidden.
530
531 Name might be a Variable name, or key in conn.extra_dejson, for example.
532 """
533 from airflow import settings
534
535 if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS:
536 name = name.strip().lower()
537 return any(s in name for s in self.sensitive_variables_fields)
538 return False
539
540 def add_mask(self, secret: JsonValue, name: str | None = None):
541 """Add a new secret to be masked to this filter instance."""
542 if isinstance(secret, dict):
543 for k, v in secret.items():
544 self.add_mask(v, k)
545 elif isinstance(secret, str):
546 if not secret:
547 return
548
549 if secret.lower() in SECRETS_TO_SKIP_MASKING:
550 return
551
552 min_length = self.min_length_to_mask
553 if len(secret) < min_length:
554 if not SecretsMasker._has_warned_short_secret:
555 log.warning(
556 "Skipping masking for a secret as it's too short (<%d chars)",
557 min_length,
558 extra={self.ALREADY_FILTERED_FLAG: True},
559 )
560 SecretsMasker._has_warned_short_secret = True
561 return
562
563 new_mask = False
564 for s in self._adaptations(secret):
565 if s:
566 if len(s) < min_length:
567 continue
568
569 if s.lower() in SECRETS_TO_SKIP_MASKING:
570 continue
571
572 pattern = re.escape(s)
573 if pattern not in self.patterns and (not name or self.should_hide_value_for_key(name)):
574 self.patterns.add(pattern)
575 new_mask = True
576 if new_mask:
577 self.replacer = re.compile("|".join(self.patterns))
578
579 elif isinstance(secret, collections.abc.Iterable):
580 for v in secret:
581 self.add_mask(v, name)
582
583 def reset_masker(self):
584 """Reset the patterns and the replacer in the masker instance."""
585 self.patterns = set()
586 self.replacer = None
587
588
589class RedactedIO(TextIO):
590 """
591 IO class that redacts values going into stdout.
592
593 Expected usage::
594
595 with contextlib.redirect_stdout(RedactedIO()):
596 ... # Writes to stdout will be redacted.
597 """
598
599 def __init__(self):
600 self.target = sys.stdout
601
602 def __enter__(self) -> TextIO:
603 return self.target.__enter__()
604
605 def __exit__(self, t, v, b) -> None:
606 return self.target.__exit__(t, v, b)
607
608 def __iter__(self) -> Iterator[str]:
609 return iter(self.target)
610
611 def __next__(self) -> str:
612 return next(self.target)
613
614 def close(self) -> None:
615 return self.target.close()
616
617 def fileno(self) -> int:
618 return self.target.fileno()
619
620 def flush(self) -> None:
621 return self.target.flush()
622
623 def isatty(self) -> bool:
624 return self.target.isatty()
625
626 def read(self, n: int = -1) -> str:
627 return self.target.read(n)
628
629 def readable(self) -> bool:
630 return self.target.readable()
631
632 def readline(self, n: int = -1) -> str:
633 return self.target.readline(n)
634
635 def readlines(self, n: int = -1) -> list[str]:
636 return self.target.readlines(n)
637
638 def seek(self, offset: int, whence: int = 0) -> int:
639 return self.target.seek(offset, whence)
640
641 def seekable(self) -> bool:
642 return self.target.seekable()
643
644 def tell(self) -> int:
645 return self.target.tell()
646
647 def truncate(self, s: int | None = None) -> int:
648 return self.target.truncate(s)
649
650 def writable(self) -> bool:
651 return self.target.writable()
652
653 def write(self, s: str) -> int:
654 s = str(redact(s))
655 return self.target.write(s)
656
657 def writelines(self, lines) -> None:
658 self.target.writelines(lines)