Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyvex/lifting/lift_function.py: 75%
113 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:15 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:15 +0000
1import logging
2from collections import defaultdict
4import archinfo
6from pyvex import const
7from pyvex.block import IRSB
8from pyvex.const import vex_int_class
9from pyvex.errors import LiftingException, NeedStatementsNotification, PyVEXError, SkipStatementsError
10from pyvex.expr import Const
11from pyvex.native import ffi
13from .lifter import Lifter
14from .post_processor import Postprocessor
16log = logging.getLogger(__name__)
18lifters = defaultdict(list)
19postprocessors = defaultdict(list)
22def lift(
23 data,
24 addr,
25 arch,
26 max_bytes=None,
27 max_inst=None,
28 bytes_offset=0,
29 opt_level=1,
30 traceflags=0,
31 strict_block_end=True,
32 inner=False,
33 skip_stmts=False,
34 collect_data_refs=False,
35 cross_insn_opt=True,
36 load_from_ro_regions=False,
37):
38 """
39 Recursively lifts blocks using the registered lifters and postprocessors. Tries each lifter in the order in
40 which they are registered on the data to lift.
42 If a lifter raises a LiftingException on the data, it is skipped.
43 If it succeeds and returns a block with a jumpkind of Ijk_NoDecode, all of the lifters are tried on the rest
44 of the data and if they work, their output is appended to the first block.
46 :param arch: The arch to lift the data as.
47 :type arch: :class:`archinfo.Arch`
48 :param addr: The starting address of the block. Effects the IMarks.
49 :param data: The bytes to lift as either a python string of bytes or a cffi buffer object.
50 :param max_bytes: The maximum number of bytes to lift. If set to None, no byte limit is used.
51 :param max_inst: The maximum number of instructions to lift. If set to None, no instruction limit is used.
52 :param bytes_offset: The offset into `data` to start lifting at.
53 :param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest
54 unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind
55 optimizations, 1 performs constant propogation, and 2 performs loop unrolling,
56 which honestly doesn't make much sense in the context of pyvex. The default is 1.
57 :param traceflags: The libVEX traceflags, controlling VEX debug prints.
59 .. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work
60 exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump
61 instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding
62 fewer instructions than requested. Specifically, this means that lifting a branch or jump
63 on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent
64 attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`.
66 .. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block
67 ends properly or until it runs out of data to lift.
68 """
69 if max_bytes is not None and max_bytes <= 0:
70 raise PyVEXError("Cannot lift block with no data (max_bytes <= 0)")
72 if not data:
73 raise PyVEXError("Cannot lift block with no data (data is empty)")
75 if isinstance(data, str):
76 raise TypeError("Cannot pass unicode string as data to lifter")
78 if isinstance(data, (bytes, bytearray, memoryview)):
79 py_data = data
80 c_data = None
81 allow_arch_optimizations = False
82 else:
83 if max_bytes is None:
84 raise PyVEXError("Cannot lift block with ffi pointer and no size (max_bytes is None)")
85 c_data = data
86 py_data = None
87 allow_arch_optimizations = True
89 # In order to attempt to preserve the property that
90 # VEX lifts the same bytes to the same IR at all times when optimizations are disabled
91 # we hack off all of VEX's non-IROpt optimizations when opt_level == -1.
92 # This is intended to enable comparisons of the lifted IR between code that happens to be
93 # found in different contexts.
94 if opt_level < 0:
95 allow_arch_optimizations = False
96 opt_level = 0
98 for lifter in lifters[arch.name]:
99 try:
100 u_data = data
101 if lifter.REQUIRE_DATA_C:
102 if c_data is None:
103 u_data = ffi.from_buffer(ffi.BVoidP, py_data + b"\0" * 8 if isinstance(py_data, bytes) else py_data)
104 max_bytes = min(len(py_data), max_bytes) if max_bytes is not None else len(py_data)
105 else:
106 u_data = c_data
107 skip = 0
108 elif lifter.REQUIRE_DATA_PY:
109 if bytes_offset and archinfo.arch_arm.is_arm_arch(arch) and (addr & 1) == 1:
110 skip = bytes_offset - 1
111 else:
112 skip = bytes_offset
113 if py_data is None:
114 if max_bytes is None:
115 log.debug("Cannot create py_data from c_data when no max length is given")
116 continue
117 u_data = ffi.buffer(c_data + skip, max_bytes)[:]
118 else:
119 if max_bytes is None:
120 u_data = py_data[skip:]
121 else:
122 u_data = py_data[skip : skip + max_bytes]
123 else:
124 raise RuntimeError(
125 "Incorrect lifter configuration. What type of data does %s expect?" % lifter.__class__
126 )
128 try:
129 final_irsb = lifter(arch, addr).lift(
130 u_data,
131 bytes_offset - skip,
132 max_bytes,
133 max_inst,
134 opt_level,
135 traceflags,
136 allow_arch_optimizations,
137 strict_block_end,
138 skip_stmts,
139 collect_data_refs=collect_data_refs,
140 cross_insn_opt=cross_insn_opt,
141 load_from_ro_regions=load_from_ro_regions,
142 )
143 except SkipStatementsError:
144 assert skip_stmts is True
145 final_irsb = lifter(arch, addr).lift(
146 u_data,
147 bytes_offset - skip,
148 max_bytes,
149 max_inst,
150 opt_level,
151 traceflags,
152 allow_arch_optimizations,
153 strict_block_end,
154 skip_stmts=False,
155 collect_data_refs=collect_data_refs,
156 cross_insn_opt=cross_insn_opt,
157 load_from_ro_regions=load_from_ro_regions,
158 )
159 break
160 except LiftingException as ex:
161 log.debug("Lifting Exception: %s", str(ex))
162 continue
163 else:
164 final_irsb = IRSB.empty_block(
165 arch,
166 addr,
167 size=0,
168 nxt=Const(const.vex_int_class(arch.bits)(addr)),
169 jumpkind="Ijk_NoDecode",
170 )
171 final_irsb.invalidate_direct_next()
172 return final_irsb
174 if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode":
175 # We have decoded a few bytes before we hit an undecodeable instruction.
177 # Determine if this is an intentional NoDecode, like the ud2 instruction on AMD64
178 nodecode_addr_expr = final_irsb.next
179 if type(nodecode_addr_expr) is Const:
180 nodecode_addr = nodecode_addr_expr.con.value
181 next_irsb_start_addr = addr + final_irsb.size
182 if nodecode_addr != next_irsb_start_addr:
183 # The last instruction of the IRSB has a non-zero length. This is an intentional NoDecode.
184 # The very last instruction has been decoded
185 final_irsb.jumpkind = "Ijk_NoDecode"
186 final_irsb.next = final_irsb.next
187 final_irsb.invalidate_direct_next()
188 return final_irsb
190 # Decode more bytes
191 if skip_stmts:
192 # When gymrat will be invoked, we will merge future basic blocks to the current basic block. In this case,
193 # statements are usually required.
194 # TODO: In the future, we may further optimize it to handle cases where getting statements in gymrat is not
195 # TODO: required.
196 return lift(
197 data,
198 addr,
199 arch,
200 max_bytes=max_bytes,
201 max_inst=max_inst,
202 bytes_offset=bytes_offset,
203 opt_level=opt_level,
204 traceflags=traceflags,
205 strict_block_end=strict_block_end,
206 skip_stmts=False,
207 collect_data_refs=collect_data_refs,
208 )
210 next_addr = addr + final_irsb.size
211 if max_bytes is not None:
212 max_bytes -= final_irsb.size
213 if isinstance(data, (bytes, bytearray, memoryview)):
214 data_left = data[final_irsb.size :]
215 else:
216 data_left = data + final_irsb.size
217 if max_inst is not None:
218 max_inst -= final_irsb.instructions
219 if (max_bytes is None or max_bytes > 0) and (max_inst is None or max_inst > 0) and data_left:
220 more_irsb = lift(
221 data_left,
222 next_addr,
223 arch,
224 max_bytes=max_bytes,
225 max_inst=max_inst,
226 bytes_offset=bytes_offset,
227 opt_level=opt_level,
228 traceflags=traceflags,
229 strict_block_end=strict_block_end,
230 inner=True,
231 skip_stmts=False,
232 collect_data_refs=collect_data_refs,
233 )
234 if more_irsb.size:
235 # Successfully decoded more bytes
236 final_irsb.extend(more_irsb)
237 elif max_bytes == 0:
238 # We have no more bytes left. Mark the jumpkind of the IRSB as Ijk_Boring
239 if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode":
240 final_irsb.jumpkind = "Ijk_Boring"
241 final_irsb.next = Const(vex_int_class(arch.bits)(final_irsb.addr + final_irsb.size))
243 if not inner:
244 for postprocessor in postprocessors[arch.name]:
245 try:
246 postprocessor(final_irsb).postprocess()
247 except NeedStatementsNotification as e:
248 # The post-processor cannot work without statements. Re-lift the current block with skip_stmts=False
249 if not skip_stmts:
250 # sanity check
251 # Why does the post-processor raise NeedStatementsNotification when skip_stmts is False?
252 raise TypeError(
253 "Bad post-processor %s: "
254 "NeedStatementsNotification is raised when statements are available." % postprocessor.__class__
255 ) from e
257 # Re-lift the current IRSB
258 return lift(
259 data,
260 addr,
261 arch,
262 max_bytes=max_bytes,
263 max_inst=max_inst,
264 bytes_offset=bytes_offset,
265 opt_level=opt_level,
266 traceflags=traceflags,
267 strict_block_end=strict_block_end,
268 inner=inner,
269 skip_stmts=False,
270 collect_data_refs=collect_data_refs,
271 )
272 except LiftingException:
273 continue
275 return final_irsb
278def register(lifter, arch_name):
279 """
280 Registers a Lifter or Postprocessor to be used by pyvex. Lifters are are given priority based on the order
281 in which they are registered. Postprocessors will be run in registration order.
283 :param lifter: The Lifter or Postprocessor to register
284 :vartype lifter: :class:`Lifter` or :class:`Postprocessor`
285 """
286 if issubclass(lifter, Lifter):
287 log.debug("Registering lifter %s for architecture %s.", lifter.__name__, arch_name)
288 lifters[arch_name].append(lifter)
289 if issubclass(lifter, Postprocessor):
290 log.debug("Registering postprocessor %s for architecture %s.", lifter.__name__, arch_name)
291 postprocessors[arch_name].append(lifter)