1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""Mask sensitive information from logs."""
18
19from __future__ import annotations
20
21import collections.abc
22import contextlib
23import functools
24import inspect
25import logging
26import re
27import sys
28from collections.abc import Generator, Iterable, Iterator
29from enum import Enum
30from functools import cache, cached_property
31from re import Pattern
32from typing import TYPE_CHECKING, Any, Protocol, TextIO, TypeAlias, TypeVar, overload
33
34# We have to import this here, as it is used in the type annotations at runtime even if it seems it is
35# not used in the code. This is because Pydantic uses type at runtime to validate the types of the fields.
36from pydantic import JsonValue # noqa: TC002
37
38if TYPE_CHECKING:
39 from typing import TypeGuard
40
41 class _V1EnvVarLike(Protocol):
42 def to_dict(self) -> dict[str, Any]: ...
43
44
45V1EnvVar = TypeVar("V1EnvVar")
46Redactable: TypeAlias = str | V1EnvVar | dict[Any, Any] | tuple[Any, ...] | list[Any]
47Redacted: TypeAlias = Redactable | str
48
49log = logging.getLogger(__name__)
50
51DEFAULT_SENSITIVE_FIELDS = frozenset(
52 {
53 "access_token",
54 "api_key",
55 "apikey",
56 "authorization",
57 "passphrase",
58 "passwd",
59 "password",
60 "private_key",
61 "proxy",
62 "proxies",
63 "secret",
64 "token",
65 "keyfile_dict",
66 "service_account",
67 }
68)
69"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
70
71SECRETS_TO_SKIP_MASKING = {"airflow"}
72"""Common terms that should be excluded from masking in both production and tests"""
73
74
75def should_hide_value_for_key(name):
76 """
77 Return if the value for this given name should be hidden.
78
79 Name might be a Variable name, or key in conn.extra_dejson, for example.
80 """
81 return _secrets_masker().should_hide_value_for_key(name)
82
83
84def mask_secret(secret: JsonValue, name: str | None = None) -> None:
85 """
86 Mask a secret from appearing in the logs.
87
88 If ``name`` is provided, then it will only be masked if the name matches one of the configured "sensitive"
89 names.
90
91 If ``secret`` is a dict or a iterable (excluding str) then it will be recursively walked and keys with
92 sensitive names will be hidden.
93
94 If the secret value is too short (by default 5 characters or fewer, configurable via the
95 :ref:`[logging] min_length_masked_secret <config:logging__min_length_masked_secret>` setting) it will not
96 be masked
97 """
98 if not secret:
99 return
100
101 _secrets_masker().add_mask(secret, name)
102
103
104def redact(
105 value: Redactable, name: str | None = None, max_depth: int | None = None, replacement: str = "***"
106) -> Redacted:
107 """Redact any secrets found in ``value`` with the given replacement."""
108 return _secrets_masker().redact(value, name, max_depth, replacement=replacement)
109
110
111@overload
112def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ...
113
114
115@overload
116def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ...
117
118
119def merge(
120 new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None
121) -> Redacted:
122 """
123 Merge a redacted value with its original unredacted counterpart.
124
125 Takes a user-modified redacted value and merges it with the original unredacted value.
126 For sensitive fields that still contain "***" (unchanged), the original value is restored.
127 For fields that have been updated by the user, the new value is preserved.
128 """
129 return _secrets_masker().merge(new_value, old_value, name, max_depth)
130
131
132@cache
133def _secrets_masker() -> SecretsMasker:
134 """
135 Get or create the module-level secrets masker instance.
136
137 This function implements a module level singleton pattern within this specific
138 module. Note that different import paths (e.g., airflow._shared vs
139 airflow.sdk._shared) will have separate global variables and thus separate
140 masker instances.
141 """
142 return SecretsMasker()
143
144
145def reset_secrets_masker() -> None:
146 """
147 Reset the secrets masker to clear existing patterns and replacer.
148
149 This utility ensures that an execution environment starts with a fresh masker,
150 preventing any carry over of patterns or replacer from previous execution or parent processes.
151
152 New processor types should invoke this method when setting up their own masking to avoid
153 inheriting masking rules from existing execution environments.
154 """
155 _secrets_masker().reset_masker()
156
157
158def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]:
159 """Check if object is V1EnvVar, avoiding unnecessary imports."""
160 # Quick check: if k8s not imported, can't be a V1EnvVar instance
161 if "kubernetes.client" not in sys.modules:
162 return False
163
164 # K8s is loaded, safe to get/cache the type
165 v1_type = _get_v1_env_var_type_cached()
166 return isinstance(v, v1_type)
167
168
169@cache
170def _get_v1_env_var_type_cached() -> type:
171 """Get V1EnvVar type (cached, only called when k8s is already loaded)."""
172 try:
173 from kubernetes.client import V1EnvVar
174
175 return V1EnvVar
176 except ImportError:
177 # Shouldn't happen since we check sys.modules first
178 return type("V1EnvVar", (), {})
179
180
181class SecretsMasker(logging.Filter):
182 """Redact secrets from logs."""
183
184 replacer: Pattern | None = None
185 patterns: set[str]
186
187 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
188 MAX_RECURSION_DEPTH = 5
189 _has_warned_short_secret = False
190 mask_secrets_in_logs = False
191
192 min_length_to_mask = 5
193 secret_mask_adapter = None
194
195 def __init__(self):
196 super().__init__()
197 self.patterns = set()
198 self.sensitive_variables_fields = []
199
200 @classmethod
201 def __init_subclass__(cls, **kwargs):
202 super().__init_subclass__(**kwargs)
203
204 if cls._redact is not SecretsMasker._redact:
205 sig = inspect.signature(cls._redact)
206 # Compat for older versions of the OpenLineage plugin which subclasses this -- call the method
207 # without the replacement character
208 for param in sig.parameters.values():
209 if param.name == "replacement" or param.kind == param.VAR_KEYWORD:
210 break
211 else:
212 # Block only runs if no break above.
213
214 f = cls._redact
215
216 @functools.wraps(f)
217 def _redact(*args, replacement: str = "***", **kwargs):
218 return f(*args, **kwargs)
219
220 cls._redact = _redact
221 ...
222
223 @classmethod
224 def enable_log_masking(cls) -> None:
225 """Enable secret masking in logs."""
226 cls.mask_secrets_in_logs = True
227
228 @classmethod
229 def disable_log_masking(cls) -> None:
230 """Disable secret masking in logs."""
231 cls.mask_secrets_in_logs = False
232
233 @classmethod
234 def is_log_masking_enabled(cls) -> bool:
235 """Check if secret masking in logs is enabled."""
236 return cls.mask_secrets_in_logs
237
238 @cached_property
239 def _record_attrs_to_ignore(self) -> Iterable[str]:
240 # Doing log.info(..., extra={'foo': 2}) sets extra properties on
241 # record, i.e. record.foo. And we need to filter those too. Fun
242 #
243 # Create a record, and look at what attributes are on it, and ignore
244 # all the default ones!
245
246 record = logging.getLogRecordFactory()(
247 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
248 "x",
249 logging.INFO,
250 __file__,
251 1,
252 "",
253 (),
254 exc_info=None,
255 func="funcname",
256 )
257 return frozenset(record.__dict__).difference({"msg", "args"})
258
259 def _redact_exception_with_context(self, exception):
260 # Exception class may not be modifiable (e.g. declared by an
261 # extension module such as JDBC).
262 with contextlib.suppress(AttributeError):
263 exception.args = (self.redact(v) for v in exception.args)
264 if exception.__context__:
265 self._redact_exception_with_context(exception.__context__)
266 if exception.__cause__ and exception.__cause__ is not exception.__context__:
267 self._redact_exception_with_context(exception.__cause__)
268
269 def filter(self, record) -> bool:
270 if not self.is_log_masking_enabled():
271 return True
272
273 if self.ALREADY_FILTERED_FLAG in record.__dict__:
274 # Filters are attached to multiple handlers and logs, keep a
275 # "private" flag that stops us needing to process it more than once
276 return True
277
278 if self.replacer:
279 for k, v in record.__dict__.items():
280 if k not in self._record_attrs_to_ignore:
281 record.__dict__[k] = self.redact(v)
282 if record.exc_info and record.exc_info[1] is not None:
283 exc = record.exc_info[1]
284 self._redact_exception_with_context(exc)
285 record.__dict__[self.ALREADY_FILTERED_FLAG] = True
286
287 return True
288
289 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called
290 # this function directly. New versions of that provider, and this class itself call it with a value
291 def _redact_all(
292 self,
293 item: Redactable,
294 depth: int,
295 max_depth: int = MAX_RECURSION_DEPTH,
296 *,
297 replacement: str = "***",
298 ) -> Redacted:
299 if depth > max_depth or isinstance(item, str):
300 return replacement
301 if isinstance(item, dict):
302 return {
303 dict_key: self._redact_all(subval, depth + 1, max_depth, replacement=replacement)
304 for dict_key, subval in item.items()
305 }
306 if isinstance(item, (tuple, set)):
307 # Turn set in to tuple!
308 return tuple(
309 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item
310 )
311 if isinstance(item, list):
312 return list(
313 self._redact_all(subval, depth + 1, max_depth, replacement=replacement) for subval in item
314 )
315 return item
316
317 def _redact(
318 self, item: Redactable, name: str | None, depth: int, max_depth: int, replacement: str = "***"
319 ) -> Redacted:
320 # Avoid spending too much effort on redacting on deeply nested
321 # structures. This also avoid infinite recursion if a structure has
322 # reference to self.
323 if depth > max_depth:
324 return item
325 try:
326 if name and self.should_hide_value_for_key(name):
327 return self._redact_all(item, depth, max_depth, replacement=replacement)
328 if isinstance(item, dict):
329 to_return = {
330 dict_key: self._redact(
331 subval, name=dict_key, depth=(depth + 1), max_depth=max_depth, replacement=replacement
332 )
333 for dict_key, subval in item.items()
334 }
335 return to_return
336 if isinstance(item, Enum):
337 return self._redact(
338 item=item.value, name=name, depth=depth, max_depth=max_depth, replacement=replacement
339 )
340 if _is_v1_env_var(item):
341 tmp = item.to_dict()
342 if self.should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp:
343 tmp["value"] = replacement
344 else:
345 return self._redact(
346 item=tmp, name=name, depth=depth, max_depth=max_depth, replacement=replacement
347 )
348 return tmp
349 if isinstance(item, str):
350 if self.replacer:
351 # We can't replace specific values, but the key-based redacting
352 # can still happen, so we can't short-circuit, we need to walk
353 # the structure.
354 return self.replacer.sub(replacement, str(item))
355 return item
356 if isinstance(item, (tuple, set)):
357 # Turn set in to tuple!
358 return tuple(
359 self._redact(
360 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement
361 )
362 for subval in item
363 )
364 if isinstance(item, list):
365 return [
366 self._redact(
367 subval, name=None, depth=(depth + 1), max_depth=max_depth, replacement=replacement
368 )
369 for subval in item
370 ]
371 return item
372 # I think this should never happen, but it does not hurt to leave it just in case
373 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
374 # but it caused infinite recursion, to avoid this we mark the log as already filtered.
375 except Exception as exc:
376 log.warning(
377 "Unable to redact value of type %s, please report this via "
378 "<https://github.com/apache/airflow/issues>. Error was: %s: %s",
379 type(item),
380 type(exc).__name__,
381 exc,
382 extra={self.ALREADY_FILTERED_FLAG: True},
383 )
384 # Rather than expose sensitive info, lets play it safe
385 return "<redaction-failed>"
386
387 def _merge(
388 self,
389 new_item: Redacted,
390 old_item: Redactable,
391 *,
392 name: str | None,
393 depth: int,
394 max_depth: int,
395 force_sensitive: bool = False,
396 replacement: str,
397 ) -> Redacted:
398 """Merge a redacted item with its original unredacted counterpart."""
399 if depth > max_depth:
400 if isinstance(new_item, str) and new_item == "***":
401 return old_item
402 return new_item
403
404 try:
405 # Determine if we should treat this as sensitive
406 is_sensitive = force_sensitive or (name is not None and self.should_hide_value_for_key(name))
407
408 if isinstance(new_item, dict) and isinstance(old_item, dict):
409 merged = {}
410 for key in new_item.keys():
411 if key in old_item:
412 # For dicts, pass the key as name unless we're in sensitive mode
413 child_name = None if is_sensitive else key
414 merged[key] = self._merge(
415 new_item[key],
416 old_item[key],
417 name=child_name,
418 depth=depth + 1,
419 max_depth=max_depth,
420 force_sensitive=is_sensitive,
421 replacement=replacement,
422 )
423 else:
424 merged[key] = new_item[key]
425 return merged
426
427 if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item):
428 merged_list = []
429 for i in range(len(new_item)):
430 if i < len(old_item):
431 # In sensitive mode, check if individual item is redacted
432 if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***":
433 merged_list.append(old_item[i])
434 else:
435 merged_list.append(
436 self._merge(
437 new_item[i],
438 old_item[i],
439 name=None,
440 depth=depth + 1,
441 max_depth=max_depth,
442 force_sensitive=is_sensitive,
443 replacement=replacement,
444 )
445 )
446 else:
447 merged_list.append(new_item[i])
448
449 if isinstance(new_item, list):
450 return list(merged_list)
451 return tuple(merged_list)
452
453 if isinstance(new_item, set) and isinstance(old_item, set):
454 # Sets are unordered, we cannot restore original items.
455 return new_item
456
457 if _is_v1_env_var(new_item) and _is_v1_env_var(old_item):
458 # TODO: Handle Kubernetes V1EnvVar objects if needed
459 return new_item
460
461 if is_sensitive and isinstance(new_item, str) and new_item == "***":
462 return old_item
463 return new_item
464
465 except (TypeError, AttributeError, ValueError):
466 return new_item
467
468 def redact(
469 self,
470 item: Redactable,
471 name: str | None = None,
472 max_depth: int | None = None,
473 replacement: str = "***",
474 ) -> Redacted:
475 """
476 Redact an any secrets found in ``item``, if it is a string.
477
478 If ``name`` is given, and it's a "sensitive" name (see
479 :func:`should_hide_value_for_key`) then all string values in the item
480 is redacted.
481 """
482 return self._redact(
483 item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH, replacement=replacement
484 )
485
486 def merge(
487 self,
488 new_item: Redacted,
489 old_item: Redactable,
490 name: str | None = None,
491 max_depth: int | None = None,
492 replacement: str = "***",
493 ) -> Redacted:
494 """
495 Merge a redacted item with its original unredacted counterpart.
496
497 Takes a user-modified redacted item and merges it with the original unredacted item.
498 For sensitive fields that still contain "***" (or whatever the ``replacement`` is specified as), the
499 original value is restored. For fields that have been updated, the new value is preserved.
500 """
501 return self._merge(
502 new_item,
503 old_item,
504 name=name,
505 depth=0,
506 max_depth=max_depth or self.MAX_RECURSION_DEPTH,
507 force_sensitive=False,
508 replacement=replacement,
509 )
510
511 def _adaptations(self, secret: str) -> Generator[str, None, None]:
512 """Yield the secret along with any adaptations to the secret that should be masked."""
513 yield secret
514
515 if self.secret_mask_adapter:
516 # This can return an iterable of secrets to mask OR a single secret as a string
517 secret_or_secrets = self.secret_mask_adapter(secret)
518 if not isinstance(secret_or_secrets, str):
519 # if its not a string, it must be an iterable
520 yield from secret_or_secrets
521 else:
522 yield secret_or_secrets
523
524 def should_hide_value_for_key(self, name):
525 """
526 Return if the value for this given name should be hidden.
527
528 Name might be a Variable name, or key in conn.extra_dejson, for example.
529 """
530 from airflow.configuration import conf
531
532 if isinstance(name, str) and conf.getboolean("core", "hide_sensitive_var_conn_fields"):
533 name = name.strip().lower()
534 return any(s in name for s in self.sensitive_variables_fields)
535 return False
536
537 def add_mask(self, secret: JsonValue, name: str | None = None):
538 """Add a new secret to be masked to this filter instance."""
539 if isinstance(secret, dict):
540 for k, v in secret.items():
541 self.add_mask(v, k)
542 elif isinstance(secret, str):
543 if not secret:
544 return
545
546 if secret.lower() in SECRETS_TO_SKIP_MASKING:
547 return
548
549 min_length = self.min_length_to_mask
550 if len(secret) < min_length:
551 if not SecretsMasker._has_warned_short_secret:
552 log.warning(
553 "Skipping masking for a secret as it's too short (<%d chars)",
554 min_length,
555 extra={self.ALREADY_FILTERED_FLAG: True},
556 )
557 SecretsMasker._has_warned_short_secret = True
558 return
559
560 new_mask = False
561 for s in self._adaptations(secret):
562 if s:
563 if len(s) < min_length:
564 continue
565
566 if s.lower() in SECRETS_TO_SKIP_MASKING:
567 continue
568
569 pattern = re.escape(s)
570 if pattern not in self.patterns and (not name or self.should_hide_value_for_key(name)):
571 self.patterns.add(pattern)
572 new_mask = True
573 if new_mask:
574 self.replacer = re.compile("|".join(self.patterns))
575
576 elif isinstance(secret, collections.abc.Iterable):
577 for v in secret:
578 self.add_mask(v, name)
579
580 def reset_masker(self):
581 """Reset the patterns and the replacer in the masker instance."""
582 self.patterns = set()
583 self.replacer = None
584
585
586class RedactedIO(TextIO):
587 """
588 IO class that redacts values going into stdout.
589
590 Expected usage::
591
592 with contextlib.redirect_stdout(RedactedIO()):
593 ... # Writes to stdout will be redacted.
594 """
595
596 def __init__(self):
597 self.target = sys.stdout
598
599 def __enter__(self) -> TextIO:
600 return self.target.__enter__()
601
602 def __exit__(self, t, v, b) -> None:
603 return self.target.__exit__(t, v, b)
604
605 def __iter__(self) -> Iterator[str]:
606 return iter(self.target)
607
608 def __next__(self) -> str:
609 return next(self.target)
610
611 def close(self) -> None:
612 return self.target.close()
613
614 def fileno(self) -> int:
615 return self.target.fileno()
616
617 def flush(self) -> None:
618 return self.target.flush()
619
620 def isatty(self) -> bool:
621 return self.target.isatty()
622
623 def read(self, n: int = -1) -> str:
624 return self.target.read(n)
625
626 def readable(self) -> bool:
627 return self.target.readable()
628
629 def readline(self, n: int = -1) -> str:
630 return self.target.readline(n)
631
632 def readlines(self, n: int = -1) -> list[str]:
633 return self.target.readlines(n)
634
635 def seek(self, offset: int, whence: int = 0) -> int:
636 return self.target.seek(offset, whence)
637
638 def seekable(self) -> bool:
639 return self.target.seekable()
640
641 def tell(self) -> int:
642 return self.target.tell()
643
644 def truncate(self, s: int | None = None) -> int:
645 return self.target.truncate(s)
646
647 def writable(self) -> bool:
648 return self.target.writable()
649
650 def write(self, s: str) -> int:
651 s = str(redact(s))
652 return self.target.write(s)
653
654 def writelines(self, lines) -> None:
655 self.target.writelines(lines)