Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/log/secrets_masker.py: 35%
204 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""Mask sensitive information from logs."""
18from __future__ import annotations
20import collections.abc
21import logging
22import re
23import sys
24from functools import cached_property
25from typing import (
26 TYPE_CHECKING,
27 Any,
28 Callable,
29 Dict,
30 Generator,
31 Iterable,
32 Iterator,
33 List,
34 TextIO,
35 Tuple,
36 TypeVar,
37 Union,
38)
40from airflow import settings
41from airflow.compat.functools import cache
42from airflow.typing_compat import TypeGuard
44if TYPE_CHECKING:
45 from kubernetes.client import V1EnvVar
47Redactable = TypeVar("Redactable", str, "V1EnvVar", Dict[Any, Any], Tuple[Any, ...], List[Any])
48Redacted = Union[Redactable, str]
50log = logging.getLogger(__name__)
52DEFAULT_SENSITIVE_FIELDS = frozenset(
53 {
54 "access_token",
55 "api_key",
56 "apikey",
57 "authorization",
58 "passphrase",
59 "passwd",
60 "password",
61 "private_key",
62 "secret",
63 "token",
64 "keyfile_dict",
65 "service_account",
66 }
67)
68"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
70SECRETS_TO_SKIP_MASKING_FOR_TESTS = {"airflow"}
73@cache
74def get_sensitive_variables_fields():
75 """Get comma-separated sensitive Variable Fields from airflow.cfg."""
76 from airflow.configuration import conf
78 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
79 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names")
80 if sensitive_variable_fields:
81 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")})
82 return sensitive_fields
85def should_hide_value_for_key(name):
86 """Should the value for this given name (Variable name, or key in conn.extra_dejson) be hidden."""
87 from airflow import settings
89 if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS:
90 name = name.strip().lower()
91 return any(s in name for s in get_sensitive_variables_fields())
92 return False
95def mask_secret(secret: str | dict | Iterable, name: str | None = None) -> None:
96 """
97 Mask a secret from appearing in the task logs.
99 If ``name`` is provided, then it will only be masked if the name matches
100 one of the configured "sensitive" names.
102 If ``secret`` is a dict or a iterable (excluding str) then it will be
103 recursively walked and keys with sensitive names will be hidden.
104 """
105 # Filtering all log messages is not a free process, so we only do it when
106 # running tasks
107 if not secret:
108 return
110 _secrets_masker().add_mask(secret, name)
113def redact(value: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted:
114 """Redact any secrets found in ``value``."""
115 return _secrets_masker().redact(value, name, max_depth)
118@cache
119def _secrets_masker() -> SecretsMasker:
120 for flt in logging.getLogger("airflow.task").filters:
121 if isinstance(flt, SecretsMasker):
122 return flt
123 raise RuntimeError(
124 "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make "
125 "sure you configure it taking airflow configuration as a base as explained at "
126 "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html"
127 "#advanced-configuration"
128 )
131@cache
132def _get_v1_env_var_type() -> type:
133 try:
134 from kubernetes.client import V1EnvVar
135 except ImportError:
136 return type("V1EnvVar", (), {})
137 return V1EnvVar
140def _is_v1_env_var(v: Any) -> TypeGuard[V1EnvVar]:
141 return isinstance(v, _get_v1_env_var_type())
144class SecretsMasker(logging.Filter):
145 """Redact secrets from logs."""
147 replacer: re.Pattern | None = None
148 patterns: set[str]
150 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
151 MAX_RECURSION_DEPTH = 5
153 def __init__(self):
154 super().__init__()
155 self.patterns = set()
157 @cached_property
158 def _record_attrs_to_ignore(self) -> Iterable[str]:
159 # Doing log.info(..., extra={'foo': 2}) sets extra properties on
160 # record, i.e. record.foo. And we need to filter those too. Fun
161 #
162 # Create a record, and look at what attributes are on it, and ignore
163 # all the default ones!
165 record = logging.getLogRecordFactory()(
166 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
167 "x",
168 logging.INFO,
169 __file__,
170 1,
171 "",
172 tuple(),
173 exc_info=None,
174 func="funcname",
175 )
176 return frozenset(record.__dict__).difference({"msg", "args"})
178 def _redact_exception_with_context(self, exception):
179 # Exception class may not be modifiable (e.g. declared by an
180 # extension module such as JDBC).
181 try:
182 exception.args = (self.redact(v) for v in exception.args)
183 except AttributeError:
184 pass
185 if exception.__context__:
186 self._redact_exception_with_context(exception.__context__)
187 if exception.__cause__ and exception.__cause__ is not exception.__context__:
188 self._redact_exception_with_context(exception.__cause__)
190 def filter(self, record) -> bool:
191 if settings.MASK_SECRETS_IN_LOGS is not True:
192 return True
194 if self.ALREADY_FILTERED_FLAG in record.__dict__:
195 # Filters are attached to multiple handlers and logs, keep a
196 # "private" flag that stops us needing to process it more than once
197 return True
199 if self.replacer:
200 for k, v in record.__dict__.items():
201 if k in self._record_attrs_to_ignore:
202 continue
203 record.__dict__[k] = self.redact(v)
204 if record.exc_info and record.exc_info[1] is not None:
205 exc = record.exc_info[1]
206 self._redact_exception_with_context(exc)
207 record.__dict__[self.ALREADY_FILTERED_FLAG] = True
209 return True
211 # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called
212 # this function directly. New versions of that provider, and this class itself call it with a value
213 def _redact_all(self, item: Redactable, depth: int, max_depth: int = MAX_RECURSION_DEPTH) -> Redacted:
214 if depth > max_depth or isinstance(item, str):
215 return "***"
216 if isinstance(item, dict):
217 return {
218 dict_key: self._redact_all(subval, depth + 1, max_depth) for dict_key, subval in item.items()
219 }
220 elif isinstance(item, (tuple, set)):
221 # Turn set in to tuple!
222 return tuple(self._redact_all(subval, depth + 1, max_depth) for subval in item)
223 elif isinstance(item, list):
224 return list(self._redact_all(subval, depth + 1, max_depth) for subval in item)
225 else:
226 return item
228 def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int) -> Redacted:
229 # Avoid spending too much effort on redacting on deeply nested
230 # structures. This also avoid infinite recursion if a structure has
231 # reference to self.
232 if depth > max_depth:
233 return item
234 try:
235 if name and should_hide_value_for_key(name):
236 return self._redact_all(item, depth, max_depth)
237 if isinstance(item, dict):
238 to_return = {
239 dict_key: self._redact(subval, name=dict_key, depth=(depth + 1), max_depth=max_depth)
240 for dict_key, subval in item.items()
241 }
242 return to_return
243 elif _is_v1_env_var(item):
244 tmp: dict = item.to_dict()
245 if should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp:
246 tmp["value"] = "***"
247 else:
248 return self._redact(item=tmp, name=name, depth=depth, max_depth=max_depth)
249 return tmp
250 elif isinstance(item, str):
251 if self.replacer:
252 # We can't replace specific values, but the key-based redacting
253 # can still happen, so we can't short-circuit, we need to walk
254 # the structure.
255 return self.replacer.sub("***", item)
256 return item
257 elif isinstance(item, (tuple, set)):
258 # Turn set in to tuple!
259 return tuple(
260 self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item
261 )
262 elif isinstance(item, list):
263 return [
264 self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item
265 ]
266 else:
267 return item
268 # I think this should never happen, but it does not hurt to leave it just in case
269 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
270 # but it caused infinite recursion, so we need to cast it to str first.
271 except Exception as e:
272 log.warning(
273 "Unable to redact %s, please report this via <https://github.com/apache/airflow/issues>. "
274 "Error was: %s: %s",
275 repr(item),
276 type(e).__name__,
277 str(e),
278 )
279 return item
281 def redact(self, item: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted:
282 """Redact an any secrets found in ``item``, if it is a string.
284 If ``name`` is given, and it's a "sensitive" name (see
285 :func:`should_hide_value_for_key`) then all string values in the item
286 is redacted.
287 """
288 return self._redact(item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH)
290 @cached_property
291 def _mask_adapter(self) -> None | Callable:
292 """Pulls the secret mask adapter from config.
294 This lives in a function here to be cached and only hit the config once.
295 """
296 from airflow.configuration import conf
298 return conf.getimport("logging", "secret_mask_adapter", fallback=None)
300 @cached_property
301 def _test_mode(self) -> bool:
302 """Pulls the unit test mode flag from config.
304 This lives in a function here to be cached and only hit the config once.
305 """
306 from airflow.configuration import conf
308 return conf.getboolean("core", "unit_test_mode")
310 def _adaptations(self, secret: str) -> Generator[str, None, None]:
311 """Yields the secret along with any adaptations to the secret that should be masked."""
312 yield secret
314 if self._mask_adapter:
315 # This can return an iterable of secrets to mask OR a single secret as a string
316 secret_or_secrets = self._mask_adapter(secret)
317 if not isinstance(secret_or_secrets, str):
318 # if its not a string, it must be an iterable
319 yield from secret_or_secrets
320 else:
321 yield secret_or_secrets
323 def add_mask(self, secret: str | dict | Iterable, name: str | None = None):
324 """Add a new secret to be masked to this filter instance."""
325 if isinstance(secret, dict):
326 for k, v in secret.items():
327 self.add_mask(v, k)
328 elif isinstance(secret, str):
329 if not secret or (self._test_mode and secret in SECRETS_TO_SKIP_MASKING_FOR_TESTS):
330 return
332 new_mask = False
333 for s in self._adaptations(secret):
334 if s:
335 pattern = re.escape(s)
336 if pattern not in self.patterns and (not name or should_hide_value_for_key(name)):
337 self.patterns.add(pattern)
338 new_mask = True
340 if new_mask:
341 self.replacer = re.compile("|".join(self.patterns))
343 elif isinstance(secret, collections.abc.Iterable):
344 for v in secret:
345 self.add_mask(v, name)
348class RedactedIO(TextIO):
349 """IO class that redacts values going into stdout.
351 Expected usage::
353 with contextlib.redirect_stdout(RedactedIO()):
354 ... # Writes to stdout will be redacted.
355 """
357 def __init__(self):
358 self.target = sys.stdout
360 def __enter__(self) -> TextIO:
361 return self.target.__enter__()
363 def __exit__(self, t, v, b) -> None:
364 return self.target.__exit__(t, v, b)
366 def __iter__(self) -> Iterator[str]:
367 return iter(self.target)
369 def __next__(self) -> str:
370 return next(self.target)
372 def close(self) -> None:
373 return self.target.close()
375 def fileno(self) -> int:
376 return self.target.fileno()
378 def flush(self) -> None:
379 return self.target.flush()
381 def isatty(self) -> bool:
382 return self.target.isatty()
384 def read(self, n: int = -1) -> str:
385 return self.target.read(n)
387 def readable(self) -> bool:
388 return self.target.readable()
390 def readline(self, n: int = -1) -> str:
391 return self.target.readline(n)
393 def readlines(self, n: int = -1) -> list[str]:
394 return self.target.readlines(n)
396 def seek(self, offset: int, whence: int = 0) -> int:
397 return self.target.seek(offset, whence)
399 def seekable(self) -> bool:
400 return self.target.seekable()
402 def tell(self) -> int:
403 return self.target.tell()
405 def truncate(self, s: int | None = None) -> int:
406 return self.target.truncate(s)
408 def writable(self) -> bool:
409 return self.target.writable()
411 def write(self, s: str) -> int:
412 s = redact(s)
413 return self.target.write(s)
415 def writelines(self, lines) -> None:
416 self.target.writelines(lines)