Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pysecsan-0.1-py3.8.egg/pysecsan/sanlib.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

85 statements  

1# Copyright 2022 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15################################################################################ 

16"""Core routines for pysecsan library.""" 

17 

18# pylint: disable=protected-access 

19 

20import re 

21import os 

22import functools 

23import subprocess 

24import traceback 

25import importlib 

26 

27from typing import Any, Callable, Optional 

28from pysecsan import command_injection, redos, yaml_deserialization 

29 

30LOG_DEBUG = 0 

31LOG_INFO = 1 

32PYSECSAN_LOG_LVL = LOG_INFO 

33 

34# Message that will be printed to stdout when an issue is found. 

35PYSECSAN_BUG_LABEL = r'===BUG DETECTED: PySecSan:' 

36 

37 

38# pylint: disable=global-statement 

39def sanitizer_log(msg, log_level, force=False, log_prefix=True): 

40 """Helper printing function.""" 

41 global PYSECSAN_LOG_LVL 

42 if log_level >= PYSECSAN_LOG_LVL or force: 

43 if log_prefix: 

44 print(f'[PYSECSAN] {msg}') 

45 else: 

46 print(f'{msg}') 

47 

48 

49def sanitizer_log_always(msg, log_prefix=True): 

50 """Wrapper for sanitizer logging. Will always log""" 

51 sanitizer_log(msg, 0, force=True, log_prefix=log_prefix) 

52 

53 

54def is_module_present(mod_name): 

55 """Identify if module is importable.""" 

56 # pylint: disable=deprecated-method 

57 return importlib.find_loader(mod_name) is not None 

58 

59 

60def _log_bug(bug_title): 

61 sanitizer_log_always('%s %s ===' % (PYSECSAN_BUG_LABEL, bug_title), 

62 log_prefix=False) 

63 

64 

65def abort_with_issue(msg, bug_title): 

66 """Print message, display stacktrace and force process exit. 

67 

68 Use this function for signalling an issue is found and use the messages 

69 logged from this function to determine if a fuzzer found a bug. 

70 """ 

71 # Show breaker string using an ASAN approach (uses 65 =) 

72 sanitizer_log_always("=" * 65, log_prefix=False) 

73 

74 # Log issue message 

75 _log_bug(bug_title) 

76 sanitizer_log_always(msg) 

77 

78 # Log stacktrace 

79 sanitizer_log_always("Stacktrace:") 

80 traceback.print_stack() 

81 

82 # Force exit 

83 # Use os._exit here to force exit. sys.exit will exit 

84 # by throwing a SystemExit exception which the interpreter 

85 # handles by exiting. However, code may catch this exception, 

86 # and thus to avoid this we exit the process without exceptions. 

87 # pylint: disable=protected-access 

88 sanitizer_log_always("Exiting") 

89 os._exit(1) 

90 

91 

92def is_exact_taint(stream) -> bool: 

93 """Checks if stream is an exact match for taint from fuzzer.""" 

94 # The fuzzer has to get 8 characters right. This may be a bit much, 

95 # however, when found it shows a high level of control over the data. 

96 if stream == 'FROMFUZZ': 

97 return True 

98 

99 return False 

100 

101 

102def create_object_wrapper(**methods): 

103 """Hooks functions in an object. 

104 

105 This is needed for hooking built-in types and object attributes. 

106 

107 Example use case is if we want to find ReDOS vulnerabilities, that 

108 have a pattern of 

109 

110 ``` 

111 import re 

112 r = re.compile(REGEX) 

113 for _ in r.findall(...) 

114 ``` 

115 

116 In the above case r.findall is a reference to 

117 re.Pattern.findall, which is a built-in type that is non-writeable. 

118 

119 In order to hook such calls we need to wrap the object, and also hook the 

120 re.compile function to return the wrapped/hooked object. 

121 """ 

122 

123 class Wrapper(): 

124 """Wrap an object by hiding attributes.""" 

125 

126 def __init__(self, instance): 

127 object.__setattr__(self, 'instance', instance) 

128 

129 def __setattr__(self, name, value): 

130 object.__setattr__(object.__getattribute__(self, 'instance'), name, value) 

131 

132 def __getattribute__(self, name): 

133 instance = object.__getattribute__(self, 'instance') 

134 

135 def _hook_func(self, pre_hook, post_hook, orig, *args, **kargs): 

136 if pre_hook is not None: 

137 pre_hook(self, *args, **kargs) 

138 # No need to pass instance here because when we extracted 

139 # the function we used instance.__getattribute__(name) which 

140 # seems to include it. I think. 

141 orig_retval = orig(*args, **kargs) 

142 

143 if post_hook is not None: 

144 post_hook(self, *args, **kargs) 

145 return orig_retval 

146 

147 # If this is a wrapped method, return a bound method 

148 if name in methods: 

149 pre_hook = methods[name][0] 

150 post_hook = methods[name][1] 

151 orig = instance.__getattribute__(name) 

152 return (lambda *args, **kargs: _hook_func(self, pre_hook, post_hook, 

153 orig, *args, **kargs)) 

154 

155 # Otherwise, just return attribute of instance 

156 return instance.__getattribute__(name) 

157 

158 return Wrapper 

159 

160 

161# pylint: disable=unsubscriptable-object 

162def add_hook(function: Callable[[Any], Any], 

163 pre_exec_hook: Optional[Callable[[Any], Any]] = None, 

164 post_exec_hook: Optional[Callable[[Any], Any]] = None): 

165 """Hook a function. 

166 

167 Hooks can be placed pre and post function call. At least one hook is 

168 needed. 

169 

170 This hooking is intended on non-object hooks. In order to hook functions 

171 in objects the `create_object_wrapper` function is used in combination 

172 with function hooking initialisation functions post execution. 

173 """ 

174 if pre_exec_hook is None and post_exec_hook is None: 

175 raise Exception('Some hooks must be included') 

176 

177 @functools.wraps(function) 

178 def run(*args, **kwargs): 

179 sanitizer_log(f'Hook start {str(function)}', LOG_DEBUG) 

180 

181 # Call hook 

182 if pre_exec_hook is not None: 

183 pre_exec_hook(*args, **kwargs) 

184 

185 # Call the original function in the even the hook did not indicate 

186 # failure. 

187 ret = function(*args, **kwargs) 

188 

189 # Post execution hook. Overwrite return value if anything is returned 

190 # by post hook. 

191 if post_exec_hook is not None: 

192 tmp_ret = post_exec_hook(ret, *args, **kwargs) 

193 if tmp_ret is not None: 

194 sanitizer_log('Overwriting return value', LOG_DEBUG) 

195 ret = tmp_ret 

196 sanitizer_log(f'Hook end {str(function)}', LOG_DEBUG) 

197 return ret 

198 

199 return run 

200 

201 

202def add_hooks(): 

203 """Sets up hooks.""" 

204 sanitizer_log('Starting', LOG_INFO) 

205 os.system = add_hook(os.system, 

206 pre_exec_hook=command_injection.hook_pre_exec_os_system) 

207 subprocess.Popen = add_hook( 

208 subprocess.Popen, 

209 pre_exec_hook=command_injection.hook_pre_exec_subprocess_Popen) 

210 

211 __builtins__['eval'] = add_hook( 

212 __builtins__['eval'], pre_exec_hook=command_injection.hook_pre_exec_eval) 

213 

214 re.compile = add_hook(re.compile, 

215 pre_exec_hook=redos.hook_pre_exec_re_compile, 

216 post_exec_hook=redos.hook_post_exec_re_compile) 

217 

218 # Hack to determine if yaml is elligible, because pkg_resources does 

219 # not seem to work from pyinstaller. 

220 # pylint: disable=import-outside-toplevel 

221 if is_module_present('yaml'): 

222 import yaml 

223 sanitizer_log('Hooking pyyaml.load', LOG_DEBUG) 

224 yaml.load = add_hook( 

225 yaml.load, 

226 pre_exec_hook=yaml_deserialization.hook_pre_exec_pyyaml_load, 

227 )