Package rekall :: Module testlib
[frames] | no frames]

Source Code for Module rekall.testlib

  1  # Rekall Memory Forensics 
  2  # 
  3  # Copyright 2013 Google Inc. All Rights Reserved. 
  4  # 
  5  # Authors: 
  6  # Michael Cohen <scudette@users.sourceforge.net> 
  7  # 
  8  # This program is free software; you can redistribute it and/or modify 
  9  # it under the terms of the GNU General Public License as published by 
 10  # the Free Software Foundation; either version 2 of the License, or (at 
 11  # your option) any later version. 
 12  # 
 13  # This program is distributed in the hope that it will be useful, but 
 14  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 16  # General Public License for more details. 
 17  # 
 18  # You should have received a copy of the GNU General Public License 
 19  # along with this program; if not, write to the Free Software 
 20  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 21  # 
 22  """Base classes for all tests. 
 23   
 24  Rekall Memory Forensics tests have these goals: 
 25   
 26  - Detect regression bugs with previous versions of Rekall Memory Forensics. 
 27   
 28  - Once differences are detected, make it easier to understand why the 
 29    differences arise. 
 30   
 31  - Sometime the differences are acceptable, in that case, there need to be a way 
 32    to declare the allowed differences in the tests. 
 33   
 34  - What makes this more complicated is that the same test needs to be applied to 
 35    multiple images in a consistent way without necessarily making any code 
 36    changes to the test itself. 
 37   
 38   
 39  Solution Outline 
 40  ---------------- 
 41   
 42  A baseline of running each test is written by the test suite itself. The user 
 43  can write a baseline for all the modules by issuing the make_suite binary. The 
 44  baseline for each module will generate a number of files in a given directory. 
 45   
 46  When test is run, the baseline files are loaded and copared with present output 
 47  in a specific way. 
 48   
 49  """ 
 50  import hashlib 
 51  import logging 
 52  import subprocess 
 53  import pdb 
 54  import os 
 55  import shutil 
 56  import sys 
 57  import tempfile 
 58  import time 
 59  import threading 
 60  import unittest 
 61   
 62  from rekall import config 
 63  from rekall import plugin 
 64  from rekall import session as rekall_session 
 65  from rekall_lib import registry 
66 67 68 -class Tail(threading.Thread):
69 """Tail a file and write to stdout.""" 70
71 - def __init__(self, filename, *args, **kwargs):
72 super(Tail, self).__init__(*args, **kwargs) 73 self.fd = open(filename, "rb") 74 self.running = False 75 self.daemon = True
76
77 - def start(self):
78 self.running = True 79 super(Tail, self).start()
80
81 - def stop(self):
82 if self.running: 83 self.running = False 84 self.join()
85
86 - def run(self):
87 while self.running: 88 time.sleep(0.1) 89 90 while True: 91 data = self.fd.read(1000) 92 if not data: 93 break 94 95 sys.stdout.write(data) 96 sys.stdout.flush()
97
98 99 -class RekallBaseUnitTestCase(unittest.TestCase):
100 """Base class for all rekall unit tests.""" 101 __metaclass__ = registry.MetaclassRegistry 102 __abstract = True 103 104 # The parameters to run this test with. These parameters are written to the 105 # config file when creating a new blank template. Users can edit the config 106 # file to influence how the test is run. 107 108 PARAMETERS = { 109 # This is the command line which is used to run the test. 110 "commandline": "", 111 } 112 113 PLUGIN = None 114 115 disabled = False 116 117 temp_directory = None 118 119 @classmethod
120 - def is_active(cls, _):
121 return True
122 123 @classmethod
124 - def CommandName(cls):
125 if cls.PLUGIN: 126 return cls.PLUGIN 127 128 name = cls.PARAMETERS.get("commandline", "").split() 129 if name: 130 return name[0]
131
132 - def __init__(self, method_name="__init__", baseline=None, current=None, 133 debug=False, temp_directory=None, config_options=None):
134 super(RekallBaseUnitTestCase, self).__init__(method_name) 135 self.baseline = baseline 136 self.config_options = config_options 137 self.current = current 138 self.debug = debug 139 if temp_directory: 140 self.temp_directory = temp_directory 141 super(RekallBaseUnitTestCase, self).__init__(method_name)
142 143 @classmethod
144 - def setUpClass(cls):
145 super(RekallBaseUnitTestCase, cls).setUpClass() 146 if cls.temp_directory is None: 147 cls.temp_directory = tempfile.mkdtemp()
148 149 @classmethod
150 - def tearDownClass(cls):
151 super(RekallBaseUnitTestCase, cls).tearDownClass() 152 if cls.temp_directory: 153 shutil.rmtree(cls.temp_directory, True)
154
155 - def setUp(self):
156 self.session = self.MakeUserSession()
157
158 - def LaunchExecutable(self, config_options):
159 """Launches the rekall executable with the config specified. 160 161 Returns: 162 A baseline data structure which contains meta data from running 163 rekall over the test case. 164 """ 165 config_options = config_options.copy() 166 tmp_filename = os.path.join(self.temp_directory, 167 "." + self.__class__.__name__) 168 error_filename = tmp_filename + ".stderr" 169 170 baseline_commandline = config_options.get("commandline") 171 config_options["tempdir"] = self.temp_directory 172 173 # Nothing to do here. 174 if not baseline_commandline: 175 return {} 176 177 # The command line is specified in the test's PARAMETERS dict. 178 try: 179 baseline_commandline = baseline_commandline % config_options 180 except KeyError as e: 181 logging.critical( 182 "Test %s requires parameter %s to be set in config file. (%s)", 183 config_options["test_class"], e, baseline_commandline) 184 return {} 185 186 if baseline_commandline: 187 baseline_commandline = "- %s" % baseline_commandline 188 for k, v in config_options.items(): 189 # prepend all global options to the command line. 190 if k.startswith("-"): 191 # This is a boolean flag. 192 if v is True: 193 baseline_commandline = "%s %s" % ( 194 k, baseline_commandline) 195 196 elif isinstance(v, list): 197 baseline_commandline = "%s %s %s" % ( 198 k, " ".join("'%s'" % x for x in v), 199 baseline_commandline) 200 201 else: 202 baseline_commandline = "%s '%s' %s" % ( 203 k, v, baseline_commandline) 204 205 cmdline = (config_options["executable"] + " -v " + 206 baseline_commandline) 207 logging.debug("%s: Launching %s", self.__class__.__name__, cmdline) 208 209 # Make sure the subprocess does not buffer so we can catch its 210 # output in a timely manner. 211 os.environ["PYTHONUNBUFFERED"] = "1" 212 213 with open(tmp_filename, "wb", buffering=1) as output_fd: 214 with open(error_filename, "wb", buffering=1) as error_fd: 215 stdout_copier = Tail(tmp_filename) 216 stderr_copier = Tail(error_filename) 217 # Specifying --debug should allow the subprocess to print 218 # messages directly to stdout. This is useful in order to 219 # attach a breakpoint (e.g. in an inline test). 220 if config_options.get("debug"): 221 stderr_copier.start() 222 stdout_copier.start() 223 224 pipe = subprocess.Popen(cmdline, shell=True, 225 stdout=output_fd, stderr=error_fd) 226 227 pipe.wait() 228 229 # Done running the command, now prepare the json baseline 230 # file. 231 output_fd.flush() 232 error_fd.flush() 233 stdout_copier.stop() 234 stderr_copier.stop() 235 236 output = open(tmp_filename).read(10 * 1024 * 1024) 237 output = output.decode("utf8", "ignore") 238 239 error = open(error_filename).read(10 * 1024 * 1024) 240 error = error.decode("utf8", "ignore") 241 242 baseline_data = dict(output=output.splitlines(), 243 logging=error.splitlines(), 244 return_code=pipe.returncode, 245 executed_command=cmdline) 246 247 return baseline_data 248 249 else: 250 # No valid command line - this baseline is aborted. 251 config_options["aborted"] = True 252 253 return {}
254
255 - def BuildBaselineData(self, config_options):
256 return self.LaunchExecutable(config_options)
257
258 - def MakeUserSession(self, config_options=None):
259 if config_options is None: 260 config_options = self.config_options or {} 261 262 user_session = rekall_session.InteractiveSession() 263 with user_session.state as state: 264 config.MergeConfigOptions(state, user_session) 265 for k, v in config_options.items(): 266 if k.startswith("--"): 267 state.Set(k[2:], v) 268 269 return user_session
270
271 - def assertListEqual(self, a, b, msg=None):
272 a = list(a) 273 b = list(b) 274 self.assertEqual(len(a), len(b)) 275 276 for x, y in zip(a, b): 277 self.assertEqual(x, y)
278
279 - def __unicode__(self):
280 return "%s %s" % (self.__class__.__name__, self._testMethodName)
281
282 - def run(self, result=None):
283 if result is None: 284 result = self.defaultTestResult() 285 286 result.startTest(self) 287 testMethod = getattr(self, self._testMethodName) 288 try: 289 try: 290 self.setUp() 291 except KeyboardInterrupt: 292 raise 293 except Exception: 294 if self.debug: 295 pdb.post_mortem() 296 297 result.addError(self, sys.exc_info()) 298 return 299 300 ok = False 301 try: 302 testMethod() 303 ok = True 304 except self.failureException: 305 if self.debug: 306 pdb.post_mortem() 307 308 result.addFailure(self, sys.exc_info()) 309 except KeyboardInterrupt: 310 raise 311 except Exception: 312 if self.debug: 313 pdb.post_mortem() 314 315 result.addError(self, sys.exc_info()) 316 317 try: 318 self.tearDown() 319 except KeyboardInterrupt: 320 raise 321 except Exception: 322 if self.debug: 323 pdb.post_mortem() 324 325 result.addError(self, sys.exc_info()) 326 ok = False 327 if ok: 328 result.addSuccess(self) 329 finally: 330 result.stopTest(self)
331
332 333 -class SimpleTestCase(plugin.ModeBasedActiveMixin, 334 RekallBaseUnitTestCase):
335 """A simple test which just compares with the baseline output.""" 336 337 __abstract = True 338 339 @classmethod
340 - def is_active(cls, session):
341 if not super(SimpleTestCase, cls).is_active(session): 342 return False 343 344 delegate_plugin = ( 345 plugin.Command.ImplementationByClass(cls.PLUGIN) or 346 getattr(session.plugins, cls.CommandName() or "", None)) 347 348 if delegate_plugin: 349 return delegate_plugin.is_active(session)
350
351 - def testCase(self):
352 previous = sorted(self.baseline['output']) 353 current = sorted(self.current['output']) 354 355 # Compare the entire table 356 self.assertListEqual(previous, current)
357
358 359 -class InlineTest(SimpleTestCase):
360 # Override this to make this script run in the interactive shell in a 361 # subprocess. 362 script = "print 1" 363
364 - def LaunchExecutable(self, config_options):
365 tmp_script = os.path.join(self.temp_directory, "script.py") 366 with open(tmp_script, "w") as fd: 367 fd.write("tempdir = %r\n" % self.temp_directory) 368 fd.write(self.script) 369 370 config_options["commandline"] = "run --run %s" % tmp_script 371 config_options["script"] = self.script.splitlines() 372 return super(InlineTest, self).LaunchExecutable(config_options)
373
374 375 -class SortedComparison(SimpleTestCase):
376 """All test cases are sorted now."""
377
378 379 -class UnSortedComparison(SimpleTestCase):
380 381 __abstract = True 382
383 - def testCase(self):
384 previous = self.baseline['output'] 385 current = self.current['output'] 386 387 # Compare the entire table 388 self.assertEqual(previous, current)
389
390 391 -class DisabledTest(RekallBaseUnitTestCase):
392 """Disable a test.""" 393 disabled = True 394 395 @classmethod
396 - def is_active(cls, _):
397 return False
398
399 400 -class HashChecker(SimpleTestCase):
401 """A test comparing the hashes of all the files dumped in the tempdir.""" 402
403 - def BuildBaselineData(self, config_options):
404 """We need to calculate the hash of the image we produce.""" 405 baseline = super(HashChecker, self).BuildBaselineData(config_options) 406 baseline['hashes'] = {} 407 for filename in os.listdir(self.temp_directory): 408 if not filename.startswith("."): 409 with open(os.path.join(self.temp_directory, filename)) as fd: 410 md5 = hashlib.md5() 411 while 1: 412 data = fd.read(1024 * 1024) 413 if not data: 414 break 415 416 md5.update(data) 417 418 baseline['hashes'][filename] = md5.hexdigest() 419 420 return baseline
421
422 - def testCase(self):
423 self.assertEqual(self.baseline['hashes'], self.current['hashes'])
424
425 -class disable_if(object):
426 """Disable a test if the condition is true."""
427 - def __init__(self, conditional):
428 self.conditional = conditional 429 self.original_run = None
430
431 - def __call__(self, cls):
432 self.original_run = cls.run 433 434 def run(instance, *args, **kwargs): 435 condition = self.conditional 436 if callable(condition): 437 condition = condition() 438 439 # If the condition is true we skip the run method. 440 if condition: 441 return self.original_run(instance, *args, **kwargs)
442 443 # Wrap the run method with a skipper. 444 cls.run = run 445 return cls
446 447 448 main = unittest.main 449