Package rekall :: Package plugins :: Package tools :: Module json_test
[frames] | no frames]

Source Code for Module rekall.plugins.tools.json_test

  1  # Rekall Memory Forensics 
  2  # Copyright 2014 Google Inc. All Rights Reserved. 
  3  # 
  4  # Author: Michael Cohen scudette@google.com 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or (at 
  9  # your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 19  # 
 20   
 21  """Tests for json encoding/decoding.""" 
 22  import json 
 23  import logging 
 24  import StringIO 
 25   
 26  from rekall import testlib 
 27  from rekall.ui import json_renderer 
 28  from rekall.plugins.renderers import data_export 
 29  from rekall_lib import utils 
 30   
 31   
32 -class JsonTest(testlib.RekallBaseUnitTestCase):
33 """Test the Json encode/decoder.""" 34 PLUGIN = "json_render" 35
36 - def setUp(self):
37 self.session = self.MakeUserSession() 38 self.renderer = json_renderer.JsonRenderer(session=self.session) 39 self.encoder = self.renderer.encoder 40 self.decoder = self.renderer.decoder
41
42 - def testEncoderCache(self):
43 # Make the string long enough so that parts of it are garbage 44 # collected. If the encoded uses id() to deduplicate it will fail since 45 # id() might reuse across GCed objects. 46 test_string = ("this_is_a_very_long_sentence" * 10) 47 parts = [test_string[x:x+16] for x in xrange( 48 0, len(test_string), 16)] 49 with data_export.DataExportRenderer( 50 session=self.session, 51 output=StringIO.StringIO()).start() as renderer: 52 utils.WriteHexdump(renderer, test_string) 53 rows = [] 54 for row in renderer.data: 55 if row[0] == "r": 56 rows.append(row[1]["data"]) 57 58 self.assertEqual(rows, parts)
59 60
61 - def testObjectRenderer(self):
62 cases = [ 63 ('\xff\xff\x00\x00', {'mro': u'str:basestring:object', 64 'b64': u'//8AAA=='}), 65 66 ("hello", u'hello'), # A string is converted into unicode if 67 # possible. 68 69 (1, 1), # Ints are already JSON serializable. 70 (dict(foo=2), {'foo': 2}), 71 (set([1, 2, 3]), {'mro': u'set:object', 'data': [1, 2, 3]}), 72 ([1, 2, 3], [1, 2, 3]), 73 74 ([1, "\xff\xff\x00\x00", 3], [1, {'mro': u'str:basestring:object', 75 'b64': u'//8AAA=='}, 3]), 76 77 ] 78 79 for case in cases: 80 encoded = self.encoder.Encode(case[0]) 81 self.assertEqual(encoded, case[1])
82
83 - def testProperSerialization(self):
84 """Test that serializing simple python objects with json works. 85 86 NOTE: Json is not intrinsically a fully functional serialization format 87 - it is unable to serialize many common python primitives (e.g. strings, 88 dicts with numeric keys etc). This tests that our wrapping around the 89 json format allows the correct serialization of python primitives. 90 """ 91 for case in [ 92 [1, 2], 93 [1, "hello"], 94 ["1", "2"], 95 ["hello", u'Gr\xfcetzi'], 96 "hello", 97 u'Gr\xfcetzi', 98 dict(a="hello"), 99 dict(b=dict(a="hello")), # Nested dict. 100 ]: 101 data = self.encoder.Encode(case) 102 logging.debug("%s->%s" % (case, data)) 103 104 # Make sure the data is JSON serializable. 105 self.assertEqual(data, json.loads(json.dumps(data))) 106 self.assertEqual(case, self.decoder.Decode(data))
107
108 - def testObjectSerization(self):
109 """Serialize _EPROCESS objects. 110 111 We check that the deserialized object is an exact replica of the 112 original - this includes the same address spaces, profile and offset. 113 114 Having the objects identical allows us to dereference object members 115 seamlessly. 116 """ 117 for task in self.session.plugins.pslist().filter_processes(): 118 data = self.encoder.Encode(task) 119 logging.debug("%r->%s" % (task, data)) 120 121 # Make sure the data is JSON serializable. 122 self.assertEqual(data, json.loads(json.dumps(data))) 123 124 decoded_task = self.decoder.Decode(data) 125 126 self.assertEqual(task.obj_offset, decoded_task.obj_offset) 127 self.assertEqual(task.obj_name, decoded_task.obj_name) 128 self.assertEqual(task.obj_vm.name, decoded_task.obj_vm.name) 129 130 # Check the process name is the same - this tests subfield 131 # dereferencing. 132 self.assertEqual(task.name, decoded_task.name) 133 self.assertEqual(task.pid, decoded_task.pid)
134
136 for vtype in self.session.profile.vtypes: 137 obj = self.session.profile.Object(vtype) 138 self.CheckObjectSerization(obj) 139 140 if self.session.profile != None: 141 self.CheckObjectSerization(self.session.profile) 142 self.CheckObjectSerization(self.session.kernel_address_space) 143 self.CheckObjectSerization(self.session.physical_address_space) 144 145 # Some native types. 146 self.CheckObjectSerization(set([1, 2, 3])) 147 self.CheckObjectSerization(dict(a=1, b=dict(a=1)))
148
149 - def CheckObjectSerization(self, obj):
150 json_renderer_obj = json_renderer.JsonRenderer(session=self.session) 151 data_export_renderer_obj = data_export.DataExportRenderer( 152 session=self.session) 153 154 # First test json encodings. 155 encoded = json_renderer_obj.encode(obj) 156 157 # Make sure it is json safe. 158 json.dumps(encoded) 159 160 # Now decode it. 161 decoded = json_renderer_obj.decode(encoded) 162 self.assertEqual(decoded, obj) 163 164 # Now check the DataExportRenderer. 165 encoded = data_export_renderer_obj.encode(obj) 166 167 # Make sure it is json safe. 168 json.dumps(encoded)
169 170 # Data Export is not decodable. 171