Package rekall :: Package plugins :: Module core
[frames] | no frames]

Source Code for Module rekall.plugins.core

   1  # Rekall Memory Forensics 
   2  # Copyright (C) 2012 Michael Cohen 
   3  # Copyright 2013 Google Inc. All Rights Reserved. 
   4  # 
   5  # This program is free software; you can redistribute it and/or modify 
   6  # it under the terms of the GNU General Public License as published by 
   7  # the Free Software Foundation; either version 2 of the License, or (at 
   8  # your option) any later version. 
   9  # 
  10  # This program is distributed in the hope that it will be useful, but 
  11  # WITHOUT ANY WARRANTY; without even the implied warranty of 
  12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 
  13  # General Public License for more details. 
  14  # 
  15  # You should have received a copy of the GNU General Public License 
  16  # along with this program; if not, write to the Free Software 
  17  # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
  18  # 
  19   
  20  """This module implements core plugins.""" 
  21   
  22  __author__ = "Michael Cohen <scudette@gmail.com>" 
  23   
  24  import exceptions 
  25  import inspect 
  26  import pdb 
  27  import math 
  28  import re 
  29  import os 
  30  import textwrap 
  31   
  32  from rekall import addrspace 
  33  from rekall import args 
  34  from rekall import config 
  35  from rekall import constants 
  36  from rekall import plugin 
  37  from rekall import obj 
  38  from rekall import scan 
  39  from rekall import testlib 
  40  from rekall_lib import registry 
  41  from rekall_lib import utils 
42 43 44 -class Info(plugin.Command):
45 """Print information about various subsystems.""" 46 47 __name = "info" 48 49 standard_options = [] 50
51 - def __init__(self, item=None, verbosity=0, **kwargs):
52 """Display information about a plugin. 53 54 Args: 55 item: The plugin class to examine. 56 verbosity: How much information to display. 57 """ 58 super(Info, self).__init__(**kwargs) 59 self.item = item 60 self.verbosity = verbosity
61
62 - def plugins(self):
63 for name, cls in plugin.Command.classes.items(): 64 if name: 65 doc = cls.__doc__ or " " 66 yield name, cls.name, doc.splitlines()[0]
67
68 - def profiles(self):
69 for name, cls in obj.Profile.classes.items(): 70 if self.verbosity == 0 and not cls.metadata("os"): 71 continue 72 73 if name: 74 yield name, cls.__doc__.splitlines()[0].strip()
75
76 - def address_spaces(self):
77 for name, cls in addrspace.BaseAddressSpace.classes.items(): 78 yield dict(name=name, function=cls.name, definition=cls.__module__)
79
80 - def render(self, renderer):
81 if self.item is None: 82 return self.render_general_info(renderer) 83 else: 84 return self.render_item_info(self.item, renderer)
85
86 - def _split_into_paragraphs(self, string, dedent):
87 """Split a string into paragraphs. 88 89 A paragraph is defined as lines of text having the same indentation. An 90 empty new line breaks the paragraph. 91 92 The first line in each paragraph is allowed to be indented more than the 93 second line. 94 """ 95 paragraph = [] 96 last_leading_space = 0 97 first_line_indent = 0 98 99 for line in string.splitlines(): 100 line = line[dedent:] 101 102 m = re.match(r"\s*", line) 103 leading_space = len(m.group(0)) 104 105 text = line[leading_space:] 106 107 # First line is always included. 108 if not paragraph: 109 paragraph = [text] 110 first_line = True 111 first_line_indent = leading_space 112 continue 113 114 if first_line and last_leading_space != leading_space: 115 if text: 116 paragraph.append(text) 117 118 last_leading_space = leading_space 119 first_line = False 120 121 elif leading_space != last_leading_space: 122 if paragraph: 123 yield paragraph, first_line_indent 124 125 paragraph = [] 126 if text: 127 paragraph.append(text) 128 last_leading_space = leading_space 129 first_line_indent = leading_space 130 first_line = True 131 else: 132 if text: 133 paragraph.append(text) 134 135 first_line = False 136 137 if paragraph: 138 yield paragraph, first_line_indent
139
140 - def split_into_paragraphs(self, string, dedent=0, wrap=50):
141 for paragraph, leading_space in self._split_into_paragraphs( 142 string, dedent): 143 paragraph = textwrap.wrap("\n".join(paragraph), wrap) 144 yield "\n".join([(" " * leading_space + x) for x in paragraph])
145
146 - def parse_args_string(self, arg_string):
147 """Parses a standard docstring into args and docs for each arg.""" 148 parameter = None 149 doc = "" 150 151 for line in arg_string.splitlines(): 152 m = re.match(r"\s+([^\s]+):(.+)", line) 153 if m: 154 if parameter: 155 yield parameter, doc 156 157 parameter = m.group(1) 158 doc = m.group(2) 159 else: 160 doc += "\n" + line 161 162 if parameter: 163 yield parameter, doc
164
165 - def get_default_args(self, item=None):
166 if item is None: 167 item = self.item 168 169 metadata = config.CommandMetadata(item) 170 for x, y in metadata.args.items(): 171 # Normalize the option name to use _. 172 x = x.replace("-", "_") 173 174 yield x, self._clean_up_doc(y.get("help", ""))
175
176 - def render_item_info(self, item, renderer):
177 """Render information about the specific item.""" 178 cls_doc = inspect.cleandoc(item.__doc__ or " ") 179 init_doc = inspect.cleandoc( 180 (item.__init__.__doc__ or " ").split("Args:")[0]) 181 182 if isinstance(item, registry.MetaclassRegistry): 183 # show the args it takes. Relies on the docstring to be formatted 184 # properly. 185 doc_string = cls_doc + init_doc 186 doc_string += ( 187 "\n\nLink:\n" 188 "http://www.rekall-forensic.com/epydocs/%s.%s-class.html" 189 "\n\n" % (item.__module__, item.__name__)) 190 191 renderer.write(doc_string) 192 193 renderer.table_header([('Parameter', 'parameter', '30'), 194 ('Documentation', 'doc', '70')]) 195 for parameter, doc in self.get_default_args(item): 196 renderer.table_row(parameter, doc) 197 198 # Add the standard help options. 199 for parameter, descriptor in self.standard_options: 200 renderer.table_row(parameter, self._clean_up_doc(descriptor)) 201 202 else: 203 # For normal objects just write their docstrings. 204 renderer.write(item.__doc__ or " ") 205 206 renderer.write("\n")
207
208 - def _clean_up_doc(self, doc, dedent=0):
209 clean_doc = [] 210 for paragraph in self.split_into_paragraphs( 211 " " * dedent + doc, dedent=dedent, wrap=70): 212 clean_doc.append(paragraph) 213 214 return "\n".join(clean_doc)
215
216 - def render_general_info(self, renderer):
217 renderer.write(constants.BANNER) 218 renderer.section() 219 renderer.table_header([('Command', 'function', "20"), 220 ('Provider Class', 'provider', '20'), 221 ('Docs', 'docs', '50')]) 222 223 for cls, name, doc in sorted(self.plugins(), key=lambda x: x[1]): 224 renderer.table_row(name, cls, doc)
225
226 227 -class TestInfo(testlib.DisabledTest):
228 """Disable the Info test.""" 229 230 PARAMETERS = dict(commandline="info")
231
232 233 -class FindDTB(plugin.PhysicalASMixin, plugin.ProfileCommand):
234 """A base class to be used by all the FindDTB implementation.""" 235 __abstract = True 236
237 - def dtb_hits(self):
238 """Yields hits for the DTB offset.""" 239 return []
240
241 - def VerifyHit(self, hit):
242 """Verify the hit for correctness, yielding an address space.""" 243 return self.CreateAS(hit)
244
245 - def address_space_hits(self):
246 """Finds DTBs and yields virtual address spaces that expose kernel. 247 248 Yields: 249 BaseAddressSpace-derived instances, validated using the VerifyHit() 250 method. 251 """ 252 for hit in self.dtb_hits(): 253 address_space = self.VerifyHit(hit) 254 if address_space is not None: 255 yield address_space
256
257 - def CreateAS(self, dtb):
258 """Creates an address space from this hit.""" 259 address_space_cls = self.GetAddressSpaceImplementation() 260 try: 261 return address_space_cls( 262 base=self.physical_address_space, 263 dtb=dtb, session=self.session, 264 profile=self.profile) 265 except IOError: 266 return None
267
268 - def GetAddressSpaceImplementation(self, simple=False):
269 """Returns the correct address space class for this profile.""" 270 # The virtual address space implementation is chosen by the profile. 271 architecture = self.profile.metadata("arch") 272 if architecture == "AMD64": 273 impl = "AMD64PagedMemory" 274 275 # PAE profiles go with the pae address space. 276 elif architecture == "I386" and self.profile.metadata("pae"): 277 impl = 'IA32PagedMemoryPae' 278 279 elif architecture == "MIPS": 280 impl = "MIPS32PagedMemory" 281 282 else: 283 impl = 'IA32PagedMemory' 284 285 as_class = addrspace.BaseAddressSpace.classes[impl] 286 return as_class
287
288 289 -class LoadAddressSpace(plugin.Command):
290 """Load address spaces into the session if its not already loaded.""" 291 292 __name = "load_as" 293
294 - def __init__(self, pas_spec="auto", **kwargs):
295 """Tries to create the address spaces and assigns them to the session. 296 297 An address space specification is a column delimited list of AS 298 constructors which will be stacked. For example: 299 300 FileAddressSpace:EWF 301 302 if the specification is "auto" we guess by trying every combintion until 303 a virtual AS is obtained. 304 305 The virtual address space is chosen based on the profile. 306 307 Args: 308 pas_spec: A Physical address space specification. 309 """ 310 super(LoadAddressSpace, self).__init__(**kwargs) 311 self.pas_spec = pas_spec
312 313 # Parse Address spaces from this specification. TODO: Support EPT 314 # specification and nesting. 315 ADDRESS_SPACE_RE = re.compile("([a-zA-Z0-9]+)@((0x)?[0-9a-zA-Z]+)") 316
317 - def ResolveAddressSpace(self, name=None):
318 """Resolve the name into an address space. 319 320 This function is intended to be called from plugins which allow an 321 address space to be specified on the command line. We implement a simple 322 way for the user to specify the address space using a string. The 323 following formats are supported: 324 325 Kernel, K : Represents the kernel address space. 326 Physical, P: Represents the physical address space. 327 328 as_type@dtb_address: Instantiates the address space at the specified 329 DTB. For example: amd64@0x18700 330 331 pid@pid_number: Use the process address space for the specified pid. 332 """ 333 if name is None: 334 result = self.session.GetParameter("default_address_space") 335 if result: 336 return result 337 338 name = "K" 339 340 # We can already specify a proper address space here. 341 if isinstance(name, addrspace.BaseAddressSpace): 342 return name 343 344 if name == "K" or name == "Kernel": 345 return (self.session.kernel_address_space or 346 self.GetVirtualAddressSpace()) 347 348 if name == "P" or name == "Physical": 349 return (self.session.physical_address_space or 350 self.GetPhysicalAddressSpace()) 351 352 m = self.ADDRESS_SPACE_RE.match(name) 353 if m: 354 arg = int(m.group(2), 0) 355 if m.group(1) == "pid": 356 for task in self.session.plugins.pslist( 357 pids=arg).filter_processes(): 358 self.session.plugins.cc().SwitchProcessContext(task) 359 return task.get_process_address_space() 360 raise AttributeError("Process pid %s not found" % arg) 361 362 as_cls = addrspace.BaseAddressSpace.classes.get(m.group(1)) 363 if as_cls: 364 return as_cls(session=self.session, dtb=arg, 365 base=self.GetPhysicalAddressSpace()) 366 367 raise AttributeError("Address space specification %r invalid.", name)
368
369 - def GetPhysicalAddressSpace(self):
370 try: 371 # Try to get a physical address space. 372 if self.pas_spec == "auto": 373 self.session.physical_address_space = self.GuessAddressSpace() 374 else: 375 self.session.physical_address_space = self.AddressSpaceFactory( 376 specification=self.pas_spec) 377 378 return self.session.physical_address_space 379 380 except addrspace.ASAssertionError as e: 381 self.session.logging.error("Could not create address space: %s" % e) 382 383 return self.session.physical_address_space
384 385 # TODO: Deprecate this method completely since it is rarely used.
386 - def GetVirtualAddressSpace(self, dtb=None):
387 """Load the Kernel Virtual Address Space. 388 389 Note that this function is usually not used since the Virtual AS is now 390 loaded from guess_profile.ApplyFindDTB() when profiles are guessed. This 391 function is only used when the profile is directly provided by the user. 392 """ 393 if not self.session.physical_address_space: 394 self.GetPhysicalAddressSpace() 395 396 if not self.session.physical_address_space: 397 raise plugin.PluginError("Unable to find physical address space.") 398 399 self.profile = self.session.profile 400 if self.profile == None: 401 raise plugin.PluginError( 402 "Must specify a profile to load virtual AS.") 403 404 # If we know the DTB, just build the address space. 405 # Otherwise, delegate to a find_dtb plugin. 406 if dtb is None: 407 dtb = self.session.GetParameter("dtb") 408 409 find_dtb = self.session.plugins.find_dtb() 410 if find_dtb == None: 411 return find_dtb 412 413 if dtb: 414 self.session.kernel_address_space = find_dtb.CreateAS(dtb) 415 416 else: 417 self.session.logging.debug("DTB not specified. Delegating to " 418 "find_dtb.") 419 for address_space in find_dtb.address_space_hits(): 420 with self.session: 421 self.session.kernel_address_space = address_space 422 self.session.SetCache("dtb", address_space.dtb, 423 volatile=False) 424 break 425 426 if self.session.kernel_address_space is None: 427 self.session.logging.info( 428 "A DTB value was found but failed to verify. " 429 "Some troubleshooting steps to consider: " 430 "(1) Is the profile correct? (2) Is the KASLR correct? " 431 "Try running the find_kaslr plugin on systems that " 432 "use KASLR and see if there are more possible values. " 433 "You can specify which offset to use using " 434 "--vm_kernel_slide. (3) If you know the DTB, for " 435 "example from knowing the value of the CR3 register " 436 "at time of acquisition, you can set it using --dtb. " 437 "On most 64-bit systems, you can use the DTB of any " 438 "process, not just the kernel!") 439 raise plugin.PluginError( 440 "A DTB value was found but failed to verify. " 441 "See logging messages for more information.") 442 443 # Set the default address space for plugins like disassemble and dump. 444 if not self.session.HasParameter("default_address_space"): 445 self.session.SetCache( 446 "default_address_space", self.session.kernel_address_space, 447 volatile=False) 448 449 return self.session.kernel_address_space
450
451 - def GuessAddressSpace(self, base_as=None, **kwargs):
452 """Loads an address space by stacking valid ASes on top of each other 453 (priority order first). 454 """ 455 base_as = base_as 456 error = addrspace.AddrSpaceError() 457 458 address_spaces = sorted(addrspace.BaseAddressSpace.classes.values(), 459 key=lambda x: x.order) 460 461 while 1: 462 self.session.logging.debug("Voting round with base: %s", base_as) 463 found = False 464 for cls in address_spaces: 465 # Only try address spaces which claim to support images. 466 if not cls.metadata("image"): 467 continue 468 469 self.session.logging.debug("Trying %s ", cls) 470 try: 471 base_as = cls(base=base_as, session=self.session, 472 **kwargs) 473 self.session.logging.debug("Succeeded instantiating %s", 474 base_as) 475 found = True 476 break 477 except (AssertionError, 478 addrspace.ASAssertionError) as e: 479 self.session.logging.debug("Failed instantiating %s: %s", 480 cls.__name__, e) 481 error.append_reason(cls.__name__, e) 482 continue 483 except Exception as e: 484 self.session.logging.info("Error: %s", e) 485 if self.session.GetParameter("debug"): 486 pdb.post_mortem() 487 488 raise 489 490 # A full iteration through all the classes without anyone 491 # selecting us means we are done: 492 if not found: 493 break 494 495 if base_as: 496 self.session.logging.info("Autodetected physical address space %s", 497 base_as) 498 499 return base_as
500
501 - def AddressSpaceFactory(self, specification='', **kwargs):
502 """Build the address space from the specification. 503 504 Args: 505 specification: A column separated list of AS class names to be 506 stacked. 507 """ 508 base_as = None 509 for as_name in specification.split(":"): 510 as_cls = addrspace.BaseAddressSpace.classes.get(as_name) 511 if as_cls is None: 512 raise addrspace.Error("No such address space %s" % as_name) 513 514 base_as = as_cls(base=base_as, session=self.session, **kwargs) 515 516 return base_as
517
518 - def render(self, renderer):
519 if not self.session.physical_address_space: 520 self.GetPhysicalAddressSpace() 521 522 if not self.session.kernel_address_space: 523 self.GetVirtualAddressSpace()
524
525 526 -class DirectoryDumperMixin(object):
527 """A mixin for plugins that want to dump files to a directory.""" 528 529 # Set this to False if the dump_dir parameter is mandatory. 530 dump_dir_optional = True 531 default_dump_dir = "." 532 533 __args = [ 534 dict(name="dump_dir", 535 help="Path suitable for dumping files.") 536 ] 537
538 - def __init__(self, *args_, **kwargs):
539 """Dump to a directory. 540 541 Args: 542 dump_dir: The directory where files should be dumped. 543 """ 544 super(DirectoryDumperMixin, self).__init__(*args_, **kwargs) 545 self.dump_dir = (self.plugin_args.dump_dir or 546 self.default_dump_dir or 547 self.session.GetParameter("dump_dir")) 548 549 self.check_dump_dir(self.dump_dir)
550
551 - def check_dump_dir(self, dump_dir=None):
552 # If the dump_dir parameter is not optional insist its there. 553 if not self.dump_dir_optional and not dump_dir: 554 raise plugin.PluginError( 555 "Please specify a dump directory.") 556 557 if dump_dir and not os.path.isdir(dump_dir): 558 raise plugin.PluginError("%s is not a directory" % self.dump_dir)
559
560 - def CopyToFile(self, address_space, start, end, outfd):
561 """Copy a part of the address space to the output file. 562 563 This utility function allows the writing of sparse files correctly. We 564 pass over the address space, automatically skipping regions which are 565 not valid. For file systems which support sparse files (e.g. in Linux), 566 no additional disk space will be used for unmapped regions. 567 568 If a region has no mapped pages, the resulting file will be of 0 bytes 569 long. 570 """ 571 BUFFSIZE = 1024 * 1024 572 573 for run in address_space.get_address_ranges(start=start, end=end): 574 out_offset = run.start - start 575 self.session.report_progress("Dumping %s Mb", out_offset / BUFFSIZE) 576 outfd.seek(out_offset) 577 i = run.start 578 579 # Now copy the region in fixed size buffers. 580 while i < run.end: 581 to_read = min(BUFFSIZE, run.end - i) 582 583 data = address_space.read(i, to_read) 584 outfd.write(data) 585 586 i += to_read
587
588 589 -class Null(plugin.Command):
590 """This plugin does absolutely nothing. 591 592 It is used to measure startup overheads. 593 """ 594 __name = "null" 595
596 - def render(self, renderer):
597 _ = renderer
598
599 600 -class LoadPlugins(plugin.Command):
601 """Load user provided plugins. 602 603 This probably is only useful after the interactive shell started since you 604 can already use the --plugin command line option. 605 """ 606 607 __name = "load_plugin" 608 interactive = True 609
610 - def __init__(self, path, **kwargs):
611 super(LoadPlugins, self).__init__(**kwargs) 612 if isinstance(path, basestring): 613 path = [path] 614 615 args.LoadPlugins(path)
616
617 618 -class Printer(plugin.Command):
619 """A plugin to print an object.""" 620 621 __name = "p" 622 interactive = True 623
624 - def __init__(self, target=None, **kwargs):
625 """Prints an object to the screen.""" 626 super(Printer, self).__init__(**kwargs) 627 self.target = target
628
629 - def render(self, renderer):
630 for line in utils.SmartStr(self.target).splitlines(): 631 renderer.format("{0}\n", line)
632
633 634 -class Lister(Printer):
635 """A plugin to list objects.""" 636 637 __name = "l" 638 interactive = True 639
640 - def render(self, renderer):
641 if self.target is None: 642 self.session.logging.error("You must list something.") 643 return 644 645 for item in self.target: 646 self.session.plugins.p(target=item).render(renderer)
647
648 649 -class DT(plugin.TypedProfileCommand, plugin.ProfileCommand):
650 """Print a struct or other symbol. 651 652 Really just a convenience function for instantiating the object and printing 653 all its members. 654 """ 655 656 __name = "dt" 657 658 __args = [ 659 dict(name="target", positional=True, required=True, 660 help="Name of a struct definition."), 661 662 dict(name="offset", type="IntParser", default=0, 663 required=False, help="Name of a struct definition."), 664 665 dict(name="address_space", type="AddressSpace", 666 help="The address space to use."), 667 668 dict(name="member_offset", type="IntParser", 669 help="If specified we only show the member at this " 670 "offset.") 671 ] 672
673 - def render_Struct(self, renderer, struct):
674 renderer.format( 675 "[{0} {1}] @ {2:addrpad} \n", 676 struct.obj_type, struct.obj_name or '', 677 self.plugin_args.offset or struct.obj_offset) 678 679 end_address = struct.obj_size + struct.obj_offset 680 width = int(math.ceil(math.log(end_address + 1, 16))) 681 renderer.table_header([ 682 dict(name="Offset", type="TreeNode", max_depth=5, 683 child=dict(style="address", width=width+5), 684 align="l"), 685 ("Field", "field", "30"), 686 dict(name="content", style="typed")]) 687 688 self._render_Struct(renderer, struct)
689
690 - def _render_Struct(self, renderer, struct, depth=0):
691 fields = [] 692 # Print all the fields sorted by offset within the struct. 693 for k in set(struct.members).union(struct.callable_members): 694 member = getattr(struct, k) 695 base_member = struct.m(k) 696 697 offset = base_member.obj_offset 698 if offset == None: # NoneObjects screw up sorting order here. 699 offset = -1 700 701 fields.append((offset, k, member)) 702 703 for offset, k, v in sorted(fields): 704 if self.plugin_args.member_offset is not None: 705 if offset == self.plugin_args.member_offset: 706 renderer.table_row(offset, k, v, depth=depth) 707 else: 708 renderer.table_row(offset, k, v, depth=depth) 709 710 if isinstance(v, obj.Struct): 711 self._render_Struct(renderer, v, depth=depth + 1)
712
713 - def render(self, renderer):
714 if isinstance(self.plugin_args.target, basestring): 715 self.plugin_args.target = self.profile.Object( 716 type_name=self.plugin_args.target, 717 offset=self.plugin_args.offset, 718 vm=self.plugin_args.address_space) 719 720 item = self.plugin_args.target 721 722 if isinstance(item, obj.Pointer): 723 item = item.deref() 724 725 if isinstance(item, obj.Struct): 726 return self.render_Struct(renderer, item) 727 728 self.session.plugins.p(self.plugin_args.target).render(renderer)
729
730 731 -class AddressMap(object):
732 """Label memory ranges.""" 733 _COLORS = u"BLACK RED GREEN YELLOW BLUE MAGENTA CYAN WHITE".split() 734 735 # All color combinations except those with the same foreground an background 736 # colors, since these will be invisible. 737 COLORS = [] 738 UNREADABLE = [ 739 ("CYAN", "GREEN"), 740 ("GREEN", "CYAN"), 741 ("MAGENTA", "YELLOW"), 742 ("YELLOW", "MAGENTA"), 743 744 ] 745 for x in _COLORS: 746 for y in _COLORS: 747 if x != y and (x, y) not in UNREADABLE: 748 COLORS.append((x, y)) 749
750 - def __init__(self):
751 self.collection = utils.RangedCollection() 752 self.idx = 0 753 self.label_color_map = {}
754
755 - def AddRange(self, start, end, label, color_index=None):
756 try: 757 fg, bg = self.label_color_map[label] 758 except KeyError: 759 if color_index is None: 760 color_index = self.idx 761 self.idx += 1 762 763 fg, bg = self.COLORS[color_index % len(self.COLORS)] 764 self.label_color_map[label] = (fg, bg) 765 766 self.collection.insert(start, end, (label, fg, bg))
767
768 - def HighlightRange(self, start, end, relative=True):
769 """Returns a highlighting list from start address to end. 770 771 If relative is True the highlighting list is relative to the start 772 offset. 773 """ 774 result = [] 775 for i in range(start, end): 776 _, _, hit = self.collection.get_containing_range(i) 777 if hit: 778 _, fg, bg = hit 779 if relative: 780 i -= start 781 782 result.append([i, i + 1, fg, bg]) 783 784 return result
785
786 - def GetComment(self, start, end):
787 """Returns a tuple of labels and their highlights.""" 788 labels = [] 789 for i in range(start, end): 790 start, end, hit = self.collection.get_containing_range(i) 791 if hit: 792 if hit not in labels: 793 labels.append(hit) 794 795 result = "" 796 highlights = [] 797 for label, fg, bg in labels: 798 highlights.append((len(result), len(result) + len(label), fg, bg)) 799 result += label + ", " 800 801 # Drop the last , 802 if result: 803 result = result[:-2] 804 805 return utils.AttributedString(result, highlights=highlights)
806
807 808 -class Dump(plugin.TypedProfileCommand, plugin.Command):
809 """Hexdump an object or memory location. 810 811 You can use this plugin repeateadely to keep dumping more data using the 812 "p _" (print last result) operation: 813 814 In [2]: dump 0x814b13b0, address_space="K" 815 ------> dump(0x814b13b0, address_space="K") 816 Offset Hex Data 817 ---------- ------------------------------------------------ ---------------- 818 0x814b13b0 03 00 1b 00 00 00 00 00 b8 13 4b 81 b8 13 4b 81 ..........K...K. 819 820 Out[3]: <rekall.plugins.core.Dump at 0x2967510> 821 822 In [4]: p _ 823 ------> p(_) 824 Offset Hex Data 825 ---------- ------------------------------------------------ ---------------- 826 0x814b1440 70 39 00 00 54 1b 01 00 18 0a 00 00 32 59 00 00 p9..T.......2Y.. 827 0x814b1450 6c 3c 01 00 81 0a 00 00 18 0a 00 00 00 b0 0f 06 l<.............. 828 0x814b1460 00 10 3f 05 64 77 ed 81 d4 80 21 82 00 00 00 00 ..?.dw....!..... 829 """ 830 831 __name = "dump" 832 833 __args = [ 834 dict(name="offset", type="SymbolAddress", positional=True, 835 default=0, help="An offset to hexdump."), 836 837 dict(name="address_space", type="AddressSpace", positional=True, 838 required=False, help="The address space to use."), 839 840 dict(name="data", 841 help="Dump this string instead."), 842 843 dict(name="length", type="IntParser", 844 help="Maximum length to dump."), 845 846 dict(name="width", type="IntParser", 847 help="Number of bytes per row"), 848 849 dict(name="rows", type="IntParser", 850 help="Number of bytes per row"), 851 ] 852 853 table_header = [ 854 dict(name="offset", style="address"), 855 dict(name="hexdump", width=65), 856 dict(name="comment", width=40) 857 ] 858
859 - def column_types(self):
860 return dict(offset=int, 861 hexdump=utils.HexDumpedString(""), 862 comment=utils.AttributedString(""))
863
864 - def __init__(self, *args, **kwargs):
865 address_map = kwargs.pop("address_map", None) 866 super(Dump, self).__init__(*args, **kwargs) 867 self.offset = self.plugin_args.offset 868 869 # default width can be set in the session. 870 self.width = (self.plugin_args.width or 871 self.session.GetParameter("hexdump_width", 16)) 872 873 874 self.rows = (self.plugin_args.rows or 875 self.session.GetParameter("paging_limit", 30)) 876 877 self.address_map = address_map or AddressMap() 878 879 if self.plugin_args.data: 880 self.plugin_args.address_space = addrspace.BufferAddressSpace( 881 data=self.plugin_args.data, session=self.session) 882 883 if self.plugin_args.length is None: 884 self.plugin_args.length = len(self.plugin_args.data)
885
886 - def collect(self):
887 to_read = min( 888 self.width * self.rows, 889 self.plugin_args.address_space.end() - self.plugin_args.offset) 890 891 if self.plugin_args.length is not None: 892 to_read = min(to_read, self.plugin_args.length) 893 894 resolver = self.session.address_resolver 895 for offset in range(self.offset, self.offset + to_read): 896 comment = resolver.format_address(offset, max_distance=0) 897 if comment: 898 self.address_map.AddRange(offset, offset + 1, ",".join(comment)) 899 900 offset = self.offset 901 end_of_range = self.offset + to_read 902 for offset in range(self.offset, end_of_range, self.width): 903 end_of_line = min(self.width, end_of_range - offset) 904 # Add a symbol name for the start of each row. 905 hex_data = utils.HexDumpedString( 906 self.plugin_args.address_space.read(offset, end_of_line), 907 highlights=self.address_map.HighlightRange( 908 offset, offset + end_of_line, relative=True)) 909 910 comment = self.address_map.GetComment(offset, offset + self.width) 911 912 yield dict(offset=offset, 913 hexdump=hex_data, 914 comment=comment, 915 nowrap=True, hex_width=self.width) 916 917 # Advance the offset so we can continue from this offset next time we 918 # get called. 919 self.offset = offset
920
921 922 -class Grep(plugin.TypedProfileCommand, plugin.ProfileCommand):
923 """Search an address space for keywords.""" 924 925 __name = "grep" 926 927 PROFILE_REQUIRED = False 928 929 __args = [ 930 dict(name="keyword", type="ArrayString", positional=True, 931 help="The binary strings to find."), 932 933 dict(name="offset", default=0, type="IntParser", 934 help="Start searching from this offset."), 935 936 dict(name="address_space", type="AddressSpace", 937 help="Name of the address_space to search."), 938 939 dict(name="context", default=20, type="IntParser", 940 help="Context to print around the hit."), 941 942 dict(name="limit", default=2**64, 943 help="The length of data to search."), 944 ] 945
946 - def render(self, renderer):
947 scanner = scan.MultiStringScanner( 948 needles=self.plugin_args.keyword, 949 address_space=self.plugin_args.address_space, 950 session=self.session) 951 952 for hit, _ in scanner.scan(offset=self.plugin_args.offset, 953 maxlen=self.plugin_args.limit): 954 hexdumper = self.session.plugins.dump( 955 offset=hit - 16, length=self.plugin_args.context + 16, 956 address_space=self.plugin_args.address_space) 957 958 hexdumper.render(renderer)
959
960 961 -class SetProcessContextMixin(object):
962 """Set the current process context. 963 964 The basic functionality of all platforms' cc plugin. 965 """ 966 967 name = "cc" 968 interactive = True 969 process_context = None 970
971 - def __enter__(self):
972 """Use this plugin as a context manager. 973 974 When used as a context manager we save the state of the address resolver 975 and then restore it on exit. This prevents the address resolver from 976 losing its current state and makes switching contexts much faster. 977 """ 978 self.process_context = self.session.GetParameter("process_context") 979 return self
980
981 - def __exit__(self, unused_type, unused_value, unused_traceback):
982 # Restore the process context. 983 self.SwitchProcessContext(self.process_context)
984
985 - def SwitchProcessContext(self, process=None):
986 if process == None: 987 message = "Switching to Kernel context" 988 self.session.SetCache("default_address_space", 989 self.session.kernel_address_space, 990 volatile=False) 991 992 else: 993 message = ("Switching to process context: {0} " 994 "(Pid {1}@{2:#x})").format( 995 process.name, process.pid, process) 996 997 self.session.SetCache( 998 "default_address_space", 999 process.get_process_address_space() or None, 1000 volatile=False) 1001 1002 # Reset the address resolver for the new context. 1003 self.session.SetCache("process_context", process, volatile=False) 1004 self.session.logging.debug(message) 1005 1006 return message
1007
1008 - def SwitchContext(self):
1009 if not self.filtering_requested: 1010 return self.SwitchProcessContext(process=None) 1011 1012 for process in self.filter_processes(): 1013 return self.SwitchProcessContext(process=process) 1014 1015 return "Process not found!\n"
1016
1017 - def render(self, renderer):
1018 message = self.SwitchContext() 1019 renderer.format(message + "\n")
1020
1021 1022 -def MethodWithAddressSpace(process=None):
1023 """A decorator to do an operation in another address space.""" 1024 def wrap(f): 1025 def wrapped_f(self, *_args, **_kwargs): 1026 with self.session.plugins.cc() as cc: 1027 cc.SwitchProcessContext(process=process) 1028 1029 return f(self, *_args, **_kwargs)
1030 return wrapped_f 1031 1032 return wrap 1033
1034 1035 -class VtoPMixin(object):
1036 """Prints information about the virtual to physical translation.""" 1037 1038 name = "vtop" 1039 1040 PAGE_SIZE = 0x1000 1041 1042 @classmethod
1043 - def args(cls, parser):
1044 super(VtoPMixin, cls).args(parser) 1045 parser.add_argument("virtual_address", type="SymbolAddress", 1046 required=True, 1047 help="The Virtual Address to examine.")
1048
1049 - def __init__(self, virtual_address=(), **kwargs):
1050 """Prints information about the virtual to physical translation. 1051 1052 This is similar to windbg's !vtop extension. 1053 1054 Args: 1055 virtual_address: The virtual address to describe. 1056 address_space: The address space to use (default the 1057 kernel_address_space). 1058 """ 1059 super(VtoPMixin, self).__init__(**kwargs) 1060 if not isinstance(virtual_address, (tuple, list)): 1061 virtual_address = [virtual_address] 1062 1063 self.addresses = [self.session.address_resolver.get_address_by_name(x) 1064 for x in virtual_address]
1065
1066 - def render(self, renderer):
1067 if self.filtering_requested: 1068 with self.session.plugins.cc() as cc: 1069 for task in self.filter_processes(): 1070 cc.SwitchProcessContext(task) 1071 1072 for vaddr in self.addresses: 1073 self.render_address(renderer, vaddr) 1074 1075 else: 1076 # Use current process context. 1077 for vaddr in self.addresses: 1078 self.render_address(renderer, vaddr)
1079
1080 - def render_address(self, renderer, vaddr):
1081 renderer.section(name="{0:#08x}".format(vaddr)) 1082 self.address_space = self.session.GetParameter("default_address_space") 1083 1084 renderer.format("Virtual {0:addrpad} Page Directory {1:addr}\n", 1085 vaddr, self.address_space.dtb) 1086 1087 # Render each step in the translation process. 1088 for translation_descriptor in self.address_space.describe_vtop(vaddr): 1089 translation_descriptor.render(renderer) 1090 1091 # The below re-does all the analysis using the address space. It should 1092 # agree! 1093 renderer.format("\nDeriving physical address from runtime " 1094 "physical address space:\n") 1095 1096 physical_address = self.address_space.vtop(vaddr) 1097 if physical_address is None: 1098 renderer.format("Physical Address Unavailable.\n") 1099 else: 1100 renderer.format( 1101 "Physical Address {0}\n", 1102 self.physical_address_space.describe(physical_address))
1103
1104 1105 -class RaisingTheRoof(plugin.Command):
1106 """A plugin that exists to break your tests and make you cry.""" 1107 1108 # Can't call this raise, because it aliases the keyword. 1109 name = "raise_the_roof" 1110 1111 @classmethod
1112 - def args(cls, parser):
1113 super(RaisingTheRoof, cls).args(parser) 1114 parser.add_argument("--exception_class", required=False, 1115 help="The exception class to raise.") 1116 parser.add_argument("--exception_text", required=False, 1117 help="The text to initialize the exception with.")
1118
1119 - def __init__(self, exception_class=None, exception_text=None, **kwargs):
1120 super(RaisingTheRoof, self).__init__(**kwargs) 1121 self.exception_class = exception_class or "ValueError" 1122 self.exception_text = exception_text or "Default exception"
1123
1124 - def render(self, renderer):
1125 exc_cls = getattr(exceptions, self.exception_class, ValueError) 1126 raise exc_cls(self.exception_text)
1127
1128 1129 -class TestRaisingTheRoof(testlib.DisabledTest):
1130 PLUGIN = "raise_the_roof"
1131