Package rekall :: Package plugins :: Module yarascanner
[frames] | no frames]

Source Code for Module rekall.plugins.yarascanner

  1  # Rekall Memory Forensics 
  2  # Copyright (c) 2012, Michael Cohen <scudette@gmail.com> 
  3  # Copyright (c) 2010, 2011, 2012 Michael Ligh <michael.ligh@mnin.org> 
  4  # Copyright 2013 Google Inc. All Rights Reserved. 
  5  # 
  6  # This program is free software; you can redistribute it and/or modify 
  7  # it under the terms of the GNU General Public License as published by 
  8  # the Free Software Foundation; either version 2 of the License, or (at 
  9  # your option) any later version. 
 10  # 
 11  # This program is distributed in the hope that it will be useful, but 
 12  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 14  # General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU General Public License 
 17  # along with this program; if not, write to the Free Software 
 18  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 19  # 
 20   
 21   
 22  """A Rekall Memory Forensics scanner which uses yara.""" 
 23  import yara 
 24   
 25  from rekall import scan 
 26  from rekall import testlib 
 27  from rekall import plugin 
 28   
 29  from rekall.plugins.common import pfn 
 30  from rekall_lib import utils 
 31   
 32   
33 -class YaraScanMixin(object):
34 """A common implementation of yara scanner. 35 36 This should be mixed with the OS specific Scanner (e.g. WinScanner) and 37 plugin.TypedProfileCommand. 38 """ 39 40 name = "yarascan" 41 42 table_header = [ 43 dict(name="Owner", width=20), 44 dict(name="Rule", width=10), 45 dict(name="Match", hidden=True), 46 dict(name="Offset", style="address"), 47 dict(name="hexdump", hex_width=16, width=67), 48 dict(name="run", hidden=True), 49 dict(name="address_space", hidden=True), 50 dict(name="Context"), 51 ] 52 53 __args = [ 54 dict(name="hits", default=10, type="IntParser", 55 help="Quit after finding this many hits."), 56 57 dict(name="string", default=None, 58 help="A verbatim string to search for."), 59 60 dict(name="binary_string", default=None, 61 help="A binary string (encoded as hex) to search " 62 "for. e.g. 000102[1-200]0506"), 63 64 dict(name="yara_file", default=None, 65 help="The yara signature file to read."), 66 67 dict(name="yara_expression", default=None, 68 help="If provided we scan for this yara " 69 "expression."), 70 71 dict(name="context", default=0x40, type="IntParser", 72 help="Context to print after the hit."), 73 74 dict(name="pre_context", default=0, type="IntParser", 75 help="Context to print before the hit."), 76 ] 77 78 scanner_defaults = dict( 79 scan_physical=True 80 ) 81
82 - def __init__(self, *args, **kwargs):
83 """Scan using yara signatures.""" 84 super(YaraScanMixin, self).__init__(*args, **kwargs) 85 86 # Compile the yara rules in advance. 87 if self.plugin_args.yara_expression: 88 self.rules_source = self.plugin_args.yara_expression 89 self.rules = yara.compile(source=self.rules_source) 90 91 elif self.plugin_args.binary_string: 92 self.compile_rule( 93 'rule r1 {strings: $a = {%s} condition: $a}' % 94 self.plugin_args.binary_string) 95 96 elif self.plugin_args.string: 97 self.compile_rule( 98 'rule r1 {strings: $a = "%s" condition: $a}' % 99 self.plugin_args.string) 100 101 elif self.plugin_args.yara_file: 102 self.compile_rule(open(self.plugin_args.yara_file).read()) 103 104 elif not self.ignore_required: 105 raise plugin.PluginError("You must specify a yara rule file or " 106 "string to match.")
107
108 - def compile_rule(self, rule):
109 self.rules_source = rule 110 try: 111 self.rules = yara.compile(source=rule) 112 except Exception as e: 113 raise plugin.PluginError( 114 "Failed to compile yara expression: %s" % e)
115
116 - def generate_hits(self, run):
117 for buffer_as in scan.BufferASGenerator( 118 self.session, run.address_space, run.start, run.end): 119 self.session.report_progress( 120 "Scanning buffer %#x->%#x (length %#x)", 121 buffer_as.base_offset, buffer_as.end(), 122 buffer_as.end() - buffer_as.base_offset) 123 124 for match in self.rules.match(data=buffer_as.data): 125 for buffer_offset, name, value in match.strings: 126 hit_offset = buffer_offset + buffer_as.base_offset 127 yield match, hit_offset
128
129 - def collect(self):
130 """Render output.""" 131 count = 0 132 for run in self.generate_memory_ranges(): 133 for match, address in self.generate_hits(run): 134 count += 1 135 if count >= self.plugin_args.hits: 136 break 137 138 # Result hit the physical memory - Get some context on this hit. 139 if run.data.get("type") == "PhysicalAS": 140 symbol = pfn.PhysicalAddressContext(self.session, address) 141 else: 142 symbol = utils.FormattedAddress( 143 self.session.address_resolver, address, 144 max_distance=2**64) 145 146 yield dict( 147 Owner=run.data.get("task") or run.data.get("type"), 148 Match=match, 149 Rule=match.rule, 150 Offset=address, 151 hexdump=utils.HexDumpedString( 152 run.address_space.read( 153 address - self.plugin_args.pre_context, 154 self.plugin_args.context + 155 self.plugin_args.pre_context)), 156 Context=symbol, 157 # Provide the address space where the hit is reported. 158 address_space=run.address_space, 159 run=run)
160 161
162 -class SimpleYaraScan(YaraScanMixin, plugin.TypedProfileCommand, 163 plugin.PhysicalASMixin, plugin.ProfileCommand):
164 """A Simple plugin which only yarascans the physical Address Space. 165 166 This plugin should not trigger profile autodetection and therefore should be 167 usable on any file at all. 168 """ 169 170 name = "simple_yarascan" 171 __args = [ 172 plugin.CommandOption("start", default=0, type="IntParser", 173 help="Start searching from this offset."), 174 175 plugin.CommandOption("limit", default=2**64, type="IntParser", 176 help="The length of data to search."), 177 ] 178 179 table_header = [ 180 dict(name="Rule", width=10), 181 dict(name="Match", hidden=True), 182 dict(name="Offset", style="address"), 183 dict(name="hexdump", hex_width=16, width=67), 184 ] 185 186 PROFILE_REQUIRED = False 187
188 - def collect(self):
189 """Render output.""" 190 count = 0 191 address_space = self.session.physical_address_space 192 for buffer_as in scan.BufferASGenerator( 193 self.session, address_space, 194 self.plugin_args.start, 195 self.plugin_args.start + self.plugin_args.limit): 196 self.session.report_progress( 197 "Scanning buffer %#x->%#x (%#x)", 198 buffer_as.base_offset, buffer_as.end(), 199 buffer_as.end() - buffer_as.base_offset) 200 201 for match in self.rules.match(data=buffer_as.data): 202 for buffer_offset, _, _ in match.strings: 203 hit_offset = buffer_offset + buffer_as.base_offset 204 count += 1 205 if count >= self.plugin_args.hits: 206 break 207 208 yield dict( 209 Match=match, 210 Rule=match.rule, 211 Offset=hit_offset, 212 hexdump=utils.HexDumpedString( 213 self.session.physical_address_space.read( 214 hit_offset - self.plugin_args.pre_context, 215 self.plugin_args.context + 216 self.plugin_args.pre_context)))
217 218
219 -class TestYara(testlib.SimpleTestCase):
220 """Test the yarascan module.""" 221 222 PARAMETERS = dict(commandline="yarascan --string %(string)s --hits 10")
223