Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pyvex/block.py: 66%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import copy
2import itertools
3import logging
4from typing import List, Optional, Tuple
6from . import expr, stmt
7from .const import get_type_size
8from .data_ref import DataRef
9from .enums import VEXObject
10from .errors import SkipStatementsError
11from .expr import RdTmp
12from .native import pvc
13from .stmt import (
14 CAS,
15 LLSC,
16 Dirty,
17 Exit,
18 IMark,
19 IRExpr,
20 IRStmt,
21 LoadG,
22 WrTmp,
23 get_enum_from_int,
24 get_int_from_enum,
25)
26from .types import Arch
28log = logging.getLogger("pyvex.block")
31class IRSB(VEXObject):
32 """
33 The IRSB is the primary interface to pyvex. Constructing one of these will make a call into LibVEX to perform a
34 translation.
36 IRSB stands for *Intermediate Representation Super-Block*. An IRSB in VEX is a single-entry, multiple-exit code
37 block.
39 :ivar arch: The architecture this block is lifted under. Must duck-type as :class:`archinfo.arch.Arch`
40 :ivar statements: The statements in this block
41 :vartype statements: list of :class:`IRStmt`
42 :ivar next: The expression for the default exit target of this block
43 :vartype next: :class:`IRExpr`
44 :ivar int offsIP: The offset of the instruction pointer in the VEX guest state
45 :ivar int stmts_used: The number of statements in this IRSB
46 :ivar str jumpkind: The type of this block's default jump (call, boring, syscall, etc) as a VEX enum string
47 :ivar bool direct_next: Whether this block ends with a direct (not indirect) jump or branch
48 :ivar int size: The size of this block in bytes
49 :ivar int addr: The address of this basic block, i.e. the address in the first IMark
50 """
52 __slots__ = (
53 "addr",
54 "arch",
55 "statements",
56 "next",
57 "_tyenv",
58 "jumpkind",
59 "_direct_next",
60 "_size",
61 "_instructions",
62 "_exit_statements",
63 "default_exit_target",
64 "_instruction_addresses",
65 "data_refs",
66 )
68 # The following constants shall match the defs in pyvex.h
69 MAX_EXITS = 400
70 MAX_DATA_REFS = 2000
72 def __init__(
73 self,
74 data,
75 mem_addr,
76 arch: Arch,
77 max_inst=None,
78 max_bytes=None,
79 bytes_offset=0,
80 traceflags=0,
81 opt_level=1,
82 num_inst=None,
83 num_bytes=None,
84 strict_block_end=False,
85 skip_stmts=False,
86 collect_data_refs=False,
87 cross_insn_opt=True,
88 ):
89 """
90 :param data: The bytes to lift. Can be either a string of bytes or a cffi buffer object.
91 You may also pass None to initialize an empty IRSB.
92 :type data: str or bytes or cffi.FFI.CData or None
93 :param int mem_addr: The address to lift the data at.
94 :param arch: The architecture to lift the data as.
95 :param max_inst: The maximum number of instructions to lift. (See note below)
96 :param max_bytes: The maximum number of bytes to use.
97 :param num_inst: Replaces max_inst if max_inst is None. If set to None as well, no instruction limit
98 is used.
99 :param num_bytes: Replaces max_bytes if max_bytes is None. If set to None as well, no byte limit is
100 used.
101 :param bytes_offset: The offset into `data` to start lifting at. Note that for ARM THUMB mode, both
102 `mem_addr` and `bytes_offset` must be odd (typically `bytes_offset` is set to 1).
103 :param traceflags: The libVEX traceflags, controlling VEX debug prints.
104 :param opt_level: The level of optimization to apply to the IR, -1 through 2. -1 is the strictest
105 unoptimized level, 0 is unoptimized but will perform some lookahead/lookbehind
106 optimizations, 1 performs constant propogation, and 2 performs loop unrolling,
107 which honestly doesn't make much sense in the context of pyvex. The default is 1.
108 :param strict_block_end: Should the LibVEX arm-thumb split block at some instructions, for example CB{N}Z.
110 .. note:: Explicitly specifying the number of instructions to lift (`max_inst`) may not always work
111 exactly as expected. For example, on MIPS, it is meaningless to lift a branch or jump
112 instruction without its delay slot. VEX attempts to Do The Right Thing by possibly decoding
113 fewer instructions than requested. Specifically, this means that lifting a branch or jump
114 on MIPS as a single instruction (`max_inst=1`) will result in an empty IRSB, and subsequent
115 attempts to run this block will raise `SimIRSBError('Empty IRSB passed to SimIRSB.')`.
117 .. note:: If no instruction and byte limit is used, pyvex will continue lifting the block until the block
118 ends properly or until it runs out of data to lift.
119 """
120 if max_inst is None:
121 max_inst = num_inst
122 if max_bytes is None:
123 max_bytes = num_bytes
124 VEXObject.__init__(self)
125 self.addr = mem_addr
126 self.arch: Arch = arch
128 self.statements: List[IRStmt] = []
129 self.next: Optional[IRExpr] = None
130 self._tyenv: Optional["IRTypeEnv"] = None
131 self.jumpkind: Optional[str] = None
132 self._direct_next: Optional[bool] = None
133 self._size: Optional[int] = None
134 self._instructions: Optional[int] = None
135 self._exit_statements: Optional[Tuple[Tuple[int, int, IRStmt], ...]] = None
136 self.default_exit_target = None
137 self.data_refs = ()
138 self._instruction_addresses: Tuple[int, ...] = ()
140 if data is not None:
141 # This is the slower path (because we need to call _from_py() to copy the content in the returned IRSB to
142 # the current IRSB instance. You should always call `lift()` directly. This method is kept for compatibility
143 # concerns.
144 from pyvex.lifting import lift
146 irsb = lift(
147 data,
148 mem_addr,
149 arch,
150 max_bytes=max_bytes,
151 max_inst=max_inst,
152 bytes_offset=bytes_offset,
153 opt_level=opt_level,
154 traceflags=traceflags,
155 strict_block_end=strict_block_end,
156 skip_stmts=skip_stmts,
157 collect_data_refs=collect_data_refs,
158 cross_insn_opt=cross_insn_opt,
159 )
160 self._from_py(irsb)
162 @staticmethod
163 def empty_block(arch, addr, statements=None, nxt=None, tyenv=None, jumpkind=None, direct_next=None, size=None):
164 block = IRSB(None, addr, arch)
165 block._set_attributes(statements, nxt, tyenv, jumpkind, direct_next, size=size)
166 return block
168 @property
169 def tyenv(self) -> "IRTypeEnv":
170 if self._tyenv is None:
171 self._tyenv = IRTypeEnv(self.arch)
172 return self._tyenv
174 @tyenv.setter
175 def tyenv(self, v):
176 self._tyenv = v
178 @property
179 def has_statements(self) -> bool:
180 return self.statements is not None and bool(self.statements)
182 @property
183 def exit_statements(self) -> Tuple[Tuple[int, int, IRStmt], ...]:
184 if self._exit_statements is not None:
185 return self._exit_statements
187 # Delayed process
188 if not self.has_statements:
189 return ()
191 exit_statements = []
193 ins_addr = None
194 for idx, stmt_ in enumerate(self.statements):
195 if type(stmt_) is IMark:
196 ins_addr = stmt_.addr + stmt_.delta
197 elif type(stmt_) is Exit:
198 assert ins_addr is not None
199 exit_statements.append((ins_addr, idx, stmt_))
201 self._exit_statements = tuple(exit_statements)
202 return self._exit_statements
204 def copy(self) -> "IRSB":
205 return copy.deepcopy(self)
207 def extend(self, extendwith) -> None:
208 """
209 Appends an irsb to the current irsb. The irsb that is appended is invalidated. The appended irsb's jumpkind and
210 default exit are used.
211 :param extendwith: The IRSB to append to this IRSB
212 :vartype extendwith: :class:`IRSB`
213 """
214 if self.stmts_used == 0:
215 self._from_py(extendwith)
216 return
218 conversion_dict = {}
219 invalid_vals = (0xFFFFFFFF, -1)
221 new_size = self.size + extendwith.size
222 new_instructions = self.instructions + extendwith.instructions
223 new_direct_next = extendwith.direct_next
225 def convert_tmp(tmp):
226 """
227 Converts a tmp from the appended-block into one in the appended-to-block. Creates a new tmp if it does not
228 already exist. Prevents collisions in tmp numbers between the two blocks.
229 :param tmp: The tmp number to convert
230 """
231 if tmp not in conversion_dict:
232 tmp_type = extendwith.tyenv.lookup(tmp)
233 conversion_dict[tmp] = self.tyenv.add(tmp_type)
234 return conversion_dict[tmp]
236 def convert_expr(expr_):
237 """
238 Converts a VEX expression to use tmps in the appended-block instead of the appended-to-block. Used to
239 prevent collisions in tmp numbers between the two blocks.
240 :param tmp: The VEX expression to convert
241 :vartype expr: :class:`IRExpr`
242 """
243 if type(expr_) is RdTmp:
244 return RdTmp.get_instance(convert_tmp(expr_.tmp))
245 return expr_
247 for stmt_ in extendwith.statements:
248 stmttype = type(stmt_)
249 if stmttype is WrTmp:
250 stmt_.tmp = convert_tmp(stmt_.tmp)
251 elif stmttype is LoadG:
252 stmt_.dst = convert_tmp(stmt_.dst)
253 elif stmttype is LLSC:
254 stmt_.result = convert_tmp(stmt_.result)
255 elif stmttype is Dirty:
256 if stmt_.tmp not in invalid_vals:
257 stmt_.tmp = convert_tmp(stmt_.tmp)
258 for e in stmt_.args:
259 convert_expr(e)
260 elif stmttype is CAS:
261 if stmt_.oldLo not in invalid_vals:
262 stmt_.oldLo = convert_tmp(stmt_.oldLo)
263 if stmt_.oldHi not in invalid_vals:
264 stmt_.oldHi = convert_tmp(stmt_.oldHi)
265 # Convert all expressions
266 to_replace = {}
267 for expr_ in stmt_.expressions:
268 replacement = convert_expr(expr_)
269 if replacement is not expr_:
270 to_replace[expr_] = replacement
271 stmt_.replace_expression(to_replace)
272 # Add the converted statement to self.statements
273 self.statements.append(stmt_)
274 extendwith.next = convert_expr(extendwith.next)
275 self.next = extendwith.next
276 self.jumpkind = extendwith.jumpkind
277 self._size = new_size
278 self._instructions = new_instructions
279 self._direct_next = new_direct_next
281 # TODO: Change exit_statements, data_references, etc.
283 def invalidate_direct_next(self) -> None:
284 self._direct_next = None
286 def pp(self) -> None:
287 """
288 Pretty-print the IRSB to stdout.
289 """
290 print(self._pp_str())
292 def __repr__(self):
293 return f"IRSB <0x{self.size:x} bytes, {self.instructions} ins., {str(self.arch)}> at 0x{self.addr:x}"
295 def __str__(self):
296 return self._pp_str()
298 def __eq__(self, other):
299 return (
300 isinstance(other, IRSB)
301 and self.addr == other.addr
302 and self.arch.name == other.arch.name
303 and self.statements == other.statements
304 and self.next == other.next
305 and self.jumpkind == other.jumpkind
306 )
308 def __hash__(self):
309 return hash((IRSB, self.addr, self.arch.name, tuple(self.statements), self.next, self.jumpkind))
311 def typecheck(self) -> bool:
312 try:
313 # existence assertions
314 assert self.next is not None, "Missing next expression"
315 assert self.jumpkind is not None, "Missing jumpkind"
317 # Type assertions
318 assert isinstance(self.next, expr.IRExpr), "Next expression is not an expression"
319 assert type(self.jumpkind is str), "Jumpkind is not a string"
320 assert self.jumpkind.startswith("Ijk_"), "Jumpkind is not a jumpkind enum"
321 assert self.tyenv.typecheck(), "Type environment contains invalid types"
323 # statement assertions
324 last_imark = None
325 for i, st in enumerate(self.statements):
326 assert isinstance(st, stmt.IRStmt), "Statement %d is not an IRStmt" % i
327 try:
328 assert st.typecheck(self.tyenv), "Statement %d failed to typecheck" % i
329 except Exception: # pylint: disable=bare-except
330 assert False, "Statement %d errored in typechecking" % i
332 if type(st) is stmt.NoOp:
333 continue
334 elif type(st) is stmt.IMark:
335 if last_imark is not None:
336 # pylint: disable=unsubscriptable-object
337 assert last_imark[0] + last_imark[1] == st.addr, "IMarks sizes overlap or have gaps"
338 last_imark = (st.addr, st.len)
339 else:
340 assert last_imark is not None, "Operation statement appears before IMark"
342 assert last_imark is not None, "No IMarks present in block"
343 except AssertionError as e:
344 log.debug(e.args[0])
345 return False
346 return True
348 #
349 # alternate constructors
350 #
352 @staticmethod
353 def from_c(c_irsb, mem_addr, arch) -> "IRSB":
354 irsb = IRSB(None, mem_addr, arch)
355 irsb._from_c(c_irsb)
356 return irsb
358 @staticmethod
359 def from_py(tyenv, stmts, next_expr, jumpkind, mem_addr, arch) -> "IRSB":
360 irsb = IRSB(None, mem_addr, arch)
362 irsb.tyenv = tyenv
363 irsb.statements = stmts
364 irsb.next = next_expr
365 irsb.jumpkind = jumpkind
366 irsb._direct_next = irsb._is_defaultexit_direct_jump()
368 return irsb
370 #
371 # simple properties useful for analysis
372 #
374 @property
375 def stmts_used(self) -> int:
376 if self.statements is None:
377 return 0
378 return len(self.statements)
380 @property
381 def offsIP(self) -> int:
382 return self.arch.ip_offset
384 @property
385 def direct_next(self):
386 if self._direct_next is None:
387 self._direct_next = self._is_defaultexit_direct_jump()
388 return self._direct_next
390 @property
391 def expressions(self):
392 """
393 Return an iterator of all expressions contained in the IRSB.
394 """
395 for s in self.statements:
396 yield from s.expressions
397 yield self.next
399 @property
400 def instructions(self):
401 """
402 The number of instructions in this block
403 """
404 if self._instructions is None:
405 if self.statements is None:
406 self._instructions = 0
407 else:
408 self._instructions = len([s for s in self.statements if type(s) is stmt.IMark])
409 return self._instructions
411 @property
412 def instruction_addresses(self) -> Tuple[int, ...]:
413 """
414 Addresses of instructions in this block.
415 """
416 if self._instruction_addresses is None:
417 if self.statements is None:
418 self._instruction_addresses = ()
419 else:
420 self._instruction_addresses = tuple(
421 (s.addr + s.delta) for s in self.statements if type(s) is stmt.IMark
422 )
423 return self._instruction_addresses
425 @property
426 def size(self):
427 """
428 The size of this block, in bytes
429 """
430 if self._size is None:
431 self._size = sum(s.len for s in self.statements if type(s) is stmt.IMark)
432 return self._size
434 @property
435 def operations(self):
436 """
437 A list of all operations done by the IRSB, as libVEX enum names
438 """
439 ops = []
440 for e in self.expressions:
441 if hasattr(e, "op"):
442 ops.append(e.op)
443 return ops
445 @property
446 def all_constants(self):
447 """
448 Returns all constants in the block (including incrementing of the program counter) as
449 :class:`pyvex.const.IRConst`.
450 """
451 return sum((e.constants for e in self.expressions), [])
453 @property
454 def constants(self):
455 """
456 The constants (excluding updates of the program counter) in the IRSB as :class:`pyvex.const.IRConst`.
457 """
458 return sum((s.constants for s in self.statements if not (type(s) is stmt.Put and s.offset == self.offsIP)), [])
460 @property
461 def constant_jump_targets(self):
462 """
463 A set of the static jump targets of the basic block.
464 """
465 exits = set()
467 if self.exit_statements:
468 for _, _, stmt_ in self.exit_statements:
469 exits.add(stmt_.dst.value)
471 default_target = self.default_exit_target
472 if default_target is not None:
473 exits.add(default_target)
475 return exits
477 @property
478 def constant_jump_targets_and_jumpkinds(self):
479 """
480 A dict of the static jump targets of the basic block to their jumpkind.
481 """
482 exits = {}
484 if self.exit_statements:
485 for _, _, stmt_ in self.exit_statements:
486 exits[stmt_.dst.value] = stmt_.jumpkind
488 default_target = self.default_exit_target
489 if default_target is not None:
490 exits[default_target] = self.jumpkind
492 return exits
494 #
495 # private methods
496 #
498 def _pp_str(self) -> str:
499 """
500 Return the pretty-printed IRSB.
501 """
502 sa = []
503 sa.append("IRSB {")
504 if self.statements is not None:
505 sa.append(" %s" % self.tyenv)
506 sa.append("")
507 if self.statements is not None:
508 for i, s in enumerate(self.statements):
509 if isinstance(s, stmt.Put):
510 stmt_str = s.pp_str(
511 reg_name=self.arch.translate_register_name(s.offset, s.data.result_size(self.tyenv) // 8)
512 )
513 elif isinstance(s, stmt.WrTmp) and isinstance(s.data, expr.Get):
514 stmt_str = s.pp_str(
515 reg_name=self.arch.translate_register_name(s.data.offset, s.data.result_size(self.tyenv) // 8)
516 )
517 elif isinstance(s, stmt.Exit):
518 stmt_str = s.pp_str(reg_name=self.arch.translate_register_name(s.offsIP, self.arch.bits // 8))
519 else:
520 stmt_str = s.pp_str()
521 sa.append(" %02d | %s" % (i, stmt_str))
522 else:
523 sa.append(" Statements are omitted.")
524 sa.append(f" NEXT: PUT({self.arch.translate_register_name(self.offsIP)}) = {self.next}; {self.jumpkind}")
525 sa.append("}")
526 return "\n".join(sa)
528 def _is_defaultexit_direct_jump(self):
529 """
530 Checks if the default of this IRSB a direct jump or not.
531 """
532 if not (self.jumpkind == "Ijk_InvalICache" or self.jumpkind == "Ijk_Boring" or self.jumpkind == "Ijk_Call"):
533 return False
535 target = self.default_exit_target
536 return target is not None
538 #
539 # internal "constructors" to fill this block out with data from various sources
540 #
542 def _from_c(self, lift_r, skip_stmts=False):
543 c_irsb = lift_r.irsb
544 if not skip_stmts:
545 self.statements = [stmt.IRStmt._from_c(c_irsb.stmts[i]) for i in range(c_irsb.stmts_used)]
546 self.tyenv = IRTypeEnv._from_c(self.arch, c_irsb.tyenv)
547 else:
548 self.statements = None
549 self.tyenv = None
551 self.next = expr.IRExpr._from_c(c_irsb.next)
552 self.jumpkind = get_enum_from_int(c_irsb.jumpkind)
553 self._size = lift_r.size
554 self._instructions = lift_r.insts
555 self._instruction_addresses = tuple(itertools.islice(lift_r.inst_addrs, lift_r.insts))
557 # Conditional exits
558 exit_statements = []
559 if skip_stmts:
560 if lift_r.exit_count > self.MAX_EXITS:
561 # There are more exits than the default size of the exits array. We will need all statements
562 raise SkipStatementsError("exit_count exceeded MAX_EXITS (%d)" % self.MAX_EXITS)
563 for i in range(lift_r.exit_count):
564 ex = lift_r.exits[i]
565 exit_stmt = stmt.IRStmt._from_c(ex.stmt)
566 exit_statements.append((ex.ins_addr, ex.stmt_idx, exit_stmt))
568 self._exit_statements = tuple(exit_statements)
569 else:
570 self._exit_statements = None # It will be generated when self.exit_statements is called
571 # The default exit
572 if lift_r.is_default_exit_constant == 1:
573 self.default_exit_target = lift_r.default_exit
574 else:
575 self.default_exit_target = None
577 # Data references
578 self.data_refs = None
579 if lift_r.data_ref_count > 0:
580 if lift_r.data_ref_count > self.MAX_DATA_REFS:
581 raise SkipStatementsError("data_ref_count exceeded MAX_DATA_REFS (%d)" % self.MAX_DATA_REFS)
582 self.data_refs = [DataRef.from_c(lift_r.data_refs[i]) for i in range(lift_r.data_ref_count)]
584 def _set_attributes(
585 self,
586 statements=None,
587 nxt=None,
588 tyenv=None,
589 jumpkind=None,
590 direct_next=None,
591 size=None,
592 instructions=None,
593 instruction_addresses=None,
594 exit_statements=None,
595 default_exit_target=None,
596 ):
597 self.statements = statements if statements is not None else []
598 self.next = nxt
599 if tyenv is not None:
600 self.tyenv = tyenv
601 self.jumpkind = jumpkind
602 self._direct_next = direct_next
603 self._size = size
604 self._instructions = instructions
605 self._instruction_addresses = instruction_addresses
606 self._exit_statements = exit_statements
607 self.default_exit_target = default_exit_target
609 def _from_py(self, irsb):
610 self._set_attributes(
611 irsb.statements,
612 irsb.next,
613 irsb.tyenv,
614 irsb.jumpkind,
615 irsb.direct_next,
616 irsb.size,
617 instructions=irsb._instructions,
618 instruction_addresses=irsb._instruction_addresses,
619 exit_statements=irsb.exit_statements,
620 default_exit_target=irsb.default_exit_target,
621 )
624class IRTypeEnv(VEXObject):
625 """
626 An IR type environment.
628 :ivar types: A list of the types of all the temporaries in this block as VEX enum strings.
629 `types[3]` is the type of t3.
630 :vartype types: list of str
631 """
633 __slots__ = ["types", "wordty"]
635 def __init__(self, arch, types=None):
636 VEXObject.__init__(self)
637 self.types = [] if types is None else types
638 self.wordty = "Ity_I%d" % arch.bits
640 def __str__(self):
641 return " ".join(("t%d:%s" % (i, t)) for i, t in enumerate(self.types))
643 def lookup(self, tmp):
644 """
645 Return the type of temporary variable `tmp` as an enum string
646 """
647 if tmp < 0 or tmp > self.types_used:
648 log.debug("Invalid temporary number %d", tmp)
649 raise IndexError(tmp)
650 return self.types[tmp]
652 def sizeof(self, tmp):
653 return get_type_size(self.lookup(tmp))
655 def add(self, ty):
656 """
657 Add a new tmp of type `ty` to the environment. Returns the number of the new tmp.
658 """
659 self.types.append(ty)
660 return self.types_used - 1
662 @property
663 def types_used(self):
664 return len(self.types)
666 @staticmethod
667 def _from_c(arch, c_tyenv):
668 return IRTypeEnv(arch, [get_enum_from_int(c_tyenv.types[t]) for t in range(c_tyenv.types_used)])
670 @staticmethod
671 def _to_c(tyenv):
672 c_tyenv = pvc.emptyIRTypeEnv()
673 for ty in tyenv.types:
674 pvc.newIRTemp(c_tyenv, get_int_from_enum(ty))
675 return c_tyenv
677 def typecheck(self):
678 for ty in self.types:
679 try:
680 get_type_size(ty)
681 except ValueError:
682 return False
683 return True