1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Base classes for all tests.
23
24 Rekall Memory Forensics tests have these goals:
25
26 - Detect regression bugs with previous versions of Rekall Memory Forensics.
27
28 - Once differences are detected, make it easier to understand why the
29 differences arise.
30
31 - Sometime the differences are acceptable, in that case, there need to be a way
32 to declare the allowed differences in the tests.
33
34 - What makes this more complicated is that the same test needs to be applied to
35 multiple images in a consistent way without necessarily making any code
36 changes to the test itself.
37
38
39 Solution Outline
40 ----------------
41
42 A baseline of running each test is written by the test suite itself. The user
43 can write a baseline for all the modules by issuing the make_suite binary. The
44 baseline for each module will generate a number of files in a given directory.
45
46 When test is run, the baseline files are loaded and copared with present output
47 in a specific way.
48
49 """
50 import hashlib
51 import logging
52 import subprocess
53 import pdb
54 import os
55 import shutil
56 import sys
57 import tempfile
58 import time
59 import threading
60 import unittest
61
62 from rekall import config
63 from rekall import plugin
64 from rekall import session as rekall_session
65 from rekall_lib import registry
66
67
68 -class Tail(threading.Thread):
69 """Tail a file and write to stdout."""
70
71 - def __init__(self, filename, *args, **kwargs):
76
78 self.running = True
79 super(Tail, self).start()
80
82 if self.running:
83 self.running = False
84 self.join()
85
87 while self.running:
88 time.sleep(0.1)
89
90 while True:
91 data = self.fd.read(1000)
92 if not data:
93 break
94
95 sys.stdout.write(data)
96 sys.stdout.flush()
97
100 """Base class for all rekall unit tests."""
101 __metaclass__ = registry.MetaclassRegistry
102 __abstract = True
103
104
105
106
107
108 PARAMETERS = {
109
110 "commandline": "",
111 }
112
113 PLUGIN = None
114
115 disabled = False
116
117 temp_directory = None
118
119 @classmethod
122
123 @classmethod
131
132 - def __init__(self, method_name="__init__", baseline=None, current=None,
133 debug=False, temp_directory=None, config_options=None):
142
143 @classmethod
148
149 @classmethod
154
157
159 """Launches the rekall executable with the config specified.
160
161 Returns:
162 A baseline data structure which contains meta data from running
163 rekall over the test case.
164 """
165 config_options = config_options.copy()
166 tmp_filename = os.path.join(self.temp_directory,
167 "." + self.__class__.__name__)
168 error_filename = tmp_filename + ".stderr"
169
170 baseline_commandline = config_options.get("commandline")
171 config_options["tempdir"] = self.temp_directory
172
173
174 if not baseline_commandline:
175 return {}
176
177
178 try:
179 baseline_commandline = baseline_commandline % config_options
180 except KeyError as e:
181 logging.critical(
182 "Test %s requires parameter %s to be set in config file. (%s)",
183 config_options["test_class"], e, baseline_commandline)
184 return {}
185
186 if baseline_commandline:
187 baseline_commandline = "- %s" % baseline_commandline
188 for k, v in config_options.items():
189
190 if k.startswith("-"):
191
192 if v is True:
193 baseline_commandline = "%s %s" % (
194 k, baseline_commandline)
195
196 elif isinstance(v, list):
197 baseline_commandline = "%s %s %s" % (
198 k, " ".join("'%s'" % x for x in v),
199 baseline_commandline)
200
201 else:
202 baseline_commandline = "%s '%s' %s" % (
203 k, v, baseline_commandline)
204
205 cmdline = (config_options["executable"] + " -v " +
206 baseline_commandline)
207 logging.debug("%s: Launching %s", self.__class__.__name__, cmdline)
208
209
210
211 os.environ["PYTHONUNBUFFERED"] = "1"
212
213 with open(tmp_filename, "wb", buffering=1) as output_fd:
214 with open(error_filename, "wb", buffering=1) as error_fd:
215 stdout_copier = Tail(tmp_filename)
216 stderr_copier = Tail(error_filename)
217
218
219
220 if config_options.get("debug"):
221 stderr_copier.start()
222 stdout_copier.start()
223
224 pipe = subprocess.Popen(cmdline, shell=True,
225 stdout=output_fd, stderr=error_fd)
226
227 pipe.wait()
228
229
230
231 output_fd.flush()
232 error_fd.flush()
233 stdout_copier.stop()
234 stderr_copier.stop()
235
236 output = open(tmp_filename).read(10 * 1024 * 1024)
237 output = output.decode("utf8", "ignore")
238
239 error = open(error_filename).read(10 * 1024 * 1024)
240 error = error.decode("utf8", "ignore")
241
242 baseline_data = dict(output=output.splitlines(),
243 logging=error.splitlines(),
244 return_code=pipe.returncode,
245 executed_command=cmdline)
246
247 return baseline_data
248
249 else:
250
251 config_options["aborted"] = True
252
253 return {}
254
257
259 if config_options is None:
260 config_options = self.config_options or {}
261
262 user_session = rekall_session.InteractiveSession()
263 with user_session.state as state:
264 config.MergeConfigOptions(state, user_session)
265 for k, v in config_options.items():
266 if k.startswith("--"):
267 state.Set(k[2:], v)
268
269 return user_session
270
272 a = list(a)
273 b = list(b)
274 self.assertEqual(len(a), len(b))
275
276 for x, y in zip(a, b):
277 self.assertEqual(x, y)
278
280 return "%s %s" % (self.__class__.__name__, self._testMethodName)
281
282 - def run(self, result=None):
283 if result is None:
284 result = self.defaultTestResult()
285
286 result.startTest(self)
287 testMethod = getattr(self, self._testMethodName)
288 try:
289 try:
290 self.setUp()
291 except KeyboardInterrupt:
292 raise
293 except Exception:
294 if self.debug:
295 pdb.post_mortem()
296
297 result.addError(self, sys.exc_info())
298 return
299
300 ok = False
301 try:
302 testMethod()
303 ok = True
304 except self.failureException:
305 if self.debug:
306 pdb.post_mortem()
307
308 result.addFailure(self, sys.exc_info())
309 except KeyboardInterrupt:
310 raise
311 except Exception:
312 if self.debug:
313 pdb.post_mortem()
314
315 result.addError(self, sys.exc_info())
316
317 try:
318 self.tearDown()
319 except KeyboardInterrupt:
320 raise
321 except Exception:
322 if self.debug:
323 pdb.post_mortem()
324
325 result.addError(self, sys.exc_info())
326 ok = False
327 if ok:
328 result.addSuccess(self)
329 finally:
330 result.stopTest(self)
331
332
333 -class SimpleTestCase(plugin.ModeBasedActiveMixin,
334 RekallBaseUnitTestCase):
335 """A simple test which just compares with the baseline output."""
336
337 __abstract = True
338
339 @classmethod
350
352 previous = sorted(self.baseline['output'])
353 current = sorted(self.current['output'])
354
355
356 self.assertListEqual(previous, current)
357
360
361
362 script = "print 1"
363
373
376 """All test cases are sorted now."""
377
380
381 __abstract = True
382
384 previous = self.baseline['output']
385 current = self.current['output']
386
387
388 self.assertEqual(previous, current)
389
392 """Disable a test."""
393 disabled = True
394
395 @classmethod
398
401 """A test comparing the hashes of all the files dumped in the tempdir."""
402
421
423 self.assertEqual(self.baseline['hashes'], self.current['hashes'])
424
426 """Disable a test if the condition is true."""
428 self.conditional = conditional
429 self.original_run = None
430
432 self.original_run = cls.run
433
434 def run(instance, *args, **kwargs):
435 condition = self.conditional
436 if callable(condition):
437 condition = condition()
438
439
440 if condition:
441 return self.original_run(instance, *args, **kwargs)
442
443
444 cls.run = run
445 return cls
446
447
448 main = unittest.main
449