Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pysecsan/sanlib.py: 37%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15################################################################################
16"""Core routines for pysecsan library."""
18# pylint: disable=protected-access
20import re
21import os
22import functools
23import subprocess
24import traceback
25import importlib.util
27from typing import Any, Callable, Optional
28from pysecsan import command_injection, redos, yaml_deserialization
30LOG_DEBUG = 0
31LOG_INFO = 1
32PYSECSAN_LOG_LVL = LOG_INFO
34# Message that will be printed to stdout when an issue is found.
35PYSECSAN_BUG_LABEL = r'===BUG DETECTED: PySecSan:'
38# pylint: disable=global-statement
39def sanitizer_log(msg, log_level, force=False, log_prefix=True):
40 """Helper printing function."""
41 global PYSECSAN_LOG_LVL
42 if log_level >= PYSECSAN_LOG_LVL or force:
43 if log_prefix:
44 print(f'[PYSECSAN] {msg}')
45 else:
46 print(f'{msg}')
49def sanitizer_log_always(msg, log_prefix=True):
50 """Wrapper for sanitizer logging. Will always log"""
51 sanitizer_log(msg, 0, force=True, log_prefix=log_prefix)
54def is_module_present(mod_name):
55 """Identify if module is importable."""
56 # pylint: disable=deprecated-method
57 return importlib.util.find_spec(mod_name) is not None
60def _log_bug(bug_title):
61 sanitizer_log_always('%s %s ===' % (PYSECSAN_BUG_LABEL, bug_title),
62 log_prefix=False)
65def abort_with_issue(msg, bug_title):
66 """Print message, display stacktrace and force process exit.
68 Use this function for signalling an issue is found and use the messages
69 logged from this function to determine if a fuzzer found a bug.
70 """
71 # Show breaker string using an ASAN approach (uses 65 =)
72 sanitizer_log_always("=" * 65, log_prefix=False)
74 # Log issue message
75 _log_bug(bug_title)
76 sanitizer_log_always(msg)
78 # Log stacktrace
79 sanitizer_log_always("Stacktrace:")
80 traceback.print_stack()
82 # Force exit
83 # Use os._exit here to force exit. sys.exit will exit
84 # by throwing a SystemExit exception which the interpreter
85 # handles by exiting. However, code may catch this exception,
86 # and thus to avoid this we exit the process without exceptions.
87 # pylint: disable=protected-access
88 sanitizer_log_always("Exiting")
89 os._exit(1)
92def is_exact_taint(stream) -> bool:
93 """Checks if stream is an exact match for taint from fuzzer."""
94 # The fuzzer has to get 8 characters right. This may be a bit much,
95 # however, when found it shows a high level of control over the data.
96 if stream == 'FROMFUZZ':
97 return True
99 return False
102def create_object_wrapper(**methods):
103 """Hooks functions in an object.
105 This is needed for hooking built-in types and object attributes.
107 Example use case is if we want to find ReDOS vulnerabilities, that
108 have a pattern of
110 ```
111 import re
112 r = re.compile(REGEX)
113 for _ in r.findall(...)
114 ```
116 In the above case r.findall is a reference to
117 re.Pattern.findall, which is a built-in type that is non-writeable.
119 In order to hook such calls we need to wrap the object, and also hook the
120 re.compile function to return the wrapped/hooked object.
121 """
123 class Wrapper():
124 """Wrap an object by hiding attributes."""
126 def __init__(self, instance):
127 object.__setattr__(self, 'instance', instance)
129 def __setattr__(self, name, value):
130 object.__setattr__(object.__getattribute__(self, 'instance'), name, value)
132 def __getattribute__(self, name):
133 instance = object.__getattribute__(self, 'instance')
135 def _hook_func(self, pre_hook, post_hook, orig, *args, **kargs):
136 if pre_hook is not None:
137 pre_hook(self, *args, **kargs)
138 # No need to pass instance here because when we extracted
139 # the function we used instance.__getattribute__(name) which
140 # seems to include it. I think.
141 orig_retval = orig(*args, **kargs)
143 if post_hook is not None:
144 post_hook(self, *args, **kargs)
145 return orig_retval
147 # If this is a wrapped method, return a bound method
148 if name in methods:
149 pre_hook = methods[name][0]
150 post_hook = methods[name][1]
151 orig = instance.__getattribute__(name)
152 return (lambda *args, **kargs: _hook_func(self, pre_hook, post_hook,
153 orig, *args, **kargs))
155 # Otherwise, just return attribute of instance
156 return instance.__getattribute__(name)
158 return Wrapper
161# pylint: disable=unsubscriptable-object
162def add_hook(function: Callable[[Any], Any],
163 pre_exec_hook: Optional[Callable[[Any], Any]] = None,
164 post_exec_hook: Optional[Callable[[Any], Any]] = None):
165 """Hook a function.
167 Hooks can be placed pre and post function call. At least one hook is
168 needed.
170 This hooking is intended on non-object hooks. In order to hook functions
171 in objects the `create_object_wrapper` function is used in combination
172 with function hooking initialisation functions post execution.
173 """
174 if pre_exec_hook is None and post_exec_hook is None:
175 raise Exception('Some hooks must be included')
177 @functools.wraps(function)
178 def run(*args, **kwargs):
179 sanitizer_log(f'Hook start {str(function)}', LOG_DEBUG)
181 # Call hook
182 if pre_exec_hook is not None:
183 pre_exec_hook(*args, **kwargs)
185 # Call the original function in the even the hook did not indicate
186 # failure.
187 ret = function(*args, **kwargs)
189 # Post execution hook. Overwrite return value if anything is returned
190 # by post hook.
191 if post_exec_hook is not None:
192 tmp_ret = post_exec_hook(ret, *args, **kwargs)
193 if tmp_ret is not None:
194 sanitizer_log('Overwriting return value', LOG_DEBUG)
195 ret = tmp_ret
196 sanitizer_log(f'Hook end {str(function)}', LOG_DEBUG)
197 return ret
199 return run
202def add_hooks():
203 """Sets up hooks."""
204 sanitizer_log('Starting', LOG_INFO)
205 os.system = add_hook(os.system,
206 pre_exec_hook=command_injection.hook_pre_exec_os_system)
207 subprocess.Popen = add_hook(
208 subprocess.Popen,
209 pre_exec_hook=command_injection.hook_pre_exec_subprocess_Popen)
211 __builtins__['eval'] = add_hook(
212 __builtins__['eval'], pre_exec_hook=command_injection.hook_pre_exec_eval)
214 re.compile = add_hook(re.compile,
215 pre_exec_hook=redos.hook_pre_exec_re_compile,
216 post_exec_hook=redos.hook_post_exec_re_compile)
218 # Hack to determine if yaml is elligible, because pkg_resources does
219 # not seem to work from pyinstaller.
220 # pylint: disable=import-outside-toplevel
221 if is_module_present('yaml'):
222 import yaml
223 sanitizer_log('Hooking pyyaml.load', LOG_DEBUG)
224 yaml.load = add_hook(
225 yaml.load,
226 pre_exec_hook=yaml_deserialization.hook_pre_exec_pyyaml_load,
227 )