Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyvex/block.py: 66%
351 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:15 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:15 +0000
1import copy
2import itertools
3import logging
4from typing import List, Optional
6from . import expr, stmt
7from .const import get_type_size
8from .data_ref import DataRef
9from .enums import VEXObject
10from .errors import SkipStatementsError
11from .expr import RdTmp
12from .native import pvc
13from .stmt import CAS, LLSC, Dirty, Exit, IMark, IRExpr, IRStmt, LoadG, WrTmp, get_enum_from_int, get_int_from_enum
15log = logging.getLogger("pyvex.block")
18class IRSB(VEXObject):
19 """
20 The IRSB is the primary interface to pyvex. Constructing one of these will make a call into LibVEX to perform a
21 translation.
23 IRSB stands for *Intermediate Representation Super-Block*. An IRSB in VEX is a single-entry, multiple-exit code
24 block.
26 :ivar arch: The architecture this block is lifted under
27 :vartype arch: :class:`archinfo.Arch`
28 :ivar statements: The statements in this block
29 :vartype statements: list of :class:`IRStmt`
30 :ivar next: The expression for the default exit target of this block
31 :vartype next: :class:`IRExpr`
32 :ivar int offsIP: The offset of the instruction pointer in the VEX guest state
33 :ivar int stmts_used: The number of statements in this IRSB
34 :ivar str jumpkind: The type of this block's default jump (call, boring, syscall, etc) as a VEX enum string
35 :ivar bool direct_next: Whether this block ends with a direct (not indirect) jump or branch
36 :ivar int size: The size of this block in bytes
37 :ivar int addr: The address of this basic block, i.e. the address in the first IMark
38 """
40 __slots__ = (
41 "addr",
42 "arch",
43 "statements",
44 "next",
45 "_tyenv",
46 "jumpkind",
47 "_direct_next",
48 "_size",
49 "_instructions",
50 "_exit_statements",
51 "default_exit_target",
52 "_instruction_addresses",
53 "data_refs",
54 )
56 # The following constants shall match the defs in pyvex.h
57 MAX_EXITS = 400
58 MAX_DATA_REFS = 2000
60 def __init__(
61 self,
62 data,
63 mem_addr,
64 arch,
65 max_inst=None,
66 max_bytes=None,
67 bytes_offset=0,
68 traceflags=0,
69 opt_level=1,
70 num_inst=None,
71 num_bytes=None,
72 strict_block_end=False,
73 skip_stmts=False,
74 collect_data_refs=False,
75 cross_insn_opt=True,
76 ):
77 """
78 :param data: The bytes to lift. Can be either a string of bytes or a cffi buffer object.
79 You may also pass None to initialize an empty IRSB.
80 :type data: str or bytes or cffi.FFI.CData or None
81 :param int mem_addr: The address to lift the data at.
82 :param arch: The architecture to lift the data as.
83 :type arch: :class:`archinfo.Arch`
84 :param max_inst: The maximum number of instructions to lift. (See note below)
85 :param max_bytes: The maximum number of bytes to use.
86 :param num_inst: Replaces max_inst if max_inst is None. If set to None as well, no instruction limit
87 is used.
88 :param num_bytes: Replaces max_bytes if max_bytes is None. If set to None as well, no byte limit is
89 used.
90 :param bytes_offset: The offset into `data` to start lifting at. Note that for ARM THUMB mode, both
91 `mem_addr` and `bytes_offset` must be odd (typically `bytes_offset` is set to 1).
92 :param traceflags: The libVEX traceflags, controlling VEX debug prints.
93 :param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest
94 unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind
95 optimizations, 1 performs constant propogation, and 2 performs loop unrolling,
96 which honestly doesn't make much sense in the context of pyvex. The default is 1.
97 :param strict_block_end: Should the LibVEX arm-thumb split block at some instructions, for example CB{N}Z.
99 .. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work
100 exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump
101 instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding
102 fewer instructions than requested. Specifically, this means that lifting a branch or jump
103 on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent
104 attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`.
106 .. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block
107 ends properly or until it runs out of data to lift.
108 """
109 if max_inst is None:
110 max_inst = num_inst
111 if max_bytes is None:
112 max_bytes = num_bytes
113 VEXObject.__init__(self)
114 self.addr = mem_addr
115 self.arch = arch
117 self.statements: List[IRStmt] = []
118 self.next: Optional[IRExpr] = None
119 self._tyenv = None
120 self.jumpkind: Optional[str] = None
121 self._direct_next = None
122 self._size = None
123 self._instructions = None
124 self._exit_statements = None
125 self.default_exit_target = None
126 self.data_refs = ()
127 self._instruction_addresses = ()
129 if data is not None:
130 # This is the slower path (because we need to call _from_py() to copy the content in the returned IRSB to
131 # the current IRSB instance. You should always call `lift()` directly. This method is kept for compatibility
132 # concerns.
133 from pyvex.lifting import lift
135 irsb = lift(
136 data,
137 mem_addr,
138 arch,
139 max_bytes=max_bytes,
140 max_inst=max_inst,
141 bytes_offset=bytes_offset,
142 opt_level=opt_level,
143 traceflags=traceflags,
144 strict_block_end=strict_block_end,
145 skip_stmts=skip_stmts,
146 collect_data_refs=collect_data_refs,
147 cross_insn_opt=cross_insn_opt,
148 )
149 self._from_py(irsb)
151 @staticmethod
152 def empty_block(arch, addr, statements=None, nxt=None, tyenv=None, jumpkind=None, direct_next=None, size=None):
153 block = IRSB(None, addr, arch)
154 block._set_attributes(statements, nxt, tyenv, jumpkind, direct_next, size=size)
155 return block
157 @property
158 def tyenv(self):
159 if self._tyenv is None:
160 self._tyenv = IRTypeEnv(self.arch)
161 return self._tyenv
163 @tyenv.setter
164 def tyenv(self, v):
165 self._tyenv = v
167 @property
168 def has_statements(self):
169 return self.statements is not None and self.statements
171 @property
172 def exit_statements(self):
173 if self._exit_statements is not None:
174 return self._exit_statements
176 # Delayed process
177 if not self.has_statements:
178 return []
180 self._exit_statements = []
182 ins_addr = None
183 for idx, stmt_ in enumerate(self.statements):
184 if type(stmt_) is IMark:
185 ins_addr = stmt_.addr + stmt_.delta
186 elif type(stmt_) is Exit:
187 self._exit_statements.append((ins_addr, idx, stmt_))
189 self._exit_statements = tuple(self._exit_statements)
190 return self._exit_statements
192 def copy(self):
193 return copy.deepcopy(self)
195 def extend(self, extendwith):
196 """
197 Appends an irsb to the current irsb. The irsb that is appended is invalidated. The appended irsb's jumpkind and
198 default exit are used.
199 :param extendwith: The IRSB to append to this IRSB
200 :vartype extendwith: :class:`IRSB`
201 """
202 if self.stmts_used == 0:
203 self._from_py(extendwith)
204 return
206 conversion_dict = {}
207 invalid_vals = (0xFFFFFFFF, -1)
209 new_size = self.size + extendwith.size
210 new_instructions = self.instructions + extendwith.instructions
211 new_direct_next = extendwith.direct_next
213 def convert_tmp(tmp):
214 """
215 Converts a tmp from the appended-block into one in the appended-to-block. Creates a new tmp if it does not
216 already exist. Prevents collisions in tmp numbers between the two blocks.
217 :param tmp: The tmp number to convert
218 """
219 if tmp not in conversion_dict:
220 tmp_type = extendwith.tyenv.lookup(tmp)
221 conversion_dict[tmp] = self.tyenv.add(tmp_type)
222 return conversion_dict[tmp]
224 def convert_expr(expr_):
225 """
226 Converts a VEX expression to use tmps in the appended-block instead of the appended-to-block. Used to
227 prevent collisions in tmp numbers between the two blocks.
228 :param tmp: The VEX expression to convert
229 :vartype expr: :class:`IRExpr`
230 """
231 if type(expr_) is RdTmp:
232 return RdTmp.get_instance(convert_tmp(expr_.tmp))
233 return expr_
235 for stmt_ in extendwith.statements:
236 stmttype = type(stmt_)
237 if stmttype is WrTmp:
238 stmt_.tmp = convert_tmp(stmt_.tmp)
239 elif stmttype is LoadG:
240 stmt_.dst = convert_tmp(stmt_.dst)
241 elif stmttype is LLSC:
242 stmt_.result = convert_tmp(stmt_.result)
243 elif stmttype is Dirty:
244 if stmt_.tmp not in invalid_vals:
245 stmt_.tmp = convert_tmp(stmt_.tmp)
246 for e in stmt_.args:
247 convert_expr(e)
248 elif stmttype is CAS:
249 if stmt_.oldLo not in invalid_vals:
250 stmt_.oldLo = convert_tmp(stmt_.oldLo)
251 if stmt_.oldHi not in invalid_vals:
252 stmt_.oldHi = convert_tmp(stmt_.oldHi)
253 # Convert all expressions
254 to_replace = {}
255 for expr_ in stmt_.expressions:
256 replacement = convert_expr(expr_)
257 if replacement is not expr_:
258 to_replace[expr_] = replacement
259 stmt_.replace_expression(to_replace)
260 # Add the converted statement to self.statements
261 self.statements.append(stmt_)
262 extendwith.next = convert_expr(extendwith.next)
263 self.next = extendwith.next
264 self.jumpkind = extendwith.jumpkind
265 self._size = new_size
266 self._instructions = new_instructions
267 self._direct_next = new_direct_next
269 # TODO: Change exit_statements, data_references, etc.
271 def invalidate_direct_next(self):
272 self._direct_next = None
274 def pp(self):
275 """
276 Pretty-print the IRSB to stdout.
277 """
278 print(self._pp_str())
280 def __repr__(self):
281 return f"IRSB <0x{self.size:x} bytes, {self.instructions} ins., {str(self.arch)}> at 0x{self.addr:x}"
283 def __str__(self):
284 return self._pp_str()
286 def __eq__(self, other):
287 return (
288 isinstance(other, IRSB)
289 and self.addr == other.addr
290 and self.arch.name == other.arch.name
291 and self.statements == other.statements
292 and self.next == other.next
293 and self.jumpkind == other.jumpkind
294 )
296 def __hash__(self):
297 return hash((IRSB, self.addr, self.arch.name, tuple(self.statements), self.next, self.jumpkind))
299 def typecheck(self):
300 try:
301 # existence assertions
302 assert self.next is not None, "Missing next expression"
303 assert self.jumpkind is not None, "Missing jumpkind"
305 # Type assertions
306 assert isinstance(self.next, expr.IRExpr), "Next expression is not an expression"
307 assert type(self.jumpkind is str), "Jumpkind is not a string"
308 assert self.jumpkind.startswith("Ijk_"), "Jumpkind is not a jumpkind enum"
309 assert self.tyenv.typecheck(), "Type environment contains invalid types"
311 # statement assertions
312 last_imark = None
313 for i, st in enumerate(self.statements):
314 assert isinstance(st, stmt.IRStmt), "Statement %d is not an IRStmt" % i
315 try:
316 assert st.typecheck(self.tyenv), "Statement %d failed to typecheck" % i
317 except Exception: # pylint: disable=bare-except
318 assert False, "Statement %d errored in typechecking" % i
320 if type(st) is stmt.NoOp:
321 continue
322 elif type(st) is stmt.IMark:
323 if last_imark is not None:
324 # pylint: disable=unsubscriptable-object
325 assert last_imark[0] + last_imark[1] == st.addr, "IMarks sizes overlap or have gaps"
326 last_imark = (st.addr, st.len)
327 else:
328 assert last_imark is not None, "Operation statement appears before IMark"
330 assert last_imark is not None, "No IMarks present in block"
331 except AssertionError as e:
332 log.debug(e.args[0])
333 return False
334 return True
336 #
337 # alternate constructors
338 #
340 @staticmethod
341 def from_c(c_irsb, mem_addr, arch):
342 irsb = IRSB(None, mem_addr, arch)
343 irsb._from_c(c_irsb)
344 return irsb
346 @staticmethod
347 def from_py(tyenv, stmts, next_expr, jumpkind, mem_addr, arch):
348 irsb = IRSB(None, mem_addr, arch)
350 irsb.tyenv = tyenv
351 irsb.statements = stmts
352 irsb.next = next_expr
353 irsb.jumpkind = jumpkind
354 irsb._direct_next = irsb._is_defaultexit_direct_jump()
356 return irsb
358 #
359 # simple properties useful for analysis
360 #
362 @property
363 def stmts_used(self):
364 if self.statements is None:
365 return 0
366 return len(self.statements)
368 @property
369 def offsIP(self):
370 return self.arch.ip_offset
372 @property
373 def direct_next(self):
374 if self._direct_next is None:
375 self._direct_next = self._is_defaultexit_direct_jump()
376 return self._direct_next
378 @property
379 def expressions(self):
380 """
381 Return an iterator of all expressions contained in the IRSB.
382 """
383 for s in self.statements:
384 yield from s.expressions
385 yield self.next
387 @property
388 def instructions(self):
389 """
390 The number of instructions in this block
391 """
392 if self._instructions is None:
393 if self.statements is None:
394 self._instructions = 0
395 else:
396 self._instructions = len([s for s in self.statements if type(s) is stmt.IMark])
397 return self._instructions
399 @property
400 def instruction_addresses(self):
401 """
402 Addresses of instructions in this block.
403 """
404 if self._instruction_addresses is None:
405 if self.statements is None:
406 self._instruction_addresses = []
407 else:
408 self._instruction_addresses = [(s.addr + s.delta) for s in self.statements if type(s) is stmt.IMark]
409 return self._instruction_addresses
411 @property
412 def size(self):
413 """
414 The size of this block, in bytes
415 """
416 if self._size is None:
417 self._size = sum(s.len for s in self.statements if type(s) is stmt.IMark)
418 return self._size
420 @property
421 def operations(self):
422 """
423 A list of all operations done by the IRSB, as libVEX enum names
424 """
425 ops = []
426 for e in self.expressions:
427 if hasattr(e, "op"):
428 ops.append(e.op)
429 return ops
431 @property
432 def all_constants(self):
433 """
434 Returns all constants in the block (including incrementing of the program counter) as
435 :class:`pyvex.const.IRConst`.
436 """
437 return sum((e.constants for e in self.expressions), [])
439 @property
440 def constants(self):
441 """
442 The constants (excluding updates of the program counter) in the IRSB as :class:`pyvex.const.IRConst`.
443 """
444 return sum((s.constants for s in self.statements if not (type(s) is stmt.Put and s.offset == self.offsIP)), [])
446 @property
447 def constant_jump_targets(self):
448 """
449 A set of the static jump targets of the basic block.
450 """
451 exits = set()
453 if self.exit_statements:
454 for _, _, stmt_ in self.exit_statements:
455 exits.add(stmt_.dst.value)
457 default_target = self.default_exit_target
458 if default_target is not None:
459 exits.add(default_target)
461 return exits
463 @property
464 def constant_jump_targets_and_jumpkinds(self):
465 """
466 A dict of the static jump targets of the basic block to their jumpkind.
467 """
468 exits = {}
470 if self.exit_statements:
471 for _, _, stmt_ in self.exit_statements:
472 exits[stmt_.dst.value] = stmt_.jumpkind
474 default_target = self.default_exit_target
475 if default_target is not None:
476 exits[default_target] = self.jumpkind
478 return exits
480 #
481 # private methods
482 #
484 def _pp_str(self):
485 """
486 Return the pretty-printed IRSB.
488 :rtype: str
489 """
490 sa = []
491 sa.append("IRSB {")
492 if self.statements is not None:
493 sa.append(" %s" % self.tyenv)
494 sa.append("")
495 if self.statements is not None:
496 for i, s in enumerate(self.statements):
497 if isinstance(s, stmt.Put):
498 stmt_str = s.__str__(
499 reg_name=self.arch.translate_register_name(s.offset, s.data.result_size(self.tyenv) // 8)
500 )
501 elif isinstance(s, stmt.WrTmp) and isinstance(s.data, expr.Get):
502 stmt_str = s.__str__(
503 reg_name=self.arch.translate_register_name(s.data.offset, s.data.result_size(self.tyenv) // 8)
504 )
505 elif isinstance(s, stmt.Exit):
506 stmt_str = s.__str__(reg_name=self.arch.translate_register_name(s.offsIP, self.arch.bits // 8))
507 else:
508 stmt_str = s.__str__()
509 sa.append(" %02d | %s" % (i, stmt_str))
510 else:
511 sa.append(" Statements are omitted.")
512 sa.append(f" NEXT: PUT({self.arch.translate_register_name(self.offsIP)}) = {self.next}; {self.jumpkind}")
513 sa.append("}")
514 return "\n".join(sa)
516 def _is_defaultexit_direct_jump(self):
517 """
518 Checks if the default of this IRSB a direct jump or not.
519 """
520 if not (self.jumpkind == "Ijk_InvalICache" or self.jumpkind == "Ijk_Boring" or self.jumpkind == "Ijk_Call"):
521 return False
523 target = self.default_exit_target
524 return target is not None
526 #
527 # internal "constructors" to fill this block out with data from various sources
528 #
530 def _from_c(self, lift_r, skip_stmts=False):
531 c_irsb = lift_r.irsb
532 if not skip_stmts:
533 self.statements = [stmt.IRStmt._from_c(c_irsb.stmts[i]) for i in range(c_irsb.stmts_used)]
534 self.tyenv = IRTypeEnv._from_c(self.arch, c_irsb.tyenv)
535 else:
536 self.statements = None
537 self.tyenv = None
539 self.next = expr.IRExpr._from_c(c_irsb.next)
540 self.jumpkind = get_enum_from_int(c_irsb.jumpkind)
541 self._size = lift_r.size
542 self._instructions = lift_r.insts
543 self._instruction_addresses = tuple(itertools.islice(lift_r.inst_addrs, lift_r.insts))
545 # Conditional exits
546 self._exit_statements = []
547 if skip_stmts:
548 if lift_r.exit_count > self.MAX_EXITS:
549 # There are more exits than the default size of the exits array. We will need all statements
550 raise SkipStatementsError("exit_count exceeded MAX_EXITS (%d)" % self.MAX_EXITS)
551 for i in range(lift_r.exit_count):
552 ex = lift_r.exits[i]
553 exit_stmt = stmt.IRStmt._from_c(ex.stmt)
554 self._exit_statements.append((ex.ins_addr, ex.stmt_idx, exit_stmt))
556 self._exit_statements = tuple(self._exit_statements)
557 else:
558 self._exit_statements = None # It will be generated when self.exit_statements is called
559 # The default exit
560 if lift_r.is_default_exit_constant == 1:
561 self.default_exit_target = lift_r.default_exit
562 else:
563 self.default_exit_target = None
565 # Data references
566 self.data_refs = None
567 if lift_r.data_ref_count > 0:
568 if lift_r.data_ref_count > self.MAX_DATA_REFS:
569 raise SkipStatementsError("data_ref_count exceeded MAX_DATA_REFS (%d)" % self.MAX_DATA_REFS)
570 self.data_refs = [DataRef.from_c(lift_r.data_refs[i]) for i in range(lift_r.data_ref_count)]
572 def _set_attributes(
573 self,
574 statements=None,
575 nxt=None,
576 tyenv=None,
577 jumpkind=None,
578 direct_next=None,
579 size=None,
580 instructions=None,
581 instruction_addresses=None,
582 exit_statements=None,
583 default_exit_target=None,
584 ):
585 self.statements = statements if statements is not None else []
586 self.next = nxt
587 if tyenv is not None:
588 self.tyenv = tyenv
589 self.jumpkind = jumpkind
590 self._direct_next = direct_next
591 self._size = size
592 self._instructions = instructions
593 self._instruction_addresses = instruction_addresses
594 self._exit_statements = exit_statements
595 self.default_exit_target = default_exit_target
597 def _from_py(self, irsb):
598 self._set_attributes(
599 irsb.statements,
600 irsb.next,
601 irsb.tyenv,
602 irsb.jumpkind,
603 irsb.direct_next,
604 irsb.size,
605 instructions=irsb._instructions,
606 instruction_addresses=irsb._instruction_addresses,
607 exit_statements=irsb.exit_statements,
608 default_exit_target=irsb.default_exit_target,
609 )
612class IRTypeEnv(VEXObject):
613 """
614 An IR type environment.
616 :ivar types: A list of the types of all the temporaries in this block as VEX enum strings.
617 `types[3]` is the type of t3.
618 :vartype types: list of str
619 """
621 __slots__ = ["types", "wordty"]
623 def __init__(self, arch, types=None):
624 VEXObject.__init__(self)
625 self.types = [] if types is None else types
626 self.wordty = "Ity_I%d" % arch.bits
628 def __str__(self):
629 return " ".join(("t%d:%s" % (i, t)) for i, t in enumerate(self.types))
631 def lookup(self, tmp):
632 """
633 Return the type of temporary variable `tmp` as an enum string
634 """
635 if tmp < 0 or tmp > self.types_used:
636 log.debug("Invalid temporary number %d", tmp)
637 raise IndexError(tmp)
638 return self.types[tmp]
640 def sizeof(self, tmp):
641 return get_type_size(self.lookup(tmp))
643 def add(self, ty):
644 """
645 Add a new tmp of type `ty` to the environment. Returns the number of the new tmp.
646 """
647 self.types.append(ty)
648 return self.types_used - 1
650 @property
651 def types_used(self):
652 return len(self.types)
654 @staticmethod
655 def _from_c(arch, c_tyenv):
656 return IRTypeEnv(arch, [get_enum_from_int(c_tyenv.types[t]) for t in range(c_tyenv.types_used)])
658 @staticmethod
659 def _to_c(tyenv):
660 c_tyenv = pvc.emptyIRTypeEnv()
661 for ty in tyenv.types:
662 pvc.newIRTemp(c_tyenv, get_int_from_enum(ty))
663 return c_tyenv
665 def typecheck(self):
666 for ty in self.types:
667 try:
668 get_type_size(ty)
669 except ValueError:
670 return False
671 return True