Package rekall :: Package plugins :: Package tools :: Module disassembler
[frames] | no frames]

Source Code for Module rekall.plugins.tools.disassembler

  1  # Rekall Memory Forensics 
  2  # Copyright (C) 2012 Michael Cohen <scudette@gmail.com> 
  3  # Copyright 2013 Google Inc. All Rights Reserved. 
  4  # 
  5  # This program is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU General Public License as published by 
  7  # the Free Software Foundation; either version 2 of the License, or (at 
  8  # your option) any later version. 
  9  # 
 10  # This program is distributed in the hope that it will be useful, but 
 11  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
 13  # General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU General Public License 
 16  # along with this program; if not, write to the Free Software 
 17  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 18  # 
 19   
 20  """ 
 21  Provides the primitives needed to disassemble code using capstone. 
 22  """ 
 23   
 24  import binascii 
 25  import capstone 
 26  import re 
 27  import struct 
 28   
 29  from capstone import x86_const 
 30  from rekall import addrspace 
 31  from rekall import plugin 
 32  from rekall import obj 
 33  from rekall import testlib 
 34  from rekall_lib import utils 
35 36 37 -class Disassembler(object):
38 __abstract = True 39
40 - def __init__(self, mode, session=None, address_space=None):
41 self.mode = mode 42 self.session = session 43 self.address_space = ( 44 address_space or 45 addrspace.BaseAddressSpace.classes["DummyAddressSpace"]( 46 session=session))
47
48 - def disassemble(self, data, offset):
49 """ Starts disassembly of data """
50
51 - def is_return(self):
52 return False
53
54 - def is_branch(self):
55 return False
56
57 - def target(self):
58 return None
59
60 61 -class Instruction(object):
62 """A Decoded instruction.""" 63 __abstract = True
64
65 66 -class CapstoneInstruction(Instruction):
67 """A capstone decoded instruction.""" 68 69 # We need to build reverse maps to properly interpret capston 70 # instructions. 71 INSTRUCTIONS = {} 72 REGISTERS = {} 73 OP = {} 74 75 @classmethod
76 - def _init_class(cls):
77 for constant in dir(x86_const): 78 components = constant.split("_") 79 value = getattr(x86_const, constant) 80 if components[0] == "X86": 81 if components[1] == "INS": 82 cls.INSTRUCTIONS[value] = components[2] 83 elif components[1] == "REG": 84 cls.REGISTERS[value] = components[2] 85 elif components[1] == "OP": 86 cls.OP[value] = components[2] 87 88 cls.REGISTERS[0] = None
89
90 - def __init__(self, insn, session=None, address_space=None):
91 self.address_space = address_space 92 self.insn = insn 93 self.address = insn.address 94 self.size = insn.size 95 self.mnemonic = insn.mnemonic 96 self._comment = "" 97 self._operands = None # Cache the operands. 98 self.session = session 99 self.resolver = session.address_resolver 100 if not self.REGISTERS: 101 self._init_class()
102 103 @utils.safe_property
104 - def operands(self):
105 if self._operands is not None: 106 return self._operands 107 108 result = [] 109 # For invalid instructions there are no operands 110 if self.insn.id == 0: 111 return result 112 113 for op in self.insn.operands: 114 operand = dict(type=self.OP[op.type], size=op.size) 115 if operand["type"] == "REG": 116 operand["reg"] = self.REGISTERS[op.reg] 117 118 elif operand["type"] == "MEM": 119 # This is of the form: [base_reg + disp + index_reg * scale] 120 mem = op.mem 121 operand["base"] = self.REGISTERS[mem.base] 122 operand["disp"] = mem.disp 123 operand["index"] = self.REGISTERS[mem.index] 124 operand["scale"] = mem.scale 125 126 if operand["base"] == "RIP": 127 target = self.insn.address + mem.disp + self.insn.size 128 operand["address"] = target 129 operand["target"] = self._read_target(target, operand) 130 131 self._comment = self.format_indirect(target, op.size) 132 133 # Simple indirect address. 134 if not operand["base"] and not operand["index"]: 135 operand["address"] = mem.disp 136 operand["target"] = self._read_target(mem.disp, operand) 137 self._comment = self.format_indirect(mem.disp, op.size) 138 139 elif operand["type"] == "IMM": 140 operand["target"] = operand["address"] = op.imm.real 141 self._comment = ", ".join(self.resolver.format_address( 142 op.imm.real)) 143 144 result.append(operand) 145 146 # Cache for next time. 147 self._operands = result 148 return result
149
150 - def _read_target(self, target, operand):
151 data = self.address_space.read(target, operand["size"]) 152 if operand["size"] == 8: 153 return struct.unpack("<Q", data)[0] 154 155 if operand["size"] == 4: 156 return struct.unpack("<I", data)[0]
157
158 - def GetCanonical(self):
159 """Returns the canonical model of the instruction.""" 160 result = dict(mnemonic=self.INSTRUCTIONS[self.insn.id], 161 str="%s %s" % (self.insn.mnemonic, self.insn.op_str), 162 operands=self.operands) 163 164 result["comment"] = self._comment 165 return result
166 167 @utils.safe_property
168 - def comment(self):
169 return self.GetCanonical()["comment"]
170 171 @utils.safe_property
172 - def op_str(self):
173 return self.GetCanonical()["str"]
174 175 @utils.safe_property
176 - def text(self):
177 canonical = self.GetCanonical() 178 if canonical["comment"]: 179 return "%s (%s)" % (canonical["str"], canonical["comment"]) 180 return canonical["str"]
181 182 @utils.safe_property
183 - def hexbytes(self):
184 return unicode(binascii.hexlify(self.insn.bytes))
185
186 - def format_indirect(self, operand, size):
187 if size == 1: 188 type = "byte" 189 elif size == 2: 190 type = "unsigned short" 191 elif size == 4: 192 type = "unsigned int" 193 else: 194 type = "address" 195 196 target = self.session.profile.Object( 197 type, offset=operand, vm=self.address_space).v() 198 199 target_name = ", ".join(self.resolver.format_address(target)) 200 operand_name = ", ".join(self.resolver.format_address(operand)) 201 202 if target_name: 203 return "0x%x %s -> %s" % (target, operand_name, target_name) 204 else: 205 return "0x%x %s" % (target, operand_name)
206
207 - def is_return(self):
208 return self.mnemonic.startswith("ret")
209 210 # https://en.wikibooks.org/wiki/X86_Assembly/Control_Flow
211 - def is_branch(self):
212 """Is this instruction a branch? 213 214 e.g. JNE JE JG JLE JL JGE JMP JA JAE JB JBE JO JNO JZ JNZ JS JNS 215 """ 216 return self.mnemonic.startswith("j")
217 218 @utils.safe_property
219 - def target(self):
220 if self.mnemonic[0] == "j": 221 operand = self.operands[0] 222 if operand["type"] in ("IMM", "MEM"): 223 return operand.get("address")
224 225 # We can not determine the target of REG jumps without the 226 # registers. 227
228 - def match_rule(self, rule, context):
229 """Match the rule against this instruction.""" 230 # Speed optimization. Most of the time the rule matches the mnemonic. 231 mnemonic = rule.get("mnemonic") 232 if mnemonic and mnemonic != self.INSTRUCTIONS[self.insn.id]: 233 return False 234 235 return self._MatchRule(rule, self.GetCanonical(), context)
236
237 - def _MatchRule(self, rule, instruction, context):
238 if isinstance(rule, dict): 239 for k, v in rule.iteritems(): 240 expected = instruction.get(k) 241 if not self._MatchRule(v, expected, context): 242 return False 243 return True 244 245 if isinstance(rule, (list, tuple)): 246 for subrule, subinst in zip(rule, instruction): 247 if subrule and not self._MatchRule(subrule, subinst, context): 248 return False 249 250 return True 251 252 if isinstance(rule, basestring): 253 # Rules starting with $ are capture variables. 254 if rule[0] == "$": 255 context[rule] = instruction 256 return True 257 258 # Rules starting with ~ are regular expressions. 259 if isinstance(instruction, basestring) and rule[0] == "~": 260 return re.match(rule[1:], instruction) 261 262 return rule == instruction
263
264 265 -class Capstone(Disassembler):
266 - def __init__(self, mode, **kwargs):
267 super(Capstone, self).__init__(mode, **kwargs) 268 269 if self.mode == "I386": 270 self.cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32) 271 elif self.mode == "AMD64": 272 self.cs = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64) 273 elif self.mode == "MIPS": 274 self.cs = capstone.Cs(capstone.CS_ARCH_MIPS, capstone.CS_MODE_32 + 275 capstone.CS_MODE_BIG_ENDIAN) 276 # This is not really supported yet. 277 elif self.mode == "ARM": 278 self.cs = capstone.Cs(capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM) 279 else: 280 raise NotImplementedError( 281 "No disassembler available for this arch.") 282 283 self.cs.detail = True 284 self.cs.skipdata_setup = ("db", None, None) 285 self.cs.skipdata = True
286
287 - def disassemble(self, data, offset):
288 for insn in self.cs.disasm(data, int(offset)): 289 yield CapstoneInstruction(insn, session=self.session, 290 address_space=self.address_space)
291
292 293 -class Disassemble(plugin.TypedProfileCommand, plugin.Command):
294 """Disassemble the given offset.""" 295 296 __name = "dis" 297 298 __args = [ 299 dict(name="offset", type="SymbolAddress", positional=True, 300 help="An offset to disassemble. This can also be the name of " 301 "a symbol with an optional offset. For example: " 302 "tcpip!TcpCovetNetBufferList."), 303 304 dict(name="address_space", type="AddressSpace", 305 help="The address space to use."), 306 307 dict(name="length", type="IntParser", 308 help="The number of instructions (lines) to disassemble."), 309 310 dict(name="end", type="IntParser", 311 help="The end address to disassemble up to."), 312 313 dict(name="mode", default=None, 314 choices=["I386", "AMD64", "MIPS"], type="Choices", 315 help="Disassemble Mode (AMD64 or I386). Defaults to 'auto'."), 316 317 dict(name="branch", default=False, type="Boolean", 318 help="If set we follow all branches to cover all code."), 319 320 dict(name="canonical", default=False, type="Boolean", 321 help="If set emit canonical instructions. These can be used to " 322 "develop signatures."), 323 ] 324 325 table_header = [ 326 dict(type="TreeNode", name="address", 327 width=20, child=dict(style="address")), 328 dict(name="rel", style="address", width=5), 329 dict(name="opcode", width=20), 330 dict(name="instruction", width=40), 331 dict(name="comment"), 332 ] 333
334 - def __init__(self, *args, **kwargs):
335 super(Disassemble, self).__init__(*args, **kwargs) 336 337 # If length is not specified only disassemble one pager of output. 338 self.length = self.plugin_args.length 339 if self.length is None: 340 self.length = self.session.GetParameter("paging_limit", 50) 341 342 # If end is specified, keep going until we hit the end. 343 if self.plugin_args.end is not None: 344 self.length = 2**62 345 346 # If we are doing branch analysis we can not suspend this plugin. We 347 # must do everything all the time. 348 if self.plugin_args.branch: 349 self.length = 2**62 350 351 # All the visited addresses (for branch analysis). 352 self._visited = set() 353 354 self.offset = self.plugin_args.offset
355
356 - def disassemble(self, offset, depth=0):
357 """Disassemble the number of instructions required. 358 359 Yields: 360 A tuple of (Address, Opcode, Instructions). 361 """ 362 # Disassemble the data one page at the time. 363 func = Function(offset=offset, vm=self.plugin_args.address_space, 364 session=self.session, mode=self.plugin_args.mode) 365 366 for instruction in func.disassemble(self.length): 367 offset = instruction.address 368 369 if offset in self._visited: 370 return 371 372 # Exit condition can be specified by length. 373 if (self.length is not None and 374 len(self._visited) > self.length): 375 return 376 377 # Exit condition can be specified by end address. 378 if self.plugin_args.end and offset > self.plugin_args.end: 379 return 380 381 # Yield this data. 382 yield depth, instruction 383 384 # If the user asked for full branch analysis we follow all 385 # branches. This gives us full code coverage for a function - we 386 # just disassemble until the function exists from all branches. 387 if self.plugin_args.branch: 388 self._visited.add(offset) 389 390 # A return stops this branch. 391 if instruction.is_return(): 392 return 393 394 target = instruction.target 395 if target: 396 # Start disassembling the branch. When the branch is 397 # exhausted we resume disassembling the continued 398 # branch. 399 for x in self.disassemble(target, depth=depth+1): 400 yield x 401 402 # A JMP stops disassembling this branch. This happens with 403 # tail end optimization where a JMP would meet a RET which 404 # unwinds past the JMP. 405 if instruction.mnemonic.startswith("jmp"): 406 return
407
408 - def render_canonical(self, renderer):
409 """Renders a canonical description of each instruction. 410 411 Canonical descriptions are machine readable representations of the 412 instruction which can be used to write disassembler signatures. 413 """ 414 # If length nor end are specified only disassemble one pager output. 415 if self.plugin_args.end is None and self.plugin_args.length is None: 416 self.length = self.session.GetParameter("paging_limit") - 5 417 418 renderer.table_header([ 419 ('Instruction', "instruction", ''), 420 ], suppress_headers=True) 421 422 for _, instruction in self.disassemble(self.offset): 423 renderer.table_row(instruction.GetCanonical())
424
425 - def render(self, renderer, **options):
426 """Disassemble code at a given address. 427 428 Disassembles code starting at address for a number of bytes 429 given by the length parameter (default: 128). 430 431 Note: This feature requires capstone, available at 432 http://www.capstone-engine.org/ 433 434 The mode is '32bit' or '64bit'. If not supplied, the disassembler 435 mode is taken from the profile. 436 """ 437 if self.plugin_args.canonical: 438 return self.render_canonical(renderer, **options) 439 440 return super(Disassemble, self).render(renderer, **options)
441
442 - def collect(self):
443 self._visited.clear() 444 445 offset = None 446 for depth, instruction in self.disassemble(self.offset): 447 offset = instruction.address 448 449 relative = None 450 resolver = self.session.address_resolver 451 if resolver: 452 (f_offset, f_names) = resolver.get_nearest_constant_by_address( 453 offset) 454 455 f_name = ", ".join(f_names) 456 self.session.report_progress( 457 "Disassembled %s: 0x%x", f_name, offset) 458 459 if offset - f_offset == 0: 460 yield dict( 461 address="------ %s ------\n" % f_name, 462 annotation=True) 463 464 if offset - f_offset < 0x1000: 465 relative = offset - f_offset 466 467 yield dict(address=instruction.address, 468 rel=relative, 469 opcode=instruction.hexbytes, 470 instruction=instruction.op_str, 471 comment=instruction.comment, depth=depth) 472 473 # Continue from where we left off when the user calls us again with the 474 # v() plugin. 475 self.offset = offset
476
477 478 -class TestDisassemble(testlib.SimpleTestCase):
479 PARAMETERS = dict( 480 # We want to test symbol discovery via export table detection so turn it 481 # on. 482 commandline=("dis --length %(length)s %(func)s " 483 "--name_resolution_strategies Export"), 484 func=0x805031be, 485 length=20 486 )
487
488 489 -class Function(obj.BaseAddressComparisonMixIn, obj.BaseObject):
490 """A base object representing code snippets.""" 491
492 - def __init__(self, mode=None, args=None, **kwargs):
493 super(Function, self).__init__(**kwargs) 494 self.args = args 495 if mode is None: 496 mode = self.obj_context.get("mode") 497 498 if mode is None: 499 # Autodetect disassembling mode 500 highest_usermode_address = self.obj_session.GetParameter( 501 "highest_usermode_address") 502 503 # We are disassembling user space. 504 if self.obj_offset < highest_usermode_address: 505 mode = self.obj_session.GetParameter( 506 "process_context").address_mode 507 508 # fall back to the kernel's mode. 509 if not mode: 510 mode = self.obj_session.profile.metadata("arch") or "I386" 511 512 self.dis = Capstone(mode, address_space=self.obj_vm, 513 session=self.obj_session) 514 self.mode = mode
515
516 - def __int__(self):
517 return self.obj_offset
518
519 - def __hash__(self):
520 return self.obj_offset + hash(self.obj_vm)
521
522 - def __unicode__(self):
523 if self.mode == "AMD64": 524 format_string = "%0#14x %s" 525 else: 526 format_string = "%0#10x %s" 527 528 result = [] 529 for instruction in self.disassemble(): 530 result.append(format_string % ( 531 instruction.address, instruction.text)) 532 533 return "\n".join(result)
534
535 - def __iter__(self):
536 return iter(self.disassemble())
537
538 - def __getitem__(self, item):
539 for i, x in enumerate(self.disassemble()): 540 if i == item: 541 return x
542
543 - def Rewind(self, length=0, align=True):
544 """Returns another function which starts before this function. 545 546 If align is specified, we increase the length repeatedly until the 547 new function disassebles exactly to the same offset of this 548 function. 549 """ 550 while 1: 551 offset = self.obj_offset - length 552 result = self.obj_profile.Function(vm=self.obj_vm, offset=offset) 553 if not align: 554 return result 555 556 for instruction in result.disassemble(instructions=length): 557 # An exact match. 558 if instruction.address == self.obj_offset: 559 return result 560 561 # We overshot ourselves, try again. 562 if instruction.address > self.obj_offset: 563 length += 1 564 break
565
566 - def disassemble(self, instructions=10):
567 """Generate some instructions.""" 568 count = 0 569 buffer_offset = offset = self.obj_offset 570 while 1: 571 # By default read 2 pages. 572 data = self.obj_vm.read(buffer_offset, 0x2000) 573 574 for instruction in self.dis.disassemble(data, buffer_offset): 575 offset = instruction.address 576 577 # If we disassemble past one page, we read another two 578 # pages. This guarantees that we have enough data for full 579 # instructions. 580 if offset - buffer_offset > 0x1000: 581 buffer_offset = offset 582 break 583 584 yield instruction 585 count += 1 586 587 if count > instructions: 588 return 589 590 buffer_offset = offset
591 592 593 # Register the Function class in all profiles. 594 obj.Profile.COMMON_CLASSES["Function"] = Function 595