Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyvex/lifting/lift_function.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

117 statements  

1import logging 

2from collections import defaultdict 

3from typing import DefaultDict 

4 

5from pyvex import const 

6from pyvex.block import IRSB 

7from pyvex.const import vex_int_class 

8from pyvex.errors import LiftingException, NeedStatementsNotification, PyVEXError, SkipStatementsError 

9from pyvex.expr import Const 

10from pyvex.native import ffi 

11from pyvex.types import LiftSource, PyLiftSource 

12 

13from .lifter import Lifter 

14from .post_processor import Postprocessor 

15 

16log = logging.getLogger(__name__) 

17 

18lifters: DefaultDict[str, list[type[Lifter]]] = defaultdict(list) 

19postprocessors: DefaultDict[str, list[type[Postprocessor]]] = defaultdict(list) 

20 

21 

22def lift( 

23 data: LiftSource, 

24 addr, 

25 arch, 

26 max_bytes=None, 

27 max_inst=None, 

28 bytes_offset=0, 

29 opt_level=1, 

30 traceflags=0, 

31 strict_block_end=True, 

32 inner=False, 

33 skip_stmts=False, 

34 collect_data_refs=False, 

35 cross_insn_opt=True, 

36 load_from_ro_regions=False, 

37 const_prop=False, 

38): 

39 """ 

40 Recursively lifts blocks using the registered lifters and postprocessors. Tries each lifter in the order in 

41 which they are registered on the data to lift. 

42 

43 If a lifter raises a LiftingException on the data, it is skipped. 

44 If it succeeds and returns a block with a jumpkind of Ijk_NoDecode, all of the lifters are tried on the rest 

45 of the data and if they work, their output is appended to the first block. 

46 

47 :param arch: The arch to lift the data as. 

48 :param addr: The starting address of the block. Effects the IMarks. 

49 :param data: The bytes to lift as either a python string of bytes or a cffi buffer object. 

50 :param max_bytes: The maximum number of bytes to lift. If set to None, no byte limit is used. 

51 :param max_inst: The maximum number of instructions to lift. If set to None, no instruction limit is used. 

52 :param bytes_offset: The offset into `data` to start lifting at. 

53 :param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest 

54 unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind 

55 optimizations, 1 performs constant propogation, and 2 performs loop unrolling, 

56 which honestly doesn't make much sense in the context of pyvex. The default is 1. 

57 :param traceflags: The libVEX traceflags, controlling VEX debug prints. 

58 

59 .. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work 

60 exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump 

61 instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding 

62 fewer instructions than requested. Specifically, this means that lifting a branch or jump 

63 on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent 

64 attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`. 

65 

66 .. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block 

67 ends properly or until it runs out of data to lift. 

68 """ 

69 if max_bytes is not None and max_bytes <= 0: 

70 raise PyVEXError("Cannot lift block with no data (max_bytes <= 0)") 

71 

72 if not data: 

73 raise PyVEXError("Cannot lift block with no data (data is empty)") 

74 

75 if isinstance(data, str): 

76 raise TypeError("Cannot pass unicode string as data to lifter") 

77 

78 py_data: PyLiftSource | None 

79 if isinstance(data, (bytes, bytearray, memoryview)): 

80 py_data = data 

81 c_data = None 

82 else: 

83 if max_bytes is None: 

84 raise PyVEXError("Cannot lift block with ffi pointer and no size (max_bytes is None)") 

85 c_data = data 

86 py_data = None 

87 

88 allow_arch_optimizations = True 

89 # In order to attempt to preserve the property that 

90 # VEX lifts the same bytes to the same IR at all times when optimizations are disabled 

91 # we hack off all of VEX's non-IROpt optimizations when opt_level == -1. 

92 # This is intended to enable comparisons of the lifted IR between code that happens to be 

93 # found in different contexts. 

94 if opt_level < 0: 

95 allow_arch_optimizations = False 

96 opt_level = 0 

97 

98 for lifter in lifters[arch.name]: 

99 try: 

100 u_data: LiftSource = data 

101 if lifter.REQUIRE_DATA_C: 

102 if c_data is None: 

103 assert py_data is not None 

104 if isinstance(py_data, (bytearray, memoryview)): 

105 u_data = ffi.from_buffer(ffi.BVoidP, py_data) 

106 else: 

107 u_data = ffi.from_buffer(ffi.BVoidP, py_data + b"\0" * 8) 

108 max_bytes = min(len(py_data), max_bytes) if max_bytes is not None else len(py_data) 

109 else: 

110 u_data = c_data 

111 skip = 0 

112 elif lifter.REQUIRE_DATA_PY: 

113 if bytes_offset and arch.name.startswith("ARM") and (addr & 1) == 1: 

114 skip = bytes_offset - 1 

115 else: 

116 skip = bytes_offset 

117 if py_data is None: 

118 assert c_data is not None 

119 if max_bytes is None: 

120 log.debug("Cannot create py_data from c_data when no max length is given") 

121 continue 

122 u_data = ffi.buffer(c_data + skip, max_bytes)[:] 

123 else: 

124 if max_bytes is None: 

125 u_data = py_data[skip:] 

126 else: 

127 u_data = py_data[skip : skip + max_bytes] 

128 else: 

129 raise RuntimeError( 

130 "Incorrect lifter configuration. What type of data does %s expect?" % lifter.__class__ 

131 ) 

132 

133 try: 

134 final_irsb = lifter(arch, addr).lift( 

135 u_data, 

136 bytes_offset - skip, 

137 max_bytes, 

138 max_inst, 

139 opt_level, 

140 traceflags, 

141 allow_arch_optimizations, 

142 strict_block_end, 

143 skip_stmts, 

144 collect_data_refs=collect_data_refs, 

145 cross_insn_opt=cross_insn_opt, 

146 load_from_ro_regions=load_from_ro_regions, 

147 const_prop=const_prop, 

148 ) 

149 except SkipStatementsError: 

150 assert skip_stmts is True 

151 final_irsb = lifter(arch, addr).lift( 

152 u_data, 

153 bytes_offset - skip, 

154 max_bytes, 

155 max_inst, 

156 opt_level, 

157 traceflags, 

158 allow_arch_optimizations, 

159 strict_block_end, 

160 skip_stmts=False, 

161 collect_data_refs=collect_data_refs, 

162 cross_insn_opt=cross_insn_opt, 

163 load_from_ro_regions=load_from_ro_regions, 

164 const_prop=const_prop, 

165 ) 

166 break 

167 except LiftingException as ex: 

168 log.debug("Lifting Exception: %s", str(ex)) 

169 continue 

170 else: 

171 final_irsb = IRSB.empty_block( 

172 arch, 

173 addr, 

174 size=0, 

175 nxt=Const(const.vex_int_class(arch.bits)(addr)), 

176 jumpkind="Ijk_NoDecode", 

177 ) 

178 final_irsb.invalidate_direct_next() 

179 return final_irsb 

180 

181 if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode": 

182 # We have decoded a few bytes before we hit an undecodeable instruction. 

183 

184 # Determine if this is an intentional NoDecode, like the ud2 instruction on AMD64 

185 nodecode_addr_expr = final_irsb.next 

186 if type(nodecode_addr_expr) is Const: 

187 nodecode_addr = nodecode_addr_expr.con.value 

188 next_irsb_start_addr = addr + final_irsb.size 

189 if nodecode_addr != next_irsb_start_addr: 

190 # The last instruction of the IRSB has a non-zero length. This is an intentional NoDecode. 

191 # The very last instruction has been decoded 

192 final_irsb.jumpkind = "Ijk_NoDecode" 

193 final_irsb.next = final_irsb.next 

194 final_irsb.invalidate_direct_next() 

195 return final_irsb 

196 

197 # Decode more bytes 

198 if skip_stmts: 

199 # When gymrat will be invoked, we will merge future basic blocks to the current basic block. In this case, 

200 # statements are usually required. 

201 # TODO: In the future, we may further optimize it to handle cases where getting statements in gymrat is not 

202 # TODO: required. 

203 return lift( 

204 data, 

205 addr, 

206 arch, 

207 max_bytes=max_bytes, 

208 max_inst=max_inst, 

209 bytes_offset=bytes_offset, 

210 opt_level=opt_level, 

211 traceflags=traceflags, 

212 strict_block_end=strict_block_end, 

213 skip_stmts=False, 

214 collect_data_refs=collect_data_refs, 

215 load_from_ro_regions=load_from_ro_regions, 

216 const_prop=const_prop, 

217 ) 

218 

219 next_addr = addr + final_irsb.size 

220 if max_bytes is not None: 

221 max_bytes -= final_irsb.size 

222 if isinstance(data, (bytes, bytearray, memoryview)): 

223 data_left = data[final_irsb.size :] 

224 else: 

225 data_left = data + final_irsb.size 

226 if max_inst is not None: 

227 max_inst -= final_irsb.instructions 

228 if (max_bytes is None or max_bytes > 0) and (max_inst is None or max_inst > 0) and data_left: 

229 more_irsb = lift( 

230 data_left, 

231 next_addr, 

232 arch, 

233 max_bytes=max_bytes, 

234 max_inst=max_inst, 

235 bytes_offset=bytes_offset, 

236 opt_level=opt_level, 

237 traceflags=traceflags, 

238 strict_block_end=strict_block_end, 

239 inner=True, 

240 skip_stmts=False, 

241 collect_data_refs=collect_data_refs, 

242 load_from_ro_regions=load_from_ro_regions, 

243 const_prop=const_prop, 

244 ) 

245 if more_irsb.size: 

246 # Successfully decoded more bytes 

247 final_irsb.extend(more_irsb) 

248 elif max_bytes == 0: 

249 # We have no more bytes left. Mark the jumpkind of the IRSB as Ijk_Boring 

250 if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode": 

251 final_irsb.jumpkind = "Ijk_Boring" 

252 final_irsb.next = Const(vex_int_class(arch.bits)(final_irsb.addr + final_irsb.size)) 

253 

254 if not inner: 

255 for postprocessor in postprocessors[arch.name]: 

256 try: 

257 postprocessor(final_irsb).postprocess() 

258 except NeedStatementsNotification as e: 

259 # The post-processor cannot work without statements. Re-lift the current block with skip_stmts=False 

260 if not skip_stmts: 

261 # sanity check 

262 # Why does the post-processor raise NeedStatementsNotification when skip_stmts is False? 

263 raise TypeError( 

264 "Bad post-processor %s: " 

265 "NeedStatementsNotification is raised when statements are available." % postprocessor.__class__ 

266 ) from e 

267 

268 # Re-lift the current IRSB 

269 return lift( 

270 data, 

271 addr, 

272 arch, 

273 max_bytes=max_bytes, 

274 max_inst=max_inst, 

275 bytes_offset=bytes_offset, 

276 opt_level=opt_level, 

277 traceflags=traceflags, 

278 strict_block_end=strict_block_end, 

279 inner=inner, 

280 skip_stmts=False, 

281 collect_data_refs=collect_data_refs, 

282 load_from_ro_regions=load_from_ro_regions, 

283 const_prop=const_prop, 

284 ) 

285 except LiftingException: 

286 continue 

287 

288 return final_irsb 

289 

290 

291def register(lifter, arch_name): 

292 """ 

293 Registers a Lifter or Postprocessor to be used by pyvex. Lifters are are given priority based on the order 

294 in which they are registered. Postprocessors will be run in registration order. 

295 

296 :param lifter: The Lifter or Postprocessor to register 

297 :vartype lifter: :class:`Lifter` or :class:`Postprocessor` 

298 """ 

299 if issubclass(lifter, Lifter): 

300 log.debug("Registering lifter %s for architecture %s.", lifter.__name__, arch_name) 

301 lifters[arch_name].append(lifter) 

302 if issubclass(lifter, Postprocessor): 

303 log.debug("Registering postprocessor %s for architecture %s.", lifter.__name__, arch_name) 

304 postprocessors[arch_name].append(lifter)