Package rekall :: Package plugins :: Package common :: Module pas2kas
[frames] | no frames]

Source Code for Module rekall.plugins.common.pas2kas

  1  # Rekall Memory Forensics 
  2  # 
  3  # Copyright 2013 Google Inc. All Rights Reserved. 
  4  # 
  5  # Authors: 
  6  # Michael Cohen <scudette@users.sourceforge.net> 
  7  # 
  8  # This program is free software; you can redistribute it and/or modify 
  9  # it under the terms of the GNU General Public License as published by 
 10  # the Free Software Foundation; either version 2 of the License, or (at 
 11  # your option) any later version. 
 12  # 
 13  # This program is distributed in the hope that it will be useful, but 
 14  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 16  # General Public License for more details. 
 17  # 
 18  # You should have received a copy of the GNU General Public License 
 19  # along with this program; if not, write to the Free Software 
 20  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 21  # 
 22   
 23  # pylint: disable=protected-access 
 24   
 25  import bisect 
 26   
 27  from rekall import testlib 
 28  from rekall.ui import json_renderer 
 29   
 30   
31 -class Pas2VasResolver(object):
32 """An object which resolves physical addresses to virtual addresses."""
33 - def __init__(self, session):
34 self.session = session 35 self.dirty = True 36 37 # Maintains some maps to ensure fast lookups. 38 self.dtb2task = {} 39 self.dtb2maps = {} 40 self.dtb2userspace = {} 41 42 # Add the kernel. 43 self.dtb2task[self.session.GetParameter("dtb")] = "Kernel" 44 45 pslist = self.session.plugins.pslist() 46 for task in pslist.filter_processes(): 47 task_dtb = task.dtb 48 if task_dtb != None: 49 self.dtb2task[task_dtb] = task.obj_offset
50
52 return 2**64-1
53
54 - def GetTaskStruct(self, address):
55 """Returns the task struct for the address. 56 57 Should be overridden by OS specific implementations. 58 """ 59 return address
60
61 - def PA2VA_for_DTB(self, physical_address, dtb, userspace=None):
62 if dtb == None: 63 return None, None 64 65 # Choose the userspace mode automatically. 66 if userspace is None: 67 userspace = dtb != self.session.kernel_address_space.dtb 68 69 # Build a map for this dtb. 70 lookup_map = self.dtb2maps.get(dtb) 71 72 # If we want the full resolution and the previous cached version was for 73 # userspace only, discard this lookup map and rebuild it. 74 if not userspace and self.dtb2userspace.get(dtb): 75 lookup_map = None 76 77 if lookup_map is None: 78 lookup_map = self.dtb2maps[dtb] = self.build_address_map( 79 dtb, userspace=userspace) 80 self.dtb2userspace[dtb] = userspace 81 82 if lookup_map: 83 if physical_address > lookup_map[0][0]: 84 # This efficiently finds the entry in the map just below the 85 # physical_address. 86 lookup_pa, length, lookup_va = lookup_map[ 87 bisect.bisect( 88 lookup_map, (physical_address, 2**64, 0, 0))-1] 89 90 if (lookup_pa <= physical_address and 91 lookup_pa + length > physical_address): 92 # Yield the pid and the virtual offset 93 task = self.dtb2task.get(dtb) 94 if task is not None: 95 task = self.GetTaskStruct(task) 96 else: 97 task = "Kernel" 98 99 return lookup_va + physical_address - lookup_pa, task 100 101 return None, None
102
103 - def build_address_map(self, dtb, userspace=True):
104 """Given the virtual_address_space, build the address map.""" 105 # This lookup map is sorted by the physical address. We then use 106 # bisect to efficiently look up the physical page. 107 tmp_lookup_map = [] 108 self.dirty = True 109 110 if dtb != None: 111 address_space = self.session.kernel_address_space.__class__( 112 base=self.session.physical_address_space, 113 session=self.session, 114 dtb=dtb) 115 116 highest_virtual_address = self.session.GetParameter( 117 "highest_usermode_address") 118 119 for run in address_space.get_mappings(): 120 # Only consider userspace addresses for processes. 121 if userspace and run.start > highest_virtual_address: 122 break 123 124 tmp_lookup_map.append((run.file_offset, run.length, run.start)) 125 self.session.report_progress( 126 "Enumerating memory for dtb %#x (%#x)", dtb, run.start) 127 128 # Now sort the map and return it. 129 tmp_lookup_map.sort() 130 131 return tmp_lookup_map
132 133
134 -class Pas2VasMixin(object):
135 """Resolves a physical address to a virtual addrress in a process.""" 136 137 name = "pas2vas" 138 139 __args = [ 140 dict(name="offsets", type="ArrayIntParser", 141 help="A list of physical offsets to resolve."), 142 ] 143
144 - def get_virtual_address(self, physical_address, tasks=None):
145 resolver = self.session.GetParameter("physical_address_resolver") 146 147 if tasks is None: 148 tasks = list(self.filter_processes()) 149 150 # First try the kernel. 151 virtual_address, _ = resolver.PA2VA_for_DTB( 152 physical_address, dtb=self.session.kernel_address_space.dtb, 153 userspace=False) 154 155 if virtual_address: 156 yield virtual_address, "Kernel" 157 158 # Find which process owns it. 159 for task in tasks: 160 virtual_offset, task = resolver.PA2VA_for_DTB( 161 physical_address, task.dtb, userspace=True) 162 if virtual_offset is not None: 163 yield virtual_offset, task
164
165 - def render(self, renderer):
166 renderer.table_header([('Physical', 'virtual_offset', '[addrpad]'), 167 ('Virtual', 'physical_offset', '[addrpad]'), 168 ('Pid', 'pid', '>6'), 169 ('Name', 'name', '')]) 170 171 tasks = list(self.filter_processes()) 172 for physical_address in self.plugin_args.offsets: 173 for virtual_address, task in self.get_virtual_address( 174 physical_address, tasks): 175 if task is 'Kernel': 176 renderer.table_row(physical_address, virtual_address, 177 0, 'Kernel') 178 else: 179 renderer.table_row( 180 physical_address, virtual_address, 181 task.pid, task.name)
182 183
184 -class Pas2VasResolverJsonObjectRenderer(json_renderer.StateBasedObjectRenderer):
185 """Encode and decode the pas2vas maps efficiently.""" 186 187 renders_type = "Pas2VasResolver" 188
189 - def EncodeToJsonSafe(self, item, **_):
190 result = {} 191 result["dtb2task"] = item.dtb2task 192 result["dtb2maps"] = item.dtb2maps 193 result["dtb2userspace"] = item.dtb2userspace 194 result["mro"] = ":".join(self.get_mro(item)) 195 196 return result
197
198 - def DecodeFromJsonSafe(self, value, _):
199 # Get the original class to instantiate the required item. 200 cls = self.GetImplementationFromMRO(Pas2VasResolver, value) 201 result = cls(session=self.session) 202 203 for attr in ["dtb2maps", "dtb2userspace", "dtb2task"]: 204 if attr in value: 205 setattr(result, attr, value[attr]) 206 207 result.dirty = False 208 return result
209 210 211
212 -class TestPas2Vas(testlib.SimpleTestCase):
213 PARAMETERS = dict( 214 commandline="pas2vas --offsets %(offset)s - %(pids)s ", 215 pid=0, 216 )
217