Package rekall :: Package plugins :: Package common :: Module sigscan
[frames] | no frames]

Source Code for Module rekall.plugins.common.sigscan

  1  # Rekall Memory Forensics 
  2  # Copyright 2015 Google Inc. All Rights Reserved. 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify 
  5  # it under the terms of the GNU General Public License as published by 
  6  # the Free Software Foundation; either version 2 of the License, or (at 
  7  # your option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, but 
 10  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 12  # General Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License 
 15  # along with this program; if not, write to the Free Software 
 16  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 17  # 
 18   
 19  __author__ = "Andreas Moser <grrrrrrrrr@surfsup.at>" 
 20   
 21   
 22  import re 
 23   
 24  from rekall import plugin 
 25  from rekall import scan 
 26  from rekall import testlib 
27 28 29 -class SignatureScannerCheck(scan.ScannerCheck):
30 """A scanner that searches for a signature. 31 32 The signature is given as a list of strings and this scanner checks that 33 each part of the signature is present in memory in ascending order. 34 """ 35
36 - def __init__(self, needles=None, **kwargs):
37 """Init. 38 39 Args: 40 needles: A list of strings we search for. 41 **kwargs: passthrough. 42 Raises: 43 RuntimeError: No needles provided. 44 """ 45 super(SignatureScannerCheck, self).__init__(**kwargs) 46 47 # It is an error to not provide something to search for. 48 if not needles: 49 raise RuntimeError("No needles provided to search.") 50 51 self.needles = needles 52 self.current_needle = 0
53
54 - def check(self, buffer_as, offset):
55 if self.current_needle >= len(self.needles): 56 # We have found all parts already. 57 return False 58 59 # Just check the buffer without needing to copy it on slice. 60 buffer_offset = buffer_as.get_buffer_offset(offset) 61 next_part = self.needles[self.current_needle] 62 if buffer_as.data.startswith(next_part, buffer_offset): 63 self.current_needle += 1 64 return next_part 65 else: 66 return False
67
68 - def skip(self, buffer_as, offset):
69 if self.current_needle >= len(self.needles): 70 # We have found all parts already, just skip the whole buffer. 71 return buffer_as.end() - offset 72 73 # Search the rest of the buffer for the needle. 74 buffer_offset = buffer_as.get_buffer_offset(offset) 75 next_part = self.needles[self.current_needle] 76 correction = 0 77 if self.current_needle: 78 # If this is not the very first hit we need to increase the offset 79 # or we might report identical parts only once. 80 correction = len(self.needles[self.current_needle - 1]) 81 dindex = buffer_as.data.find(next_part, buffer_offset + correction) 82 if dindex > -1: 83 return dindex - buffer_offset 84 85 # Skip entire region. 86 return buffer_as.end() - offset
87
88 89 -class SignatureScanner(scan.BaseScanner):
90
91 - def __init__(self, needles=None, **kwargs):
92 super(SignatureScanner, self).__init__(**kwargs) 93 self.needles = needles 94 95 self.check = SignatureScannerCheck( 96 profile=self.profile, address_space=self.address_space, 97 needles=self.needles)
98
99 - def check_addr(self, offset, buffer_as=None):
100 # Ask the check if this offset is possible. 101 val = self.check.check(buffer_as, offset) 102 if val: 103 return offset, val
104
105 - def skip(self, buffer_as, offset):
106 return self.check.skip(buffer_as, offset)
107
108 - def scan(self, **kwargs):
109 for hit in super(SignatureScanner, self).scan(**kwargs): 110 yield hit 111 112 # If a single hit is found, we are done. 113 if self.check.current_needle >= len(self.check.needles): 114 break
115
116 117 -class SigScanMixIn(object):
118 """Scan memory for signatures.""" 119 120 name = "sigscan" 121 122 @classmethod
123 - def args(cls, parser):
124 super(SigScanMixIn, cls).args(parser) 125 parser.add_argument("signature", default=None, nargs="*", 126 help="The signature(s) to scan for. Format is " 127 "000102*0506*AAFF") 128 129 parser.add_argument( 130 "--scan_physical", default=False, type="Boolean", 131 help="If specified we scan the physcial address space.") 132 133 parser.add_argument( 134 "--scan_kernel", default=False, type="Boolean", 135 help="If specified we scan the kernel address space.")
136
137 - def __init__(self, signature=None, scan_kernel=False, scan_physical=False, 138 **kwargs):
139 """Scan using custom signatures.""" 140 super(SigScanMixIn, self).__init__(**kwargs) 141 # If nothing is specified just scan the physical address space. 142 if not self.filtering_requested and not scan_kernel: 143 scan_physical = True 144 145 if not signature: 146 raise plugin.PluginError("No signature given.") 147 sig_re = re.compile("^[0-9A-F*]*$") 148 149 if isinstance(signature, basestring): 150 signature = [signature] 151 152 self.signatures = [] 153 for sig in signature: 154 sig = sig.upper() 155 if not sig_re.match(sig): 156 raise plugin.PluginError( 157 "Signature %s has invalid format. Format is eg. " 158 "000102*0506*AAFF" % sig) 159 parts = sig.split("*") 160 decoded_parts = [] 161 for p in parts: 162 try: 163 decoded_parts.append(p.decode("hex")) 164 except TypeError: 165 raise plugin.PluginError( 166 "Signature %s has invalid format." % sig) 167 self.signatures.append(decoded_parts) 168 self.scan_physical = scan_physical 169 self.scan_kernel = scan_kernel
170
171 - def render(self, renderer):
172 """Render output.""" 173 174 if self.scan_physical: 175 self.render_physical_scan(renderer) 176 if self.scan_kernel: 177 self.render_kernel_scan(renderer) 178 if self.filtering_requested: 179 for task in self.filter_processes(): 180 self.render_task_scan(renderer, task)
181
182 - def generate_hits(self, address_space, end=2**64):
183 for sig in self.signatures: 184 scanner = SignatureScanner( 185 session=self.session, profile=self.profile, needles=sig, 186 address_space=address_space) 187 188 results = list(scanner.scan(maxlen=end)) 189 if len(results) == len(sig): 190 yield results
191
192 - def _scan(self, renderer, hit_msg, address_space, end=2**64):
193 for hit in self.generate_hits(address_space, end=end): 194 renderer.format(hit_msg) 195 196 # A hit is a list of pairs (offset, signature part). 197 renderer.table_header([("Offset", "offset", "[addrpad]"), 198 ("Matching part", "part", "")]) 199 for offset, part in hit: 200 renderer.table_row(offset, part.encode("hex"))
201
202 - def render_physical_scan(self, renderer):
203 """This method scans the physical memory.""" 204 self.session.logging.debug("sigscanning against physical memory: %s.", 205 self.physical_address_space) 206 return self._scan(renderer, "Hit in physical AS:\n", 207 self.physical_address_space)
208
209 - def render_kernel_scan(self, renderer):
210 """This method scans the kernel memory.""" 211 self.session.logging.debug("sigscanning against the kernel.") 212 return self._scan(renderer, "Hit in kernel AS:\n", 213 self.kernel_address_space)
214
215 - def render_task_scan(self, renderer, task):
216 """This method scans the AS of a single task.""" 217 self.session.logging.debug("sigscanning task %s", task.name) 218 return self._scan( 219 renderer, "Hit in task %s (%s):\n" % (task.name, task.pid), 220 task.get_process_address_space(), 221 end=self.session.GetParameter("highest_usermode_address"))
222
223 224 -class TestSigScanPhysical(testlib.SimpleTestCase):
225 """Runs sigscan against physical memory.""" 226 227 PARAMETERS = dict( 228 commandline="sigscan --signature %(signature)s --scan_physical", 229 signature="")
230
231 232 -class TestSigScanKernel(testlib.SimpleTestCase):
233 """Runs sigscan against the kernel.""" 234 235 PARAMETERS = dict( 236 commandline="sigscan --signature %(signature)s --scan_kernel", 237 signature="")
238
239 240 -class TestSigScanProcess(testlib.SimpleTestCase):
241 """Runs sigscan against processes.""" 242 243 PARAMETERS = dict( 244 commandline=("sigscan --signature %(signature)s " 245 "--proc_regex %(proc_regex)s"), 246 signature="", 247 proc_regex=".")
248