Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/atheris/instrument_bytecode.py: 87%

424 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Copyright 2021 Google LLC 

2# Copyright 2021 Fraunhofer FKIE 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""This module provides the instrumentation functionality for atheris. 

16 

17Mainly the function patch_code(), which can instrument a code object and the 

18helper class Instrumentor. 

19""" 

20import collections 

21import dis 

22import gc 

23import sys 

24import types 

25from typing import Any, Callable, Iterator, List, Optional, Tuple, TypeVar, Union 

26 

27from . import utils 

28from .native import _reserve_counter # type: ignore[import] 

29from .version_dependent import add_bytes_to_jump_arg 

30from .version_dependent import CONDITIONAL_JUMPS 

31from .version_dependent import ENDS_FUNCTION 

32from .version_dependent import get_code_object 

33from .version_dependent import get_lnotab 

34from .version_dependent import HAVE_ABS_REFERENCE 

35from .version_dependent import HAVE_REL_REFERENCE 

36from .version_dependent import jump_arg_bytes 

37from .version_dependent import REVERSE_CMP_OP 

38from .version_dependent import UNCONDITIONAL_JUMPS 

39 

40_TARGET_MODULE = "atheris" 

41_COVERAGE_FUNCTION = "_trace_branch" 

42_COMPARE_FUNCTION = "_trace_cmp" 

43 

44# TODO(b/207008147): Use NewType to differentiate the many int and str types. 

45 

46 

47class Instruction: 

48 """A single bytecode instruction after every EXTENDED_ARG has been resolved. 

49 

50 It is assumed that all instructions are always 2*n bytes long. 

51 

52 Sometimes the Python-Interpreter pads instructions with 'EXTENDED_ARG 0' 

53 so instructions must have a minimum size. 

54 

55 Attributes: 

56 lineno: 

57 Line number in the original source code. 

58 offset: 

59 Offset of an instruction in bytes. 

60 opcode: 

61 Integer identifier of the bytecode operation. 

62 mnemonic: 

63 Human readable name of the opcode. 

64 arg: 

65 Optional (default 0) argument to the instruction. This may index into 

66 CodeType.co_consts or it may be the address for jump instructions. 

67 reference: 

68 For jump instructions, the absolute address in bytes of the target. For 

69 other instructions, None. 

70 """ 

71 

72 @classmethod 

73 def get_fixed_size(cls) -> int: 

74 return 2 

75 

76 def __init__(self, 

77 lineno: int, 

78 offset: int, 

79 opcode: int, 

80 arg: int = 0, 

81 min_size: int = 0): 

82 self.lineno = lineno 

83 self.offset = offset 

84 self.opcode = opcode 

85 self.mnemonic = dis.opname[opcode] 

86 self.arg = arg 

87 self._min_size = min_size 

88 

89 if self.mnemonic in HAVE_REL_REFERENCE: 

90 self._is_relative: Optional[bool] = True 

91 self.reference: Optional[int] = self.offset + self.get_size( 

92 ) + jump_arg_bytes(self.arg) 

93 elif self.mnemonic in HAVE_ABS_REFERENCE: 

94 self._is_relative = False 

95 self.reference = jump_arg_bytes(self.arg) 

96 else: 

97 self._is_relative = None 

98 self.reference = None 

99 

100 self.check_state() 

101 

102 def __repr__(self) -> str: 

103 return (f"{self.mnemonic}(arg={self.arg} offset={self.offset} " + 

104 f"reference={self.reference} getsize={self.get_size()})") 

105 

106 def has_argument(self) -> bool: 

107 return self.opcode >= dis.HAVE_ARGUMENT 

108 

109 def _get_arg_size(self) -> int: 

110 if self.arg >= (1 << 24): 

111 return 8 

112 elif self.arg >= (1 << 16): 

113 return 6 

114 elif self.arg >= (1 << 8): 

115 return 4 

116 else: 

117 return 2 

118 

119 def get_size(self) -> int: 

120 return max(self._get_arg_size(), self._min_size) 

121 

122 def get_stack_effect(self) -> int: 

123 # dis.stack_effect does not work for EXTENDED_ARG and NOP 

124 if self.mnemonic in ["EXTENDED_ARG", "NOP"]: 

125 return 0 

126 

127 return dis.stack_effect(self.opcode, 

128 (self.arg if self.has_argument() else None)) 

129 

130 def to_bytes(self) -> bytes: 

131 """Returns this instruction as bytes.""" 

132 size = self._get_arg_size() 

133 arg = self.arg 

134 ret = [self.opcode, arg & 0xff] 

135 

136 for _ in range(size // 2 - 1): 

137 arg >>= 8 

138 ret = [dis.opmap["EXTENDED_ARG"], arg & 0xff] + ret 

139 

140 while len(ret) < self._min_size: 

141 ret = [dis.opmap["EXTENDED_ARG"], 0] + ret 

142 

143 assert len(ret) == self.get_size() 

144 

145 return bytes(ret) 

146 

147 def adjust(self, changed_offset: int, size: int, keep_ref: bool) -> None: 

148 """Compensates the offsets in this instruction for a resize elsewhere. 

149 

150 Relative offsets may be invalidated due to two main events: 

151 (1) Insertion of instructions 

152 (2) Change of size of a single, already existing instruction 

153 

154 (1) Some instructions of size `size` (in bytes) have been inserted at offset 

155 `changed_offset` in the instruction listing. 

156 

157 (2) An instruction at offset changed_offset` - 0.5 has increased in size. 

158 If `changed_offset` is self.offset + 0.5, then self has increased. 

159 

160 Either way, adjust the current offset, reference and argument 

161 accordingly. 

162 

163 TODO(b/207008147): Replace the pattern of using +0.5 as a sentinal. 

164 

165 Args: 

166 changed_offset: The offset where instructions are inserted. 

167 size: The number of bytes of instructions inserted. 

168 keep_ref: if True, adjust our reference. 

169 """ 

170 old_offset = self.offset 

171 old_reference = self.reference 

172 

173 if old_offset < changed_offset < (old_offset + 1): 

174 if old_reference is not None: 

175 if self._is_relative: 

176 self.reference += size # type: ignore[operator] 

177 elif old_reference > old_offset: 

178 self.reference += size # type: ignore[operator] 

179 self.arg = add_bytes_to_jump_arg(self.arg, size) 

180 

181 return 

182 

183 if changed_offset <= old_offset: 

184 self.offset += size 

185 

186 if old_reference is not None and not keep_ref: 

187 if changed_offset <= old_reference: 

188 self.reference += size # type: ignore[operator] 

189 

190 if self._is_relative: 

191 if old_offset < changed_offset <= old_reference: 

192 self.arg = add_bytes_to_jump_arg(self.arg, size) 

193 else: 

194 if changed_offset <= old_reference: 

195 self.arg = add_bytes_to_jump_arg(self.arg, size) 

196 

197 def check_state(self) -> None: 

198 """Asserts that internal state is consistent.""" 

199 assert self.mnemonic != "EXTENDED_ARG" 

200 assert 0 <= self.arg <= 0x7fffffff 

201 assert 0 <= self.opcode < 256 

202 

203 if self.reference is not None: 

204 if self._is_relative: 

205 assert self.offset + self.get_size() + jump_arg_bytes( 

206 self.arg) == self.reference 

207 else: 

208 assert jump_arg_bytes(self.arg) == self.reference 

209 

210 def is_jump(self) -> bool: 

211 return self.mnemonic in CONDITIONAL_JUMPS or self.mnemonic in UNCONDITIONAL_JUMPS 

212 

213 def make_nop(self) -> None: 

214 self.opcode = dis.opmap["NOP"] 

215 self.mnemonic = "NOP" 

216 self.arg = 0 

217 self._is_relative = None 

218 self.reference = None 

219 self.check_state() 

220 

221 

222class BasicBlock: 

223 """A block of bytecode instructions and the adresses it may jump to.""" 

224 

225 def __init__(self, instructions: List[Instruction], last_one: bool): 

226 self.instructions = instructions 

227 self.id = instructions[0].offset 

228 

229 last_instr = instructions[-1] 

230 

231 if last_one or last_instr.mnemonic in ENDS_FUNCTION: 

232 self.edges = [] 

233 elif last_instr.mnemonic in CONDITIONAL_JUMPS: 

234 self.edges = list( 

235 {last_instr.reference, last_instr.offset + last_instr.get_size()}) 

236 else: 

237 if last_instr.reference is not None: 

238 self.edges = [last_instr.reference] 

239 else: 

240 self.edges = [last_instr.offset + last_instr.get_size()] 

241 

242 def __iter__(self) -> Iterator[Instruction]: 

243 return iter(self.instructions) 

244 

245 def __repr__(self) -> str: 

246 return (f"BasicBlock(id={self.id}, edges={self.edges}, " + 

247 f"instructions={self.instructions})") 

248 

249 

250_SizeAndInstructions = Tuple[int, List[Instruction]] 

251 

252 

253class Instrumentor: 

254 """Implements the core instrumentation functionality. 

255 

256 It gets a single code object, builds a CFG of the bytecode and 

257 can instrument the code for coverage collection via trace_control_flow() 

258 and for data-flow tracing via trace_data_flow(). 

259 

260 How to insert code: 

261 1. Select a target basic block 

262 2. Build up the new code as a list of `Instruction` objects. 

263 Make sure to get the offsets right. 

264 3. Calculate the overall size needed by your new code (in bytes) 

265 4. Call _adjust() with your target offset and calculated size 

266 5. Insert your instruction list into the instruction list of the basic 

267 block 

268 6. Call _handle_size_changes() 

269 Take a look at trace_control_flow() and trace_data_flow() for examples. 

270 

271 Note that Instrumentor only supports insertions, not deletions. 

272 """ 

273 

274 def __init__(self, code: types.CodeType): 

275 self._cfg: collections.OrderedDict = collections.OrderedDict() 

276 self.consts = list(code.co_consts) 

277 self._names = list(code.co_names) 

278 self.num_counters = 0 

279 self._code = code 

280 

281 self._build_cfg() 

282 self._check_state() 

283 

284 def _build_cfg(self) -> None: 

285 """Builds control flow graph.""" 

286 lineno = self._code.co_firstlineno 

287 arg = None 

288 offset = None 

289 length = Instruction.get_fixed_size() 

290 instr_list = [] 

291 basic_block_borders = [] 

292 did_jump = False 

293 jump_targets = set() 

294 

295 for instruction in dis.get_instructions(self._code): 

296 if instruction.starts_line is not None: 

297 lineno = instruction.starts_line 

298 

299 if instruction.opname == "EXTENDED_ARG": 

300 if arg is None: 

301 arg = 0 

302 offset = instruction.offset 

303 

304 arg <<= 8 

305 arg |= instruction.arg # type: ignore[operator] 

306 length += Instruction.get_fixed_size() # type: ignore[operator] 

307 

308 continue 

309 

310 elif arg is not None: 

311 assert offset is not None 

312 combined_arg = 0 

313 # https://bugs.python.org/issue45757 can cause .arg to be None 

314 if instruction.arg is not None: 

315 combined_arg = (arg << 8) | instruction.arg # type: ignore[operator] 

316 instr_list.append( 

317 Instruction( 

318 lineno, 

319 offset, 

320 instruction.opcode, 

321 combined_arg, 

322 min_size=length)) 

323 arg = None 

324 offset = None 

325 length = Instruction.get_fixed_size() 

326 

327 else: 

328 instr_list.append( 

329 Instruction(lineno, instruction.offset, instruction.opcode, 

330 instruction.arg or 0)) 

331 

332 if instr_list[-1].reference is not None: 

333 jump_targets.add(instr_list[-1].reference) 

334 

335 for c, instr in enumerate(instr_list): 

336 if instr.offset == 0 or instr.offset in jump_targets or did_jump: 

337 basic_block_borders.append(c) 

338 

339 if instr.is_jump(): 

340 did_jump = True 

341 else: 

342 did_jump = False 

343 

344 basic_block_borders.append(len(instr_list)) 

345 

346 for i in range(len(basic_block_borders) - 1): 

347 start_of_bb = basic_block_borders[i] 

348 end_of_bb = basic_block_borders[i + 1] 

349 bb = BasicBlock(instr_list[start_of_bb:end_of_bb], 

350 i == len(basic_block_borders) - 2) 

351 self._cfg[bb.id] = bb 

352 

353 def _check_state(self) -> None: 

354 """Asserts that the Instrumentor is in a valid state.""" 

355 assert self._cfg, "Control flow graph empty." 

356 seen_ids = set() 

357 

358 for basic_block in self._cfg.values(): 

359 assert basic_block.instructions, "BasicBlock has no instructions." 

360 

361 assert basic_block.id not in seen_ids 

362 seen_ids.add(basic_block.id) 

363 

364 for edge in basic_block.edges: 

365 assert edge in self._cfg, ( 

366 f"{basic_block} has an edge, {edge}, not in CFG {self._cfg}.") 

367 

368 listing = self._get_linear_instruction_listing() 

369 i = 0 

370 

371 assert listing[0].offset == 0 

372 

373 while i < len(listing) - 1: 

374 assert (listing[i].offset + listing[i].get_size() == listing[i + 

375 1].offset) 

376 listing[i].check_state() 

377 i += 1 

378 

379 def _get_name(self, name: str) -> int: 

380 """Returns an offset to `name` in co_names, appending if necessary.""" 

381 try: 

382 return self._names.index(name) 

383 except ValueError: 

384 self._names.append(name) 

385 return len(self._names) - 1 

386 

387 def _get_const(self, constant: Union[int, types.ModuleType]) -> int: 

388 """Returns the index of `constant` in self.consts, inserting if needed.""" 

389 for i in range(len(self.consts)): 

390 if isinstance(self.consts[i], 

391 type(constant)) and self.consts[i] == constant: 

392 return i 

393 

394 self.consts.append(constant) 

395 return len(self.consts) - 1 

396 

397 def _get_counter(self) -> int: 

398 counter = _reserve_counter() 

399 return self._get_const(counter) 

400 

401 def _adjust(self, offset: float, size: int, *keep_refs: str) -> None: 

402 """Adjust for `size` bytes of instructions inserted at `offset`. 

403 

404 Signal all instructions that some instructions of size `size` (in bytes) 

405 will be inserted at offset `offset`. Sometimes it is necessary that some 

406 instructions do not change their reference when a new insertion happens. 

407 

408 All those Instruction-objects whose reference shall not change must be 

409 in `keep_refs`. 

410 

411 Args: 

412 offset: Location that new instructions are inserted at 

413 size: How many bytes of new instructions are being inserted. 

414 *keep_refs: The Instructions whose reference shall not change. 

415 """ 

416 for basic_block in self._cfg.values(): 

417 for instr in basic_block: 

418 instr.adjust(offset, size, instr in keep_refs) 

419 

420 def _handle_size_changes(self) -> None: 

421 """Fixes instructions who's size increased with the last insertion. 

422 

423 After insertions have been made it could be that the argument of some 

424 instructions crossed certain boundaries so that more EXTENDED_ARGs are 

425 required to build the oparg. This function identifies all of those 

426 instructions whose size increased with the latest insertion and adjusts all 

427 other instructions to the new size. 

428 """ 

429 listing = self._get_linear_instruction_listing() 

430 

431 while True: 

432 found_invalid = False 

433 i = 0 

434 

435 while i < len(listing) - 1: 

436 next_offset = listing[i].offset + listing[i].get_size() 

437 

438 assert next_offset >= listing[i + 1].offset, ( 

439 "Something weird happened with the offsets at offset " + 

440 f"{listing[i].offset}") 

441 

442 if next_offset > listing[i + 1].offset: 

443 delta = next_offset - listing[i + 1].offset 

444 self._adjust(listing[i].offset + 0.5, delta) 

445 found_invalid = True 

446 

447 i += 1 

448 

449 if not found_invalid: 

450 break 

451 

452 def _get_linear_instruction_listing(self) -> List[Instruction]: 

453 listing = [] 

454 for basic_block in self._cfg.values(): 

455 for instr in basic_block: 

456 listing.append(instr) 

457 return listing 

458 

459 def to_code(self) -> types.CodeType: 

460 """Returns the instrumented code object.""" 

461 self._check_state() 

462 listing = self._get_linear_instruction_listing() 

463 code = bytes() 

464 stacksize = 0 

465 

466 for instr in listing: 

467 code += instr.to_bytes() 

468 stacksize = max(stacksize, stacksize + instr.get_stack_effect()) 

469 

470 return get_code_object(self._code, stacksize, code, 

471 tuple(self.consts + ["__ATHERIS_INSTRUMENTED__"]), 

472 tuple(self._names), get_lnotab(self._code, listing)) 

473 

474 def _generate_trace_branch_invocation(self, lineno: int, 

475 offset: int) -> _SizeAndInstructions: 

476 """Builds the bytecode that calls atheris._trace_branch().""" 

477 to_insert = [] 

478 start_offset = offset 

479 const_atheris = self._get_const(sys.modules[_TARGET_MODULE]) 

480 name_cov = self._get_name(_COVERAGE_FUNCTION) 

481 

482 to_insert.append( 

483 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_atheris)) 

484 offset += to_insert[-1].get_size() 

485 to_insert.append( 

486 Instruction(lineno, offset, dis.opmap["LOAD_ATTR"], name_cov)) 

487 offset += to_insert[-1].get_size() 

488 to_insert.append( 

489 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], 

490 self._get_counter())) 

491 offset += to_insert[-1].get_size() 

492 to_insert.append(Instruction(lineno, offset, dis.opmap["CALL_FUNCTION"], 1)) 

493 offset += to_insert[-1].get_size() 

494 to_insert.append(Instruction(lineno, offset, dis.opmap["POP_TOP"])) 

495 offset += to_insert[-1].get_size() 

496 

497 return offset - start_offset, to_insert 

498 

499 def _generate_cmp_invocation(self, op: int, lineno: int, 

500 offset: int) -> _SizeAndInstructions: 

501 """Builds the bytecode that calls atheris._trace_cmp(). 

502 

503 Only call this if the two objects being compared are non-constants. 

504 

505 Args: 

506 op: The comparison operation 

507 lineno: The line number of the operation 

508 offset: The offset to the operation instruction 

509 

510 Returns: 

511 The size of the instructions to insert, 

512 The instructions to insert 

513 """ 

514 to_insert = [] 

515 start_offset = offset 

516 const_atheris = self._get_const(sys.modules[_TARGET_MODULE]) 

517 name_cmp = self._get_name(_COMPARE_FUNCTION) 

518 const_op = self._get_const(op) 

519 const_counter = self._get_counter() 

520 const_false = self._get_const(False) 

521 

522 to_insert.append( 

523 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_atheris)) 

524 offset += to_insert[-1].get_size() 

525 to_insert.append( 

526 Instruction(lineno, offset, dis.opmap["LOAD_ATTR"], name_cmp)) 

527 offset += to_insert[-1].get_size() 

528 to_insert.append(Instruction(lineno, offset, dis.opmap["ROT_THREE"])) 

529 offset += to_insert[-1].get_size() 

530 to_insert.append( 

531 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_op)) 

532 offset += to_insert[-1].get_size() 

533 to_insert.append( 

534 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_counter)) 

535 offset += to_insert[-1].get_size() 

536 to_insert.append( 

537 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_false)) 

538 offset += to_insert[-1].get_size() 

539 to_insert.append(Instruction(lineno, offset, dis.opmap["CALL_FUNCTION"], 5)) 

540 offset += to_insert[-1].get_size() 

541 

542 return offset - start_offset, to_insert 

543 

544 def _generate_const_cmp_invocation(self, op: int, lineno: int, offset: int, 

545 switch: bool) -> _SizeAndInstructions: 

546 """Builds the bytecode that calls atheris._trace_cmp(). 

547 

548 Only call this if one of the objects being compared is a constant coming 

549 from co_consts. If `switch` is true the constant is the second argument and 

550 needs to be switched with the first argument. 

551 

552 Args: 

553 op: The comparison operation. 

554 lineno: The line number of the operation 

555 offset: The initial number of instructions. 

556 switch: bool whether the second arg is constant instead of the first. 

557 

558 Returns: 

559 The number of bytes to insert, and the instructions. 

560 """ 

561 to_insert = [] 

562 start_offset = offset 

563 const_atheris = self._get_const(sys.modules[_TARGET_MODULE]) 

564 name_cmp = self._get_name(_COMPARE_FUNCTION) 

565 const_counter = self._get_counter() 

566 const_true = self._get_const(True) 

567 const_op = None 

568 

569 if switch: 

570 const_op = self._get_const(REVERSE_CMP_OP[op]) 

571 else: 

572 const_op = self._get_const(op) 

573 

574 to_insert.append( 

575 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_atheris)) 

576 offset += to_insert[-1].get_size() 

577 to_insert.append( 

578 Instruction(lineno, offset, dis.opmap["LOAD_ATTR"], name_cmp)) 

579 offset += to_insert[-1].get_size() 

580 to_insert.append(Instruction(lineno, offset, dis.opmap["ROT_THREE"])) 

581 offset += to_insert[-1].get_size() 

582 

583 if switch: 

584 to_insert.append(Instruction(lineno, offset, dis.opmap["ROT_TWO"])) 

585 offset += to_insert[-1].get_size() 

586 

587 to_insert.append( 

588 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_op)) 

589 offset += to_insert[-1].get_size() 

590 to_insert.append( 

591 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_counter)) 

592 offset += to_insert[-1].get_size() 

593 to_insert.append( 

594 Instruction(lineno, offset, dis.opmap["LOAD_CONST"], const_true)) 

595 offset += to_insert[-1].get_size() 

596 to_insert.append(Instruction(lineno, offset, dis.opmap["CALL_FUNCTION"], 5)) 

597 offset += to_insert[-1].get_size() 

598 

599 return offset - start_offset, to_insert 

600 

601 def trace_control_flow(self) -> None: 

602 """Insert a call to atheris._trace_branch() branch's target block. 

603 

604 The argument of _trace_branch() is an id for the branch. 

605 

606 The following bytecode gets inserted: 

607 LOAD_CONST atheris 

608 LOAD_ATTR _trace_branch 

609 LOAD_CONST <id> 

610 CALL_FUNCTION 1 

611 POP_TOP ; _trace_branch() returns None, remove the 

612 return value 

613 """ 

614 already_instrumented = set() 

615 

616 offset = self._cfg[0].instructions[0].offset 

617 total_size, to_insert = self._generate_trace_branch_invocation( 

618 self._cfg[0].instructions[0].lineno, offset) 

619 self._adjust(offset, total_size) 

620 self._cfg[0].instructions = to_insert + self._cfg[0].instructions 

621 

622 for basic_block in self._cfg.values(): 

623 if len(basic_block.edges) == 2: 

624 for edge in basic_block.edges: 

625 bb = self._cfg[edge] 

626 

627 if bb.id not in already_instrumented: 

628 already_instrumented.add(bb.id) 

629 source_instr = [] 

630 offset = bb.instructions[0].offset 

631 

632 for source_bb in self._cfg.values(): 

633 if bb.id in source_bb.edges and source_bb.instructions[ 

634 -1].reference == offset: 

635 source_instr.append(source_bb.instructions[-1]) 

636 

637 total_size, to_insert = self._generate_trace_branch_invocation( 

638 bb.instructions[0].lineno, offset) 

639 

640 self._adjust(offset, total_size, *source_instr) 

641 

642 bb.instructions = to_insert + bb.instructions 

643 

644 self._handle_size_changes() 

645 

646 def trace_data_flow(self) -> None: 

647 """Instruments bytecode for data-flow tracing. 

648 

649 This works by replacing the instruction COMPARE_OP with a call to 

650 atheris._trace_cmp(). The arguments for _trace_cmp() are as follows: 

651 - obj1 and obj2: The two values to compare 

652 - opid: argument to COMPARE_OP 

653 - counter: The counter for this comparison. 

654 - is_const: whether obj1 is a constant in co_consts. 

655 

656 To detect if any of the values being compared is a constant, all push and 

657 pop operations have to be analyzed. If a constant appears in a comparison it 

658 must always be given as obj1 to _trace_cmp(). 

659 

660 The bytecode that gets inserted looks like this: 

661 LOAD_CONST atheris 

662 LOAD_ATTR _trace_cmp 

663 ROT_THREE ; move atheris._trace_cmp below the two 

664 objects 

665 LOAD_CONST <opid> 

666 LOAD_CONST <counter index> 

667 LOAD_CONST <is_const> 

668 CALL_FUNCTION 5 

669 """ 

670 stack_size = 0 

671 seen_consts = [] 

672 

673 for basic_block in self._cfg.values(): 

674 for c, instr in enumerate(basic_block.instructions): 

675 if instr.mnemonic == "LOAD_CONST": 

676 seen_consts.append(stack_size) 

677 elif instr.mnemonic == "COMPARE_OP" and instr.arg <= 5: 

678 # Determine the two values on the top of the stack before COMPARE_OP 

679 consts_on_stack = [ 

680 c for c in seen_consts if stack_size - 2 <= c < stack_size 

681 ] 

682 tos_is_constant = stack_size - 1 in consts_on_stack 

683 tos1_is_constant = stack_size - 2 in consts_on_stack 

684 

685 if not (tos_is_constant and tos1_is_constant): 

686 offset = instr.offset 

687 total_size = None 

688 to_insert = None 

689 

690 # Both items are non-constants 

691 if (not tos_is_constant) and (not tos1_is_constant): 

692 total_size, to_insert = self._generate_cmp_invocation( 

693 instr.arg, instr.lineno, offset) 

694 

695 # One item is constant, one is non-constant 

696 else: 

697 total_size, to_insert = self._generate_const_cmp_invocation( 

698 instr.arg, instr.lineno, offset, tos_is_constant) 

699 

700 self._adjust(offset, total_size) 

701 

702 for i, new_instr in enumerate(to_insert): 

703 basic_block.instructions.insert(c + i, new_instr) 

704 

705 instr.make_nop() 

706 

707 stack_size += instr.get_stack_effect() 

708 seen_consts = [c for c in seen_consts if c < stack_size] 

709 

710 self._handle_size_changes() 

711 

712 def _print_disassembly(self) -> None: 

713 """Prints disassembly.""" 

714 print(f"Disassembly of {self._code.co_filename}:{self._code.co_name}") 

715 for basic_block in self._cfg.values(): 

716 print(" -bb-") 

717 for instr in basic_block: 

718 print(f" L.{instr.lineno} [{instr.offset}] {instr.mnemonic} ", end="") 

719 

720 if instr.has_argument(): 

721 print(f"{instr.arg} ", end="") 

722 

723 if instr._is_relative: 

724 print(f"(to {instr.reference})", end="") 

725 

726 print() 

727 

728 

729def patch_code(code: types.CodeType, 

730 trace_dataflow: bool, 

731 nested: bool = False) -> types.CodeType: 

732 """Returns code, patched with Atheris instrumentation. 

733 

734 Args: 

735 code: The byte code to instrument. 

736 trace_dataflow: Whether to trace dataflow or not. 

737 nested: If False, reserve counters, and patch modules. Recursive calls to 

738 this function are considered nested. 

739 """ 

740 inst = Instrumentor(code) 

741 

742 # If this code object has already been instrumented, skip it 

743 for const in inst.consts: 

744 # This avoids comparison between str and bytes (BytesWarning). 

745 if isinstance(const, str) and const == "__ATHERIS_INSTRUMENTED__": 

746 return code 

747 

748 inst.trace_control_flow() 

749 

750 if trace_dataflow: 

751 inst.trace_data_flow() 

752 

753 # Repeat this for all nested code objects 

754 for i in range(len(inst.consts)): 

755 if isinstance(inst.consts[i], types.CodeType): 

756 if (inst.consts[i].co_name == "<lambda>" or 

757 (not nested and inst.consts[i].co_name == "<module>") or 

758 inst.consts[i].co_name[0] != "<" or 

759 inst.consts[i].co_name[-1] != ">"): 

760 inst.consts[i] = patch_code(inst.consts[i], trace_dataflow, nested=True) 

761 

762 return inst.to_code() 

763 

764 

765T = TypeVar("T") 

766 

767 

768def instrument_func(func: Callable[..., T]) -> Callable[..., T]: 

769 """Add Atheris instrumentation to a specific function.""" 

770 func.__code__ = patch_code(func.__code__, True, True) 

771 

772 return func 

773 

774 

775def _is_instrumentable(obj: Any) -> bool: 

776 """Returns True if this object can be instrumented.""" 

777 try: 

778 # Only callables can be instrumented 

779 if not hasattr(obj, "__call__"): 

780 return False 

781 # Only objects with a __code__ member of type CodeType can be instrumented 

782 if not hasattr(obj, "__code__"): 

783 return False 

784 if not isinstance(obj.__code__, types.CodeType): 

785 return False 

786 # Only code in a real module can be instrumented 

787 if not hasattr(obj, "__module__"): 

788 return False 

789 if obj.__module__ not in sys.modules: 

790 return False 

791 # Bound methods can't be instrumented - instrument the real func instead 

792 if hasattr(obj, "__self__"): 

793 return False 

794 # Only Python functions and methods can be instrumented, nothing native 

795 if (not isinstance(obj, types.FunctionType)) and (not isinstance( 

796 obj, types.MethodType)): 

797 return False 

798 except Exception: # pylint: disable=broad-except 

799 # If accessing any of those fields produced an exception, the object 

800 # probably can't be instrumented 

801 return False 

802 

803 return True 

804 

805 

806def instrument_all() -> None: 

807 """Add Atheris instrementation to all Python code already imported. 

808 

809 This function is experimental. 

810 

811 This function is able to instrument core library functions that can't be 

812 instrumented by instrument_func or instrument_imports, as those functions are 

813 used in the implementation of the instrumentation. 

814 """ 

815 progress_renderer = None 

816 

817 funcs = [obj for obj in gc.get_objects() if _is_instrumentable(obj)] 

818 if sys.stderr.isatty(): 

819 sys.stderr.write("INFO: Instrumenting functions: ") 

820 progress_renderer = utils.ProgressRenderer(sys.stderr, len(funcs)) 

821 else: 

822 sys.stderr.write(f"INFO: Instrumenting {len(funcs)} functions...\n") 

823 

824 for i in range(len(funcs)): 

825 func = funcs[i] 

826 try: 

827 instrument_func(func) 

828 except Exception as e: # pylint: disable=broad-except 

829 if progress_renderer: 

830 progress_renderer.drop() 

831 sys.stderr.write(f"ERROR: Failed to instrument function {func}: {e}\n") 

832 if progress_renderer: 

833 progress_renderer.count = i + 1 

834 

835 if progress_renderer: 

836 progress_renderer.drop() 

837 else: 

838 print("INFO: Instrumentation complete.")