Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/log/secrets_masker.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""Mask sensitive information from logs."""
19from __future__ import annotations
21import collections.abc
22import logging
23import sys
24from enum import Enum
25from functools import cached_property
26from typing import (
27 TYPE_CHECKING,
28 Any,
29 Callable,
30 Dict,
31 Generator,
32 Iterable,
33 Iterator,
34 List,
35 Pattern,
36 TextIO,
37 Tuple,
38 TypeVar,
39 Union,
40)
42import re2
44from airflow import settings
45from airflow.compat.functools import cache
47if TYPE_CHECKING:
48 from kubernetes.client import V1EnvVar
50 from airflow.typing_compat import TypeGuard
52Redactable = TypeVar("Redactable", str, "V1EnvVar", Dict[Any, Any], Tuple[Any, ...], List[Any])
53Redacted = Union[Redactable, str]
55log = logging.getLogger(__name__)
57DEFAULT_SENSITIVE_FIELDS = frozenset(
58 {
59 "access_token",
60 "api_key",
61 "apikey",
62 "authorization",
63 "passphrase",
64 "passwd",
65 "password",
66 "private_key",
67 "secret",
68 "token",
69 "keyfile_dict",
70 "service_account",
71 }
72)
73"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
75SECRETS_TO_SKIP_MASKING_FOR_TESTS = {"airflow"}
78@cache
79def get_sensitive_variables_fields():
80 """Get comma-separated sensitive Variable Fields from airflow.cfg."""
81 from airflow.configuration import conf
83 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
84 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names")
85 if sensitive_variable_fields:
86 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")})
87 return sensitive_fields
90def should_hide_value_for_key(name):
91 """
92 Return if the value for this given name should be hidden.
94 Name might be a Variable name, or key in conn.extra_dejson, for example.
95 """
96 from airflow import settings
98 if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS:
99 name = name.strip().lower()
100 return any(s in name for s in get_sensitive_variables_fields())
101 return False
104def mask_secret(secret: str | dict | Iterable, name: str | None = None) -> None:
105 """
106 Mask a secret from appearing in the task logs.
108 If ``name`` is provided, then it will only be masked if the name matches
109 one of the configured "sensitive" names.
111 If ``secret`` is a dict or a iterable (excluding str) then it will be
112 recursively walked and keys with sensitive names will be hidden.
113 """
114 # Filtering all log messages is not a free process, so we only do it when
115 # running tasks
116 if not secret:
117 return
119 _secrets_masker().add_mask(secret, name)
122def redact(value: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted:
123 """Redact any secrets found in ``value``."""
124 return _secrets_masker().redact(value, name, max_depth)
127@cache
128def _secrets_masker() -> SecretsMasker:
129 for flt in logging.getLogger("airflow.task").filters:
130 if isinstance(flt, SecretsMasker):
131 return flt
132 raise RuntimeError(
133 "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make "
134 "sure you configure it taking airflow configuration as a base as explained at "
135 "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html"
136 "#advanced-configuration"
137 )
140@cache
141def _get_v1_env_var_type() -> type:
142 try:
143 from kubernetes.client import V1EnvVar
144 except ImportError:
145 return type("V1EnvVar", (), {})
146 return V1EnvVar
149def _is_v1_env_var(v: Any) -> TypeGuard[V1EnvVar]:
150 return isinstance(v, _get_v1_env_var_type())
153class SecretsMasker(logging.Filter):
154 """Redact secrets from logs."""
156 replacer: Pattern | None = None
157 patterns: set[str]
159 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
160 MAX_RECURSION_DEPTH = 5
162 def __init__(self):
163 super().__init__()
164 self.patterns = set()
166 @cached_property
167 def _record_attrs_to_ignore(self) -> Iterable[str]:
168 # Doing log.info(..., extra={'foo': 2}) sets extra properties on
169 # record, i.e. record.foo. And we need to filter those too. Fun
170 #
171 # Create a record, and look at what attributes are on it, and ignore
172 # all the default ones!
174 record = logging.getLogRecordFactory()(
175 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
176 "x",
177 logging.INFO,
178 __file__,
179 1,
180 "",
181 (),
182 exc_info=None,
183 func="funcname",
184 )
185 return frozenset(record.__dict__).difference({"msg", "args"})
187 def _redact_exception_with_context(self, exception):
188 # Exception class may not be modifiable (e.g. declared by an
189 # extension module such as JDBC).
190 try:
191 exception.args = (self.redact(v) for v in exception.args)
192 except AttributeError:
193 pass
194 if exception.__context__:
195 self._redact_exception_with_context(exception.__context__)
196 if exception.__cause__ and exception.__cause__ is not exception.__context__:
197 self._redact_exception_with_context(exception.__cause__)
199 def filter(self, record) -> bool:
200 if settings.MASK_SECRETS_IN_LOGS is not True:
201 return True
203 if self.ALREADY_FILTERED_FLAG in record.__dict__:
204 # Filters are attached to multiple handlers and logs, keep a
205 # "private" flag that stops us needing to process it more than once
206 return True
208 if self.replacer:
209 for k, v in record.__dict__.items():
210 if k not in self._record_attrs_to_ignore:
211 record.__dict__[k] = self.redact(v)
212 if record.exc_info and record.exc_info[1] is not None:
213 exc = record.exc_info[1]
214 self._redact_exception_with_context(exc)
215 record.__dict__[self.ALREADY_FILTERED_FLAG] = True
217 return True
219 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called
220 # this function directly. New versions of that provider, and this class itself call it with a value
221 def _redact_all(self, item: Redactable, depth: int, max_depth: int = MAX_RECURSION_DEPTH) -> Redacted:
222 if depth > max_depth or isinstance(item, str):
223 return "***"
224 if isinstance(item, dict):
225 return {
226 dict_key: self._redact_all(subval, depth + 1, max_depth) for dict_key, subval in item.items()
227 }
228 elif isinstance(item, (tuple, set)):
229 # Turn set in to tuple!
230 return tuple(self._redact_all(subval, depth + 1, max_depth) for subval in item)
231 elif isinstance(item, list):
232 return list(self._redact_all(subval, depth + 1, max_depth) for subval in item)
233 else:
234 return item
236 def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int) -> Redacted:
237 # Avoid spending too much effort on redacting on deeply nested
238 # structures. This also avoid infinite recursion if a structure has
239 # reference to self.
240 if depth > max_depth:
241 return item
242 try:
243 if name and should_hide_value_for_key(name):
244 return self._redact_all(item, depth, max_depth)
245 if isinstance(item, dict):
246 to_return = {
247 dict_key: self._redact(subval, name=dict_key, depth=(depth + 1), max_depth=max_depth)
248 for dict_key, subval in item.items()
249 }
250 return to_return
251 elif isinstance(item, Enum):
252 return self._redact(item=item.value, name=name, depth=depth, max_depth=max_depth)
253 elif _is_v1_env_var(item):
254 tmp: dict = item.to_dict()
255 if should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp:
256 tmp["value"] = "***"
257 else:
258 return self._redact(item=tmp, name=name, depth=depth, max_depth=max_depth)
259 return tmp
260 elif isinstance(item, str):
261 if self.replacer:
262 # We can't replace specific values, but the key-based redacting
263 # can still happen, so we can't short-circuit, we need to walk
264 # the structure.
265 return self.replacer.sub("***", str(item))
266 return item
267 elif isinstance(item, (tuple, set)):
268 # Turn set in to tuple!
269 return tuple(
270 self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item
271 )
272 elif isinstance(item, list):
273 return [
274 self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item
275 ]
276 else:
277 return item
278 # I think this should never happen, but it does not hurt to leave it just in case
279 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
280 # but it caused infinite recursion, to avoid this we mark the log as already filtered.
281 except Exception as exc:
282 log.warning(
283 "Unable to redact value of type %s, please report this via "
284 "<https://github.com/apache/airflow/issues>. Error was: %s: %s",
285 item,
286 type(exc).__name__,
287 exc,
288 extra={self.ALREADY_FILTERED_FLAG: True},
289 )
290 return item
292 def redact(self, item: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted:
293 """Redact an any secrets found in ``item``, if it is a string.
295 If ``name`` is given, and it's a "sensitive" name (see
296 :func:`should_hide_value_for_key`) then all string values in the item
297 is redacted.
298 """
299 return self._redact(item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH)
301 @cached_property
302 def _mask_adapter(self) -> None | Callable:
303 """Pulls the secret mask adapter from config.
305 This lives in a function here to be cached and only hit the config once.
306 """
307 from airflow.configuration import conf
309 return conf.getimport("logging", "secret_mask_adapter", fallback=None)
311 @cached_property
312 def _test_mode(self) -> bool:
313 """Pulls the unit test mode flag from config.
315 This lives in a function here to be cached and only hit the config once.
316 """
317 from airflow.configuration import conf
319 return conf.getboolean("core", "unit_test_mode")
321 def _adaptations(self, secret: str) -> Generator[str, None, None]:
322 """Yield the secret along with any adaptations to the secret that should be masked."""
323 yield secret
325 if self._mask_adapter:
326 # This can return an iterable of secrets to mask OR a single secret as a string
327 secret_or_secrets = self._mask_adapter(secret)
328 if not isinstance(secret_or_secrets, str):
329 # if its not a string, it must be an iterable
330 yield from secret_or_secrets
331 else:
332 yield secret_or_secrets
334 def add_mask(self, secret: str | dict | Iterable, name: str | None = None):
335 """Add a new secret to be masked to this filter instance."""
336 if isinstance(secret, dict):
337 for k, v in secret.items():
338 self.add_mask(v, k)
339 elif isinstance(secret, str):
340 if not secret or (self._test_mode and secret in SECRETS_TO_SKIP_MASKING_FOR_TESTS):
341 return
343 new_mask = False
344 for s in self._adaptations(secret):
345 if s:
346 pattern = re2.escape(s)
347 if pattern not in self.patterns and (not name or should_hide_value_for_key(name)):
348 self.patterns.add(pattern)
349 new_mask = True
351 if new_mask:
352 self.replacer = re2.compile("|".join(self.patterns))
354 elif isinstance(secret, collections.abc.Iterable):
355 for v in secret:
356 self.add_mask(v, name)
359class RedactedIO(TextIO):
360 """IO class that redacts values going into stdout.
362 Expected usage::
364 with contextlib.redirect_stdout(RedactedIO()):
365 ... # Writes to stdout will be redacted.
366 """
368 def __init__(self):
369 self.target = sys.stdout
371 def __enter__(self) -> TextIO:
372 return self.target.__enter__()
374 def __exit__(self, t, v, b) -> None:
375 return self.target.__exit__(t, v, b)
377 def __iter__(self) -> Iterator[str]:
378 return iter(self.target)
380 def __next__(self) -> str:
381 return next(self.target)
383 def close(self) -> None:
384 return self.target.close()
386 def fileno(self) -> int:
387 return self.target.fileno()
389 def flush(self) -> None:
390 return self.target.flush()
392 def isatty(self) -> bool:
393 return self.target.isatty()
395 def read(self, n: int = -1) -> str:
396 return self.target.read(n)
398 def readable(self) -> bool:
399 return self.target.readable()
401 def readline(self, n: int = -1) -> str:
402 return self.target.readline(n)
404 def readlines(self, n: int = -1) -> list[str]:
405 return self.target.readlines(n)
407 def seek(self, offset: int, whence: int = 0) -> int:
408 return self.target.seek(offset, whence)
410 def seekable(self) -> bool:
411 return self.target.seekable()
413 def tell(self) -> int:
414 return self.target.tell()
416 def truncate(self, s: int | None = None) -> int:
417 return self.target.truncate(s)
419 def writable(self) -> bool:
420 return self.target.writable()
422 def write(self, s: str) -> int:
423 s = redact(s)
424 return self.target.write(s)
426 def writelines(self, lines) -> None:
427 self.target.writelines(lines)