Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/conjecture/utils.py: 66%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

203 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import enum 

12import hashlib 

13import heapq 

14import math 

15import sys 

16from collections import OrderedDict, abc 

17from collections.abc import Sequence 

18from functools import lru_cache 

19from typing import TYPE_CHECKING, Optional, TypeVar, Union 

20 

21from hypothesis.errors import InvalidArgument 

22from hypothesis.internal.compat import int_from_bytes 

23from hypothesis.internal.floats import next_up 

24 

25if TYPE_CHECKING: 

26 from hypothesis.internal.conjecture.data import ConjectureData 

27 

28 

29LABEL_MASK = 2**64 - 1 

30 

31 

32def calc_label_from_name(name: str) -> int: 

33 hashed = hashlib.sha384(name.encode()).digest() 

34 return int_from_bytes(hashed[:8]) 

35 

36 

37def calc_label_from_cls(cls: type) -> int: 

38 return calc_label_from_name(cls.__qualname__) 

39 

40 

41def calc_label_from_hash(obj: object) -> int: 

42 return calc_label_from_name(str(hash(obj))) 

43 

44 

45def combine_labels(*labels: int) -> int: 

46 label = 0 

47 for l in labels: 

48 label = (label << 1) & LABEL_MASK 

49 label ^= l 

50 return label 

51 

52 

53SAMPLE_IN_SAMPLER_LABEL = calc_label_from_name("a sample() in Sampler") 

54ONE_FROM_MANY_LABEL = calc_label_from_name("one more from many()") 

55 

56 

57T = TypeVar("T") 

58 

59 

60def identity(v: T) -> T: 

61 return v 

62 

63 

64def check_sample( 

65 values: Union[type[enum.Enum], Sequence[T]], strategy_name: str 

66) -> Sequence[T]: 

67 if "numpy" in sys.modules and isinstance(values, sys.modules["numpy"].ndarray): 

68 if values.ndim != 1: 

69 raise InvalidArgument( 

70 "Only one-dimensional arrays are supported for sampling, " 

71 f"and the given value has {values.ndim} dimensions (shape " 

72 f"{values.shape}). This array would give samples of array slices " 

73 "instead of elements! Use np.ravel(values) to convert " 

74 "to a one-dimensional array, or tuple(values) if you " 

75 "want to sample slices." 

76 ) 

77 elif not isinstance(values, (OrderedDict, abc.Sequence, enum.EnumMeta)): 

78 raise InvalidArgument( 

79 f"Cannot sample from {values!r} because it is not an ordered collection. " 

80 f"Hypothesis goes to some length to ensure that the {strategy_name} " 

81 "strategy has stable results between runs. To replay a saved " 

82 "example, the sampled values must have the same iteration order " 

83 "on every run - ruling out sets, dicts, etc due to hash " 

84 "randomization. Most cases can simply use `sorted(values)`, but " 

85 "mixed types or special values such as math.nan require careful " 

86 "handling - and note that when simplifying an example, " 

87 "Hypothesis treats earlier values as simpler." 

88 ) 

89 if isinstance(values, range): 

90 # Pyright is unhappy with every way I've tried to type-annotate this 

91 # function, so fine, we'll just ignore the analysis error. 

92 return values # type: ignore 

93 return tuple(values) 

94 

95 

96@lru_cache(64) 

97def compute_sampler_table(weights: tuple[float, ...]) -> list[tuple[int, int, float]]: 

98 n = len(weights) 

99 table: list[list[int | float | None]] = [[i, None, None] for i in range(n)] 

100 total = sum(weights) 

101 num_type = type(total) 

102 

103 zero = num_type(0) # type: ignore 

104 one = num_type(1) # type: ignore 

105 

106 small: list[int] = [] 

107 large: list[int] = [] 

108 

109 probabilities = [w / total for w in weights] 

110 scaled_probabilities: list[float] = [] 

111 

112 for i, alternate_chance in enumerate(probabilities): 

113 scaled = alternate_chance * n 

114 scaled_probabilities.append(scaled) 

115 if scaled == 1: 

116 table[i][2] = zero 

117 elif scaled < 1: 

118 small.append(i) 

119 else: 

120 large.append(i) 

121 heapq.heapify(small) 

122 heapq.heapify(large) 

123 

124 while small and large: 

125 lo = heapq.heappop(small) 

126 hi = heapq.heappop(large) 

127 

128 assert lo != hi 

129 assert scaled_probabilities[hi] > one 

130 assert table[lo][1] is None 

131 table[lo][1] = hi 

132 table[lo][2] = one - scaled_probabilities[lo] 

133 scaled_probabilities[hi] = ( 

134 scaled_probabilities[hi] + scaled_probabilities[lo] 

135 ) - one 

136 

137 if scaled_probabilities[hi] < 1: 

138 heapq.heappush(small, hi) 

139 elif scaled_probabilities[hi] == 1: 

140 table[hi][2] = zero 

141 else: 

142 heapq.heappush(large, hi) 

143 while large: 

144 table[large.pop()][2] = zero 

145 while small: 

146 table[small.pop()][2] = zero 

147 

148 new_table: list[tuple[int, int, float]] = [] 

149 for base, alternate, alternate_chance in table: 

150 assert isinstance(base, int) 

151 assert isinstance(alternate, int) or alternate is None 

152 assert alternate_chance is not None 

153 if alternate is None: 

154 new_table.append((base, base, alternate_chance)) 

155 elif alternate < base: 

156 new_table.append((alternate, base, one - alternate_chance)) 

157 else: 

158 new_table.append((base, alternate, alternate_chance)) 

159 new_table.sort() 

160 return new_table 

161 

162 

163class Sampler: 

164 """Sampler based on Vose's algorithm for the alias method. See 

165 http://www.keithschwarz.com/darts-dice-coins/ for a good explanation. 

166 

167 The general idea is that we store a table of triples (base, alternate, p). 

168 base. We then pick a triple uniformly at random, and choose its alternate 

169 value with probability p and else choose its base value. The triples are 

170 chosen so that the resulting mixture has the right distribution. 

171 

172 We maintain the following invariants to try to produce good shrinks: 

173 

174 1. The table is in lexicographic (base, alternate) order, so that choosing 

175 an earlier value in the list always lowers (or at least leaves 

176 unchanged) the value. 

177 2. base[i] < alternate[i], so that shrinking the draw always results in 

178 shrinking the chosen element. 

179 """ 

180 

181 table: list[tuple[int, int, float]] # (base_idx, alt_idx, alt_chance) 

182 

183 def __init__(self, weights: Sequence[float], *, observe: bool = True): 

184 self.observe = observe 

185 self.table = compute_sampler_table(tuple(weights)) 

186 

187 def sample( 

188 self, 

189 data: "ConjectureData", 

190 *, 

191 forced: Optional[int] = None, 

192 ) -> int: 

193 if self.observe: 

194 data.start_span(SAMPLE_IN_SAMPLER_LABEL) 

195 forced_choice = ( # pragma: no branch # https://github.com/nedbat/coveragepy/issues/1617 

196 None 

197 if forced is None 

198 else next( 

199 (base, alternate, alternate_chance) 

200 for (base, alternate, alternate_chance) in self.table 

201 if forced == base or (forced == alternate and alternate_chance > 0) 

202 ) 

203 ) 

204 base, alternate, alternate_chance = data.choice( 

205 self.table, 

206 forced=forced_choice, 

207 observe=self.observe, 

208 ) 

209 forced_use_alternate = None 

210 if forced is not None: 

211 # we maintain this invariant when picking forced_choice above. 

212 # This song and dance about alternate_chance > 0 is to avoid forcing 

213 # e.g. draw_boolean(p=0, forced=True), which is an error. 

214 forced_use_alternate = forced == alternate and alternate_chance > 0 

215 assert forced == base or forced_use_alternate 

216 

217 use_alternate = data.draw_boolean( 

218 alternate_chance, 

219 forced=forced_use_alternate, 

220 observe=self.observe, 

221 ) 

222 if self.observe: 

223 data.stop_span() 

224 if use_alternate: 

225 assert forced is None or alternate == forced, (forced, alternate) 

226 return alternate 

227 else: 

228 assert forced is None or base == forced, (forced, base) 

229 return base 

230 

231 

232INT_SIZES = (8, 16, 32, 64, 128) 

233INT_SIZES_SAMPLER = Sampler((4.0, 8.0, 1.0, 1.0, 0.5), observe=False) 

234 

235 

236class many: 

237 """Utility class for collections. Bundles up the logic we use for "should I 

238 keep drawing more values?" and handles starting and stopping examples in 

239 the right place. 

240 

241 Intended usage is something like: 

242 

243 elements = many(data, ...) 

244 while elements.more(): 

245 add_stuff_to_result() 

246 """ 

247 

248 def __init__( 

249 self, 

250 data: "ConjectureData", 

251 min_size: int, 

252 max_size: Union[int, float], 

253 average_size: Union[int, float], 

254 *, 

255 forced: Optional[int] = None, 

256 observe: bool = True, 

257 ) -> None: 

258 assert 0 <= min_size <= average_size <= max_size 

259 assert forced is None or min_size <= forced <= max_size 

260 self.min_size = min_size 

261 self.max_size = max_size 

262 self.data = data 

263 self.forced_size = forced 

264 self.p_continue = _calc_p_continue(average_size - min_size, max_size - min_size) 

265 self.count = 0 

266 self.rejections = 0 

267 self.drawn = False 

268 self.force_stop = False 

269 self.rejected = False 

270 self.observe = observe 

271 

272 def stop_span(self): 

273 if self.observe: 

274 self.data.stop_span() 

275 

276 def start_span(self, label): 

277 if self.observe: 

278 self.data.start_span(label) 

279 

280 def more(self) -> bool: 

281 """Should I draw another element to add to the collection?""" 

282 if self.drawn: 

283 self.stop_span() 

284 

285 self.drawn = True 

286 self.rejected = False 

287 

288 self.start_span(ONE_FROM_MANY_LABEL) 

289 if self.min_size == self.max_size: 

290 # if we have to hit an exact size, draw unconditionally until that 

291 # point, and no further. 

292 should_continue = self.count < self.min_size 

293 else: 

294 forced_result = None 

295 if self.force_stop: 

296 # if our size is forced, we can't reject in a way that would 

297 # cause us to differ from the forced size. 

298 assert self.forced_size is None or self.count == self.forced_size 

299 forced_result = False 

300 elif self.count < self.min_size: 

301 forced_result = True 

302 elif self.count >= self.max_size: 

303 forced_result = False 

304 elif self.forced_size is not None: 

305 forced_result = self.count < self.forced_size 

306 should_continue = self.data.draw_boolean( 

307 self.p_continue, 

308 forced=forced_result, 

309 observe=self.observe, 

310 ) 

311 

312 if should_continue: 

313 self.count += 1 

314 return True 

315 else: 

316 self.stop_span() 

317 return False 

318 

319 def reject(self, why: Optional[str] = None) -> None: 

320 """Reject the last example (i.e. don't count it towards our budget of 

321 elements because it's not going to go in the final collection).""" 

322 assert self.count > 0 

323 self.count -= 1 

324 self.rejections += 1 

325 self.rejected = True 

326 # We set a minimum number of rejections before we give up to avoid 

327 # failing too fast when we reject the first draw. 

328 if self.rejections > max(3, 2 * self.count): 

329 if self.count < self.min_size: 

330 self.data.mark_invalid(why) 

331 else: 

332 self.force_stop = True 

333 

334 

335SMALLEST_POSITIVE_FLOAT: float = next_up(0.0) or sys.float_info.min 

336 

337 

338@lru_cache 

339def _calc_p_continue(desired_avg: float, max_size: Union[int, float]) -> float: 

340 """Return the p_continue which will generate the desired average size.""" 

341 assert desired_avg <= max_size, (desired_avg, max_size) 

342 if desired_avg == max_size: 

343 return 1.0 

344 p_continue = 1 - 1.0 / (1 + desired_avg) 

345 if p_continue == 0 or max_size == math.inf: 

346 assert 0 <= p_continue < 1, p_continue 

347 return p_continue 

348 assert 0 < p_continue < 1, p_continue 

349 # For small max_size, the infinite-series p_continue is a poor approximation, 

350 # and while we can't solve the polynomial a few rounds of iteration quickly 

351 # gets us a good approximate solution in almost all cases (sometimes exact!). 

352 while _p_continue_to_avg(p_continue, max_size) > desired_avg: 

353 # This is impossible over the reals, but *can* happen with floats. 

354 p_continue -= 0.0001 

355 # If we've reached zero or gone negative, we want to break out of this loop, 

356 # and do so even if we're on a system with the unsafe denormals-are-zero flag. 

357 # We make that an explicit error in st.floats(), but here we'd prefer to 

358 # just get somewhat worse precision on collection lengths. 

359 if p_continue < SMALLEST_POSITIVE_FLOAT: 

360 p_continue = SMALLEST_POSITIVE_FLOAT 

361 break 

362 # Let's binary-search our way to a better estimate! We tried fancier options 

363 # like gradient descent, but this is numerically stable and works better. 

364 hi = 1.0 

365 while desired_avg - _p_continue_to_avg(p_continue, max_size) > 0.01: 

366 assert 0 < p_continue < hi, (p_continue, hi) 

367 mid = (p_continue + hi) / 2 

368 if _p_continue_to_avg(mid, max_size) <= desired_avg: 

369 p_continue = mid 

370 else: 

371 hi = mid 

372 assert 0 < p_continue < 1, p_continue 

373 assert _p_continue_to_avg(p_continue, max_size) <= desired_avg 

374 return p_continue 

375 

376 

377def _p_continue_to_avg(p_continue: float, max_size: Union[int, float]) -> float: 

378 """Return the average_size generated by this p_continue and max_size.""" 

379 if p_continue >= 1: 

380 return max_size 

381 return (1.0 / (1 - p_continue) - 1) * (1 - p_continue**max_size)