1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""Mask sensitive information from logs."""
18
19from __future__ import annotations
20
21import collections.abc
22import contextlib
23import functools
24import inspect
25import logging
26import re
27import sys
28from collections.abc import Generator, Iterable, Iterator
29from enum import Enum
30from functools import cache, cached_property
31from re import Pattern
32from typing import TYPE_CHECKING, Any, Protocol, TextIO, TypeAlias, TypeVar, overload
33
34# We have to import this here, as it is used in the type annotations at runtime even if it seems it is
35# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields.
36from pydantic import JsonValue # noqa: TC002
37
38if TYPE_CHECKING:
39 from typing import TypeGuard
40
41 class _V1EnvVarLike(Protocol):
42 def to_dict(self) -> dict[str, Any]: ...
43
44
45V1EnvVar = TypeVar("V1EnvVar")
46Redactable: TypeAlias = str | V1EnvVar | dict[Any, Any] | tuple[Any, ...] | list[Any]
47Redacted: TypeAlias = Redactable | str
48
49log = logging.getLogger(__name__)
50
51DEFAULT_SENSITIVE_FIELDS = frozenset(
52 {
53 "access_key",
54 "access_token",
55 "api_key",
56 "apikey",
57 "authorization",
58 "connection_string",
59 "passphrase",
60 "passwd",
61 "password",
62 "private_key",
63 "proxy",
64 "proxy_password",
65 "proxies",
66 "secret",
67 "token",
68 "keyfile_dict",
69 "service_account",
70 }
71)
72"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
73
74SECRETS_TO_SKIP_MASKING = {"airflow"}
75"""Common terms that should be excluded from masking in both production and tests"""
76
77
78def should_hide_value_for_key(name):
79 """
80 Return if the value for this given name should be hidden.
81
82 Name might be a Variable name, or key in conn.extra_dejson, for example.
83 """
84 return _secrets_masker().should_hide_value_for_key(name)
85
86
87def mask_secret(secret: JsonValue, name: str | None = None) -> None:
88 """
89 Mask a secret from appearing in the logs.
90
91 If ``name`` is provided, then it will only be masked if the name matches one of the configured "sensitive"
92 names.
93
94 If ``secret`` is a dict or a iterable (excluding str) then it will be recursively walked and keys with
95 sensitive names will be hidden.
96
97 If the secret value is too short (by default 5 characters or fewer, configurable via the
98 :ref:`[logging] min_length_masked_secret <config:logging__min_length_masked_secret>` setting) it will not
99 be masked
100 """
101 if not secret:
102 return
103
104 _secrets_masker().add_mask(secret, name)
105
106
107def redact(
108 value: Redactable, name: str | None = None, max_depth: int | None = None, replacement: str = "***"
109) -> Redacted:
110 """Redact any secrets found in ``value`` with the given replacement."""
111 return _secrets_masker().redact(value, name, max_depth, replacement=replacement)
112
113
114@overload
115def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ...
116
117
118@overload
119def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ...
120
121
122def merge(
123 new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None
124) -> Redacted:
125 """
126 Merge a redacted value with its original unredacted counterpart.
127
128 Takes a user-modified redacted value and merges it with the original unredacted value.
129 For sensitive fields that still contain "***" (unchanged), the original value is restored.
130 For fields that have been updated by the user, the new value is preserved.
131 """
132 return _secrets_masker().merge(new_value, old_value, name, max_depth)
133
134
135@cache
136def _secrets_masker() -> SecretsMasker:
137 """
138 Get or create the module-level secrets masker instance.
139
140 This function implements a module level singleton pattern within this specific
141 module. Note that different import paths (e.g., airflow._shared vs
142 airflow.sdk._shared) will have separate global variables and thus separate
143 masker instances.
144 """
145 return SecretsMasker()
146
147
148def reset_secrets_masker() -> None:
149 """
150 Reset the secrets masker to clear existing patterns and replacer.
151
152 This utility ensures that an execution environment starts with a fresh masker,
153 preventing any carry over of patterns or replacer from previous execution or parent processes.
154
155 New processor types should invoke this method when setting up their own masking to avoid
156 inheriting masking rules from existing execution environments.
157 """
158 _secrets_masker().reset_masker()
159
160
161def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]:
162 """Check if object is V1EnvVar, avoiding unnecessary imports."""
163 # Quick check: if k8s not imported, can't be a V1EnvVar instance
164 if "kubernetes.client" not in sys.modules:
165 return False
166
167 # K8s is loaded, safe to get/cache the type
168 v1_type = _get_v1_env_var_type_cached()
169 return isinstance(v, v1_type)
170
171
172@cache
173def _get_v1_env_var_type_cached() -> type:
174 """Get V1EnvVar type (cached, only called when k8s is already loaded)."""
175 try:
176 from kubernetes.client import V1EnvVar
177
178 return V1EnvVar
179 except ImportError:
180 # Shouldn't happen since we check sys.modules first
181 return type("V1EnvVar", (), {})
182
183
184class SecretsMasker(logging.Filter):
185 """Redact secrets from logs."""
186
187 replacer: Pattern | None = None
188 patterns: set[str]
189
190 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
191 MAX_RECURSION_DEPTH = 5
192 _has_warned_short_secret = False
193 mask_secrets_in_logs = False
194
195 min_length_to_mask = 5
196 secret_mask_adapter = None
197
198 def __init__(self):
199 super().__init__()
200 self.patterns = set()
201 self.sensitive_variables_fields = []
202 self.hide_sensitive_var_conn_fields = True
203
204 @classmethod
205 def __init_subclass__(cls, **kwargs):
206 super().__init_subclass__(**kwargs)
207
208 if cls._redact is not SecretsMasker._redact:
209 sig = inspect.signature(cls._redact)
210 # Compat for older versions of the OpenLineage plugin which subclasses this -- call the method
211 # without the replacement character
212 for param in sig.parameters.values():
213 if param.name == "replacement" or param.kind == param.VAR_KEYWORD:
214 break
215 else:
216 # Block only runs if no break above.
217
218 f = cls._redact
219
220 @functools.wraps(f)
221 def _redact(*args, replacement: str = "***", **kwargs):
222 return f(*args, **kwargs)
223
224 cls._redact = _redact
225 ...
226
227 @classmethod
228 def enable_log_masking(cls) -> None:
229 """Enable secret masking in logs."""
230 cls.mask_secrets_in_logs = True
231
232 @classmethod
233 def disable_log_masking(cls) -> None:
234 """Disable secret masking in logs."""
235 cls.mask_secrets_in_logs = False
236
237 @classmethod
238 def is_log_masking_enabled(cls) -> bool:
239 """Check if secret masking in logs is enabled."""
240 return cls.mask_secrets_in_logs
241
242 @cached_property
243 def _record_attrs_to_ignore(self) -> Iterable[str]:
244 # Doing log.info(..., extra={'foo': 2}) sets extra properties on
245 # record, i.e. record.foo. And we need to filter those too. Fun
246 #
247 # Create a record, and look at what attributes are on it, and ignore
248 # all the default ones!
249
250 record = logging.getLogRecordFactory()(
251 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
252 "x",
253 logging.INFO,
254 __file__,
255 1,
256 "",
257 (),
258 exc_info=None,
259 func="funcname",
260 )
261 return frozenset(record.__dict__).difference({"msg", "args"})
262
263 def _redact_exception_with_context_or_cause(self, exception, visited=None):
264 # Exception class may not be modifiable (e.g. declared by an
265 # extension module such as JDBC).
266 with contextlib.suppress(AttributeError):
267 if visited is None:
268 visited = set()
269
270 if id(exception) in visited:
271 # already visited - it was redacted earlier
272 return exception
273
274 # Check depth before adding to visited to ensure we skip exceptions beyond the limit
275 if len(visited) >= self.MAX_RECURSION_DEPTH:
276 return RuntimeError(
277 f"Stack trace redaction hit recursion limit of {self.MAX_RECURSION_DEPTH} "
278 f"when processing exception of type {type(exception).__name__}. "
279 f"The remaining exceptions will be skipped to avoid "
280 f"infinite recursion and protect against revealing sensitive information."
281 )
282
283 visited.add(id(exception))
284
285 exception.args = tuple(self.redact(v) for v in exception.args)
286 if exception.__context__:
287 exception.__context__ = self._redact_exception_with_context_or_cause(
288 exception.__context__, visited
289 )
290 if exception.__cause__ and exception.__cause__ is not exception.__context__:
291 exception.__cause__ = self._redact_exception_with_context_or_cause(
292 exception.__cause__, visited
293 )
294 return exception
295
296 def filter(self, record) -> bool:
297 if not self.is_log_masking_enabled():
298 return True
299
300 if self.ALREADY_FILTERED_FLAG in record.__dict__:
301 # Filters are attached to multiple handlers and logs, keep a
302 # "private" flag that stops us needing to process it more than once
303 return True
304
305 if self.replacer:
306 for k, v in record.__dict__.items():
307 if k not in self._record_attrs_to_ignore:
308 record.__dict__[k] = self.redact(v)
309 if record.exc_info and record.exc_info[1] is not None:
310 exc = record.exc_info[1]
311 self._redact_exception_with_context_or_cause(exc)
312 record.__dict__[self.ALREADY_FILTERED_FLAG] = True
313
314 return True
315
316 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called
317 # this function directly. New versions of that provider, and this class itself call it with a value
318 def _redact_all(
319 self,
320 item: Redactable,
321 depth: int,
322 max_depth: int = MAX_RECURSION_DEPTH,
323 *,
324 replacement: str = "***",
325 ) -> Redacted:
326 if depth > max_depth or isinstance(item, str):
327 return replacement
328 if isinstance(item, dict):
329 return {
330 dict_key: self._redact_all(subval, depth + 1, max_depth, replacement=replacement)
331 for dict_key, subval in item.items()
332 }
333 if isinstance(item, (tuple, set)):
334 # Turn set in to tuple!
335 return tuple(
336 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item
337 )
338 if isinstance(item, list):
339 return list(
340 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item
341 )
342 return item
343
344 def _redact(
345 self, item: Redactable, name: str | None, depth: int, max_depth: int, replacement: str = "***"
346 ) -> Redacted:
347 # Avoid spending too much effort on redacting on deeply nested
348 # structures. This also avoid infinite recursion if a structure has
349 # reference to self.
350 if depth > max_depth:
351 return item
352 try:
353 if name and self.should_hide_value_for_key(name):
354 return self._redact_all(item, depth, max_depth, replacement=replacement)
355 if isinstance(item, dict):
356 to_return = {
357 dict_key: self._redact(
358 subval, name=dict_key, depth=(depth + 1), max_depth=max_depth, replacement=replacement
359 )
360 for dict_key, subval in item.items()
361 }
362 return to_return
363 if isinstance(item, Enum):
364 return self._redact(
365 item=item.value, name=name, depth=depth, max_depth=max_depth, replacement=replacement
366 )
367 if _is_v1_env_var(item):
368 tmp = item.to_dict()
369 if self.should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp:
370 tmp["value"] = replacement
371 else:
372 return self._redact(
373 item=tmp, name=name, depth=depth, max_depth=max_depth, replacement=replacement
374 )
375 return tmp
376 if isinstance(item, str):
377 if self.replacer:
378 # We can't replace specific values, but the key-based redacting
379 # can still happen, so we can't short-circuit, we need to walk
380 # the structure.
381 return self.replacer.sub(replacement, str(item))
382 return item
383 if isinstance(item, (tuple, set)):
384 # Turn set in to tuple!
385 return tuple(
386 self._redact(
387 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement
388 )
389 for subval in item
390 )
391 if isinstance(item, list):
392 return [
393 self._redact(
394 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement
395 )
396 for subval in item
397 ]
398 return item
399 # I think this should never happen, but it does not hurt to leave it just in case
400 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
401 # but it caused infinite recursion, to avoid this we mark the log as already filtered.
402 except Exception as exc:
403 log.warning(
404 "Unable to redact value of type %s, please report this via "
405 "<https://github.com/apache/airflow/issues>. Error was: %s: %s",
406 type(item),
407 type(exc).__name__,
408 exc,
409 extra={self.ALREADY_FILTERED_FLAG: True},
410 )
411 # Rather than expose sensitive info, lets play it safe
412 return "<redaction-failed>"
413
414 def _merge(
415 self,
416 new_item: Redacted,
417 old_item: Redactable,
418 *,
419 name: str | None,
420 depth: int,
421 max_depth: int,
422 force_sensitive: bool = False,
423 replacement: str,
424 ) -> Redacted:
425 """Merge a redacted item with its original unredacted counterpart."""
426 if depth > max_depth:
427 if isinstance(new_item, str) and new_item == "***":
428 return old_item
429 return new_item
430
431 try:
432 # Determine if we should treat this as sensitive
433 is_sensitive = force_sensitive or (name is not None and self.should_hide_value_for_key(name))
434
435 if isinstance(new_item, dict) and isinstance(old_item, dict):
436 merged = {}
437 for key in new_item.keys():
438 if key in old_item:
439 # For dicts, pass the key as name unless we're in sensitive mode
440 child_name = None if is_sensitive else key
441 merged[key] = self._merge(
442 new_item[key],
443 old_item[key],
444 name=child_name,
445 depth=depth + 1,
446 max_depth=max_depth,
447 force_sensitive=is_sensitive,
448 replacement=replacement,
449 )
450 else:
451 merged[key] = new_item[key]
452 return merged
453
454 if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item):
455 merged_list = []
456 for i in range(len(new_item)):
457 if i < len(old_item):
458 # In sensitive mode, check if individual item is redacted
459 if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***":
460 merged_list.append(old_item[i])
461 else:
462 merged_list.append(
463 self._merge(
464 new_item[i],
465 old_item[i],
466 name=None,
467 depth=depth + 1,
468 max_depth=max_depth,
469 force_sensitive=is_sensitive,
470 replacement=replacement,
471 )
472 )
473 else:
474 merged_list.append(new_item[i])
475
476 if isinstance(new_item, list):
477 return list(merged_list)
478 return tuple(merged_list)
479
480 if isinstance(new_item, set) and isinstance(old_item, set):
481 # Sets are unordered, we cannot restore original items.
482 return new_item
483
484 if _is_v1_env_var(new_item) and _is_v1_env_var(old_item):
485 # TODO: Handle Kubernetes V1EnvVar objects if needed
486 return new_item
487
488 if is_sensitive and isinstance(new_item, str) and new_item == "***":
489 return old_item
490 return new_item
491
492 except (TypeError, AttributeError, ValueError):
493 return new_item
494
495 def redact(
496 self,
497 item: Redactable,
498 name: str | None = None,
499 max_depth: int | None = None,
500 replacement: str = "***",
501 ) -> Redacted:
502 """
503 Redact an any secrets found in ``item``, if it is a string.
504
505 If ``name`` is given, and it's a "sensitive" name (see
506 :func:`should_hide_value_for_key`) then all string values in the item
507 is redacted.
508 """
509 return self._redact(
510 item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH, replacement=replacement
511 )
512
513 def merge(
514 self,
515 new_item: Redacted,
516 old_item: Redactable,
517 name: str | None = None,
518 max_depth: int | None = None,
519 replacement: str = "***",
520 ) -> Redacted:
521 """
522 Merge a redacted item with its original unredacted counterpart.
523
524 Takes a user-modified redacted item and merges it with the original unredacted item.
525 For sensitive fields that still contain "***" (or whatever the ``replacement`` is specified as), the
526 original value is restored. For fields that have been updated, the new value is preserved.
527 """
528 return self._merge(
529 new_item,
530 old_item,
531 name=name,
532 depth=0,
533 max_depth=max_depth or self.MAX_RECURSION_DEPTH,
534 force_sensitive=False,
535 replacement=replacement,
536 )
537
538 def _adaptations(self, secret: str) -> Generator[str, None, None]:
539 """Yield the secret along with any adaptations to the secret that should be masked."""
540 yield secret
541
542 if self.secret_mask_adapter:
543 # This can return an iterable of secrets to mask OR a single secret as a string
544 secret_or_secrets = self.secret_mask_adapter(secret)
545 if not isinstance(secret_or_secrets, str):
546 # if its not a string, it must be an iterable
547 yield from secret_or_secrets
548 else:
549 yield secret_or_secrets
550
551 def should_hide_value_for_key(self, name):
552 """
553 Return if the value for this given name should be hidden.
554
555 Name might be a Variable name, or key in conn.extra_dejson, for example.
556 """
557 if isinstance(name, str) and self.hide_sensitive_var_conn_fields:
558 name = name.strip().lower()
559 return any(s in name for s in self.sensitive_variables_fields)
560 return False
561
562 def add_mask(self, secret: JsonValue, name: str | None = None):
563 """Add a new secret to be masked to this filter instance."""
564 if isinstance(secret, dict):
565 for k, v in secret.items():
566 self.add_mask(v, k)
567 elif isinstance(secret, str):
568 if not secret:
569 return
570
571 if secret.lower() in SECRETS_TO_SKIP_MASKING:
572 return
573
574 min_length = self.min_length_to_mask
575 if len(secret) < min_length:
576 if not SecretsMasker._has_warned_short_secret:
577 log.warning(
578 "Skipping masking for a secret as it's too short (<%d chars)",
579 min_length,
580 extra={self.ALREADY_FILTERED_FLAG: True},
581 )
582 SecretsMasker._has_warned_short_secret = True
583 return
584
585 new_mask = False
586 for s in self._adaptations(secret):
587 if s:
588 if len(s) < min_length:
589 continue
590
591 if s.lower() in SECRETS_TO_SKIP_MASKING:
592 continue
593
594 pattern = re.escape(s)
595 if pattern not in self.patterns and (not name or self.should_hide_value_for_key(name)):
596 self.patterns.add(pattern)
597 new_mask = True
598 if new_mask:
599 self.replacer = re.compile("|".join(self.patterns))
600
601 elif isinstance(secret, collections.abc.Iterable):
602 for v in secret:
603 self.add_mask(v, name)
604
605 def reset_masker(self):
606 """Reset the patterns and the replacer in the masker instance."""
607 self.patterns = set()
608 self.replacer = None
609
610
611class RedactedIO(TextIO):
612 """
613 IO class that redacts values going into stdout.
614
615 Expected usage::
616
617 with contextlib.redirect_stdout(RedactedIO()):
618 ... # Writes to stdout will be redacted.
619 """
620
621 def __init__(self):
622 self.target = sys.stdout
623
624 def __enter__(self) -> TextIO:
625 return self.target.__enter__()
626
627 def __exit__(self, t, v, b) -> None:
628 return self.target.__exit__(t, v, b)
629
630 def __iter__(self) -> Iterator[str]:
631 return iter(self.target)
632
633 def __next__(self) -> str:
634 return next(self.target)
635
636 def close(self) -> None:
637 return self.target.close()
638
639 def fileno(self) -> int:
640 return self.target.fileno()
641
642 def flush(self) -> None:
643 return self.target.flush()
644
645 def isatty(self) -> bool:
646 return self.target.isatty()
647
648 def read(self, n: int = -1) -> str:
649 return self.target.read(n)
650
651 def readable(self) -> bool:
652 return self.target.readable()
653
654 def readline(self, n: int = -1) -> str:
655 return self.target.readline(n)
656
657 def readlines(self, n: int = -1) -> list[str]:
658 return self.target.readlines(n)
659
660 def seek(self, offset: int, whence: int = 0) -> int:
661 return self.target.seek(offset, whence)
662
663 def seekable(self) -> bool:
664 return self.target.seekable()
665
666 def tell(self) -> int:
667 return self.target.tell()
668
669 def truncate(self, s: int | None = None) -> int:
670 return self.target.truncate(s)
671
672 def writable(self) -> bool:
673 return self.target.writable()
674
675 def write(self, s: str) -> int:
676 s = str(redact(s))
677 return self.target.write(s)
678
679 def writelines(self, lines) -> None:
680 self.target.writelines(lines)