1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15################################################################################
16"""Core routines for pysecsan library."""
17
18# pylint: disable=protected-access
19
20import re
21import os
22import functools
23import subprocess
24import traceback
25import importlib
26
27from typing import Any, Callable, Optional
28from pysecsan import command_injection, redos, yaml_deserialization
29
30LOG_DEBUG = 0
31LOG_INFO = 1
32PYSECSAN_LOG_LVL = LOG_INFO
33
34# Message that will be printed to stdout when an issue is found.
35PYSECSAN_BUG_LABEL = r'===BUG DETECTED: PySecSan:'
36
37
38# pylint: disable=global-statement
39def sanitizer_log(msg, log_level, force=False, log_prefix=True):
40 """Helper printing function."""
41 global PYSECSAN_LOG_LVL
42 if log_level >= PYSECSAN_LOG_LVL or force:
43 if log_prefix:
44 print(f'[PYSECSAN] {msg}')
45 else:
46 print(f'{msg}')
47
48
49def sanitizer_log_always(msg, log_prefix=True):
50 """Wrapper for sanitizer logging. Will always log"""
51 sanitizer_log(msg, 0, force=True, log_prefix=log_prefix)
52
53
54def is_module_present(mod_name):
55 """Identify if module is importable."""
56 # pylint: disable=deprecated-method
57 return importlib.find_loader(mod_name) is not None
58
59
60def _log_bug(bug_title):
61 sanitizer_log_always('%s %s ===' % (PYSECSAN_BUG_LABEL, bug_title),
62 log_prefix=False)
63
64
65def abort_with_issue(msg, bug_title):
66 """Print message, display stacktrace and force process exit.
67
68 Use this function for signalling an issue is found and use the messages
69 logged from this function to determine if a fuzzer found a bug.
70 """
71 # Show breaker string using an ASAN approach (uses 65 =)
72 sanitizer_log_always("=" * 65, log_prefix=False)
73
74 # Log issue message
75 _log_bug(bug_title)
76 sanitizer_log_always(msg)
77
78 # Log stacktrace
79 sanitizer_log_always("Stacktrace:")
80 traceback.print_stack()
81
82 # Force exit
83 # Use os._exit here to force exit. sys.exit will exit
84 # by throwing a SystemExit exception which the interpreter
85 # handles by exiting. However, code may catch this exception,
86 # and thus to avoid this we exit the process without exceptions.
87 # pylint: disable=protected-access
88 sanitizer_log_always("Exiting")
89 os._exit(1)
90
91
92def is_exact_taint(stream) -> bool:
93 """Checks if stream is an exact match for taint from fuzzer."""
94 # The fuzzer has to get 8 characters right. This may be a bit much,
95 # however, when found it shows a high level of control over the data.
96 if stream == 'FROMFUZZ':
97 return True
98
99 return False
100
101
102def create_object_wrapper(**methods):
103 """Hooks functions in an object.
104
105 This is needed for hooking built-in types and object attributes.
106
107 Example use case is if we want to find ReDOS vulnerabilities, that
108 have a pattern of
109
110 ```
111 import re
112 r = re.compile(REGEX)
113 for _ in r.findall(...)
114 ```
115
116 In the above case r.findall is a reference to
117 re.Pattern.findall, which is a built-in type that is non-writeable.
118
119 In order to hook such calls we need to wrap the object, and also hook the
120 re.compile function to return the wrapped/hooked object.
121 """
122
123 class Wrapper():
124 """Wrap an object by hiding attributes."""
125
126 def __init__(self, instance):
127 object.__setattr__(self, 'instance', instance)
128
129 def __setattr__(self, name, value):
130 object.__setattr__(object.__getattribute__(self, 'instance'), name, value)
131
132 def __getattribute__(self, name):
133 instance = object.__getattribute__(self, 'instance')
134
135 def _hook_func(self, pre_hook, post_hook, orig, *args, **kargs):
136 if pre_hook is not None:
137 pre_hook(self, *args, **kargs)
138 # No need to pass instance here because when we extracted
139 # the function we used instance.__getattribute__(name) which
140 # seems to include it. I think.
141 orig_retval = orig(*args, **kargs)
142
143 if post_hook is not None:
144 post_hook(self, *args, **kargs)
145 return orig_retval
146
147 # If this is a wrapped method, return a bound method
148 if name in methods:
149 pre_hook = methods[name][0]
150 post_hook = methods[name][1]
151 orig = instance.__getattribute__(name)
152 return (lambda *args, **kargs: _hook_func(self, pre_hook, post_hook,
153 orig, *args, **kargs))
154
155 # Otherwise, just return attribute of instance
156 return instance.__getattribute__(name)
157
158 return Wrapper
159
160
161# pylint: disable=unsubscriptable-object
162def add_hook(function: Callable[[Any], Any],
163 pre_exec_hook: Optional[Callable[[Any], Any]] = None,
164 post_exec_hook: Optional[Callable[[Any], Any]] = None):
165 """Hook a function.
166
167 Hooks can be placed pre and post function call. At least one hook is
168 needed.
169
170 This hooking is intended on non-object hooks. In order to hook functions
171 in objects the `create_object_wrapper` function is used in combination
172 with function hooking initialisation functions post execution.
173 """
174 if pre_exec_hook is None and post_exec_hook is None:
175 raise Exception('Some hooks must be included')
176
177 @functools.wraps(function)
178 def run(*args, **kwargs):
179 sanitizer_log(f'Hook start {str(function)}', LOG_DEBUG)
180
181 # Call hook
182 if pre_exec_hook is not None:
183 pre_exec_hook(*args, **kwargs)
184
185 # Call the original function in the even the hook did not indicate
186 # failure.
187 ret = function(*args, **kwargs)
188
189 # Post execution hook. Overwrite return value if anything is returned
190 # by post hook.
191 if post_exec_hook is not None:
192 tmp_ret = post_exec_hook(ret, *args, **kwargs)
193 if tmp_ret is not None:
194 sanitizer_log('Overwriting return value', LOG_DEBUG)
195 ret = tmp_ret
196 sanitizer_log(f'Hook end {str(function)}', LOG_DEBUG)
197 return ret
198
199 return run
200
201
202def add_hooks():
203 """Sets up hooks."""
204 sanitizer_log('Starting', LOG_INFO)
205 os.system = add_hook(os.system,
206 pre_exec_hook=command_injection.hook_pre_exec_os_system)
207 subprocess.Popen = add_hook(
208 subprocess.Popen,
209 pre_exec_hook=command_injection.hook_pre_exec_subprocess_Popen)
210
211 __builtins__['eval'] = add_hook(
212 __builtins__['eval'], pre_exec_hook=command_injection.hook_pre_exec_eval)
213
214 re.compile = add_hook(re.compile,
215 pre_exec_hook=redos.hook_pre_exec_re_compile,
216 post_exec_hook=redos.hook_post_exec_re_compile)
217
218 # Hack to determine if yaml is elligible, because pkg_resources does
219 # not seem to work from pyinstaller.
220 # pylint: disable=import-outside-toplevel
221 if is_module_present('yaml'):
222 import yaml
223 sanitizer_log('Hooking pyyaml.load', LOG_DEBUG)
224 yaml.load = add_hook(
225 yaml.load,
226 pre_exec_hook=yaml_deserialization.hook_pre_exec_pyyaml_load,
227 )