Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyvex/block.py: 66%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import itertools
3import logging
4from typing import Optional
6from . import expr, stmt
7from .const import U1, get_type_size
8from .const_val import ConstVal
9from .data_ref import DataRef
10from .enums import VEXObject
11from .errors import SkipStatementsError
12from .expr import Const, RdTmp
13from .native import pvc
14from .stmt import (
15 CAS,
16 LLSC,
17 Dirty,
18 Exit,
19 IMark,
20 IRExpr,
21 IRStmt,
22 LoadG,
23 WrTmp,
24 get_enum_from_int,
25 get_int_from_enum,
26)
27from .types import Arch
29log = logging.getLogger("pyvex.block")
32class IRSB(VEXObject):
33 """
34 The IRSB is the primary interface to pyvex. Constructing one of these will make a call into LibVEX to perform a
35 translation.
37 IRSB stands for *Intermediate Representation Super-Block*. An IRSB in VEX is a single-entry, multiple-exit code
38 block.
40 :ivar arch: The architecture this block is lifted under. Must duck-type as :class:`archinfo.arch.Arch`
41 :ivar statements: The statements in this block
42 :vartype statements: list of :class:`IRStmt`
43 :ivar next: The expression for the default exit target of this block
44 :vartype next: :class:`IRExpr`
45 :ivar int offsIP: The offset of the instruction pointer in the VEX guest state
46 :ivar int stmts_used: The number of statements in this IRSB
47 :ivar str jumpkind: The type of this block's default jump (call, boring, syscall, etc) as a VEX enum string
48 :ivar bool direct_next: Whether this block ends with a direct (not indirect) jump or branch
49 :ivar int size: The size of this block in bytes
50 :ivar int addr: The address of this basic block, i.e. the address in the first IMark
51 """
53 __slots__ = [
54 "addr",
55 "arch",
56 "statements",
57 "next",
58 "_tyenv",
59 "jumpkind",
60 "is_noop_block",
61 "_direct_next",
62 "_size",
63 "_instructions",
64 "_exit_statements",
65 "default_exit_target",
66 "_instruction_addresses",
67 "data_refs",
68 "const_vals",
69 ]
71 # The following constants shall match the defs in pyvex.h
72 MAX_EXITS = 400
73 MAX_DATA_REFS = 2000
74 MAX_CONST_VALS = 1000
76 def __init__(
77 self,
78 data,
79 mem_addr,
80 arch: Arch,
81 max_inst=None,
82 max_bytes=None,
83 bytes_offset=0,
84 traceflags=0,
85 opt_level=1,
86 num_inst=None,
87 num_bytes=None,
88 strict_block_end=False,
89 skip_stmts=False,
90 collect_data_refs=False,
91 cross_insn_opt=True,
92 ):
93 """
94 :param data: The bytes to lift. Can be either a string of bytes or a cffi buffer object.
95 You may also pass None to initialize an empty IRSB.
96 :type data: str or bytes or cffi.FFI.CData or None
97 :param int mem_addr: The address to lift the data at.
98 :param arch: The architecture to lift the data as.
99 :param max_inst: The maximum number of instructions to lift. (See note below)
100 :param max_bytes: The maximum number of bytes to use.
101 :param num_inst: Replaces max_inst if max_inst is None. If set to None as well, no instruction limit
102 is used.
103 :param num_bytes: Replaces max_bytes if max_bytes is None. If set to None as well, no byte limit is
104 used.
105 :param bytes_offset: The offset into `data` to start lifting at. Note that for ARM THUMB mode, both
106 `mem_addr` and `bytes_offset` must be odd (typically `bytes_offset` is set to 1).
107 :param traceflags: The libVEX traceflags, controlling VEX debug prints.
108 :param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest
109 unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind
110 optimizations, 1 performs constant propogation, and 2 performs loop unrolling,
111 which honestly doesn't make much sense in the context of pyvex. The default is 1.
112 :param strict_block_end: Should the LibVEX arm-thumb split block at some instructions, for example CB{N}Z.
114 .. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work
115 exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump
116 instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding
117 fewer instructions than requested. Specifically, this means that lifting a branch or jump
118 on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent
119 attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`.
121 .. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block
122 ends properly or until it runs out of data to lift.
123 """
124 if max_inst is None:
125 max_inst = num_inst
126 if max_bytes is None:
127 max_bytes = num_bytes
128 VEXObject.__init__(self)
129 self.addr = mem_addr
130 self.arch: Arch = arch
132 self.statements: list[IRStmt] = []
133 self.next: IRExpr = Const(U1(0))
134 self._tyenv: Optional["IRTypeEnv"] = None
135 self.jumpkind: str = "UNSET"
136 self._direct_next: bool | None = None
137 self._size: int | None = None
138 self._instructions: int | None = None
139 self._exit_statements: tuple[tuple[int, int, IRStmt], ...] | None = None
140 self.is_noop_block: bool = False
141 self.default_exit_target = None
142 self.data_refs = ()
143 self.const_vals = ()
144 self._instruction_addresses: tuple[int, ...] = ()
146 if data is not None:
147 # This is the slower path (because we need to call _from_py() to copy the content in the returned IRSB to
148 # the current IRSB instance. You should always call `lift()` directly. This method is kept for compatibility
149 # concerns.
150 from pyvex.lifting import lift
152 irsb = lift(
153 data,
154 mem_addr,
155 arch,
156 max_bytes=max_bytes,
157 max_inst=max_inst,
158 bytes_offset=bytes_offset,
159 opt_level=opt_level,
160 traceflags=traceflags,
161 strict_block_end=strict_block_end,
162 skip_stmts=skip_stmts,
163 collect_data_refs=collect_data_refs,
164 cross_insn_opt=cross_insn_opt,
165 )
166 self._from_py(irsb)
168 @staticmethod
169 def empty_block(arch, addr, statements=None, nxt=None, tyenv=None, jumpkind=None, direct_next=None, size=None):
170 block = IRSB(None, addr, arch)
171 block._set_attributes(statements, nxt, tyenv, jumpkind, direct_next, size=size)
172 return block
174 @property
175 def tyenv(self) -> "IRTypeEnv":
176 if self._tyenv is None:
177 self._tyenv = IRTypeEnv(self.arch)
178 return self._tyenv
180 @tyenv.setter
181 def tyenv(self, v):
182 self._tyenv = v
184 @property
185 def has_statements(self) -> bool:
186 return self.statements is not None and bool(self.statements)
188 @property
189 def exit_statements(self) -> tuple[tuple[int, int, IRStmt], ...]:
190 if self._exit_statements is not None:
191 return self._exit_statements
193 # Delayed process
194 if not self.has_statements:
195 return ()
197 exit_statements = []
199 ins_addr = None
200 for idx, stmt_ in enumerate(self.statements):
201 if type(stmt_) is IMark:
202 ins_addr = stmt_.addr + stmt_.delta
203 elif type(stmt_) is Exit:
204 assert ins_addr is not None
205 exit_statements.append((ins_addr, idx, stmt_))
207 self._exit_statements = tuple(exit_statements)
208 return self._exit_statements
210 def copy(self) -> "IRSB":
211 return copy.deepcopy(self)
213 def extend(self, extendwith) -> None:
214 """
215 Appends an irsb to the current irsb. The irsb that is appended is invalidated. The appended irsb's jumpkind and
216 default exit are used.
217 :param extendwith: The IRSB to append to this IRSB
218 :vartype extendwith: :class:`IRSB`
219 """
220 if self.stmts_used == 0:
221 self._from_py(extendwith)
222 return
224 conversion_dict = {}
225 invalid_vals = (0xFFFFFFFF, -1)
227 new_size = self.size + extendwith.size
228 new_instructions = self.instructions + extendwith.instructions
229 new_direct_next = extendwith.direct_next
231 def convert_tmp(tmp):
232 """
233 Converts a tmp from the appended-block into one in the appended-to-block. Creates a new tmp if it does not
234 already exist. Prevents collisions in tmp numbers between the two blocks.
235 :param tmp: The tmp number to convert
236 """
237 if tmp not in conversion_dict:
238 tmp_type = extendwith.tyenv.lookup(tmp)
239 conversion_dict[tmp] = self.tyenv.add(tmp_type)
240 return conversion_dict[tmp]
242 def convert_expr(expr_):
243 """
244 Converts a VEX expression to use tmps in the appended-block instead of the appended-to-block. Used to
245 prevent collisions in tmp numbers between the two blocks.
246 :param tmp: The VEX expression to convert
247 :vartype expr: :class:`IRExpr`
248 """
249 if type(expr_) is RdTmp:
250 return RdTmp.get_instance(convert_tmp(expr_.tmp))
251 return expr_
253 for stmt_ in extendwith.statements:
254 stmttype = type(stmt_)
255 if stmttype is WrTmp:
256 stmt_.tmp = convert_tmp(stmt_.tmp)
257 elif stmttype is LoadG:
258 stmt_.dst = convert_tmp(stmt_.dst)
259 elif stmttype is LLSC:
260 stmt_.result = convert_tmp(stmt_.result)
261 elif stmttype is Dirty:
262 if stmt_.tmp not in invalid_vals:
263 stmt_.tmp = convert_tmp(stmt_.tmp)
264 for e in stmt_.args:
265 convert_expr(e)
266 elif stmttype is CAS:
267 if stmt_.oldLo not in invalid_vals:
268 stmt_.oldLo = convert_tmp(stmt_.oldLo)
269 if stmt_.oldHi not in invalid_vals:
270 stmt_.oldHi = convert_tmp(stmt_.oldHi)
271 # Convert all expressions
272 to_replace = {}
273 for expr_ in stmt_.expressions:
274 replacement = convert_expr(expr_)
275 if replacement is not expr_:
276 to_replace[expr_] = replacement
277 stmt_.replace_expression(to_replace)
278 # Add the converted statement to self.statements
279 self.statements.append(stmt_)
280 extendwith.next = convert_expr(extendwith.next)
281 self.next = extendwith.next
282 self.jumpkind = extendwith.jumpkind
283 self._size = new_size
284 self._instructions = new_instructions
285 self._direct_next = new_direct_next
287 # TODO: Change exit_statements, data_references, etc.
289 def invalidate_direct_next(self) -> None:
290 self._direct_next = None
292 def pp(self) -> None:
293 """
294 Pretty-print the IRSB to stdout.
295 """
296 print(self._pp_str())
298 def __repr__(self):
299 return f"IRSB <0x{self.size:x} bytes, {self.instructions} ins., {str(self.arch)}> at 0x{self.addr:x}"
301 def __str__(self):
302 return self._pp_str()
304 def __eq__(self, other):
305 return (
306 isinstance(other, IRSB)
307 and self.addr == other.addr
308 and self.arch.name == other.arch.name
309 and self.statements == other.statements
310 and self.next == other.next
311 and self.jumpkind == other.jumpkind
312 )
314 def __hash__(self):
315 return hash((IRSB, self.addr, self.arch.name, tuple(self.statements), self.next, self.jumpkind))
317 def typecheck(self) -> bool:
318 try:
319 # existence assertions
320 assert self.next is not None, "Missing next expression"
321 assert self.jumpkind is not None, "Missing jumpkind"
323 # Type assertions
324 assert isinstance(self.next, expr.IRExpr), "Next expression is not an expression"
325 assert type(self.jumpkind is str), "Jumpkind is not a string"
326 assert self.jumpkind.startswith("Ijk_"), "Jumpkind is not a jumpkind enum"
327 assert self.tyenv.typecheck(), "Type environment contains invalid types"
329 # statement assertions
330 last_imark = None
331 for i, st in enumerate(self.statements):
332 assert isinstance(st, stmt.IRStmt), "Statement %d is not an IRStmt" % i
333 try:
334 assert st.typecheck(self.tyenv), "Statement %d failed to typecheck" % i
335 except Exception: # pylint: disable=bare-except
336 assert False, "Statement %d errored in typechecking" % i
338 if type(st) is stmt.NoOp:
339 continue
340 elif type(st) is stmt.IMark:
341 if last_imark is not None:
342 # pylint: disable=unsubscriptable-object
343 assert last_imark[0] + last_imark[1] == st.addr, "IMarks sizes overlap or have gaps"
344 last_imark = (st.addr, st.len)
345 else:
346 assert last_imark is not None, "Operation statement appears before IMark"
348 assert last_imark is not None, "No IMarks present in block"
349 except AssertionError as e:
350 log.debug(e.args[0])
351 return False
352 return True
354 #
355 # alternate constructors
356 #
358 @staticmethod
359 def from_c(c_irsb, mem_addr, arch) -> "IRSB":
360 irsb = IRSB(None, mem_addr, arch)
361 irsb._from_c(c_irsb)
362 return irsb
364 @staticmethod
365 def from_py(tyenv, stmts, next_expr, jumpkind, mem_addr, arch) -> "IRSB":
366 irsb = IRSB(None, mem_addr, arch)
368 irsb.tyenv = tyenv
369 irsb.statements = stmts
370 irsb.next = next_expr
371 irsb.jumpkind = jumpkind
372 irsb._direct_next = irsb._is_defaultexit_direct_jump()
374 return irsb
376 #
377 # simple properties useful for analysis
378 #
380 @property
381 def stmts_used(self) -> int:
382 if self.statements is None:
383 return 0
384 return len(self.statements)
386 @property
387 def offsIP(self) -> int:
388 return self.arch.ip_offset
390 @property
391 def direct_next(self):
392 if self._direct_next is None:
393 self._direct_next = self._is_defaultexit_direct_jump()
394 return self._direct_next
396 @property
397 def expressions(self):
398 """
399 Return an iterator of all expressions contained in the IRSB.
400 """
401 for s in self.statements:
402 yield from s.expressions
403 yield self.next
405 @property
406 def instructions(self):
407 """
408 The number of instructions in this block
409 """
410 if self._instructions is None:
411 if self.statements is None:
412 self._instructions = 0
413 else:
414 self._instructions = len([s for s in self.statements if type(s) is stmt.IMark])
415 return self._instructions
417 @property
418 def instruction_addresses(self) -> tuple[int, ...]:
419 """
420 Addresses of instructions in this block.
421 """
422 if self._instruction_addresses is None:
423 if self.statements is None:
424 self._instruction_addresses = ()
425 else:
426 self._instruction_addresses = tuple(
427 (s.addr + s.delta) for s in self.statements if type(s) is stmt.IMark
428 )
429 return self._instruction_addresses
431 @property
432 def size(self):
433 """
434 The size of this block, in bytes
435 """
436 if self._size is None:
437 self._size = sum(s.len for s in self.statements if type(s) is stmt.IMark)
438 return self._size
440 @property
441 def operations(self):
442 """
443 A list of all operations done by the IRSB, as libVEX enum names
444 """
445 ops = []
446 for e in self.expressions:
447 if hasattr(e, "op"):
448 ops.append(e.op)
449 return ops
451 @property
452 def all_constants(self):
453 """
454 Returns all constants in the block (including incrementing of the program counter) as
455 :class:`pyvex.const.IRConst`.
456 """
457 return sum((e.constants for e in self.expressions), [])
459 @property
460 def constants(self):
461 """
462 The constants (excluding updates of the program counter) in the IRSB as :class:`pyvex.const.IRConst`.
463 """
464 return sum((s.constants for s in self.statements if not (type(s) is stmt.Put and s.offset == self.offsIP)), [])
466 @property
467 def constant_jump_targets(self):
468 """
469 A set of the static jump targets of the basic block.
470 """
471 exits = set()
473 if self.exit_statements:
474 for _, _, stmt_ in self.exit_statements:
475 exits.add(stmt_.dst.value)
477 default_target = self.default_exit_target
478 if default_target is not None:
479 exits.add(default_target)
481 return exits
483 @property
484 def constant_jump_targets_and_jumpkinds(self):
485 """
486 A dict of the static jump targets of the basic block to their jumpkind.
487 """
488 exits = {}
490 if self.exit_statements:
491 for _, _, stmt_ in self.exit_statements:
492 exits[stmt_.dst.value] = stmt_.jumpkind
494 default_target = self.default_exit_target
495 if default_target is not None:
496 exits[default_target] = self.jumpkind
498 return exits
500 #
501 # private methods
502 #
504 def _pp_str(self) -> str:
505 """
506 Return the pretty-printed IRSB.
507 """
508 sa = []
509 sa.append("IRSB {")
510 if self.statements is not None:
511 sa.append(" %s" % self.tyenv)
512 sa.append("")
513 if self.statements is not None:
514 for i, s in enumerate(self.statements):
515 if isinstance(s, stmt.Put):
516 stmt_str = s.pp_str(
517 reg_name=self.arch.translate_register_name(s.offset, s.data.result_size(self.tyenv) // 8)
518 )
519 elif isinstance(s, stmt.WrTmp) and isinstance(s.data, expr.Get):
520 stmt_str = s.pp_str(
521 reg_name=self.arch.translate_register_name(s.data.offset, s.data.result_size(self.tyenv) // 8)
522 )
523 elif isinstance(s, stmt.Exit):
524 stmt_str = s.pp_str(reg_name=self.arch.translate_register_name(s.offsIP, self.arch.bits // 8))
525 else:
526 stmt_str = s.pp_str()
527 sa.append(" %02d | %s" % (i, stmt_str))
528 else:
529 sa.append(" Statements are omitted.")
530 sa.append(f" NEXT: PUT({self.arch.translate_register_name(self.offsIP)}) = {self.next}; {self.jumpkind}")
531 sa.append("}")
532 return "\n".join(sa)
534 def _is_defaultexit_direct_jump(self):
535 """
536 Checks if the default of this IRSB a direct jump or not.
537 """
538 if not (self.jumpkind == "Ijk_InvalICache" or self.jumpkind == "Ijk_Boring" or self.jumpkind == "Ijk_Call"):
539 return False
541 target = self.default_exit_target
542 return target is not None
544 #
545 # internal "constructors" to fill this block out with data from various sources
546 #
548 def _from_c(self, lift_r, skip_stmts=False):
549 c_irsb = lift_r.irsb
550 if not skip_stmts:
551 self.statements = [stmt.IRStmt._from_c(c_irsb.stmts[i]) for i in range(c_irsb.stmts_used)]
552 self.tyenv = IRTypeEnv._from_c(self.arch, c_irsb.tyenv)
553 else:
554 self.statements = None
555 self.tyenv = None
557 self.next = expr.IRExpr._from_c(c_irsb.next)
558 self.jumpkind = get_enum_from_int(c_irsb.jumpkind)
559 self._size = lift_r.size
560 self.is_noop_block = lift_r.is_noop_block == 1
561 self._instructions = lift_r.insts
562 self._instruction_addresses = tuple(itertools.islice(lift_r.inst_addrs, lift_r.insts))
564 # Conditional exits
565 exit_statements = []
566 if skip_stmts:
567 if lift_r.exit_count > self.MAX_EXITS:
568 # There are more exits than the default size of the exits array. We will need all statements
569 raise SkipStatementsError("exit_count exceeded MAX_EXITS (%d)" % self.MAX_EXITS)
570 for i in range(lift_r.exit_count):
571 ex = lift_r.exits[i]
572 exit_stmt = stmt.IRStmt._from_c(ex.stmt)
573 exit_statements.append((ex.ins_addr, ex.stmt_idx, exit_stmt))
575 self._exit_statements = tuple(exit_statements)
576 else:
577 self._exit_statements = None # It will be generated when self.exit_statements is called
578 # The default exit
579 if lift_r.is_default_exit_constant == 1:
580 self.default_exit_target = lift_r.default_exit
581 else:
582 self.default_exit_target = None
584 # Data references
585 self.data_refs = None
586 if lift_r.data_ref_count > 0:
587 if lift_r.data_ref_count > self.MAX_DATA_REFS:
588 raise SkipStatementsError(f"data_ref_count exceeded MAX_DATA_REFS ({self.MAX_DATA_REFS})")
589 self.data_refs = [DataRef.from_c(lift_r.data_refs[i]) for i in range(lift_r.data_ref_count)]
591 # Const values
592 self.const_vals = None
593 if lift_r.const_val_count > 0:
594 if lift_r.const_val_count > self.MAX_CONST_VALS:
595 raise SkipStatementsError(f"const_val_count exceeded MAX_CONST_VALS ({self.MAX_CONST_VALS})")
596 self.const_vals = [ConstVal.from_c(lift_r.const_vals[i]) for i in range(lift_r.const_val_count)]
598 def _set_attributes(
599 self,
600 statements=None,
601 nxt=None,
602 tyenv=None,
603 jumpkind=None,
604 direct_next=None,
605 size=None,
606 instructions=None,
607 instruction_addresses=None,
608 exit_statements=None,
609 default_exit_target=None,
610 ):
611 self.statements = statements if statements is not None else []
612 self.next = nxt
613 if tyenv is not None:
614 self.tyenv = tyenv
615 self.jumpkind = jumpkind
616 self._direct_next = direct_next
617 self._size = size
618 self._instructions = instructions
619 self._instruction_addresses = instruction_addresses
620 self._exit_statements = exit_statements
621 self.default_exit_target = default_exit_target
623 def _from_py(self, irsb):
624 self._set_attributes(
625 irsb.statements,
626 irsb.next,
627 irsb.tyenv,
628 irsb.jumpkind,
629 irsb.direct_next,
630 irsb.size,
631 instructions=irsb._instructions,
632 instruction_addresses=irsb._instruction_addresses,
633 exit_statements=irsb.exit_statements,
634 default_exit_target=irsb.default_exit_target,
635 )
638class IRTypeEnv(VEXObject):
639 """
640 An IR type environment.
642 :ivar types: A list of the types of all the temporaries in this block as VEX enum strings.
643 `types[3]` is the type of t3.
644 :vartype types: list of str
645 """
647 __slots__ = ["types", "wordty"]
649 def __init__(self, arch, types=None):
650 VEXObject.__init__(self)
651 self.types = [] if types is None else types
652 self.wordty = "Ity_I%d" % arch.bits
654 def __str__(self):
655 return " ".join(("t%d:%s" % (i, t)) for i, t in enumerate(self.types))
657 def lookup(self, tmp: int) -> str:
658 """
659 Return the type of temporary variable `tmp` as an enum string
660 """
661 if tmp < 0 or tmp > self.types_used:
662 log.debug("Invalid temporary number %d", tmp)
663 raise IndexError(tmp)
664 return self.types[tmp]
666 def sizeof(self, tmp):
667 return get_type_size(self.lookup(tmp))
669 def add(self, ty):
670 """
671 Add a new tmp of type `ty` to the environment. Returns the number of the new tmp.
672 """
673 self.types.append(ty)
674 return self.types_used - 1
676 @property
677 def types_used(self):
678 return len(self.types)
680 @staticmethod
681 def _from_c(arch, c_tyenv):
682 return IRTypeEnv(arch, [get_enum_from_int(c_tyenv.types[t]) for t in range(c_tyenv.types_used)])
684 @staticmethod
685 def _to_c(tyenv):
686 c_tyenv = pvc.emptyIRTypeEnv()
687 for ty in tyenv.types:
688 pvc.newIRTemp(c_tyenv, get_int_from_enum(ty))
689 return c_tyenv
691 def typecheck(self):
692 for ty in self.types:
693 try:
694 get_type_size(ty)
695 except ValueError:
696 return False
697 return True