1import logging
2from collections import defaultdict
3from typing import DefaultDict, List, Optional, Type
4
5from pyvex import const
6from pyvex.block import IRSB
7from pyvex.const import vex_int_class
8from pyvex.errors import LiftingException, NeedStatementsNotification, PyVEXError, SkipStatementsError
9from pyvex.expr import Const
10from pyvex.native import ffi
11from pyvex.types import LiftSource, PyLiftSource
12
13from .lifter import Lifter
14from .post_processor import Postprocessor
15
16log = logging.getLogger(__name__)
17
18lifters: DefaultDict[str, List[Type[Lifter]]] = defaultdict(list)
19postprocessors: DefaultDict[str, List[Type[Postprocessor]]] = defaultdict(list)
20
21
22def lift(
23 data: LiftSource,
24 addr,
25 arch,
26 max_bytes=None,
27 max_inst=None,
28 bytes_offset=0,
29 opt_level=1,
30 traceflags=0,
31 strict_block_end=True,
32 inner=False,
33 skip_stmts=False,
34 collect_data_refs=False,
35 cross_insn_opt=True,
36 load_from_ro_regions=False,
37):
38 """
39 Recursively lifts blocks using the registered lifters and postprocessors. Tries each lifter in the order in
40 which they are registered on the data to lift.
41
42 If a lifter raises a LiftingException on the data, it is skipped.
43 If it succeeds and returns a block with a jumpkind of Ijk_NoDecode, all of the lifters are tried on the rest
44 of the data and if they work, their output is appended to the first block.
45
46 :param arch: The arch to lift the data as.
47 :param addr: The starting address of the block. Effects the IMarks.
48 :param data: The bytes to lift as either a python string of bytes or a cffi buffer object.
49 :param max_bytes: The maximum number of bytes to lift. If set to None, no byte limit is used.
50 :param max_inst: The maximum number of instructions to lift. If set to None, no instruction limit is used.
51 :param bytes_offset: The offset into `data` to start lifting at.
52 :param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest
53 unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind
54 optimizations, 1 performs constant propogation, and 2 performs loop unrolling,
55 which honestly doesn't make much sense in the context of pyvex. The default is 1.
56 :param traceflags: The libVEX traceflags, controlling VEX debug prints.
57
58 .. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work
59 exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump
60 instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding
61 fewer instructions than requested. Specifically, this means that lifting a branch or jump
62 on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent
63 attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`.
64
65 .. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block
66 ends properly or until it runs out of data to lift.
67 """
68 if max_bytes is not None and max_bytes <= 0:
69 raise PyVEXError("Cannot lift block with no data (max_bytes <= 0)")
70
71 if not data:
72 raise PyVEXError("Cannot lift block with no data (data is empty)")
73
74 if isinstance(data, str):
75 raise TypeError("Cannot pass unicode string as data to lifter")
76
77 py_data: Optional[PyLiftSource]
78 if isinstance(data, (bytes, bytearray, memoryview)):
79 py_data = data
80 c_data = None
81 allow_arch_optimizations = False
82 else:
83 if max_bytes is None:
84 raise PyVEXError("Cannot lift block with ffi pointer and no size (max_bytes is None)")
85 c_data = data
86 py_data = None
87 allow_arch_optimizations = True
88
89 # In order to attempt to preserve the property that
90 # VEX lifts the same bytes to the same IR at all times when optimizations are disabled
91 # we hack off all of VEX's non-IROpt optimizations when opt_level == -1.
92 # This is intended to enable comparisons of the lifted IR between code that happens to be
93 # found in different contexts.
94 if opt_level < 0:
95 allow_arch_optimizations = False
96 opt_level = 0
97
98 for lifter in lifters[arch.name]:
99 try:
100 u_data: LiftSource = data
101 if lifter.REQUIRE_DATA_C:
102 if c_data is None:
103 assert py_data is not None
104 if isinstance(py_data, (bytearray, memoryview)):
105 u_data = ffi.from_buffer(ffi.BVoidP, py_data)
106 else:
107 u_data = ffi.from_buffer(ffi.BVoidP, py_data + b"\0" * 8)
108 max_bytes = min(len(py_data), max_bytes) if max_bytes is not None else len(py_data)
109 else:
110 u_data = c_data
111 skip = 0
112 elif lifter.REQUIRE_DATA_PY:
113 if bytes_offset and arch.name.startswith("ARM") and (addr & 1) == 1:
114 skip = bytes_offset - 1
115 else:
116 skip = bytes_offset
117 if py_data is None:
118 assert c_data is not None
119 if max_bytes is None:
120 log.debug("Cannot create py_data from c_data when no max length is given")
121 continue
122 u_data = ffi.buffer(c_data + skip, max_bytes)[:]
123 else:
124 if max_bytes is None:
125 u_data = py_data[skip:]
126 else:
127 u_data = py_data[skip : skip + max_bytes]
128 else:
129 raise RuntimeError(
130 "Incorrect lifter configuration. What type of data does %s expect?" % lifter.__class__
131 )
132
133 try:
134 final_irsb = lifter(arch, addr).lift(
135 u_data,
136 bytes_offset - skip,
137 max_bytes,
138 max_inst,
139 opt_level,
140 traceflags,
141 allow_arch_optimizations,
142 strict_block_end,
143 skip_stmts,
144 collect_data_refs=collect_data_refs,
145 cross_insn_opt=cross_insn_opt,
146 load_from_ro_regions=load_from_ro_regions,
147 )
148 except SkipStatementsError:
149 assert skip_stmts is True
150 final_irsb = lifter(arch, addr).lift(
151 u_data,
152 bytes_offset - skip,
153 max_bytes,
154 max_inst,
155 opt_level,
156 traceflags,
157 allow_arch_optimizations,
158 strict_block_end,
159 skip_stmts=False,
160 collect_data_refs=collect_data_refs,
161 cross_insn_opt=cross_insn_opt,
162 load_from_ro_regions=load_from_ro_regions,
163 )
164 break
165 except LiftingException as ex:
166 log.debug("Lifting Exception: %s", str(ex))
167 continue
168 else:
169 final_irsb = IRSB.empty_block(
170 arch,
171 addr,
172 size=0,
173 nxt=Const(const.vex_int_class(arch.bits)(addr)),
174 jumpkind="Ijk_NoDecode",
175 )
176 final_irsb.invalidate_direct_next()
177 return final_irsb
178
179 if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode":
180 # We have decoded a few bytes before we hit an undecodeable instruction.
181
182 # Determine if this is an intentional NoDecode, like the ud2 instruction on AMD64
183 nodecode_addr_expr = final_irsb.next
184 if type(nodecode_addr_expr) is Const:
185 nodecode_addr = nodecode_addr_expr.con.value
186 next_irsb_start_addr = addr + final_irsb.size
187 if nodecode_addr != next_irsb_start_addr:
188 # The last instruction of the IRSB has a non-zero length. This is an intentional NoDecode.
189 # The very last instruction has been decoded
190 final_irsb.jumpkind = "Ijk_NoDecode"
191 final_irsb.next = final_irsb.next
192 final_irsb.invalidate_direct_next()
193 return final_irsb
194
195 # Decode more bytes
196 if skip_stmts:
197 # When gymrat will be invoked, we will merge future basic blocks to the current basic block. In this case,
198 # statements are usually required.
199 # TODO: In the future, we may further optimize it to handle cases where getting statements in gymrat is not
200 # TODO: required.
201 return lift(
202 data,
203 addr,
204 arch,
205 max_bytes=max_bytes,
206 max_inst=max_inst,
207 bytes_offset=bytes_offset,
208 opt_level=opt_level,
209 traceflags=traceflags,
210 strict_block_end=strict_block_end,
211 skip_stmts=False,
212 collect_data_refs=collect_data_refs,
213 )
214
215 next_addr = addr + final_irsb.size
216 if max_bytes is not None:
217 max_bytes -= final_irsb.size
218 if isinstance(data, (bytes, bytearray, memoryview)):
219 data_left = data[final_irsb.size :]
220 else:
221 data_left = data + final_irsb.size
222 if max_inst is not None:
223 max_inst -= final_irsb.instructions
224 if (max_bytes is None or max_bytes > 0) and (max_inst is None or max_inst > 0) and data_left:
225 more_irsb = lift(
226 data_left,
227 next_addr,
228 arch,
229 max_bytes=max_bytes,
230 max_inst=max_inst,
231 bytes_offset=bytes_offset,
232 opt_level=opt_level,
233 traceflags=traceflags,
234 strict_block_end=strict_block_end,
235 inner=True,
236 skip_stmts=False,
237 collect_data_refs=collect_data_refs,
238 )
239 if more_irsb.size:
240 # Successfully decoded more bytes
241 final_irsb.extend(more_irsb)
242 elif max_bytes == 0:
243 # We have no more bytes left. Mark the jumpkind of the IRSB as Ijk_Boring
244 if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode":
245 final_irsb.jumpkind = "Ijk_Boring"
246 final_irsb.next = Const(vex_int_class(arch.bits)(final_irsb.addr + final_irsb.size))
247
248 if not inner:
249 for postprocessor in postprocessors[arch.name]:
250 try:
251 postprocessor(final_irsb).postprocess()
252 except NeedStatementsNotification as e:
253 # The post-processor cannot work without statements. Re-lift the current block with skip_stmts=False
254 if not skip_stmts:
255 # sanity check
256 # Why does the post-processor raise NeedStatementsNotification when skip_stmts is False?
257 raise TypeError(
258 "Bad post-processor %s: "
259 "NeedStatementsNotification is raised when statements are available." % postprocessor.__class__
260 ) from e
261
262 # Re-lift the current IRSB
263 return lift(
264 data,
265 addr,
266 arch,
267 max_bytes=max_bytes,
268 max_inst=max_inst,
269 bytes_offset=bytes_offset,
270 opt_level=opt_level,
271 traceflags=traceflags,
272 strict_block_end=strict_block_end,
273 inner=inner,
274 skip_stmts=False,
275 collect_data_refs=collect_data_refs,
276 )
277 except LiftingException:
278 continue
279
280 return final_irsb
281
282
283def register(lifter, arch_name):
284 """
285 Registers a Lifter or Postprocessor to be used by pyvex. Lifters are are given priority based on the order
286 in which they are registered. Postprocessors will be run in registration order.
287
288 :param lifter: The Lifter or Postprocessor to register
289 :vartype lifter: :class:`Lifter` or :class:`Postprocessor`
290 """
291 if issubclass(lifter, Lifter):
292 log.debug("Registering lifter %s for architecture %s.", lifter.__name__, arch_name)
293 lifters[arch_name].append(lifter)
294 if issubclass(lifter, Postprocessor):
295 log.debug("Registering postprocessor %s for architecture %s.", lifter.__name__, arch_name)
296 postprocessors[arch_name].append(lifter)