Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pyvex/lifting/lift_function.py: 75%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

118 statements  

1import logging 

2from collections import defaultdict 

3from typing import DefaultDict, List, Optional, Type 

4 

5from pyvex import const 

6from pyvex.block import IRSB 

7from pyvex.const import vex_int_class 

8from pyvex.errors import LiftingException, NeedStatementsNotification, PyVEXError, SkipStatementsError 

9from pyvex.expr import Const 

10from pyvex.native import ffi 

11from pyvex.types import LiftSource, PyLiftSource 

12 

13from .lifter import Lifter 

14from .post_processor import Postprocessor 

15 

16log = logging.getLogger(__name__) 

17 

18lifters: DefaultDict[str, List[Type[Lifter]]] = defaultdict(list) 

19postprocessors: DefaultDict[str, List[Type[Postprocessor]]] = defaultdict(list) 

20 

21 

22def lift( 

23 data: LiftSource, 

24 addr, 

25 arch, 

26 max_bytes=None, 

27 max_inst=None, 

28 bytes_offset=0, 

29 opt_level=1, 

30 traceflags=0, 

31 strict_block_end=True, 

32 inner=False, 

33 skip_stmts=False, 

34 collect_data_refs=False, 

35 cross_insn_opt=True, 

36 load_from_ro_regions=False, 

37): 

38 """ 

39 Recursively lifts blocks using the registered lifters and postprocessors. Tries each lifter in the order in 

40 which they are registered on the data to lift. 

41 

42 If a lifter raises a LiftingException on the data, it is skipped. 

43 If it succeeds and returns a block with a jumpkind of Ijk_NoDecode, all of the lifters are tried on the rest 

44 of the data and if they work, their output is appended to the first block. 

45 

46 :param arch: The arch to lift the data as. 

47 :param addr: The starting address of the block. Effects the IMarks. 

48 :param data: The bytes to lift as either a python string of bytes or a cffi buffer object. 

49 :param max_bytes: The maximum number of bytes to lift. If set to None, no byte limit is used. 

50 :param max_inst: The maximum number of instructions to lift. If set to None, no instruction limit is used. 

51 :param bytes_offset: The offset into `data` to start lifting at. 

52 :param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest 

53 unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind 

54 optimizations, 1 performs constant propogation, and 2 performs loop unrolling, 

55 which honestly doesn't make much sense in the context of pyvex. The default is 1. 

56 :param traceflags: The libVEX traceflags, controlling VEX debug prints. 

57 

58 .. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work 

59 exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump 

60 instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding 

61 fewer instructions than requested. Specifically, this means that lifting a branch or jump 

62 on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent 

63 attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`. 

64 

65 .. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block 

66 ends properly or until it runs out of data to lift. 

67 """ 

68 if max_bytes is not None and max_bytes <= 0: 

69 raise PyVEXError("Cannot lift block with no data (max_bytes <= 0)") 

70 

71 if not data: 

72 raise PyVEXError("Cannot lift block with no data (data is empty)") 

73 

74 if isinstance(data, str): 

75 raise TypeError("Cannot pass unicode string as data to lifter") 

76 

77 py_data: Optional[PyLiftSource] 

78 if isinstance(data, (bytes, bytearray, memoryview)): 

79 py_data = data 

80 c_data = None 

81 allow_arch_optimizations = False 

82 else: 

83 if max_bytes is None: 

84 raise PyVEXError("Cannot lift block with ffi pointer and no size (max_bytes is None)") 

85 c_data = data 

86 py_data = None 

87 allow_arch_optimizations = True 

88 

89 # In order to attempt to preserve the property that 

90 # VEX lifts the same bytes to the same IR at all times when optimizations are disabled 

91 # we hack off all of VEX's non-IROpt optimizations when opt_level == -1. 

92 # This is intended to enable comparisons of the lifted IR between code that happens to be 

93 # found in different contexts. 

94 if opt_level < 0: 

95 allow_arch_optimizations = False 

96 opt_level = 0 

97 

98 for lifter in lifters[arch.name]: 

99 try: 

100 u_data: LiftSource = data 

101 if lifter.REQUIRE_DATA_C: 

102 if c_data is None: 

103 assert py_data is not None 

104 if isinstance(py_data, (bytearray, memoryview)): 

105 u_data = ffi.from_buffer(ffi.BVoidP, py_data) 

106 else: 

107 u_data = ffi.from_buffer(ffi.BVoidP, py_data + b"\0" * 8) 

108 max_bytes = min(len(py_data), max_bytes) if max_bytes is not None else len(py_data) 

109 else: 

110 u_data = c_data 

111 skip = 0 

112 elif lifter.REQUIRE_DATA_PY: 

113 if bytes_offset and arch.name.startswith("ARM") and (addr & 1) == 1: 

114 skip = bytes_offset - 1 

115 else: 

116 skip = bytes_offset 

117 if py_data is None: 

118 assert c_data is not None 

119 if max_bytes is None: 

120 log.debug("Cannot create py_data from c_data when no max length is given") 

121 continue 

122 u_data = ffi.buffer(c_data + skip, max_bytes)[:] 

123 else: 

124 if max_bytes is None: 

125 u_data = py_data[skip:] 

126 else: 

127 u_data = py_data[skip : skip + max_bytes] 

128 else: 

129 raise RuntimeError( 

130 "Incorrect lifter configuration. What type of data does %s expect?" % lifter.__class__ 

131 ) 

132 

133 try: 

134 final_irsb = lifter(arch, addr).lift( 

135 u_data, 

136 bytes_offset - skip, 

137 max_bytes, 

138 max_inst, 

139 opt_level, 

140 traceflags, 

141 allow_arch_optimizations, 

142 strict_block_end, 

143 skip_stmts, 

144 collect_data_refs=collect_data_refs, 

145 cross_insn_opt=cross_insn_opt, 

146 load_from_ro_regions=load_from_ro_regions, 

147 ) 

148 except SkipStatementsError: 

149 assert skip_stmts is True 

150 final_irsb = lifter(arch, addr).lift( 

151 u_data, 

152 bytes_offset - skip, 

153 max_bytes, 

154 max_inst, 

155 opt_level, 

156 traceflags, 

157 allow_arch_optimizations, 

158 strict_block_end, 

159 skip_stmts=False, 

160 collect_data_refs=collect_data_refs, 

161 cross_insn_opt=cross_insn_opt, 

162 load_from_ro_regions=load_from_ro_regions, 

163 ) 

164 break 

165 except LiftingException as ex: 

166 log.debug("Lifting Exception: %s", str(ex)) 

167 continue 

168 else: 

169 final_irsb = IRSB.empty_block( 

170 arch, 

171 addr, 

172 size=0, 

173 nxt=Const(const.vex_int_class(arch.bits)(addr)), 

174 jumpkind="Ijk_NoDecode", 

175 ) 

176 final_irsb.invalidate_direct_next() 

177 return final_irsb 

178 

179 if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode": 

180 # We have decoded a few bytes before we hit an undecodeable instruction. 

181 

182 # Determine if this is an intentional NoDecode, like the ud2 instruction on AMD64 

183 nodecode_addr_expr = final_irsb.next 

184 if type(nodecode_addr_expr) is Const: 

185 nodecode_addr = nodecode_addr_expr.con.value 

186 next_irsb_start_addr = addr + final_irsb.size 

187 if nodecode_addr != next_irsb_start_addr: 

188 # The last instruction of the IRSB has a non-zero length. This is an intentional NoDecode. 

189 # The very last instruction has been decoded 

190 final_irsb.jumpkind = "Ijk_NoDecode" 

191 final_irsb.next = final_irsb.next 

192 final_irsb.invalidate_direct_next() 

193 return final_irsb 

194 

195 # Decode more bytes 

196 if skip_stmts: 

197 # When gymrat will be invoked, we will merge future basic blocks to the current basic block. In this case, 

198 # statements are usually required. 

199 # TODO: In the future, we may further optimize it to handle cases where getting statements in gymrat is not 

200 # TODO: required. 

201 return lift( 

202 data, 

203 addr, 

204 arch, 

205 max_bytes=max_bytes, 

206 max_inst=max_inst, 

207 bytes_offset=bytes_offset, 

208 opt_level=opt_level, 

209 traceflags=traceflags, 

210 strict_block_end=strict_block_end, 

211 skip_stmts=False, 

212 collect_data_refs=collect_data_refs, 

213 ) 

214 

215 next_addr = addr + final_irsb.size 

216 if max_bytes is not None: 

217 max_bytes -= final_irsb.size 

218 if isinstance(data, (bytes, bytearray, memoryview)): 

219 data_left = data[final_irsb.size :] 

220 else: 

221 data_left = data + final_irsb.size 

222 if max_inst is not None: 

223 max_inst -= final_irsb.instructions 

224 if (max_bytes is None or max_bytes > 0) and (max_inst is None or max_inst > 0) and data_left: 

225 more_irsb = lift( 

226 data_left, 

227 next_addr, 

228 arch, 

229 max_bytes=max_bytes, 

230 max_inst=max_inst, 

231 bytes_offset=bytes_offset, 

232 opt_level=opt_level, 

233 traceflags=traceflags, 

234 strict_block_end=strict_block_end, 

235 inner=True, 

236 skip_stmts=False, 

237 collect_data_refs=collect_data_refs, 

238 ) 

239 if more_irsb.size: 

240 # Successfully decoded more bytes 

241 final_irsb.extend(more_irsb) 

242 elif max_bytes == 0: 

243 # We have no more bytes left. Mark the jumpkind of the IRSB as Ijk_Boring 

244 if final_irsb.size > 0 and final_irsb.jumpkind == "Ijk_NoDecode": 

245 final_irsb.jumpkind = "Ijk_Boring" 

246 final_irsb.next = Const(vex_int_class(arch.bits)(final_irsb.addr + final_irsb.size)) 

247 

248 if not inner: 

249 for postprocessor in postprocessors[arch.name]: 

250 try: 

251 postprocessor(final_irsb).postprocess() 

252 except NeedStatementsNotification as e: 

253 # The post-processor cannot work without statements. Re-lift the current block with skip_stmts=False 

254 if not skip_stmts: 

255 # sanity check 

256 # Why does the post-processor raise NeedStatementsNotification when skip_stmts is False? 

257 raise TypeError( 

258 "Bad post-processor %s: " 

259 "NeedStatementsNotification is raised when statements are available." % postprocessor.__class__ 

260 ) from e 

261 

262 # Re-lift the current IRSB 

263 return lift( 

264 data, 

265 addr, 

266 arch, 

267 max_bytes=max_bytes, 

268 max_inst=max_inst, 

269 bytes_offset=bytes_offset, 

270 opt_level=opt_level, 

271 traceflags=traceflags, 

272 strict_block_end=strict_block_end, 

273 inner=inner, 

274 skip_stmts=False, 

275 collect_data_refs=collect_data_refs, 

276 ) 

277 except LiftingException: 

278 continue 

279 

280 return final_irsb 

281 

282 

283def register(lifter, arch_name): 

284 """ 

285 Registers a Lifter or Postprocessor to be used by pyvex. Lifters are are given priority based on the order 

286 in which they are registered. Postprocessors will be run in registration order. 

287 

288 :param lifter: The Lifter or Postprocessor to register 

289 :vartype lifter: :class:`Lifter` or :class:`Postprocessor` 

290 """ 

291 if issubclass(lifter, Lifter): 

292 log.debug("Registering lifter %s for architecture %s.", lifter.__name__, arch_name) 

293 lifters[arch_name].append(lifter) 

294 if issubclass(lifter, Postprocessor): 

295 log.debug("Registering postprocessor %s for architecture %s.", lifter.__name__, arch_name) 

296 postprocessors[arch_name].append(lifter)