1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""Mask sensitive information from logs."""
18
19from __future__ import annotations
20
21import collections.abc
22import contextlib
23import functools
24import inspect
25import logging
26import re
27import sys
28from collections.abc import Generator, Iterable, Iterator
29from enum import Enum
30from functools import cache, cached_property
31from re import Pattern
32from typing import TYPE_CHECKING, Any, Protocol, TextIO, TypeAlias, TypeVar, overload
33
34# We have to import this here, as it is used in the type annotations at runtime even if it seems it is
35# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields.
36from pydantic import JsonValue # noqa: TC002
37
38if TYPE_CHECKING:
39 from typing import TypeGuard
40
41 class _V1EnvVarLike(Protocol):
42 def to_dict(self) -> dict[str, Any]: ...
43
44
45V1EnvVar = TypeVar("V1EnvVar")
46Redactable: TypeAlias = str | V1EnvVar | dict[Any, Any] | tuple[Any, ...] | list[Any]
47Redacted: TypeAlias = Redactable | str
48
49log = logging.getLogger(__name__)
50
51DEFAULT_SENSITIVE_FIELDS = frozenset(
52 {
53 "access_token",
54 "api_key",
55 "apikey",
56 "authorization",
57 "passphrase",
58 "passwd",
59 "password",
60 "private_key",
61 "proxy",
62 "proxies",
63 "secret",
64 "token",
65 "keyfile_dict",
66 "service_account",
67 }
68)
69"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
70
71SECRETS_TO_SKIP_MASKING = {"airflow"}
72"""Common terms that should be excluded from masking in both production and tests"""
73
74
75def should_hide_value_for_key(name):
76 """
77 Return if the value for this given name should be hidden.
78
79 Name might be a Variable name, or key in conn.extra_dejson, for example.
80 """
81 return _secrets_masker().should_hide_value_for_key(name)
82
83
84def mask_secret(secret: JsonValue, name: str | None = None) -> None:
85 """
86 Mask a secret from appearing in the logs.
87
88 If ``name`` is provided, then it will only be masked if the name matches one of the configured "sensitive"
89 names.
90
91 If ``secret`` is a dict or a iterable (excluding str) then it will be recursively walked and keys with
92 sensitive names will be hidden.
93
94 If the secret value is too short (by default 5 characters or fewer, configurable via the
95 :ref:`[logging] min_length_masked_secret <config:logging__min_length_masked_secret>` setting) it will not
96 be masked
97 """
98 if not secret:
99 return
100
101 _secrets_masker().add_mask(secret, name)
102
103
104def redact(
105 value: Redactable, name: str | None = None, max_depth: int | None = None, replacement: str = "***"
106) -> Redacted:
107 """Redact any secrets found in ``value`` with the given replacement."""
108 return _secrets_masker().redact(value, name, max_depth, replacement=replacement)
109
110
111@overload
112def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ...
113
114
115@overload
116def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ...
117
118
119def merge(
120 new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None
121) -> Redacted:
122 """
123 Merge a redacted value with its original unredacted counterpart.
124
125 Takes a user-modified redacted value and merges it with the original unredacted value.
126 For sensitive fields that still contain "***" (unchanged), the original value is restored.
127 For fields that have been updated by the user, the new value is preserved.
128 """
129 return _secrets_masker().merge(new_value, old_value, name, max_depth)
130
131
132@cache
133def _secrets_masker() -> SecretsMasker:
134 """
135 Get or create the module-level secrets masker instance.
136
137 This function implements a module level singleton pattern within this specific
138 module. Note that different import paths (e.g., airflow._shared vs
139 airflow.sdk._shared) will have separate global variables and thus separate
140 masker instances.
141 """
142 return SecretsMasker()
143
144
145def reset_secrets_masker() -> None:
146 """
147 Reset the secrets masker to clear existing patterns and replacer.
148
149 This utility ensures that an execution environment starts with a fresh masker,
150 preventing any carry over of patterns or replacer from previous execution or parent processes.
151
152 New processor types should invoke this method when setting up their own masking to avoid
153 inheriting masking rules from existing execution environments.
154 """
155 _secrets_masker().reset_masker()
156
157
158def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]:
159 """Check if object is V1EnvVar, avoiding unnecessary imports."""
160 # Quick check: if k8s not imported, can't be a V1EnvVar instance
161 if "kubernetes.client" not in sys.modules:
162 return False
163
164 # K8s is loaded, safe to get/cache the type
165 v1_type = _get_v1_env_var_type_cached()
166 return isinstance(v, v1_type)
167
168
169@cache
170def _get_v1_env_var_type_cached() -> type:
171 """Get V1EnvVar type (cached, only called when k8s is already loaded)."""
172 try:
173 from kubernetes.client import V1EnvVar
174
175 return V1EnvVar
176 except ImportError:
177 # Shouldn't happen since we check sys.modules first
178 return type("V1EnvVar", (), {})
179
180
181class SecretsMasker(logging.Filter):
182 """Redact secrets from logs."""
183
184 replacer: Pattern | None = None
185 patterns: set[str]
186
187 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
188 MAX_RECURSION_DEPTH = 5
189 _has_warned_short_secret = False
190 mask_secrets_in_logs = False
191
192 min_length_to_mask = 5
193 secret_mask_adapter = None
194
195 def __init__(self):
196 super().__init__()
197 self.patterns = set()
198 self.sensitive_variables_fields = []
199 self.hide_sensitive_var_conn_fields = True
200
201 @classmethod
202 def __init_subclass__(cls, **kwargs):
203 super().__init_subclass__(**kwargs)
204
205 if cls._redact is not SecretsMasker._redact:
206 sig = inspect.signature(cls._redact)
207 # Compat for older versions of the OpenLineage plugin which subclasses this -- call the method
208 # without the replacement character
209 for param in sig.parameters.values():
210 if param.name == "replacement" or param.kind == param.VAR_KEYWORD:
211 break
212 else:
213 # Block only runs if no break above.
214
215 f = cls._redact
216
217 @functools.wraps(f)
218 def _redact(*args, replacement: str = "***", **kwargs):
219 return f(*args, **kwargs)
220
221 cls._redact = _redact
222 ...
223
224 @classmethod
225 def enable_log_masking(cls) -> None:
226 """Enable secret masking in logs."""
227 cls.mask_secrets_in_logs = True
228
229 @classmethod
230 def disable_log_masking(cls) -> None:
231 """Disable secret masking in logs."""
232 cls.mask_secrets_in_logs = False
233
234 @classmethod
235 def is_log_masking_enabled(cls) -> bool:
236 """Check if secret masking in logs is enabled."""
237 return cls.mask_secrets_in_logs
238
239 @cached_property
240 def _record_attrs_to_ignore(self) -> Iterable[str]:
241 # Doing log.info(..., extra={'foo': 2}) sets extra properties on
242 # record, i.e. record.foo. And we need to filter those too. Fun
243 #
244 # Create a record, and look at what attributes are on it, and ignore
245 # all the default ones!
246
247 record = logging.getLogRecordFactory()(
248 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
249 "x",
250 logging.INFO,
251 __file__,
252 1,
253 "",
254 (),
255 exc_info=None,
256 func="funcname",
257 )
258 return frozenset(record.__dict__).difference({"msg", "args"})
259
260 def _redact_exception_with_context(self, exception):
261 # Exception class may not be modifiable (e.g. declared by an
262 # extension module such as JDBC).
263 with contextlib.suppress(AttributeError):
264 exception.args = (self.redact(v) for v in exception.args)
265 if exception.__context__:
266 self._redact_exception_with_context(exception.__context__)
267 if exception.__cause__ and exception.__cause__ is not exception.__context__:
268 self._redact_exception_with_context(exception.__cause__)
269
270 def filter(self, record) -> bool:
271 if not self.is_log_masking_enabled():
272 return True
273
274 if self.ALREADY_FILTERED_FLAG in record.__dict__:
275 # Filters are attached to multiple handlers and logs, keep a
276 # "private" flag that stops us needing to process it more than once
277 return True
278
279 if self.replacer:
280 for k, v in record.__dict__.items():
281 if k not in self._record_attrs_to_ignore:
282 record.__dict__[k] = self.redact(v)
283 if record.exc_info and record.exc_info[1] is not None:
284 exc = record.exc_info[1]
285 self._redact_exception_with_context(exc)
286 record.__dict__[self.ALREADY_FILTERED_FLAG] = True
287
288 return True
289
290 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called
291 # this function directly. New versions of that provider, and this class itself call it with a value
292 def _redact_all(
293 self,
294 item: Redactable,
295 depth: int,
296 max_depth: int = MAX_RECURSION_DEPTH,
297 *,
298 replacement: str = "***",
299 ) -> Redacted:
300 if depth > max_depth or isinstance(item, str):
301 return replacement
302 if isinstance(item, dict):
303 return {
304 dict_key: self._redact_all(subval, depth + 1, max_depth, replacement=replacement)
305 for dict_key, subval in item.items()
306 }
307 if isinstance(item, (tuple, set)):
308 # Turn set in to tuple!
309 return tuple(
310 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item
311 )
312 if isinstance(item, list):
313 return list(
314 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item
315 )
316 return item
317
318 def _redact(
319 self, item: Redactable, name: str | None, depth: int, max_depth: int, replacement: str = "***"
320 ) -> Redacted:
321 # Avoid spending too much effort on redacting on deeply nested
322 # structures. This also avoid infinite recursion if a structure has
323 # reference to self.
324 if depth > max_depth:
325 return item
326 try:
327 if name and self.should_hide_value_for_key(name):
328 return self._redact_all(item, depth, max_depth, replacement=replacement)
329 if isinstance(item, dict):
330 to_return = {
331 dict_key: self._redact(
332 subval, name=dict_key, depth=(depth + 1), max_depth=max_depth, replacement=replacement
333 )
334 for dict_key, subval in item.items()
335 }
336 return to_return
337 if isinstance(item, Enum):
338 return self._redact(
339 item=item.value, name=name, depth=depth, max_depth=max_depth, replacement=replacement
340 )
341 if _is_v1_env_var(item):
342 tmp = item.to_dict()
343 if self.should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp:
344 tmp["value"] = replacement
345 else:
346 return self._redact(
347 item=tmp, name=name, depth=depth, max_depth=max_depth, replacement=replacement
348 )
349 return tmp
350 if isinstance(item, str):
351 if self.replacer:
352 # We can't replace specific values, but the key-based redacting
353 # can still happen, so we can't short-circuit, we need to walk
354 # the structure.
355 return self.replacer.sub(replacement, str(item))
356 return item
357 if isinstance(item, (tuple, set)):
358 # Turn set in to tuple!
359 return tuple(
360 self._redact(
361 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement
362 )
363 for subval in item
364 )
365 if isinstance(item, list):
366 return [
367 self._redact(
368 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement
369 )
370 for subval in item
371 ]
372 return item
373 # I think this should never happen, but it does not hurt to leave it just in case
374 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
375 # but it caused infinite recursion, to avoid this we mark the log as already filtered.
376 except Exception as exc:
377 log.warning(
378 "Unable to redact value of type %s, please report this via "
379 "<https://github.com/apache/airflow/issues>. Error was: %s: %s",
380 type(item),
381 type(exc).__name__,
382 exc,
383 extra={self.ALREADY_FILTERED_FLAG: True},
384 )
385 # Rather than expose sensitive info, lets play it safe
386 return "<redaction-failed>"
387
388 def _merge(
389 self,
390 new_item: Redacted,
391 old_item: Redactable,
392 *,
393 name: str | None,
394 depth: int,
395 max_depth: int,
396 force_sensitive: bool = False,
397 replacement: str,
398 ) -> Redacted:
399 """Merge a redacted item with its original unredacted counterpart."""
400 if depth > max_depth:
401 if isinstance(new_item, str) and new_item == "***":
402 return old_item
403 return new_item
404
405 try:
406 # Determine if we should treat this as sensitive
407 is_sensitive = force_sensitive or (name is not None and self.should_hide_value_for_key(name))
408
409 if isinstance(new_item, dict) and isinstance(old_item, dict):
410 merged = {}
411 for key in new_item.keys():
412 if key in old_item:
413 # For dicts, pass the key as name unless we're in sensitive mode
414 child_name = None if is_sensitive else key
415 merged[key] = self._merge(
416 new_item[key],
417 old_item[key],
418 name=child_name,
419 depth=depth + 1,
420 max_depth=max_depth,
421 force_sensitive=is_sensitive,
422 replacement=replacement,
423 )
424 else:
425 merged[key] = new_item[key]
426 return merged
427
428 if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item):
429 merged_list = []
430 for i in range(len(new_item)):
431 if i < len(old_item):
432 # In sensitive mode, check if individual item is redacted
433 if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***":
434 merged_list.append(old_item[i])
435 else:
436 merged_list.append(
437 self._merge(
438 new_item[i],
439 old_item[i],
440 name=None,
441 depth=depth + 1,
442 max_depth=max_depth,
443 force_sensitive=is_sensitive,
444 replacement=replacement,
445 )
446 )
447 else:
448 merged_list.append(new_item[i])
449
450 if isinstance(new_item, list):
451 return list(merged_list)
452 return tuple(merged_list)
453
454 if isinstance(new_item, set) and isinstance(old_item, set):
455 # Sets are unordered, we cannot restore original items.
456 return new_item
457
458 if _is_v1_env_var(new_item) and _is_v1_env_var(old_item):
459 # TODO: Handle Kubernetes V1EnvVar objects if needed
460 return new_item
461
462 if is_sensitive and isinstance(new_item, str) and new_item == "***":
463 return old_item
464 return new_item
465
466 except (TypeError, AttributeError, ValueError):
467 return new_item
468
469 def redact(
470 self,
471 item: Redactable,
472 name: str | None = None,
473 max_depth: int | None = None,
474 replacement: str = "***",
475 ) -> Redacted:
476 """
477 Redact an any secrets found in ``item``, if it is a string.
478
479 If ``name`` is given, and it's a "sensitive" name (see
480 :func:`should_hide_value_for_key`) then all string values in the item
481 is redacted.
482 """
483 return self._redact(
484 item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH, replacement=replacement
485 )
486
487 def merge(
488 self,
489 new_item: Redacted,
490 old_item: Redactable,
491 name: str | None = None,
492 max_depth: int | None = None,
493 replacement: str = "***",
494 ) -> Redacted:
495 """
496 Merge a redacted item with its original unredacted counterpart.
497
498 Takes a user-modified redacted item and merges it with the original unredacted item.
499 For sensitive fields that still contain "***" (or whatever the ``replacement`` is specified as), the
500 original value is restored. For fields that have been updated, the new value is preserved.
501 """
502 return self._merge(
503 new_item,
504 old_item,
505 name=name,
506 depth=0,
507 max_depth=max_depth or self.MAX_RECURSION_DEPTH,
508 force_sensitive=False,
509 replacement=replacement,
510 )
511
512 def _adaptations(self, secret: str) -> Generator[str, None, None]:
513 """Yield the secret along with any adaptations to the secret that should be masked."""
514 yield secret
515
516 if self.secret_mask_adapter:
517 # This can return an iterable of secrets to mask OR a single secret as a string
518 secret_or_secrets = self.secret_mask_adapter(secret)
519 if not isinstance(secret_or_secrets, str):
520 # if its not a string, it must be an iterable
521 yield from secret_or_secrets
522 else:
523 yield secret_or_secrets
524
525 def should_hide_value_for_key(self, name):
526 """
527 Return if the value for this given name should be hidden.
528
529 Name might be a Variable name, or key in conn.extra_dejson, for example.
530 """
531 if isinstance(name, str) and self.hide_sensitive_var_conn_fields:
532 name = name.strip().lower()
533 return any(s in name for s in self.sensitive_variables_fields)
534 return False
535
536 def add_mask(self, secret: JsonValue, name: str | None = None):
537 """Add a new secret to be masked to this filter instance."""
538 if isinstance(secret, dict):
539 for k, v in secret.items():
540 self.add_mask(v, k)
541 elif isinstance(secret, str):
542 if not secret:
543 return
544
545 if secret.lower() in SECRETS_TO_SKIP_MASKING:
546 return
547
548 min_length = self.min_length_to_mask
549 if len(secret) < min_length:
550 if not SecretsMasker._has_warned_short_secret:
551 log.warning(
552 "Skipping masking for a secret as it's too short (<%d chars)",
553 min_length,
554 extra={self.ALREADY_FILTERED_FLAG: True},
555 )
556 SecretsMasker._has_warned_short_secret = True
557 return
558
559 new_mask = False
560 for s in self._adaptations(secret):
561 if s:
562 if len(s) < min_length:
563 continue
564
565 if s.lower() in SECRETS_TO_SKIP_MASKING:
566 continue
567
568 pattern = re.escape(s)
569 if pattern not in self.patterns and (not name or self.should_hide_value_for_key(name)):
570 self.patterns.add(pattern)
571 new_mask = True
572 if new_mask:
573 self.replacer = re.compile("|".join(self.patterns))
574
575 elif isinstance(secret, collections.abc.Iterable):
576 for v in secret:
577 self.add_mask(v, name)
578
579 def reset_masker(self):
580 """Reset the patterns and the replacer in the masker instance."""
581 self.patterns = set()
582 self.replacer = None
583
584
585class RedactedIO(TextIO):
586 """
587 IO class that redacts values going into stdout.
588
589 Expected usage::
590
591 with contextlib.redirect_stdout(RedactedIO()):
592 ... # Writes to stdout will be redacted.
593 """
594
595 def __init__(self):
596 self.target = sys.stdout
597
598 def __enter__(self) -> TextIO:
599 return self.target.__enter__()
600
601 def __exit__(self, t, v, b) -> None:
602 return self.target.__exit__(t, v, b)
603
604 def __iter__(self) -> Iterator[str]:
605 return iter(self.target)
606
607 def __next__(self) -> str:
608 return next(self.target)
609
610 def close(self) -> None:
611 return self.target.close()
612
613 def fileno(self) -> int:
614 return self.target.fileno()
615
616 def flush(self) -> None:
617 return self.target.flush()
618
619 def isatty(self) -> bool:
620 return self.target.isatty()
621
622 def read(self, n: int = -1) -> str:
623 return self.target.read(n)
624
625 def readable(self) -> bool:
626 return self.target.readable()
627
628 def readline(self, n: int = -1) -> str:
629 return self.target.readline(n)
630
631 def readlines(self, n: int = -1) -> list[str]:
632 return self.target.readlines(n)
633
634 def seek(self, offset: int, whence: int = 0) -> int:
635 return self.target.seek(offset, whence)
636
637 def seekable(self) -> bool:
638 return self.target.seekable()
639
640 def tell(self) -> int:
641 return self.target.tell()
642
643 def truncate(self, s: int | None = None) -> int:
644 return self.target.truncate(s)
645
646 def writable(self) -> bool:
647 return self.target.writable()
648
649 def write(self, s: str) -> int:
650 s = str(redact(s))
651 return self.target.write(s)
652
653 def writelines(self, lines) -> None:
654 self.target.writelines(lines)