1import logging
2from typing import TYPE_CHECKING, List, Type
3
4import bitstring
5
6from pyvex.const import vex_int_class
7from pyvex.errors import LiftingException
8from pyvex.lifting.lifter import Lifter
9
10from .vex_helper import IRSBCustomizer, JumpKind
11
12if TYPE_CHECKING:
13 from .instr_helper import Instruction
14
15log = logging.getLogger(__name__)
16
17
18def is_empty(bitstrm):
19 try:
20 bitstrm.peek(1)
21 return False
22 except bitstring.ReadError:
23 return True
24
25
26class ParseError(Exception):
27 pass
28
29
30class GymratLifter(Lifter):
31 """
32 This is a base class for lifters that use Gymrat.
33 For most architectures, all you need to do is subclass this, and set the property "instructions"
34 to be a list of classes that define each instruction.
35 By default, a lifter will decode instructions by attempting to instantiate every class until one works.
36 This will use an IRSBCustomizer, which will, if it succeeds, add the appropriate VEX instructions to a pyvex IRSB.
37 pyvex, when lifting a block of code for this architecture, will call the method "lift", which will produce the IRSB
38 of the lifted code.
39 """
40
41 __slots__ = (
42 "bitstrm",
43 "errors",
44 "thedata",
45 "disassembly",
46 )
47
48 REQUIRE_DATA_PY = True
49 instrs: List[Type["Instruction"]]
50
51 def __init__(self, arch, addr):
52 super().__init__(arch, addr)
53 self.bitstrm = None
54 self.errors = None
55 self.thedata = None
56 self.disassembly = None
57
58 def create_bitstrm(self):
59 self.bitstrm = bitstring.ConstBitStream(bytes=self.thedata)
60
61 def _decode_next_instruction(self, addr):
62 # Try every instruction until one works
63 for possible_instr in self.instrs:
64 try:
65 log.debug("Trying %s", possible_instr.name)
66 return possible_instr(self.bitstrm, self.irsb.arch, addr)
67 # a ParserError signals that this instruction did not match
68 # we need to try other instructions, so we ignore this error
69 except ParseError:
70 pass # l.exception(repr(possible_instr))
71 # if we are out of input, ignore.
72 # there may be other, shorter instructions that still match,
73 # so we continue with the loop
74 except (bitstring.ReadError, bitstring.InterpretError):
75 pass
76
77 # If no instruction matches, log an error
78 errorstr = "Unknown instruction at bit position %d" % self.bitstrm.bitpos
79 log.debug(errorstr)
80 log.debug("Address: %#08x" % addr)
81
82 def decode(self):
83 try:
84 self.create_bitstrm()
85 count = 0
86 disas = []
87 addr = self.irsb.addr
88 log.debug("Starting block at address: " + hex(addr))
89 bytepos = self.bitstrm.bytepos
90
91 while not is_empty(self.bitstrm):
92 instr = self._decode_next_instruction(addr)
93 if not instr:
94 break
95 disas.append(instr)
96 log.debug("Matched " + instr.name)
97 addr += self.bitstrm.bytepos - bytepos
98 bytepos = self.bitstrm.bytepos
99 count += 1
100 return disas
101 except Exception as e:
102 self.errors = str(e)
103 log.exception(f"Error decoding block at offset {bytepos:#x} (address {addr:#x}):")
104 raise
105
106 def _lift(self):
107 self.thedata = (
108 self.data[: self.max_bytes]
109 if isinstance(self.data, (bytes, bytearray, memoryview))
110 else self.data[: self.max_bytes].encode()
111 )
112 log.debug(repr(self.thedata))
113 instructions = self.decode()
114
115 if self.disasm:
116 self.disassembly = [instr.disassemble() for instr in instructions]
117 self.irsb.jumpkind = JumpKind.Invalid
118 irsb_c = IRSBCustomizer(self.irsb)
119 log.debug("Decoding complete.")
120 for i, instr in enumerate(instructions[: self.max_inst]):
121 log.debug("Lifting instruction %s", instr.name)
122 instr(irsb_c, instructions[:i], instructions[i + 1 :])
123 if irsb_c.irsb.jumpkind != JumpKind.Invalid:
124 break
125 if (i + 1) == self.max_inst: # if we are on our last iteration
126 instr.jump(None, irsb_c.irsb.addr + irsb_c.irsb.size)
127 break
128 else:
129 if len(irsb_c.irsb.statements) == 0:
130 raise LiftingException("Could not decode any instructions")
131 irsb_c.irsb.jumpkind = JumpKind.NoDecode
132 dst = irsb_c.irsb.addr + irsb_c.irsb.size
133 dst_ty = vex_int_class(irsb_c.irsb.arch.bits).type
134 irsb_c.irsb.next = irsb_c.mkconst(dst, dst_ty)
135 log.debug(str(self.irsb))
136 if self.dump_irsb:
137 self.irsb.pp()
138 return self.irsb
139
140 def pp_disas(self):
141 disasstr = ""
142 insts = self.disassemble()
143 for addr, name, args in insts:
144 args_str = ",".join(str(a) for a in args)
145 disasstr += f"{addr:#08x}:\t{name} {args_str}\n"
146 print(disasstr)
147
148 def error(self):
149 return self.errors
150
151 def disassemble(self):
152 if self.disassembly is None:
153 self.lift(self.data, disasm=True)
154 return self.disassembly