Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/log/secrets_masker.py: 32%
151 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""Mask sensitive information from logs"""
18from __future__ import annotations
20import collections
21import logging
22import re
23import sys
24from typing import Any, Callable, Dict, Generator, Iterable, List, TextIO, Tuple, TypeVar, Union
26from airflow import settings
27from airflow.compat.functools import cache, cached_property
29Redactable = TypeVar("Redactable", str, Dict[Any, Any], Tuple[Any, ...], List[Any])
30Redacted = Union[Redactable, str]
32log = logging.getLogger(__name__)
34DEFAULT_SENSITIVE_FIELDS = frozenset(
35 {
36 "access_token",
37 "api_key",
38 "apikey",
39 "authorization",
40 "passphrase",
41 "passwd",
42 "password",
43 "private_key",
44 "secret",
45 "token",
46 "keyfile_dict",
47 "service_account",
48 }
49)
50"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
52SECRETS_TO_SKIP_MASKING_FOR_TESTS = {"airflow"}
55@cache
56def get_sensitive_variables_fields():
57 """Get comma-separated sensitive Variable Fields from airflow.cfg."""
58 from airflow.configuration import conf
60 sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
61 sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names")
62 if sensitive_variable_fields:
63 sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")})
64 return sensitive_fields
67def should_hide_value_for_key(name):
68 """Should the value for this given name (Variable name, or key in conn.extra_dejson) be hidden"""
69 from airflow import settings
71 if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS:
72 name = name.strip().lower()
73 return any(s in name for s in get_sensitive_variables_fields())
74 return False
77def mask_secret(secret: str | dict | Iterable, name: str | None = None) -> None:
78 """
79 Mask a secret from appearing in the task logs.
81 If ``name`` is provided, then it will only be masked if the name matches
82 one of the configured "sensitive" names.
84 If ``secret`` is a dict or a iterable (excluding str) then it will be
85 recursively walked and keys with sensitive names will be hidden.
86 """
87 # Filtering all log messages is not a free process, so we only do it when
88 # running tasks
89 if not secret:
90 return
92 _secrets_masker().add_mask(secret, name)
95def redact(value: Redactable, name: str | None = None) -> Redacted:
96 """Redact any secrets found in ``value``."""
97 return _secrets_masker().redact(value, name)
100@cache
101def _secrets_masker() -> SecretsMasker:
102 for flt in logging.getLogger("airflow.task").filters:
103 if isinstance(flt, SecretsMasker):
104 return flt
105 raise RuntimeError(
106 "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make "
107 "sure you configure it taking airflow configuration as a base as explained at "
108 "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html"
109 "#advanced-configuration"
110 )
113class SecretsMasker(logging.Filter):
114 """Redact secrets from logs"""
116 replacer: re.Pattern | None = None
117 patterns: set[str]
119 ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
120 MAX_RECURSION_DEPTH = 5
122 def __init__(self):
123 super().__init__()
124 self.patterns = set()
126 @cached_property
127 def _record_attrs_to_ignore(self) -> Iterable[str]:
128 # Doing log.info(..., extra={'foo': 2}) sets extra properties on
129 # record, i.e. record.foo. And we need to filter those too. Fun
130 #
131 # Create a record, and look at what attributes are on it, and ignore
132 # all the default ones!
134 record = logging.getLogRecordFactory()(
135 # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
136 "x",
137 logging.INFO,
138 __file__,
139 1,
140 "",
141 tuple(),
142 exc_info=None,
143 func="funcname",
144 )
145 return frozenset(record.__dict__).difference({"msg", "args"})
147 def _redact_exception_with_context(self, exception):
148 # Exception class may not be modifiable (e.g. declared by an
149 # extension module such as JDBC).
150 try:
151 exception.args = (self.redact(v) for v in exception.args)
152 except AttributeError:
153 pass
154 if exception.__context__:
155 self._redact_exception_with_context(exception.__context__)
156 if exception.__cause__ and exception.__cause__ is not exception.__context__:
157 self._redact_exception_with_context(exception.__cause__)
159 def filter(self, record) -> bool:
160 if settings.MASK_SECRETS_IN_LOGS is not True:
161 return True
163 if self.ALREADY_FILTERED_FLAG in record.__dict__:
164 # Filters are attached to multiple handlers and logs, keep a
165 # "private" flag that stops us needing to process it more than once
166 return True
168 if self.replacer:
169 for k, v in record.__dict__.items():
170 if k in self._record_attrs_to_ignore:
171 continue
172 record.__dict__[k] = self.redact(v)
173 if record.exc_info and record.exc_info[1] is not None:
174 exc = record.exc_info[1]
175 self._redact_exception_with_context(exc)
176 record.__dict__[self.ALREADY_FILTERED_FLAG] = True
178 return True
180 def _redact_all(self, item: Redactable, depth: int) -> Redacted:
181 if depth > self.MAX_RECURSION_DEPTH or isinstance(item, str):
182 return "***"
183 if isinstance(item, dict):
184 return {dict_key: self._redact_all(subval, depth + 1) for dict_key, subval in item.items()}
185 elif isinstance(item, (tuple, set)):
186 # Turn set in to tuple!
187 return tuple(self._redact_all(subval, depth + 1) for subval in item)
188 elif isinstance(item, list):
189 return list(self._redact_all(subval, depth + 1) for subval in item)
190 else:
191 return item
193 def _redact(self, item: Redactable, name: str | None, depth: int) -> Redacted:
194 # Avoid spending too much effort on redacting on deeply nested
195 # structures. This also avoid infinite recursion if a structure has
196 # reference to self.
197 if depth > self.MAX_RECURSION_DEPTH:
198 return item
199 try:
200 if name and should_hide_value_for_key(name):
201 return self._redact_all(item, depth)
202 if isinstance(item, dict):
203 return {
204 dict_key: self._redact(subval, name=dict_key, depth=(depth + 1))
205 for dict_key, subval in item.items()
206 }
207 elif isinstance(item, str):
208 if self.replacer:
209 # We can't replace specific values, but the key-based redacting
210 # can still happen, so we can't short-circuit, we need to walk
211 # the structure.
212 return self.replacer.sub("***", item)
213 return item
214 elif isinstance(item, (tuple, set)):
215 # Turn set in to tuple!
216 return tuple(self._redact(subval, name=None, depth=(depth + 1)) for subval in item)
217 elif isinstance(item, list):
218 return [self._redact(subval, name=None, depth=(depth + 1)) for subval in item]
219 else:
220 return item
221 # I think this should never happen, but it does not hurt to leave it just in case
222 # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
223 # but it caused infinite recursion, so we need to cast it to str first.
224 except Exception as e:
225 log.warning(
226 "Unable to redact %s, please report this via <https://github.com/apache/airflow/issues>. "
227 "Error was: %s: %s",
228 repr(item),
229 type(e).__name__,
230 str(e),
231 )
232 return item
234 def redact(self, item: Redactable, name: str | None = None) -> Redacted:
235 """Redact an any secrets found in ``item``, if it is a string.
237 If ``name`` is given, and it's a "sensitive" name (see
238 :func:`should_hide_value_for_key`) then all string values in the item
239 is redacted.
240 """
241 return self._redact(item, name, depth=0)
243 @cached_property
244 def _mask_adapter(self) -> None | Callable:
245 """Pulls the secret mask adapter from config.
247 This lives in a function here to be cached and only hit the config once.
248 """
249 from airflow.configuration import conf
251 return conf.getimport("logging", "secret_mask_adapter", fallback=None)
253 @cached_property
254 def _test_mode(self) -> bool:
255 """Pulls the unit test mode flag from config.
257 This lives in a function here to be cached and only hit the config once.
258 """
259 from airflow.configuration import conf
261 return conf.getboolean("core", "unit_test_mode")
263 def _adaptations(self, secret: str) -> Generator[str, None, None]:
264 """Yields the secret along with any adaptations to the secret that should be masked."""
265 yield secret
267 if self._mask_adapter:
268 # This can return an iterable of secrets to mask OR a single secret as a string
269 secret_or_secrets = self._mask_adapter(secret)
270 if not isinstance(secret_or_secrets, str):
271 # if its not a string, it must be an iterable
272 yield from secret_or_secrets
273 else:
274 yield secret_or_secrets
276 def add_mask(self, secret: str | dict | Iterable, name: str | None = None):
277 """Add a new secret to be masked to this filter instance."""
278 if isinstance(secret, dict):
279 for k, v in secret.items():
280 self.add_mask(v, k)
281 elif isinstance(secret, str):
282 if not secret or (self._test_mode and secret in SECRETS_TO_SKIP_MASKING_FOR_TESTS):
283 return
285 new_mask = False
286 for s in self._adaptations(secret):
287 if s:
288 pattern = re.escape(s)
289 if pattern not in self.patterns and (not name or should_hide_value_for_key(name)):
290 self.patterns.add(pattern)
291 new_mask = True
293 if new_mask:
294 self.replacer = re.compile("|".join(self.patterns))
296 elif isinstance(secret, collections.abc.Iterable):
297 for v in secret:
298 self.add_mask(v, name)
301class RedactedIO(TextIO):
302 """IO class that redacts values going into stdout.
304 Expected usage::
306 with contextlib.redirect_stdout(RedactedIO()):
307 ... # Writes to stdout will be redacted.
308 """
310 def __init__(self):
311 self.target = sys.stdout
312 self.fileno = sys.stdout.fileno
314 def write(self, s: str) -> int:
315 s = redact(s)
316 return self.target.write(s)
318 def flush(self) -> None:
319 return self.target.flush()