Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/atheris/instrument_bytecode.py: 87%
424 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# Copyright 2021 Google LLC
2# Copyright 2021 Fraunhofer FKIE
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""This module provides the instrumentation functionality for atheris.
17Mainly the function patch_code(), which can instrument a code object and the
18helper class Instrumentor.
19"""
20import collections
21import dis
22import gc
23import sys
24import types
25from typing import Any, Callable, Iterator, List, Optional, Tuple, TypeVar, Union
27from . import utils
28from .native import _reserve_counter # type: ignore[import]
29from .version_dependent import add_bytes_to_jump_arg
30from .version_dependent import CONDITIONAL_JUMPS
31from .version_dependent import ENDS_FUNCTION
32from .version_dependent import get_code_object
33from .version_dependent import get_lnotab
34from .version_dependent import HAVE_ABS_REFERENCE
35from .version_dependent import HAVE_REL_REFERENCE
36from .version_dependent import jump_arg_bytes
37from .version_dependent import REVERSE_CMP_OP
38from .version_dependent import UNCONDITIONAL_JUMPS
40_TARGET_MODULE = "atheris"
41_COVERAGE_FUNCTION = "_trace_branch"
42_COMPARE_FUNCTION = "_trace_cmp"
44# TODO(b/207008147): Use NewType to differentiate the many int and str types.
47class Instruction:
48 """A single bytecode instruction after every EXTENDED_ARG has been resolved.
50 It is assumed that all instructions are always 2*n bytes long.
52 Sometimes the Python-Interpreter pads instructions with 'EXTENDED_ARG 0'
53 so instructions must have a minimum size.
55 Attributes:
56 lineno:
57 Line number in the original source code.
58 offset:
59 Offset of an instruction in bytes.
60 opcode:
61 Integer identifier of the bytecode operation.
62 mnemonic:
63 Human readable name of the opcode.
64 arg:
65 Optional (default 0) argument to the instruction. This may index into
66 CodeType.co_consts or it may be the address for jump instructions.
67 reference:
68 For jump instructions, the absolute address in bytes of the target. For
69 other instructions, None.
70 """
72 @classmethod
73 def get_fixed_size(cls) -> int:
74 return 2
76 def __init__(self,
77 lineno: int,
78 offset: int,
79 opcode: int,
80 arg: int = 0,
81 min_size: int = 0):
82 self.lineno = lineno
83 self.offset = offset
84 self.opcode = opcode
85 self.mnemonic = dis.opname[opcode]
86 self.arg = arg
87 self._min_size = min_size
89 if self.mnemonic in HAVE_REL_REFERENCE:
90 self._is_relative: Optional[bool] = True
91 self.reference: Optional[int] = self.offset + self.get_size(
92 ) + jump_arg_bytes(self.arg)
93 elif self.mnemonic in HAVE_ABS_REFERENCE:
94 self._is_relative = False
95 self.reference = jump_arg_bytes(self.arg)
96 else:
97 self._is_relative = None
98 self.reference = None
100 self.check_state()
102 def __repr__(self) -> str:
103 return (f"{self.mnemonic}(arg={self.arg} offset={self.offset} " +
104 f"reference={self.reference} getsize={self.get_size()})")
106 def has_argument(self) -> bool:
107 return self.opcode >= dis.HAVE_ARGUMENT
109 def _get_arg_size(self) -> int:
110 if self.arg >= (1 << 24):
111 return 8
112 elif self.arg >= (1 << 16):
113 return 6
114 elif self.arg >= (1 << 8):
115 return 4
116 else:
117 return 2
119 def get_size(self) -> int:
120 return max(self._get_arg_size(), self._min_size)
122 def get_stack_effect(self) -> int:
123 # dis.stack_effect does not work for EXTENDED_ARG and NOP
124 if self.mnemonic in ["EXTENDED_ARG", "NOP"]:
125 return 0
127 return dis.stack_effect(self.opcode,
128 (self.arg if self.has_argument() else None))
130 def to_bytes(self) -> bytes:
131 """Returns this instruction as bytes."""
132 size = self._get_arg_size()
133 arg = self.arg
134 ret = [self.opcode, arg & 0xff]
136 for _ in range(size // 2 - 1):
137 arg >>= 8
138 ret = [dis.opmap["EXTENDED_ARG"], arg & 0xff] + ret
140 while len(ret) < self._min_size:
141 ret = [dis.opmap["EXTENDED_ARG"], 0] + ret
143 assert len(ret) == self.get_size()
145 return bytes(ret)
147 def adjust(self, changed_offset: int, size: int, keep_ref: bool) -> None:
148 """Compensates the offsets in this instruction for a resize elsewhere.
150 Relative offsets may be invalidated due to two main events:
151 (1) Insertion of instructions
152 (2) Change of size of a single, already existing instruction
154 (1) Some instructions of size `size` (in bytes) have been inserted at offset
155 `changed_offset` in the instruction listing.
157 (2) An instruction at offset changed_offset` - 0.5 has increased in size.
158 If `changed_offset` is self.offset + 0.5, then self has increased.
160 Either way, adjust the current offset, reference and argument
161 accordingly.
163 TODO(b/207008147): Replace the pattern of using +0.5 as a sentinal.
165 Args:
166 changed_offset: The offset where instructions are inserted.
167 size: The number of bytes of instructions inserted.
168 keep_ref: if True, adjust our reference.
169 """
170 old_offset = self.offset
171 old_reference = self.reference
173 if old_offset < changed_offset < (old_offset + 1):
174 if old_reference is not None:
175 if self._is_relative:
176 self.reference += size # type: ignore[operator]
177 elif old_reference > old_offset:
178 self.reference += size # type: ignore[operator]
179 self.arg = add_bytes_to_jump_arg(self.arg, size)
181 return
183 if changed_offset <= old_offset:
184 self.offset += size
186 if old_reference is not None and not keep_ref:
187 if changed_offset <= old_reference:
188 self.reference += size # type: ignore[operator]
190 if self._is_relative:
191 if old_offset < changed_offset <= old_reference:
192 self.arg = add_bytes_to_jump_arg(self.arg, size)
193 else:
194 if changed_offset <= old_reference:
195 self.arg = add_bytes_to_jump_arg(self.arg, size)
197 def check_state(self) -> None:
198 """Asserts that internal state is consistent."""
199 assert self.mnemonic != "EXTENDED_ARG"
200 assert 0 <= self.arg <= 0x7fffffff
201 assert 0 <= self.opcode < 256
203 if self.reference is not None:
204 if self._is_relative:
205 assert self.offset + self.get_size() + jump_arg_bytes(
206 self.arg) == self.reference
207 else:
208 assert jump_arg_bytes(self.arg) == self.reference
210 def is_jump(self) -> bool:
211 return self.mnemonic in CONDITIONAL_JUMPS or self.mnemonic in UNCONDITIONAL_JUMPS
213 def make_nop(self) -> None:
214 self.opcode = dis.opmap["NOP"]
215 self.mnemonic = "NOP"
216 self.arg = 0
217 self._is_relative = None
218 self.reference = None
219 self.check_state()
222class BasicBlock:
223 """A block of bytecode instructions and the adresses it may jump to."""
225 def __init__(self, instructions: List[Instruction], last_one: bool):
226 self.instructions = instructions
227 self.id = instructions[0].offset
229 last_instr = instructions[-1]
231 if last_one or last_instr.mnemonic in ENDS_FUNCTION:
232 self.edges = []
233 elif last_instr.mnemonic in CONDITIONAL_JUMPS:
234 self.edges = list(
235 {last_instr.reference, last_instr.offset + last_instr.get_size()})
236 else:
237 if last_instr.reference is not None:
238 self.edges = [last_instr.reference]
239 else:
240 self.edges = [last_instr.offset + last_instr.get_size()]
242 def __iter__(self) -> Iterator[Instruction]:
243 return iter(self.instructions)
245 def __repr__(self) -> str:
246 return (f"BasicBlock(id={self.id}, edges={self.edges}, " +
247 f"instructions={self.instructions})")
250_SizeAndInstructions = Tuple[int, List[Instruction]]
253class Instrumentor:
254 """Implements the core instrumentation functionality.
256 It gets a single code object, builds a CFG of the bytecode and
257 can instrument the code for coverage collection via trace_control_flow()
258 and for data-flow tracing via trace_data_flow().
260 How to insert code:
261 1. Select a target basic block
262 2. Build up the new code as a list of `Instruction` objects.
263 Make sure to get the offsets right.
264 3. Calculate the overall size needed by your new code (in bytes)
265 4. Call _adjust() with your target offset and calculated size
266 5. Insert your instruction list into the instruction list of the basic
267 block
268 6. Call _handle_size_changes()
269 Take a look at trace_control_flow() and trace_data_flow() for examples.
271 Note that Instrumentor only supports insertions, not deletions.
272 """
274 def __init__(self, code: types.CodeType):
275 self._cfg: collections.OrderedDict = collections.OrderedDict()
276 self.consts = list(code.co_consts)
277 self._names = list(code.co_names)
278 self.num_counters = 0
279 self._code = code
281 self._build_cfg()
282 self._check_state()
284 def _build_cfg(self) -> None:
285 """Builds control flow graph."""
286 lineno = self._code.co_firstlineno
287 arg = None
288 offset = None
289 length = Instruction.get_fixed_size()
290 instr_list = []
291 basic_block_borders = []
292 did_jump = False
293 jump_targets = set()
295 for instruction in dis.get_instructions(self._code):
296 if instruction.starts_line is not None:
297 lineno = instruction.starts_line
299 if instruction.opname == "EXTENDED_ARG":
300 if arg is None:
301 arg = 0
302 offset = instruction.offset
304 arg <<= 8
305 arg |= instruction.arg # type: ignore[operator]
306 length += Instruction.get_fixed_size() # type: ignore[operator]
308 continue
310 elif arg is not None:
311 assert offset is not None
312 combined_arg = 0
313 # https://bugs.python.org/issue45757 can cause .arg to be None
314 if instruction.arg is not None:
315 combined_arg = (arg << 8) | instruction.arg # type: ignore[operator]
316 instr_list.append(
317 Instruction(
318 lineno,
319 offset,
320 instruction.opcode,
321 combined_arg,
322 min_size=length))
323 arg = None
324 offset = None
325 length = Instruction.get_fixed_size()
327 else:
328 instr_list.append(
329 Instruction(lineno, instruction.offset, instruction.opcode,
330 instruction.arg or 0))
332 if instr_list[-1].reference is not None:
333 jump_targets.add(instr_list[-1].reference)
335 for c, instr in enumerate(instr_list):
336 if instr.offset == 0 or instr.offset in jump_targets or did_jump:
337 basic_block_borders.append(c)
339 if instr.is_jump():
340 did_jump = True
341 else:
342 did_jump = False
344 basic_block_borders.append(len(instr_list))
346 for i in range(len(basic_block_borders) - 1):
347 start_of_bb = basic_block_borders[i]
348 end_of_bb = basic_block_borders[i + 1]
349 bb = BasicBlock(instr_list[start_of_bb:end_of_bb],
350 i == len(basic_block_borders) - 2)
351 self._cfg[bb.id] = bb
353 def _check_state(self) -> None:
354 """Asserts that the Instrumentor is in a valid state."""
355 assert self._cfg, "Control flow graph empty."
356 seen_ids = set()
358 for basic_block in self._cfg.values():
359 assert basic_block.instructions, "BasicBlock has no instructions."
361 assert basic_block.id not in seen_ids
362 seen_ids.add(basic_block.id)
364 for edge in basic_block.edges:
365 assert edge in self._cfg, (
366 f"{basic_block} has an edge, {edge}, not in CFG {self._cfg}.")
368 listing = self._get_linear_instruction_listing()
369 i = 0
371 assert listing[0].offset == 0
373 while i < len(listing) - 1:
374 assert (listing[i].offset + listing[i].get_size() == listing[i +
375 1].offset)
376 listing[i].check_state()
377 i += 1
379 def _get_name(self, name: str) -> int:
380 """Returns an offset to `name` in co_names, appending if necessary."""
381 try:
382 return self._names.index(name)
383 except ValueError:
384 self._names.append(name)
385 return len(self._names) - 1
387 def _get_const(self, constant: Union[int, types.ModuleType]) -> int:
388 """Returns the index of `constant` in self.consts, inserting if needed."""
389 for i in range(len(self.consts)):
390 if isinstance(self.consts[i],
391 type(constant)) and self.consts[i] == constant:
392 return i
394 self.consts.append(constant)
395 return len(self.consts) - 1
397 def _get_counter(self) -> int:
398 counter = _reserve_counter()
399 return self._get_const(counter)
401 def _adjust(self, offset: float, size: int, *keep_refs: str) -> None:
402 """Adjust for `size` bytes of instructions inserted at `offset`.
404 Signal all instructions that some instructions of size `size` (in bytes)
405 will be inserted at offset `offset`. Sometimes it is necessary that some
406 instructions do not change their reference when a new insertion happens.
408 All those Instruction-objects whose reference shall not change must be
409 in `keep_refs`.
411 Args:
412 offset: Location that new instructions are inserted at
413 size: How many bytes of new instructions are being inserted.
414 *keep_refs: The Instructions whose reference shall not change.
415 """
416 for basic_block in self._cfg.values():
417 for instr in basic_block:
418 instr.adjust(offset, size, instr in keep_refs)
420 def _handle_size_changes(self) -> None:
421 """Fixes instructions who's size increased with the last insertion.
423 After insertions have been made it could be that the argument of some
424 instructions crossed certain boundaries so that more EXTENDED_ARGs are
425 required to build the oparg. This function identifies all of those
426 instructions whose size increased with the latest insertion and adjusts all
427 other instructions to the new size.
428 """
429 listing = self._get_linear_instruction_listing()
431 while True:
432 found_invalid = False
433 i = 0
435 while i < len(listing) - 1:
436 next_offset = listing[i].offset + listing[i].get_size()
438 assert next_offset >= listing[i + 1].offset, (
439 "Something weird happened with the offsets at offset " +
440 f"{listing[i].offset}")
442 if next_offset > listing[i + 1].offset:
443 delta = next_offset - listing[i + 1].offset
444 self._adjust(listing[i].offset + 0.5, delta)
445 found_invalid = True
447 i += 1
449 if not found_invalid:
450 break
452 def _get_linear_instruction_listing(self) -> List[Instruction]:
453 listing = []
454 for basic_block in self._cfg.values():
455 for instr in basic_block:
456 listing.append(instr)
457 return listing
459 def to_code(self) -> types.CodeType:
460 """Returns the instrumented code object."""
461 self._check_state()
462 listing = self._get_linear_instruction_listing()
463 code = bytes()
464 stacksize = 0
466 for instr in listing:
467 code += instr.to_bytes()
468 stacksize = max(stacksize, stacksize + instr.get_stack_effect())
470 return get_code_object(self._code, stacksize, code,
471 tuple(self.consts + ["__ATHERIS_INSTRUMENTED__"]),
472 tuple(self._names), get_lnotab(self._code, listing))
474 def _generate_trace_branch_invocation(self, lineno: int,
475 offset: int) -> _SizeAndInstructions:
476 """Builds the bytecode that calls atheris._trace_branch()."""
477 to_insert = []
478 start_offset = offset
479 const_atheris = self._get_const(sys.modules[_TARGET_MODULE])
480 name_cov = self._get_name(_COVERAGE_FUNCTION)
482 to_insert.append(
483 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_atheris))
484 offset += to_insert[-1].get_size()
485 to_insert.append(
486 Instruction(lineno, offset, dis.opmap["LOAD_ATTR"], name_cov))
487 offset += to_insert[-1].get_size()
488 to_insert.append(
489 Instruction(lineno, offset, dis.opmap["LOAD_CONST"],
490 self._get_counter()))
491 offset += to_insert[-1].get_size()
492 to_insert.append(Instruction(lineno, offset, dis.opmap["CALL_FUNCTION"], 1))
493 offset += to_insert[-1].get_size()
494 to_insert.append(Instruction(lineno, offset, dis.opmap["POP_TOP"]))
495 offset += to_insert[-1].get_size()
497 return offset - start_offset, to_insert
499 def _generate_cmp_invocation(self, op: int, lineno: int,
500 offset: int) -> _SizeAndInstructions:
501 """Builds the bytecode that calls atheris._trace_cmp().
503 Only call this if the two objects being compared are non-constants.
505 Args:
506 op: The comparison operation
507 lineno: The line number of the operation
508 offset: The offset to the operation instruction
510 Returns:
511 The size of the instructions to insert,
512 The instructions to insert
513 """
514 to_insert = []
515 start_offset = offset
516 const_atheris = self._get_const(sys.modules[_TARGET_MODULE])
517 name_cmp = self._get_name(_COMPARE_FUNCTION)
518 const_op = self._get_const(op)
519 const_counter = self._get_counter()
520 const_false = self._get_const(False)
522 to_insert.append(
523 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_atheris))
524 offset += to_insert[-1].get_size()
525 to_insert.append(
526 Instruction(lineno, offset, dis.opmap["LOAD_ATTR"], name_cmp))
527 offset += to_insert[-1].get_size()
528 to_insert.append(Instruction(lineno, offset, dis.opmap["ROT_THREE"]))
529 offset += to_insert[-1].get_size()
530 to_insert.append(
531 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_op))
532 offset += to_insert[-1].get_size()
533 to_insert.append(
534 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_counter))
535 offset += to_insert[-1].get_size()
536 to_insert.append(
537 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_false))
538 offset += to_insert[-1].get_size()
539 to_insert.append(Instruction(lineno, offset, dis.opmap["CALL_FUNCTION"], 5))
540 offset += to_insert[-1].get_size()
542 return offset - start_offset, to_insert
544 def _generate_const_cmp_invocation(self, op: int, lineno: int, offset: int,
545 switch: bool) -> _SizeAndInstructions:
546 """Builds the bytecode that calls atheris._trace_cmp().
548 Only call this if one of the objects being compared is a constant coming
549 from co_consts. If `switch` is true the constant is the second argument and
550 needs to be switched with the first argument.
552 Args:
553 op: The comparison operation.
554 lineno: The line number of the operation
555 offset: The initial number of instructions.
556 switch: bool whether the second arg is constant instead of the first.
558 Returns:
559 The number of bytes to insert, and the instructions.
560 """
561 to_insert = []
562 start_offset = offset
563 const_atheris = self._get_const(sys.modules[_TARGET_MODULE])
564 name_cmp = self._get_name(_COMPARE_FUNCTION)
565 const_counter = self._get_counter()
566 const_true = self._get_const(True)
567 const_op = None
569 if switch:
570 const_op = self._get_const(REVERSE_CMP_OP[op])
571 else:
572 const_op = self._get_const(op)
574 to_insert.append(
575 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_atheris))
576 offset += to_insert[-1].get_size()
577 to_insert.append(
578 Instruction(lineno, offset, dis.opmap["LOAD_ATTR"], name_cmp))
579 offset += to_insert[-1].get_size()
580 to_insert.append(Instruction(lineno, offset, dis.opmap["ROT_THREE"]))
581 offset += to_insert[-1].get_size()
583 if switch:
584 to_insert.append(Instruction(lineno, offset, dis.opmap["ROT_TWO"]))
585 offset += to_insert[-1].get_size()
587 to_insert.append(
588 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_op))
589 offset += to_insert[-1].get_size()
590 to_insert.append(
591 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_counter))
592 offset += to_insert[-1].get_size()
593 to_insert.append(
594 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_true))
595 offset += to_insert[-1].get_size()
596 to_insert.append(Instruction(lineno, offset, dis.opmap["CALL_FUNCTION"], 5))
597 offset += to_insert[-1].get_size()
599 return offset - start_offset, to_insert
601 def trace_control_flow(self) -> None:
602 """Insert a call to atheris._trace_branch() branch's target block.
604 The argument of _trace_branch() is an id for the branch.
606 The following bytecode gets inserted:
607 LOAD_CONST atheris
608 LOAD_ATTR _trace_branch
609 LOAD_CONST <id>
610 CALL_FUNCTION 1
611 POP_TOP ; _trace_branch() returns None, remove the
612 return value
613 """
614 already_instrumented = set()
616 offset = self._cfg[0].instructions[0].offset
617 total_size, to_insert = self._generate_trace_branch_invocation(
618 self._cfg[0].instructions[0].lineno, offset)
619 self._adjust(offset, total_size)
620 self._cfg[0].instructions = to_insert + self._cfg[0].instructions
622 for basic_block in self._cfg.values():
623 if len(basic_block.edges) == 2:
624 for edge in basic_block.edges:
625 bb = self._cfg[edge]
627 if bb.id not in already_instrumented:
628 already_instrumented.add(bb.id)
629 source_instr = []
630 offset = bb.instructions[0].offset
632 for source_bb in self._cfg.values():
633 if bb.id in source_bb.edges and source_bb.instructions[
634 -1].reference == offset:
635 source_instr.append(source_bb.instructions[-1])
637 total_size, to_insert = self._generate_trace_branch_invocation(
638 bb.instructions[0].lineno, offset)
640 self._adjust(offset, total_size, *source_instr)
642 bb.instructions = to_insert + bb.instructions
644 self._handle_size_changes()
646 def trace_data_flow(self) -> None:
647 """Instruments bytecode for data-flow tracing.
649 This works by replacing the instruction COMPARE_OP with a call to
650 atheris._trace_cmp(). The arguments for _trace_cmp() are as follows:
651 - obj1 and obj2: The two values to compare
652 - opid: argument to COMPARE_OP
653 - counter: The counter for this comparison.
654 - is_const: whether obj1 is a constant in co_consts.
656 To detect if any of the values being compared is a constant, all push and
657 pop operations have to be analyzed. If a constant appears in a comparison it
658 must always be given as obj1 to _trace_cmp().
660 The bytecode that gets inserted looks like this:
661 LOAD_CONST atheris
662 LOAD_ATTR _trace_cmp
663 ROT_THREE ; move atheris._trace_cmp below the two
664 objects
665 LOAD_CONST <opid>
666 LOAD_CONST <counter index>
667 LOAD_CONST <is_const>
668 CALL_FUNCTION 5
669 """
670 stack_size = 0
671 seen_consts = []
673 for basic_block in self._cfg.values():
674 for c, instr in enumerate(basic_block.instructions):
675 if instr.mnemonic == "LOAD_CONST":
676 seen_consts.append(stack_size)
677 elif instr.mnemonic == "COMPARE_OP" and instr.arg <= 5:
678 # Determine the two values on the top of the stack before COMPARE_OP
679 consts_on_stack = [
680 c for c in seen_consts if stack_size - 2 <= c < stack_size
681 ]
682 tos_is_constant = stack_size - 1 in consts_on_stack
683 tos1_is_constant = stack_size - 2 in consts_on_stack
685 if not (tos_is_constant and tos1_is_constant):
686 offset = instr.offset
687 total_size = None
688 to_insert = None
690 # Both items are non-constants
691 if (not tos_is_constant) and (not tos1_is_constant):
692 total_size, to_insert = self._generate_cmp_invocation(
693 instr.arg, instr.lineno, offset)
695 # One item is constant, one is non-constant
696 else:
697 total_size, to_insert = self._generate_const_cmp_invocation(
698 instr.arg, instr.lineno, offset, tos_is_constant)
700 self._adjust(offset, total_size)
702 for i, new_instr in enumerate(to_insert):
703 basic_block.instructions.insert(c + i, new_instr)
705 instr.make_nop()
707 stack_size += instr.get_stack_effect()
708 seen_consts = [c for c in seen_consts if c < stack_size]
710 self._handle_size_changes()
712 def _print_disassembly(self) -> None:
713 """Prints disassembly."""
714 print(f"Disassembly of {self._code.co_filename}:{self._code.co_name}")
715 for basic_block in self._cfg.values():
716 print(" -bb-")
717 for instr in basic_block:
718 print(f" L.{instr.lineno} [{instr.offset}] {instr.mnemonic} ", end="")
720 if instr.has_argument():
721 print(f"{instr.arg} ", end="")
723 if instr._is_relative:
724 print(f"(to {instr.reference})", end="")
726 print()
729def patch_code(code: types.CodeType,
730 trace_dataflow: bool,
731 nested: bool = False) -> types.CodeType:
732 """Returns code, patched with Atheris instrumentation.
734 Args:
735 code: The byte code to instrument.
736 trace_dataflow: Whether to trace dataflow or not.
737 nested: If False, reserve counters, and patch modules. Recursive calls to
738 this function are considered nested.
739 """
740 inst = Instrumentor(code)
742 # If this code object has already been instrumented, skip it
743 for const in inst.consts:
744 # This avoids comparison between str and bytes (BytesWarning).
745 if isinstance(const, str) and const == "__ATHERIS_INSTRUMENTED__":
746 return code
748 inst.trace_control_flow()
750 if trace_dataflow:
751 inst.trace_data_flow()
753 # Repeat this for all nested code objects
754 for i in range(len(inst.consts)):
755 if isinstance(inst.consts[i], types.CodeType):
756 if (inst.consts[i].co_name == "<lambda>" or
757 (not nested and inst.consts[i].co_name == "<module>") or
758 inst.consts[i].co_name[0] != "<" or
759 inst.consts[i].co_name[-1] != ">"):
760 inst.consts[i] = patch_code(inst.consts[i], trace_dataflow, nested=True)
762 return inst.to_code()
765T = TypeVar("T")
768def instrument_func(func: Callable[..., T]) -> Callable[..., T]:
769 """Add Atheris instrumentation to a specific function."""
770 func.__code__ = patch_code(func.__code__, True, True)
772 return func
775def _is_instrumentable(obj: Any) -> bool:
776 """Returns True if this object can be instrumented."""
777 try:
778 # Only callables can be instrumented
779 if not hasattr(obj, "__call__"):
780 return False
781 # Only objects with a __code__ member of type CodeType can be instrumented
782 if not hasattr(obj, "__code__"):
783 return False
784 if not isinstance(obj.__code__, types.CodeType):
785 return False
786 # Only code in a real module can be instrumented
787 if not hasattr(obj, "__module__"):
788 return False
789 if obj.__module__ not in sys.modules:
790 return False
791 # Bound methods can't be instrumented - instrument the real func instead
792 if hasattr(obj, "__self__"):
793 return False
794 # Only Python functions and methods can be instrumented, nothing native
795 if (not isinstance(obj, types.FunctionType)) and (not isinstance(
796 obj, types.MethodType)):
797 return False
798 except Exception: # pylint: disable=broad-except
799 # If accessing any of those fields produced an exception, the object
800 # probably can't be instrumented
801 return False
803 return True
806def instrument_all() -> None:
807 """Add Atheris instrementation to all Python code already imported.
809 This function is experimental.
811 This function is able to instrument core library functions that can't be
812 instrumented by instrument_func or instrument_imports, as those functions are
813 used in the implementation of the instrumentation.
814 """
815 progress_renderer = None
817 funcs = [obj for obj in gc.get_objects() if _is_instrumentable(obj)]
818 if sys.stderr.isatty():
819 sys.stderr.write("INFO: Instrumenting functions: ")
820 progress_renderer = utils.ProgressRenderer(sys.stderr, len(funcs))
821 else:
822 sys.stderr.write(f"INFO: Instrumenting {len(funcs)} functions...\n")
824 for i in range(len(funcs)):
825 func = funcs[i]
826 try:
827 instrument_func(func)
828 except Exception as e: # pylint: disable=broad-except
829 if progress_renderer:
830 progress_renderer.drop()
831 sys.stderr.write(f"ERROR: Failed to instrument function {func}: {e}\n")
832 if progress_renderer:
833 progress_renderer.count = i + 1
835 if progress_renderer:
836 progress_renderer.drop()
837 else:
838 print("INFO: Instrumentation complete.")