Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/hypothesis/internal/conjecture/shrinker.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

615 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11from collections import defaultdict 

12from typing import TYPE_CHECKING, Callable, Dict, Optional, Union 

13 

14import attr 

15 

16from hypothesis.internal.compat import int_from_bytes, int_to_bytes 

17from hypothesis.internal.conjecture.choicetree import ( 

18 ChoiceTree, 

19 prefix_selection_order, 

20 random_selection_order, 

21) 

22from hypothesis.internal.conjecture.data import ( 

23 ConjectureData, 

24 ConjectureResult, 

25 Status, 

26 bits_to_bytes, 

27 ir_value_equal, 

28 ir_value_key, 

29 ir_value_permitted, 

30) 

31from hypothesis.internal.conjecture.junkdrawer import find_integer, replace_all 

32from hypothesis.internal.conjecture.shrinking import ( 

33 Bytes, 

34 Float, 

35 Integer, 

36 Ordering, 

37 String, 

38) 

39 

40if TYPE_CHECKING: 

41 from hypothesis.internal.conjecture.engine import ConjectureRunner 

42 

43 

44def sort_key(buffer): 

45 """Returns a sort key such that "simpler" buffers are smaller than 

46 "more complicated" ones. 

47 

48 We define sort_key so that x is simpler than y if x is shorter than y or if 

49 they have the same length and x < y lexicographically. This is called the 

50 shortlex order. 

51 

52 The reason for using the shortlex order is: 

53 

54 1. If x is shorter than y then that means we had to make fewer decisions 

55 in constructing the test case when we ran x than we did when we ran y. 

56 2. If x is the same length as y then replacing a byte with a lower byte 

57 corresponds to reducing the value of an integer we drew with draw_bits 

58 towards zero. 

59 3. We want a total order, and given (2) the natural choices for things of 

60 the same size are either the lexicographic or colexicographic orders 

61 (the latter being the lexicographic order of the reverse of the string). 

62 Because values drawn early in generation potentially get used in more 

63 places they potentially have a more significant impact on the final 

64 result, so it makes sense to prioritise reducing earlier values over 

65 later ones. This makes the lexicographic order the more natural choice. 

66 """ 

67 return (len(buffer), buffer) 

68 

69 

70SHRINK_PASS_DEFINITIONS: Dict[str, "ShrinkPassDefinition"] = {} 

71 

72 

73@attr.s() 

74class ShrinkPassDefinition: 

75 """A shrink pass bundles together a large number of local changes to 

76 the current shrink target. 

77 

78 Each shrink pass is defined by some function and some arguments to that 

79 function. The ``generate_arguments`` function returns all arguments that 

80 might be useful to run on the current shrink target. 

81 

82 The guarantee made by methods defined this way is that after they are 

83 called then *either* the shrink target has changed *or* each of 

84 ``fn(*args)`` has been called for every ``args`` in ``generate_arguments(self)``. 

85 No guarantee is made that all of these will be called if the shrink target 

86 changes. 

87 """ 

88 

89 run_with_chooser = attr.ib() 

90 

91 @property 

92 def name(self): 

93 return self.run_with_chooser.__name__ 

94 

95 def __attrs_post_init__(self): 

96 assert self.name not in SHRINK_PASS_DEFINITIONS, self.name 

97 SHRINK_PASS_DEFINITIONS[self.name] = self 

98 

99 

100def defines_shrink_pass(): 

101 """A convenient decorator for defining shrink passes.""" 

102 

103 def accept(run_step): 

104 ShrinkPassDefinition(run_with_chooser=run_step) 

105 

106 def run(self): 

107 raise NotImplementedError("Shrink passes should not be run directly") 

108 

109 run.__name__ = run_step.__name__ 

110 run.is_shrink_pass = True 

111 return run 

112 

113 return accept 

114 

115 

116class Shrinker: 

117 """A shrinker is a child object of a ConjectureRunner which is designed to 

118 manage the associated state of a particular shrink problem. That is, we 

119 have some initial ConjectureData object and some property of interest 

120 that it satisfies, and we want to find a ConjectureData object with a 

121 shortlex (see sort_key above) smaller buffer that exhibits the same 

122 property. 

123 

124 Currently the only property of interest we use is that the status is 

125 INTERESTING and the interesting_origin takes on some fixed value, but we 

126 may potentially be interested in other use cases later. 

127 However we assume that data with a status < VALID never satisfies the predicate. 

128 

129 The shrinker keeps track of a value shrink_target which represents the 

130 current best known ConjectureData object satisfying the predicate. 

131 It refines this value by repeatedly running *shrink passes*, which are 

132 methods that perform a series of transformations to the current shrink_target 

133 and evaluate the underlying test function to find new ConjectureData 

134 objects. If any of these satisfy the predicate, the shrink_target 

135 is updated automatically. Shrinking runs until no shrink pass can 

136 improve the shrink_target, at which point it stops. It may also be 

137 terminated if the underlying engine throws RunIsComplete, but that 

138 is handled by the calling code rather than the Shrinker. 

139 

140 ======================= 

141 Designing Shrink Passes 

142 ======================= 

143 

144 Generally a shrink pass is just any function that calls 

145 cached_test_function and/or incorporate_new_buffer a number of times, 

146 but there are a couple of useful things to bear in mind. 

147 

148 A shrink pass *makes progress* if running it changes self.shrink_target 

149 (i.e. it tries a shortlex smaller ConjectureData object satisfying 

150 the predicate). The desired end state of shrinking is to find a 

151 value such that no shrink pass can make progress, i.e. that we 

152 are at a local minimum for each shrink pass. 

153 

154 In aid of this goal, the main invariant that a shrink pass much 

155 satisfy is that whether it makes progress must be deterministic. 

156 It is fine (encouraged even) for the specific progress it makes 

157 to be non-deterministic, but if you run a shrink pass, it makes 

158 no progress, and then you immediately run it again, it should 

159 never succeed on the second time. This allows us to stop as soon 

160 as we have run each shrink pass and seen no progress on any of 

161 them. 

162 

163 This means that e.g. it's fine to try each of N deletions 

164 or replacements in a random order, but it's not OK to try N random 

165 deletions (unless you have already shrunk at least once, though we 

166 don't currently take advantage of this loophole). 

167 

168 Shrink passes need to be written so as to be robust against 

169 change in the underlying shrink target. It is generally safe 

170 to assume that the shrink target does not change prior to the 

171 point of first modification - e.g. if you change no bytes at 

172 index ``i``, all examples whose start is ``<= i`` still exist, 

173 as do all blocks, and the data object is still of length 

174 ``>= i + 1``. This can only be violated by bad user code which 

175 relies on an external source of non-determinism. 

176 

177 When the underlying shrink_target changes, shrink 

178 passes should not run substantially more test_function calls 

179 on success than they do on failure. Say, no more than a constant 

180 factor more. In particular shrink passes should not iterate to a 

181 fixed point. 

182 

183 This means that shrink passes are often written with loops that 

184 are carefully designed to do the right thing in the case that no 

185 shrinks occurred and try to adapt to any changes to do a reasonable 

186 job. e.g. say we wanted to write a shrink pass that tried deleting 

187 each individual byte (this isn't an especially good choice, 

188 but it leads to a simple illustrative example), we might do it 

189 by iterating over the buffer like so: 

190 

191 .. code-block:: python 

192 

193 i = 0 

194 while i < len(self.shrink_target.buffer): 

195 if not self.incorporate_new_buffer( 

196 self.shrink_target.buffer[:i] + self.shrink_target.buffer[i + 1 :] 

197 ): 

198 i += 1 

199 

200 The reason for writing the loop this way is that i is always a 

201 valid index into the current buffer, even if the current buffer 

202 changes as a result of our actions. When the buffer changes, 

203 we leave the index where it is rather than restarting from the 

204 beginning, and carry on. This means that the number of steps we 

205 run in this case is always bounded above by the number of steps 

206 we would run if nothing works. 

207 

208 Another thing to bear in mind about shrink pass design is that 

209 they should prioritise *progress*. If you have N operations that 

210 you need to run, you should try to order them in such a way as 

211 to avoid stalling, where you have long periods of test function 

212 invocations where no shrinks happen. This is bad because whenever 

213 we shrink we reduce the amount of work the shrinker has to do 

214 in future, and often speed up the test function, so we ideally 

215 wanted those shrinks to happen much earlier in the process. 

216 

217 Sometimes stalls are inevitable of course - e.g. if the pass 

218 makes no progress, then the entire thing is just one long stall, 

219 but it's helpful to design it so that stalls are less likely 

220 in typical behaviour. 

221 

222 The two easiest ways to do this are: 

223 

224 * Just run the N steps in random order. As long as a 

225 reasonably large proportion of the operations succeed, this 

226 guarantees the expected stall length is quite short. The 

227 book keeping for making sure this does the right thing when 

228 it succeeds can be quite annoying. 

229 * When you have any sort of nested loop, loop in such a way 

230 that both loop variables change each time. This prevents 

231 stalls which occur when one particular value for the outer 

232 loop is impossible to make progress on, rendering the entire 

233 inner loop into a stall. 

234 

235 However, although progress is good, too much progress can be 

236 a bad sign! If you're *only* seeing successful reductions, 

237 that's probably a sign that you are making changes that are 

238 too timid. Two useful things to offset this: 

239 

240 * It's worth writing shrink passes which are *adaptive*, in 

241 the sense that when operations seem to be working really 

242 well we try to bundle multiple of them together. This can 

243 often be used to turn what would be O(m) successful calls 

244 into O(log(m)). 

245 * It's often worth trying one or two special minimal values 

246 before trying anything more fine grained (e.g. replacing 

247 the whole thing with zero). 

248 

249 """ 

250 

251 def derived_value(fn): 

252 """It's useful during shrinking to have access to derived values of 

253 the current shrink target. 

254 

255 This decorator allows you to define these as cached properties. They 

256 are calculated once, then cached until the shrink target changes, then 

257 recalculated the next time they are used.""" 

258 

259 def accept(self): 

260 try: 

261 return self.__derived_values[fn.__name__] 

262 except KeyError: 

263 return self.__derived_values.setdefault(fn.__name__, fn(self)) 

264 

265 accept.__name__ = fn.__name__ 

266 return property(accept) 

267 

268 def __init__( 

269 self, 

270 engine: "ConjectureRunner", 

271 initial: Union[ConjectureData, ConjectureResult], 

272 predicate: Optional[Callable[[ConjectureData], bool]], 

273 *, 

274 allow_transition: Optional[ 

275 Callable[[Union[ConjectureData, ConjectureResult], ConjectureData], bool] 

276 ], 

277 explain: bool, 

278 in_target_phase: bool = False, 

279 ): 

280 """Create a shrinker for a particular engine, with a given starting 

281 point and predicate. When shrink() is called it will attempt to find an 

282 example for which predicate is True and which is strictly smaller than 

283 initial. 

284 

285 Note that initial is a ConjectureData object, and predicate 

286 takes ConjectureData objects. 

287 """ 

288 assert predicate is not None or allow_transition is not None 

289 self.engine = engine 

290 self.__predicate = predicate or (lambda data: True) 

291 self.__allow_transition = allow_transition or (lambda source, destination: True) 

292 self.__derived_values: dict = {} 

293 self.__pending_shrink_explanation = None 

294 

295 self.initial_size = len(initial.buffer) 

296 

297 # We keep track of the current best example on the shrink_target 

298 # attribute. 

299 self.shrink_target = initial 

300 self.clear_change_tracking() 

301 self.shrinks = 0 

302 

303 # We terminate shrinks that seem to have reached their logical 

304 # conclusion: If we've called the underlying test function at 

305 # least self.max_stall times since the last time we shrunk, 

306 # it's time to stop shrinking. 

307 self.max_stall = 200 

308 self.initial_calls = self.engine.call_count 

309 self.initial_misaligned = self.engine.misaligned_count 

310 self.calls_at_last_shrink = self.initial_calls 

311 

312 self.passes_by_name: Dict[str, ShrinkPass] = {} 

313 

314 # Because the shrinker is also used to `pareto_optimise` in the target phase, 

315 # we sometimes want to allow extending buffers instead of aborting at the end. 

316 if in_target_phase: 

317 from hypothesis.internal.conjecture.engine import BUFFER_SIZE 

318 

319 self.__extend = BUFFER_SIZE 

320 else: 

321 self.__extend = 0 

322 self.should_explain = explain 

323 

324 @derived_value # type: ignore 

325 def cached_calculations(self): 

326 return {} 

327 

328 def cached(self, *keys): 

329 def accept(f): 

330 cache_key = (f.__name__, *keys) 

331 try: 

332 return self.cached_calculations[cache_key] 

333 except KeyError: 

334 return self.cached_calculations.setdefault(cache_key, f()) 

335 

336 return accept 

337 

338 def add_new_pass(self, run): 

339 """Creates a shrink pass corresponding to calling ``run(self)``""" 

340 

341 definition = SHRINK_PASS_DEFINITIONS[run] 

342 

343 p = ShrinkPass( 

344 run_with_chooser=definition.run_with_chooser, 

345 shrinker=self, 

346 index=len(self.passes_by_name), 

347 ) 

348 self.passes_by_name[p.name] = p 

349 return p 

350 

351 def shrink_pass(self, name): 

352 """Return the ShrinkPass object for the pass with the given name.""" 

353 if isinstance(name, ShrinkPass): 

354 return name 

355 if name not in self.passes_by_name: 

356 self.add_new_pass(name) 

357 return self.passes_by_name[name] 

358 

359 @property 

360 def calls(self): 

361 """Return the number of calls that have been made to the underlying 

362 test function.""" 

363 return self.engine.call_count 

364 

365 @property 

366 def misaligned(self): 

367 return self.engine.misaligned_count 

368 

369 def check_calls(self): 

370 if self.calls - self.calls_at_last_shrink >= self.max_stall: 

371 raise StopShrinking 

372 

373 def cached_test_function_ir(self, tree): 

374 # sometimes our shrinking passes try obviously invalid things. We handle 

375 # discarding them in one place here. 

376 for node in tree: 

377 if not ir_value_permitted(node.value, node.ir_type, node.kwargs): 

378 return None 

379 

380 result = self.engine.cached_test_function_ir(tree) 

381 self.incorporate_test_data(result) 

382 self.check_calls() 

383 return result 

384 

385 def consider_new_tree(self, tree): 

386 tree = tree[: len(self.nodes)] 

387 

388 def startswith(t1, t2): 

389 return t1[: len(t2)] == t2 

390 

391 if startswith(tree, self.nodes): 

392 return True 

393 

394 if startswith(self.nodes, tree): 

395 return False 

396 

397 previous = self.shrink_target 

398 self.cached_test_function_ir(tree) 

399 return previous is not self.shrink_target 

400 

401 def consider_new_buffer(self, buffer): 

402 """Returns True if after running this buffer the result would be 

403 the current shrink_target.""" 

404 buffer = bytes(buffer) 

405 return buffer.startswith(self.buffer) or self.incorporate_new_buffer(buffer) 

406 

407 def incorporate_new_buffer(self, buffer): 

408 """Either runs the test function on this buffer and returns True if 

409 that changed the shrink_target, or determines that doing so would 

410 be useless and returns False without running it.""" 

411 

412 buffer = bytes(buffer[: self.shrink_target.index]) 

413 # Sometimes an attempt at lexicographic minimization will do the wrong 

414 # thing because the buffer has changed under it (e.g. something has 

415 # turned into a write, the bit size has changed). The result would be 

416 # an invalid string, but it's better for us to just ignore it here as 

417 # it turns out to involve quite a lot of tricky book-keeping to get 

418 # this right and it's better to just handle it in one place. 

419 if sort_key(buffer) >= sort_key(self.shrink_target.buffer): 

420 return False 

421 

422 if self.shrink_target.buffer.startswith(buffer): 

423 return False 

424 

425 previous = self.shrink_target 

426 self.cached_test_function(buffer) 

427 return previous is not self.shrink_target 

428 

429 def incorporate_test_data(self, data): 

430 """Takes a ConjectureData or Overrun object updates the current 

431 shrink_target if this data represents an improvement over it.""" 

432 if data.status < Status.VALID or data is self.shrink_target: 

433 return 

434 if ( 

435 self.__predicate(data) 

436 and sort_key(data.buffer) < sort_key(self.shrink_target.buffer) 

437 and self.__allow_transition(self.shrink_target, data) 

438 ): 

439 self.update_shrink_target(data) 

440 

441 def cached_test_function(self, buffer): 

442 """Returns a cached version of the underlying test function, so 

443 that the result is either an Overrun object (if the buffer is 

444 too short to be a valid test case) or a ConjectureData object 

445 with status >= INVALID that would result from running this buffer.""" 

446 buffer = bytes(buffer) 

447 result = self.engine.cached_test_function(buffer, extend=self.__extend) 

448 self.incorporate_test_data(result) 

449 self.check_calls() 

450 return result 

451 

452 def debug(self, msg): 

453 self.engine.debug(msg) 

454 

455 @property 

456 def random(self): 

457 return self.engine.random 

458 

459 def shrink(self): 

460 """Run the full set of shrinks and update shrink_target. 

461 

462 This method is "mostly idempotent" - calling it twice is unlikely to 

463 have any effect, though it has a non-zero probability of doing so. 

464 """ 

465 # We assume that if an all-zero block of bytes is an interesting 

466 # example then we're not going to do better than that. 

467 # This might not technically be true: e.g. for integers() | booleans() 

468 # the simplest example is actually [1, 0]. Missing this case is fairly 

469 # harmless and this allows us to make various simplifying assumptions 

470 # about the structure of the data (principally that we're never 

471 # operating on a block of all zero bytes so can use non-zeroness as a 

472 # signpost of complexity). 

473 if not any(self.shrink_target.buffer) or self.incorporate_new_buffer( 

474 bytes(len(self.shrink_target.buffer)) 

475 ): 

476 self.explain() 

477 return 

478 

479 # There are multiple buffers that represent the same counterexample, eg 

480 # n=2 (from the 16 bit integer bucket) and n=2 (from the 32 bit integer 

481 # bucket). Before we start shrinking, we need to normalize to the minimal 

482 # such buffer, else a buffer-smaller but ir-larger value may be chosen 

483 # as the minimal counterexample. 

484 data = self.engine.new_conjecture_data_ir(self.nodes) 

485 self.engine.test_function(data) 

486 self.incorporate_test_data(data.as_result()) 

487 

488 try: 

489 self.greedy_shrink() 

490 except StopShrinking: 

491 # If we stopped shrinking because we're making slow progress (instead of 

492 # reaching a local optimum), don't run the explain-phase logic. 

493 self.should_explain = False 

494 finally: 

495 if self.engine.report_debug_info: 

496 

497 def s(n): 

498 return "s" if n != 1 else "" 

499 

500 total_deleted = self.initial_size - len(self.shrink_target.buffer) 

501 calls = self.engine.call_count - self.initial_calls 

502 misaligned = self.engine.misaligned_count - self.initial_misaligned 

503 

504 self.debug( 

505 "---------------------\n" 

506 "Shrink pass profiling\n" 

507 "---------------------\n\n" 

508 f"Shrinking made a total of {calls} call{s(calls)} of which " 

509 f"{self.shrinks} shrank and {misaligned} were misaligned. This deleted {total_deleted} bytes out " 

510 f"of {self.initial_size}." 

511 ) 

512 for useful in [True, False]: 

513 self.debug("") 

514 if useful: 

515 self.debug("Useful passes:") 

516 else: 

517 self.debug("Useless passes:") 

518 self.debug("") 

519 for p in sorted( 

520 self.passes_by_name.values(), 

521 key=lambda t: (-t.calls, t.deletions, t.shrinks), 

522 ): 

523 if p.calls == 0: 

524 continue 

525 if (p.shrinks != 0) != useful: 

526 continue 

527 

528 self.debug( 

529 f" * {p.name} made {p.calls} call{s(p.calls)} of which " 

530 f"{p.shrinks} shrank and {p.misaligned} were misaligned, " 

531 f"deleting {p.deletions} byte{s(p.deletions)}." 

532 ) 

533 self.debug("") 

534 self.explain() 

535 

536 def explain(self): 

537 if not self.should_explain or not self.shrink_target.arg_slices: 

538 return 

539 from hypothesis.internal.conjecture.engine import BUFFER_SIZE 

540 

541 self.max_stall = 1e999 

542 shrink_target = self.shrink_target 

543 buffer = shrink_target.buffer 

544 chunks = defaultdict(list) 

545 

546 # Before we start running experiments, let's check for known inputs which would 

547 # make them redundant. The shrinking process means that we've already tried many 

548 # variations on the minimal example, so this can save a lot of time. 

549 seen_passing_buffers = self.engine.passing_buffers( 

550 prefix=buffer[: min(self.shrink_target.arg_slices)[0]] 

551 ) 

552 

553 # Now that we've shrunk to a minimal failing example, it's time to try 

554 # varying each part that we've noted will go in the final report. Consider 

555 # slices in largest-first order 

556 for start, end in sorted( 

557 self.shrink_target.arg_slices, key=lambda x: (-(x[1] - x[0]), x) 

558 ): 

559 # Check for any previous examples that match the prefix and suffix, 

560 # so we can skip if we found a passing example while shrinking. 

561 if any( 

562 seen.startswith(buffer[:start]) and seen.endswith(buffer[end:]) 

563 for seen in seen_passing_buffers 

564 ): 

565 continue 

566 

567 # Run our experiments 

568 n_same_failures = 0 

569 note = "or any other generated value" 

570 # TODO: is 100 same-failures out of 500 attempts a good heuristic? 

571 for n_attempt in range(500): # pragma: no branch 

572 # no-branch here because we don't coverage-test the abort-at-500 logic. 

573 

574 if n_attempt - 10 > n_same_failures * 5: 

575 # stop early if we're seeing mostly invalid examples 

576 break # pragma: no cover 

577 

578 buf_attempt_fixed = bytearray(buffer) 

579 buf_attempt_fixed[start:end] = [ 

580 self.random.randint(0, 255) for _ in range(end - start) 

581 ] 

582 result = self.engine.cached_test_function( 

583 buf_attempt_fixed, extend=BUFFER_SIZE - len(buf_attempt_fixed) 

584 ) 

585 

586 # Turns out this was a variable-length part, so grab the infix... 

587 if result.status == Status.OVERRUN: 

588 continue # pragma: no cover # flakily covered 

589 if not ( 

590 len(buf_attempt_fixed) == len(result.buffer) 

591 and result.buffer.endswith(buffer[end:]) 

592 ): 

593 for ex, res in zip(shrink_target.examples, result.examples): 

594 assert ex.start == res.start 

595 assert ex.start <= start 

596 assert ex.label == res.label 

597 if start == ex.start and end == ex.end: 

598 res_end = res.end 

599 break 

600 else: 

601 raise NotImplementedError("Expected matching prefixes") 

602 

603 buf_attempt_fixed = ( 

604 buffer[:start] + result.buffer[start:res_end] + buffer[end:] 

605 ) 

606 chunks[(start, end)].append(result.buffer[start:res_end]) 

607 result = self.engine.cached_test_function(buf_attempt_fixed) 

608 

609 if result.status == Status.OVERRUN: 

610 continue # pragma: no cover # flakily covered 

611 else: 

612 chunks[(start, end)].append(result.buffer[start:end]) 

613 

614 if shrink_target is not self.shrink_target: # pragma: no cover 

615 # If we've shrunk further without meaning to, bail out. 

616 self.shrink_target.slice_comments.clear() 

617 return 

618 if result.status == Status.VALID: 

619 # The test passed, indicating that this param can't vary freely. 

620 # However, it's really hard to write a simple and reliable covering 

621 # test, because of our `seen_passing_buffers` check above. 

622 break # pragma: no cover 

623 elif self.__predicate(result): # pragma: no branch 

624 n_same_failures += 1 

625 if n_same_failures >= 100: 

626 self.shrink_target.slice_comments[(start, end)] = note 

627 break 

628 

629 # Finally, if we've found multiple independently-variable parts, check whether 

630 # they can all be varied together. 

631 if len(self.shrink_target.slice_comments) <= 1: 

632 return 

633 n_same_failures_together = 0 

634 chunks_by_start_index = sorted(chunks.items()) 

635 for _ in range(500): # pragma: no branch 

636 # no-branch here because we don't coverage-test the abort-at-500 logic. 

637 new_buf = bytearray() 

638 prev_end = 0 

639 for (start, end), ls in chunks_by_start_index: 

640 assert prev_end <= start < end, "these chunks must be nonoverlapping" 

641 new_buf.extend(buffer[prev_end:start]) 

642 new_buf.extend(self.random.choice(ls)) 

643 prev_end = end 

644 

645 result = self.engine.cached_test_function(new_buf) 

646 

647 # This *can't* be a shrink because none of the components were. 

648 assert shrink_target is self.shrink_target 

649 if result.status == Status.VALID: 

650 self.shrink_target.slice_comments[(0, 0)] = ( 

651 "The test sometimes passed when commented parts were varied together." 

652 ) 

653 break # Test passed, this param can't vary freely. 

654 elif self.__predicate(result): # pragma: no branch 

655 n_same_failures_together += 1 

656 if n_same_failures_together >= 100: 

657 self.shrink_target.slice_comments[(0, 0)] = ( 

658 "The test always failed when commented parts were varied together." 

659 ) 

660 break 

661 

662 def greedy_shrink(self): 

663 """Run a full set of greedy shrinks (that is, ones that will only ever 

664 move to a better target) and update shrink_target appropriately. 

665 

666 This method iterates to a fixed point and so is idempontent - calling 

667 it twice will have exactly the same effect as calling it once. 

668 """ 

669 self.fixate_shrink_passes( 

670 [ 

671 node_program("X" * 5), 

672 node_program("X" * 4), 

673 node_program("X" * 3), 

674 node_program("X" * 2), 

675 node_program("X" * 1), 

676 "pass_to_descendant", 

677 "reorder_examples", 

678 "minimize_duplicated_nodes", 

679 "minimize_individual_nodes", 

680 "redistribute_block_pairs", 

681 "lower_blocks_together", 

682 ] 

683 ) 

684 

685 @derived_value # type: ignore 

686 def shrink_pass_choice_trees(self): 

687 return defaultdict(ChoiceTree) 

688 

689 def fixate_shrink_passes(self, passes): 

690 """Run steps from each pass in ``passes`` until the current shrink target 

691 is a fixed point of all of them.""" 

692 passes = list(map(self.shrink_pass, passes)) 

693 

694 any_ran = True 

695 while any_ran: 

696 any_ran = False 

697 

698 reordering = {} 

699 

700 # We run remove_discarded after every pass to do cleanup 

701 # keeping track of whether that actually works. Either there is 

702 # no discarded data and it is basically free, or it reliably works 

703 # and deletes data, or it doesn't work. In that latter case we turn 

704 # it off for the rest of this loop through the passes, but will 

705 # try again once all of the passes have been run. 

706 can_discard = self.remove_discarded() 

707 

708 calls_at_loop_start = self.calls 

709 

710 # We keep track of how many calls can be made by a single step 

711 # without making progress and use this to test how much to pad 

712 # out self.max_stall by as we go along. 

713 max_calls_per_failing_step = 1 

714 

715 for sp in passes: 

716 if can_discard: 

717 can_discard = self.remove_discarded() 

718 

719 before_sp = self.shrink_target 

720 

721 # Run the shrink pass until it fails to make any progress 

722 # max_failures times in a row. This implicitly boosts shrink 

723 # passes that are more likely to work. 

724 failures = 0 

725 max_failures = 20 

726 while failures < max_failures: 

727 # We don't allow more than max_stall consecutive failures 

728 # to shrink, but this means that if we're unlucky and the 

729 # shrink passes are in a bad order where only the ones at 

730 # the end are useful, if we're not careful this heuristic 

731 # might stop us before we've tried everything. In order to 

732 # avoid that happening, we make sure that there's always 

733 # plenty of breathing room to make it through a single 

734 # iteration of the fixate_shrink_passes loop. 

735 self.max_stall = max( 

736 self.max_stall, 

737 2 * max_calls_per_failing_step 

738 + (self.calls - calls_at_loop_start), 

739 ) 

740 

741 prev = self.shrink_target 

742 initial_calls = self.calls 

743 # It's better for us to run shrink passes in a deterministic 

744 # order, to avoid repeat work, but this can cause us to create 

745 # long stalls when there are a lot of steps which fail to do 

746 # anything useful. In order to avoid this, once we've noticed 

747 # we're in a stall (i.e. half of max_failures calls have failed 

748 # to do anything) we switch to randomly jumping around. If we 

749 # find a success then we'll resume deterministic order from 

750 # there which, with any luck, is in a new good region. 

751 if not sp.step(random_order=failures >= max_failures // 2): 

752 # step returns False when there is nothing to do because 

753 # the entire choice tree is exhausted. If this happens 

754 # we break because we literally can't run this pass any 

755 # more than we already have until something else makes 

756 # progress. 

757 break 

758 any_ran = True 

759 

760 # Don't count steps that didn't actually try to do 

761 # anything as failures. Otherwise, this call is a failure 

762 # if it failed to make any changes to the shrink target. 

763 if initial_calls != self.calls: 

764 if prev is not self.shrink_target: 

765 failures = 0 

766 else: 

767 max_calls_per_failing_step = max( 

768 max_calls_per_failing_step, self.calls - initial_calls 

769 ) 

770 failures += 1 

771 

772 # We reorder the shrink passes so that on our next run through 

773 # we try good ones first. The rule is that shrink passes that 

774 # did nothing useful are the worst, shrink passes that reduced 

775 # the length are the best. 

776 if self.shrink_target is before_sp: 

777 reordering[sp] = 1 

778 elif len(self.buffer) < len(before_sp.buffer): 

779 reordering[sp] = -1 

780 else: 

781 reordering[sp] = 0 

782 

783 passes.sort(key=reordering.__getitem__) 

784 

785 @property 

786 def buffer(self): 

787 return self.shrink_target.buffer 

788 

789 @property 

790 def blocks(self): 

791 return self.shrink_target.blocks 

792 

793 @property 

794 def nodes(self): 

795 return self.shrink_target.examples.ir_tree_nodes 

796 

797 @property 

798 def examples(self): 

799 return self.shrink_target.examples 

800 

801 @derived_value # type: ignore 

802 def examples_by_label(self): 

803 """An index of all examples grouped by their label, with 

804 the examples stored in their normal index order.""" 

805 

806 examples_by_label = defaultdict(list) 

807 for ex in self.examples: 

808 examples_by_label[ex.label].append(ex) 

809 return dict(examples_by_label) 

810 

811 @derived_value # type: ignore 

812 def distinct_labels(self): 

813 return sorted(self.examples_by_label, key=str) 

814 

815 @defines_shrink_pass() 

816 def pass_to_descendant(self, chooser): 

817 """Attempt to replace each example with a descendant example. 

818 

819 This is designed to deal with strategies that call themselves 

820 recursively. For example, suppose we had: 

821 

822 binary_tree = st.deferred( 

823 lambda: st.one_of( 

824 st.integers(), st.tuples(binary_tree, binary_tree))) 

825 

826 This pass guarantees that we can replace any binary tree with one of 

827 its subtrees - each of those will create an interval that the parent 

828 could validly be replaced with, and this pass will try doing that. 

829 

830 This is pretty expensive - it takes O(len(intervals)^2) - so we run it 

831 late in the process when we've got the number of intervals as far down 

832 as possible. 

833 """ 

834 

835 label = chooser.choose( 

836 self.distinct_labels, lambda l: len(self.examples_by_label[l]) >= 2 

837 ) 

838 

839 ls = self.examples_by_label[label] 

840 i = chooser.choose(range(len(ls) - 1)) 

841 ancestor = ls[i] 

842 

843 if i + 1 == len(ls) or ls[i + 1].ir_start >= ancestor.ir_end: 

844 return 

845 

846 @self.cached(label, i) 

847 def descendants(): 

848 lo = i + 1 

849 hi = len(ls) 

850 while lo + 1 < hi: 

851 mid = (lo + hi) // 2 

852 if ls[mid].ir_start >= ancestor.ir_end: 

853 hi = mid 

854 else: 

855 lo = mid 

856 return [t for t in ls[i + 1 : hi] if t.ir_length < ancestor.ir_length] 

857 

858 descendant = chooser.choose(descendants, lambda ex: ex.ir_length > 0) 

859 

860 assert ancestor.ir_start <= descendant.ir_start 

861 assert ancestor.ir_end >= descendant.ir_end 

862 assert descendant.ir_length < ancestor.ir_length 

863 

864 self.consider_new_tree( 

865 self.nodes[: ancestor.ir_start] 

866 + self.nodes[descendant.ir_start : descendant.ir_end] 

867 + self.nodes[ancestor.ir_end :] 

868 ) 

869 

870 def lower_common_node_offset(self): 

871 """Sometimes we find ourselves in a situation where changes to one part 

872 of the choice sequence unlock changes to other parts. Sometimes this is 

873 good, but sometimes this can cause us to exhibit exponential slow 

874 downs! 

875 

876 e.g. suppose we had the following: 

877 

878 m = draw(integers(min_value=0)) 

879 n = draw(integers(min_value=0)) 

880 assert abs(m - n) > 1 

881 

882 If this fails then we'll end up with a loop where on each iteration we 

883 reduce each of m and n by 2 - m can't go lower because of n, then n 

884 can't go lower because of m. 

885 

886 This will take us O(m) iterations to complete, which is exponential in 

887 the data size, as we gradually zig zag our way towards zero. 

888 

889 This can only happen if we're failing to reduce the size of the choice 

890 sequence: The number of iterations that reduce the length of the choice 

891 sequence is bounded by that length. 

892 

893 So what we do is this: We keep track of which blocks are changing, and 

894 then if there's some non-zero common offset to them we try and minimize 

895 them all at once by lowering that offset. 

896 

897 This may not work, and it definitely won't get us out of all possible 

898 exponential slow downs (an example of where it doesn't is where the 

899 shape of the blocks changes as a result of this bouncing behaviour), 

900 but it fails fast when it doesn't work and gets us out of a really 

901 nastily slow case when it does. 

902 """ 

903 if len(self.__changed_nodes) <= 1: 

904 return 

905 

906 changed = [] 

907 for i in sorted(self.__changed_nodes): 

908 node = self.nodes[i] 

909 if node.trivial or node.ir_type != "integer": 

910 continue 

911 changed.append(node) 

912 

913 if not changed: 

914 return 

915 

916 ints = [abs(node.value - node.kwargs["shrink_towards"]) for node in changed] 

917 offset = min(ints) 

918 assert offset > 0 

919 

920 for i in range(len(ints)): 

921 ints[i] -= offset 

922 

923 st = self.shrink_target 

924 

925 def offset_node(node, n): 

926 return ( 

927 node.index, 

928 node.index + 1, 

929 [node.copy(with_value=node.kwargs["shrink_towards"] + n)], 

930 ) 

931 

932 def consider(n, sign): 

933 return self.consider_new_tree( 

934 replace_all( 

935 st.examples.ir_tree_nodes, 

936 [ 

937 offset_node(node, sign * (n + v)) 

938 for node, v in zip(changed, ints) 

939 ], 

940 ) 

941 ) 

942 

943 # shrink from both sides 

944 Integer.shrink(offset, lambda n: consider(n, 1)) 

945 Integer.shrink(offset, lambda n: consider(n, -1)) 

946 self.clear_change_tracking() 

947 

948 def clear_change_tracking(self): 

949 self.__last_checked_changed_at = self.shrink_target 

950 self.__all_changed_nodes = set() 

951 

952 def mark_changed(self, i): 

953 self.__changed_nodes.add(i) 

954 

955 @property 

956 def __changed_nodes(self): 

957 if self.__last_checked_changed_at is self.shrink_target: 

958 return self.__all_changed_nodes 

959 

960 prev_target = self.__last_checked_changed_at 

961 new_target = self.shrink_target 

962 assert prev_target is not new_target 

963 prev_nodes = prev_target.examples.ir_tree_nodes 

964 new_nodes = new_target.examples.ir_tree_nodes 

965 assert sort_key(new_target.buffer) < sort_key(prev_target.buffer) 

966 

967 if len(prev_nodes) != len(new_nodes) or any( 

968 n1.ir_type != n2.ir_type for n1, n2 in zip(prev_nodes, new_nodes) 

969 ): 

970 # should we check kwargs are equal as well? 

971 self.__all_changed_nodes = set() 

972 else: 

973 assert len(prev_nodes) == len(new_nodes) 

974 for i, (n1, n2) in enumerate(zip(prev_nodes, new_nodes)): 

975 assert n1.ir_type == n2.ir_type 

976 if not ir_value_equal(n1.ir_type, n1.value, n2.value): 

977 self.__all_changed_nodes.add(i) 

978 

979 return self.__all_changed_nodes 

980 

981 def update_shrink_target(self, new_target): 

982 assert isinstance(new_target, ConjectureResult) 

983 self.shrinks += 1 

984 # If we are just taking a long time to shrink we don't want to 

985 # trigger this heuristic, so whenever we shrink successfully 

986 # we give ourselves a bit of breathing room to make sure we 

987 # would find a shrink that took that long to find the next time. 

988 # The case where we're taking a long time but making steady 

989 # progress is handled by `finish_shrinking_deadline` in engine.py 

990 self.max_stall = max( 

991 self.max_stall, (self.calls - self.calls_at_last_shrink) * 2 

992 ) 

993 self.calls_at_last_shrink = self.calls 

994 self.shrink_target = new_target 

995 self.__derived_values = {} 

996 

997 def try_shrinking_nodes(self, nodes, n): 

998 """Attempts to replace each node in the nodes list with n. Returns 

999 True if it succeeded (which may include some additional modifications 

1000 to shrink_target). 

1001 

1002 In current usage it is expected that each of the nodes currently have 

1003 the same value and ir type, although this is not essential. Note that 

1004 n must be < the node at min(nodes) or this is not a valid shrink. 

1005 

1006 This method will attempt to do some small amount of work to delete data 

1007 that occurs after the end of the nodes. This is useful for cases where 

1008 there is some size dependency on the value of a node. 

1009 """ 

1010 # If the length of the shrink target has changed from under us such that 

1011 # the indices are out of bounds, give up on the replacement. 

1012 # TODO_BETTER_SHRINK: we probably want to narrow down the root cause here at some point. 

1013 if any(node.index >= len(self.nodes) for node in nodes): 

1014 return # pragma: no cover 

1015 

1016 initial_attempt = replace_all( 

1017 self.nodes, 

1018 [(node.index, node.index + 1, [node.copy(with_value=n)]) for node in nodes], 

1019 ) 

1020 

1021 attempt = self.cached_test_function_ir(initial_attempt) 

1022 

1023 if attempt is None: 

1024 return False 

1025 

1026 if attempt is self.shrink_target: 

1027 # if the initial shrink was a success, try lowering offsets. 

1028 self.lower_common_node_offset() 

1029 return True 

1030 

1031 # If this produced something completely invalid we ditch it 

1032 # here rather than trying to persevere. 

1033 if attempt.status is Status.OVERRUN: 

1034 return False 

1035 

1036 if attempt.status is Status.INVALID and attempt.invalid_at is None: 

1037 return False 

1038 

1039 if attempt.status is Status.INVALID and attempt.invalid_at is not None: 

1040 # we're invalid due to a misalignment in the tree. We'll try to fix 

1041 # a very specific type of misalignment here: where we have a node of 

1042 # {"size": n} and tried to draw the same node, but with {"size": m < n}. 

1043 # This can occur with eg 

1044 # 

1045 # n = data.draw_integer() 

1046 # s = data.draw_string(min_size=n) 

1047 # 

1048 # where we try lowering n, resulting in the test_function drawing a lower 

1049 # min_size than our attempt had for the draw_string node. 

1050 # 

1051 # We'll now try realigning this tree by: 

1052 # * replacing the kwargs in our attempt with what test_function tried 

1053 # to draw in practice 

1054 # * truncating the value of that node to match min_size 

1055 # 

1056 # This helps in the specific case of drawing a value and then drawing 

1057 # a collection of that size...and not much else. In practice this 

1058 # helps because this antipattern is fairly common. 

1059 

1060 # TODO we'll probably want to apply the same trick as in the valid 

1061 # case of this function of preserving from the right instead of 

1062 # preserving from the left. see test_can_shrink_variable_string_draws. 

1063 

1064 node = self.nodes[len(attempt.examples.ir_tree_nodes)] 

1065 (attempt_ir_type, attempt_kwargs, _attempt_forced) = attempt.invalid_at 

1066 if node.ir_type != attempt_ir_type: 

1067 return False 

1068 if node.was_forced: 

1069 return False # pragma: no cover 

1070 

1071 if node.ir_type == "string": 

1072 # if the size *increased*, we would have to guess what to pad with 

1073 # in order to try fixing up this attempt. Just give up. 

1074 if node.kwargs["min_size"] <= attempt_kwargs["min_size"]: 

1075 return False 

1076 # the size decreased in our attempt. Try again, but replace with 

1077 # the min_size that we would have gotten, and truncate the value 

1078 # to that size by removing any elements past min_size. 

1079 return self.consider_new_tree( 

1080 initial_attempt[: node.index] 

1081 + [ 

1082 initial_attempt[node.index].copy( 

1083 with_kwargs=attempt_kwargs, 

1084 with_value=initial_attempt[node.index].value[ 

1085 : attempt_kwargs["min_size"] 

1086 ], 

1087 ) 

1088 ] 

1089 + initial_attempt[node.index :] 

1090 ) 

1091 if node.ir_type == "bytes": 

1092 if node.kwargs["size"] <= attempt_kwargs["size"]: 

1093 return False 

1094 return self.consider_new_tree( 

1095 initial_attempt[: node.index] 

1096 + [ 

1097 initial_attempt[node.index].copy( 

1098 with_kwargs=attempt_kwargs, 

1099 with_value=initial_attempt[node.index].value[ 

1100 : attempt_kwargs["size"] 

1101 ], 

1102 ) 

1103 ] 

1104 + initial_attempt[node.index :] 

1105 ) 

1106 

1107 lost_nodes = len(self.nodes) - len(attempt.examples.ir_tree_nodes) 

1108 if lost_nodes <= 0: 

1109 return False 

1110 

1111 start = nodes[0].index 

1112 end = nodes[-1].index + 1 

1113 # We now look for contiguous regions to delete that might help fix up 

1114 # this failed shrink. We only look for contiguous regions of the right 

1115 # lengths because doing anything more than that starts to get very 

1116 # expensive. See minimize_individual_blocks for where we 

1117 # try to be more aggressive. 

1118 regions_to_delete = {(end, end + lost_nodes)} 

1119 

1120 for ex in self.examples: 

1121 if ex.ir_start > start: 

1122 continue 

1123 if ex.ir_end <= end: 

1124 continue 

1125 

1126 if ex.index >= len(attempt.examples): 

1127 continue # pragma: no cover 

1128 

1129 replacement = attempt.examples[ex.index] 

1130 in_original = [c for c in ex.children if c.ir_start >= end] 

1131 in_replaced = [c for c in replacement.children if c.ir_start >= end] 

1132 

1133 if len(in_replaced) >= len(in_original) or not in_replaced: 

1134 continue 

1135 

1136 # We've found an example where some of the children went missing 

1137 # as a result of this change, and just replacing it with the data 

1138 # it would have had and removing the spillover didn't work. This 

1139 # means that some of its children towards the right must be 

1140 # important, so we try to arrange it so that it retains its 

1141 # rightmost children instead of its leftmost. 

1142 regions_to_delete.add( 

1143 (in_original[0].ir_start, in_original[-len(in_replaced)].ir_start) 

1144 ) 

1145 

1146 for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True): 

1147 try_with_deleted = initial_attempt[:u] + initial_attempt[v:] 

1148 if self.consider_new_tree(try_with_deleted): 

1149 return True 

1150 

1151 return False 

1152 

1153 def remove_discarded(self): 

1154 """Try removing all bytes marked as discarded. 

1155 

1156 This is primarily to deal with data that has been ignored while 

1157 doing rejection sampling - e.g. as a result of an integer range, or a 

1158 filtered strategy. 

1159 

1160 Such data will also be handled by the adaptive_example_deletion pass, 

1161 but that pass is necessarily more conservative and will try deleting 

1162 each interval individually. The common case is that all data drawn and 

1163 rejected can just be thrown away immediately in one block, so this pass 

1164 will be much faster than trying each one individually when it works. 

1165 

1166 returns False if there is discarded data and removing it does not work, 

1167 otherwise returns True. 

1168 """ 

1169 while self.shrink_target.has_discards: 

1170 discarded = [] 

1171 

1172 for ex in self.shrink_target.examples: 

1173 if ( 

1174 ex.length > 0 

1175 and ex.discarded 

1176 and (not discarded or ex.start >= discarded[-1][-1]) 

1177 ): 

1178 discarded.append((ex.start, ex.end)) 

1179 

1180 # This can happen if we have discards but they are all of 

1181 # zero length. This shouldn't happen very often so it's 

1182 # faster to check for it here than at the point of example 

1183 # generation. 

1184 if not discarded: 

1185 break 

1186 

1187 attempt = bytearray(self.shrink_target.buffer) 

1188 for u, v in reversed(discarded): 

1189 del attempt[u:v] 

1190 

1191 if not self.incorporate_new_buffer(attempt): 

1192 return False 

1193 return True 

1194 

1195 @derived_value # type: ignore 

1196 def duplicated_nodes(self): 

1197 """Returns a list of nodes grouped (ir_type, value).""" 

1198 duplicates = defaultdict(list) 

1199 for node in self.nodes: 

1200 duplicates[(node.ir_type, ir_value_key(node.ir_type, node.value))].append( 

1201 node 

1202 ) 

1203 return list(duplicates.values()) 

1204 

1205 @defines_shrink_pass() 

1206 def minimize_duplicated_nodes(self, chooser): 

1207 """Find blocks that have been duplicated in multiple places and attempt 

1208 to minimize all of the duplicates simultaneously. 

1209 

1210 This lets us handle cases where two values can't be shrunk 

1211 independently of each other but can easily be shrunk together. 

1212 For example if we had something like: 

1213 

1214 ls = data.draw(lists(integers())) 

1215 y = data.draw(integers()) 

1216 assert y not in ls 

1217 

1218 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were 

1219 to replace both 3s with 0, this would be a valid shrink, but if we were 

1220 to replace either 3 with 0 on its own the test would start passing. 

1221 

1222 It is also useful for when that duplication is accidental and the value 

1223 of the blocks doesn't matter very much because it allows us to replace 

1224 more values at once. 

1225 """ 

1226 nodes = chooser.choose(self.duplicated_nodes) 

1227 if len(nodes) <= 1: 

1228 return 

1229 

1230 # no point in lowering nodes together if one is already trivial. 

1231 # TODO_BETTER_SHRINK: we could potentially just drop the trivial nodes 

1232 # here and carry on with nontrivial ones? 

1233 if any(node.trivial for node in nodes): 

1234 return 

1235 

1236 self.minimize_nodes(nodes) 

1237 

1238 @defines_shrink_pass() 

1239 def redistribute_block_pairs(self, chooser): 

1240 """If there is a sum of generated integers that we need their sum 

1241 to exceed some bound, lowering one of them requires raising the 

1242 other. This pass enables that.""" 

1243 

1244 node = chooser.choose( 

1245 self.nodes, lambda node: node.ir_type == "integer" and not node.trivial 

1246 ) 

1247 

1248 # The preconditions for this pass are that the two integer draws are only 

1249 # separated by non-integer nodes, and have the same size value in bytes. 

1250 # 

1251 # This isn't particularly principled. For instance, this wouldn't reduce 

1252 # e.g. @given(integers(), integers(), integers()) where the sum property 

1253 # involves the first and last integers. 

1254 # 

1255 # A better approach may be choosing *two* such integer nodes arbitrarily 

1256 # from the list, instead of conditionally scanning forward. 

1257 

1258 for j in range(node.index + 1, len(self.nodes)): 

1259 next_node = self.nodes[j] 

1260 if next_node.ir_type == "integer" and bits_to_bytes( 

1261 node.value.bit_length() 

1262 ) == bits_to_bytes(next_node.value.bit_length()): 

1263 break 

1264 else: 

1265 return 

1266 

1267 if next_node.was_forced: 

1268 # avoid modifying a forced node. Note that it's fine for next_node 

1269 # to be trivial, because we're going to explicitly make it *not* 

1270 # trivial by adding to its value. 

1271 return 

1272 

1273 m = node.value 

1274 n = next_node.value 

1275 

1276 def boost(k): 

1277 if k > m: 

1278 return False 

1279 

1280 node_value = m - k 

1281 next_node_value = n + k 

1282 

1283 return self.consider_new_tree( 

1284 self.nodes[: node.index] 

1285 + [node.copy(with_value=node_value)] 

1286 + self.nodes[node.index + 1 : next_node.index] 

1287 + [next_node.copy(with_value=next_node_value)] 

1288 + self.nodes[next_node.index + 1 :] 

1289 ) 

1290 

1291 find_integer(boost) 

1292 

1293 @defines_shrink_pass() 

1294 def lower_blocks_together(self, chooser): 

1295 block = chooser.choose(self.blocks, lambda b: not b.trivial) 

1296 

1297 # Choose the next block to be up to eight blocks onwards. We don't 

1298 # want to go too far (to avoid quadratic time) but it's worth a 

1299 # reasonable amount of lookahead, especially as we expect most 

1300 # blocks are zero by this point anyway. 

1301 next_block = self.blocks[ 

1302 chooser.choose( 

1303 range(block.index + 1, min(len(self.blocks), block.index + 9)), 

1304 lambda j: not self.blocks[j].trivial, 

1305 ) 

1306 ] 

1307 

1308 buffer = self.buffer 

1309 

1310 m = int_from_bytes(buffer[block.start : block.end]) 

1311 n = int_from_bytes(buffer[next_block.start : next_block.end]) 

1312 

1313 def lower(k): 

1314 if k > min(m, n): 

1315 return False 

1316 attempt = bytearray(buffer) 

1317 attempt[block.start : block.end] = int_to_bytes(m - k, block.length) 

1318 attempt[next_block.start : next_block.end] = int_to_bytes( 

1319 n - k, next_block.length 

1320 ) 

1321 assert len(attempt) == len(buffer) 

1322 return self.consider_new_buffer(attempt) 

1323 

1324 find_integer(lower) 

1325 

1326 def minimize_nodes(self, nodes): 

1327 ir_type = nodes[0].ir_type 

1328 value = nodes[0].value 

1329 # unlike ir_type and value, kwargs are *not* guaranteed to be equal among all 

1330 # passed nodes. We arbitrarily use the kwargs of the first node. I think 

1331 # this is unsound (= leads to us trying shrinks that could not have been 

1332 # generated), but those get discarded at test-time, and this enables useful 

1333 # slips where kwargs are not equal but are close enough that doing the 

1334 # same operation on both basically just works. 

1335 kwargs = nodes[0].kwargs 

1336 assert all( 

1337 node.ir_type == ir_type and ir_value_equal(ir_type, node.value, value) 

1338 for node in nodes 

1339 ) 

1340 

1341 if ir_type == "integer": 

1342 shrink_towards = kwargs["shrink_towards"] 

1343 # try shrinking from both sides towards shrink_towards. 

1344 # we're starting from n = abs(shrink_towards - value). Because the 

1345 # shrinker will not check its starting value, we need to try 

1346 # shrinking to n first. 

1347 self.try_shrinking_nodes(nodes, abs(shrink_towards - value)) 

1348 Integer.shrink( 

1349 abs(shrink_towards - value), 

1350 lambda n: self.try_shrinking_nodes(nodes, shrink_towards + n), 

1351 ) 

1352 Integer.shrink( 

1353 abs(shrink_towards - value), 

1354 lambda n: self.try_shrinking_nodes(nodes, shrink_towards - n), 

1355 ) 

1356 elif ir_type == "float": 

1357 self.try_shrinking_nodes(nodes, abs(value)) 

1358 Float.shrink( 

1359 abs(value), 

1360 lambda val: self.try_shrinking_nodes(nodes, val), 

1361 ) 

1362 Float.shrink( 

1363 abs(value), 

1364 lambda val: self.try_shrinking_nodes(nodes, -val), 

1365 ) 

1366 elif ir_type == "boolean": 

1367 # must be True, otherwise would be trivial and not selected. 

1368 assert value is True 

1369 # only one thing to try: false! 

1370 self.try_shrinking_nodes(nodes, False) 

1371 elif ir_type == "bytes": 

1372 Bytes.shrink( 

1373 value, 

1374 lambda val: self.try_shrinking_nodes(nodes, val), 

1375 ) 

1376 elif ir_type == "string": 

1377 String.shrink( 

1378 value, 

1379 lambda val: self.try_shrinking_nodes(nodes, val), 

1380 intervals=kwargs["intervals"], 

1381 ) 

1382 else: 

1383 raise NotImplementedError 

1384 

1385 @defines_shrink_pass() 

1386 def minimize_individual_nodes(self, chooser): 

1387 """Attempt to minimize each node in sequence. 

1388 

1389 This is the pass that ensures that e.g. each integer we draw is a 

1390 minimum value. So it's the part that guarantees that if we e.g. do 

1391 

1392 x = data.draw(integers()) 

1393 assert x < 10 

1394 

1395 then in our shrunk example, x = 10 rather than say 97. 

1396 

1397 If we are unsuccessful at minimizing a node of interest we then 

1398 check if that's because it's changing the size of the test case and, 

1399 if so, we also make an attempt to delete parts of the test case to 

1400 see if that fixes it. 

1401 

1402 We handle most of the common cases in try_shrinking_nodes which is 

1403 pretty good at clearing out large contiguous blocks of dead space, 

1404 but it fails when there is data that has to stay in particular places 

1405 in the list. 

1406 """ 

1407 node = chooser.choose(self.nodes, lambda node: not node.trivial) 

1408 initial_target = self.shrink_target 

1409 

1410 self.minimize_nodes([node]) 

1411 if self.shrink_target is not initial_target: 

1412 # the shrink target changed, so our shrink worked. Defer doing 

1413 # anything more intelligent until this shrink fails. 

1414 return 

1415 

1416 # the shrink failed. One particularly common case where minimizing a 

1417 # node can fail is the antipattern of drawing a size and then drawing a 

1418 # collection of that size, or more generally when there is a size 

1419 # dependency on some single node. We'll explicitly try and fix up this 

1420 # common case here: if decreasing an integer node by one would reduce 

1421 # the size of the generated input, we'll try deleting things after that 

1422 # node and see if the resulting attempt works. 

1423 

1424 if node.ir_type != "integer": 

1425 # Only try this fixup logic on integer draws. Almost all size 

1426 # dependencies are on integer draws, and if it's not, it's doing 

1427 # something convoluted enough that it is unlikely to shrink well anyway. 

1428 # TODO: extent to floats? we probably currently fail on the following, 

1429 # albeit convoluted example: 

1430 # n = int(data.draw(st.floats())) 

1431 # s = data.draw(st.lists(st.integers(), min_size=n, max_size=n)) 

1432 return 

1433 

1434 lowered = ( 

1435 self.nodes[: node.index] 

1436 + [node.copy(with_value=node.value - 1)] 

1437 + self.nodes[node.index + 1 :] 

1438 ) 

1439 attempt = self.cached_test_function_ir(lowered) 

1440 if ( 

1441 attempt is None 

1442 or attempt.status < Status.VALID 

1443 or len(attempt.examples.ir_tree_nodes) == len(self.nodes) 

1444 or len(attempt.examples.ir_tree_nodes) == node.index + 1 

1445 ): 

1446 # no point in trying our size-dependency-logic if our attempt at 

1447 # lowering the node resulted in: 

1448 # * an invalid conjecture data 

1449 # * the same number of nodes as before 

1450 # * no nodes beyond the lowered node (nothing to try to delete afterwards) 

1451 return 

1452 

1453 # If it were then the original shrink should have worked and we could 

1454 # never have got here. 

1455 assert attempt is not self.shrink_target 

1456 

1457 @self.cached(node.index) 

1458 def first_example_after_node(): 

1459 lo = 0 

1460 hi = len(self.examples) 

1461 while lo + 1 < hi: 

1462 mid = (lo + hi) // 2 

1463 ex = self.examples[mid] 

1464 if ex.ir_start >= node.index: 

1465 hi = mid 

1466 else: 

1467 lo = mid 

1468 return hi 

1469 

1470 # we try deleting both entire examples, and single nodes. 

1471 # If we wanted to get more aggressive, we could try deleting n 

1472 # consecutive nodes (that don't cross an example boundary) for say 

1473 # n <= 2 or n <= 3. 

1474 if chooser.choose([True, False]): 

1475 ex = self.examples[ 

1476 chooser.choose( 

1477 range(first_example_after_node, len(self.examples)), 

1478 lambda i: self.examples[i].ir_length > 0, 

1479 ) 

1480 ] 

1481 self.consider_new_tree(lowered[: ex.ir_start] + lowered[ex.ir_end :]) 

1482 else: 

1483 node = self.nodes[chooser.choose(range(node.index + 1, len(self.nodes)))] 

1484 self.consider_new_tree(lowered[: node.index] + lowered[node.index + 1 :]) 

1485 

1486 @defines_shrink_pass() 

1487 def reorder_examples(self, chooser): 

1488 """This pass allows us to reorder the children of each example. 

1489 

1490 For example, consider the following: 

1491 

1492 .. code-block:: python 

1493 

1494 import hypothesis.strategies as st 

1495 from hypothesis import given 

1496 

1497 

1498 @given(st.text(), st.text()) 

1499 def test_not_equal(x, y): 

1500 assert x != y 

1501 

1502 Without the ability to reorder x and y this could fail either with 

1503 ``x=""``, ``y="0"``, or the other way around. With reordering it will 

1504 reliably fail with ``x=""``, ``y="0"``. 

1505 """ 

1506 ex = chooser.choose(self.examples) 

1507 label = chooser.choose(ex.children).label 

1508 

1509 examples = [c for c in ex.children if c.label == label] 

1510 if len(examples) <= 1: 

1511 return 

1512 st = self.shrink_target 

1513 endpoints = [(ex.ir_start, ex.ir_end) for ex in examples] 

1514 

1515 Ordering.shrink( 

1516 range(len(examples)), 

1517 lambda indices: self.consider_new_tree( 

1518 replace_all( 

1519 st.examples.ir_nodes, 

1520 [ 

1521 ( 

1522 u, 

1523 v, 

1524 st.examples.ir_nodes[ 

1525 examples[i].ir_start : examples[i].ir_end 

1526 ], 

1527 ) 

1528 for (u, v), i in zip(endpoints, indices) 

1529 ], 

1530 ) 

1531 ), 

1532 key=lambda i: st.buffer[examples[i].start : examples[i].end], 

1533 ) 

1534 

1535 def run_node_program(self, i, description, original, repeats=1): 

1536 """Node programs are a mini-DSL for node rewriting, defined as a sequence 

1537 of commands that can be run at some index into the nodes 

1538 

1539 Commands are: 

1540 

1541 * "X", delete this node 

1542 

1543 This method runs the node program in ``description`` at node index 

1544 ``i`` on the ConjectureData ``original``. If ``repeats > 1`` then it 

1545 will attempt to approximate the results of running it that many times. 

1546 

1547 Returns True if this successfully changes the underlying shrink target, 

1548 else False. 

1549 """ 

1550 if i + len(description) > len(original.examples.ir_tree_nodes) or i < 0: 

1551 return False 

1552 attempt = list(original.examples.ir_tree_nodes) 

1553 for _ in range(repeats): 

1554 for k, command in reversed(list(enumerate(description))): 

1555 j = i + k 

1556 if j >= len(attempt): 

1557 return False 

1558 

1559 if command == "X": 

1560 del attempt[j] 

1561 else: 

1562 raise NotImplementedError(f"Unrecognised command {command!r}") 

1563 

1564 return self.consider_new_tree(attempt) 

1565 

1566 

1567def shrink_pass_family(f): 

1568 def accept(*args): 

1569 name = "{}({})".format(f.__name__, ", ".join(map(repr, args))) 

1570 if name not in SHRINK_PASS_DEFINITIONS: 

1571 

1572 def run(self, chooser): 

1573 return f(self, chooser, *args) 

1574 

1575 run.__name__ = name 

1576 defines_shrink_pass()(run) 

1577 assert name in SHRINK_PASS_DEFINITIONS 

1578 return name 

1579 

1580 return accept 

1581 

1582 

1583@shrink_pass_family 

1584def node_program(self, chooser, description): 

1585 n = len(description) 

1586 # Adaptively attempt to run the node program at the current 

1587 # index. If this successfully applies the node program ``k`` times 

1588 # then this runs in ``O(log(k))`` test function calls. 

1589 i = chooser.choose(range(len(self.nodes) - n + 1)) 

1590 

1591 # First, run the node program at the chosen index. If this fails, 

1592 # don't do any extra work, so that failure is as cheap as possible. 

1593 if not self.run_node_program(i, description, original=self.shrink_target): 

1594 return 

1595 

1596 # Because we run in a random order we will often find ourselves in the middle 

1597 # of a region where we could run the node program. We thus start by moving 

1598 # left to the beginning of that region if possible in order to to start from 

1599 # the beginning of that region. 

1600 def offset_left(k): 

1601 return i - k * n 

1602 

1603 i = offset_left( 

1604 find_integer( 

1605 lambda k: self.run_node_program( 

1606 offset_left(k), description, original=self.shrink_target 

1607 ) 

1608 ) 

1609 ) 

1610 

1611 original = self.shrink_target 

1612 # Now try to run the block program multiple times here. 

1613 find_integer( 

1614 lambda k: self.run_node_program(i, description, original=original, repeats=k) 

1615 ) 

1616 

1617 

1618@attr.s(slots=True, eq=False) 

1619class ShrinkPass: 

1620 run_with_chooser = attr.ib() 

1621 index = attr.ib() 

1622 shrinker = attr.ib() 

1623 

1624 last_prefix = attr.ib(default=()) 

1625 successes = attr.ib(default=0) 

1626 calls = attr.ib(default=0) 

1627 misaligned = attr.ib(default=0) 

1628 shrinks = attr.ib(default=0) 

1629 deletions = attr.ib(default=0) 

1630 

1631 def step(self, *, random_order=False): 

1632 tree = self.shrinker.shrink_pass_choice_trees[self] 

1633 if tree.exhausted: 

1634 return False 

1635 

1636 initial_shrinks = self.shrinker.shrinks 

1637 initial_calls = self.shrinker.calls 

1638 initial_misaligned = self.shrinker.misaligned 

1639 size = len(self.shrinker.shrink_target.buffer) 

1640 self.shrinker.engine.explain_next_call_as(self.name) 

1641 

1642 if random_order: 

1643 selection_order = random_selection_order(self.shrinker.random) 

1644 else: 

1645 selection_order = prefix_selection_order(self.last_prefix) 

1646 

1647 try: 

1648 self.last_prefix = tree.step( 

1649 selection_order, 

1650 lambda chooser: self.run_with_chooser(self.shrinker, chooser), 

1651 ) 

1652 finally: 

1653 self.calls += self.shrinker.calls - initial_calls 

1654 self.misaligned += self.shrinker.misaligned - initial_misaligned 

1655 self.shrinks += self.shrinker.shrinks - initial_shrinks 

1656 self.deletions += size - len(self.shrinker.shrink_target.buffer) 

1657 self.shrinker.engine.clear_call_explanation() 

1658 return True 

1659 

1660 @property 

1661 def name(self) -> str: 

1662 return self.run_with_chooser.__name__ 

1663 

1664 

1665class StopShrinking(Exception): 

1666 pass