1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import abc
12import contextlib
13import math
14import sys
15import warnings
16from collections.abc import Iterable
17from functools import cached_property
18from random import Random
19from sys import float_info
20from types import ModuleType
21from typing import (
22 TYPE_CHECKING,
23 Any,
24 Literal,
25 Optional,
26 TypedDict,
27 TypeVar,
28 Union,
29)
30
31from sortedcontainers import SortedSet
32
33from hypothesis.errors import HypothesisWarning
34from hypothesis.internal.cache import LRUCache
35from hypothesis.internal.compat import WINDOWS, int_from_bytes
36from hypothesis.internal.conjecture.choice import (
37 ChoiceConstraintsT,
38 ChoiceTypeT,
39 FloatConstraints,
40 choice_constraints_key,
41 choice_permitted,
42)
43from hypothesis.internal.conjecture.floats import lex_to_float
44from hypothesis.internal.conjecture.junkdrawer import bits_to_bytes
45from hypothesis.internal.conjecture.utils import (
46 INT_SIZES,
47 INT_SIZES_SAMPLER,
48 Sampler,
49 many,
50)
51from hypothesis.internal.constants_ast import (
52 Constants,
53 constants_from_module,
54 is_local_module_file,
55)
56from hypothesis.internal.floats import (
57 SIGNALING_NAN,
58 float_to_int,
59 make_float_clamper,
60 next_down,
61 next_up,
62)
63from hypothesis.internal.intervalsets import IntervalSet
64
65if TYPE_CHECKING:
66 from typing import TypeAlias
67
68 from hypothesis.internal.conjecture.data import ConjectureData
69 from hypothesis.internal.constants_ast import ConstantT
70
71T = TypeVar("T")
72_Lifetime: "TypeAlias" = Literal["test_case", "test_function"]
73COLLECTION_DEFAULT_MAX_SIZE = 10**10 # "arbitrarily large"
74
75
76# The available `PrimitiveProvider`s, and therefore also the available backends
77# for use by @settings(backend=...). The key is the name to be used in the backend=
78# value, and the value is the importable path to a subclass of PrimitiveProvider.
79#
80# See also
81# https://hypothesis.readthedocs.io/en/latest/strategies.html#alternative-backends-for-hypothesis.
82#
83# NOTE: the PrimitiveProvider interface is not yet stable. We may continue to
84# make breaking changes to it. (but if you want to experiment and don't mind
85# breakage, here you go!)
86AVAILABLE_PROVIDERS = {
87 "hypothesis": "hypothesis.internal.conjecture.providers.HypothesisProvider",
88 "hypothesis-urandom": "hypothesis.internal.conjecture.providers.URandomProvider",
89}
90# cache the choice_permitted constants for a particular set of constraints.
91CacheKeyT: "TypeAlias" = tuple[ChoiceTypeT, tuple[Any, ...]]
92CacheValueT: "TypeAlias" = tuple[tuple["ConstantT", ...], tuple["ConstantT", ...]]
93CONSTANTS_CACHE: LRUCache[CacheKeyT, CacheValueT] = LRUCache(1024)
94
95_constant_floats = (
96 [
97 0.5,
98 1.1,
99 1.5,
100 1.9,
101 1.0 / 3,
102 10e6,
103 10e-6,
104 1.175494351e-38,
105 next_up(0.0),
106 float_info.min,
107 float_info.max,
108 3.402823466e38,
109 9007199254740992.0,
110 1 - 10e-6,
111 2 + 10e-6,
112 1.192092896e-07,
113 2.2204460492503131e-016,
114 ]
115 + [2.0**-n for n in (24, 14, 149, 126)] # minimum (sub)normals for float16,32
116 + [float_info.min / n for n in (2, 10, 1000, 100_000)] # subnormal in float64
117)
118_constant_floats.extend([-x for x in _constant_floats])
119assert all(isinstance(f, float) for f in _constant_floats)
120
121_constant_strings = {
122 # strings which can be interpreted as code / logic
123 "undefined",
124 "null",
125 "NULL",
126 "nil",
127 "NIL",
128 "true",
129 "false",
130 "True",
131 "False",
132 "TRUE",
133 "FALSE",
134 "None",
135 "none",
136 "if",
137 "then",
138 "else",
139 # strings which can be interpreted as a number
140 "0",
141 "1e100",
142 "0..0",
143 "0/0",
144 "1/0",
145 "+0.0",
146 "Infinity",
147 "-Infinity",
148 "Inf",
149 "INF",
150 "NaN",
151 "9" * 30,
152 # common ascii characters
153 ",./;'[]\\-=<>?:\"{}|_+!@#$%^&*()`~",
154 # common unicode characters
155 "Ω≈ç√∫˜µ≤≥÷åß∂ƒ©˙∆˚¬…æœ∑´®†¥¨ˆøπ“‘¡™£¢∞§¶•ªº–≠¸˛Ç◊ı˜Â¯˘¿ÅÍÎÏ˝ÓÔÒÚÆ☃Œ„´‰ˇÁ¨ˆØ∏”’`⁄€‹›fifl‡°·‚—±",
156 # characters which increase in length when lowercased
157 "Ⱥ",
158 "Ⱦ",
159 # ligatures
160 "æœÆŒffʤʨß"
161 # emoticons
162 "(╯°□°)╯︵ ┻━┻)",
163 # emojis
164 "😍",
165 "🇺🇸",
166 # emoji modifiers
167 "🏻" # U+1F3FB Light Skin Tone,
168 "👍🏻", # 👍 followed by U+1F3FB
169 # RTL text
170 "الكل في المجمو عة",
171 # Ogham text, which contains the only character in the Space Separators
172 # unicode category (Zs) that isn't visually blank: . # noqa: RUF003
173 "᚛ᚄᚓᚐᚋᚒᚄ ᚑᚄᚂᚑᚏᚅ᚜",
174 # readable variations on text (bolt/italic/script)
175 "𝐓𝐡𝐞 𝐪𝐮𝐢𝐜𝐤 𝐛𝐫𝐨𝐰𝐧 𝐟𝐨𝐱 𝐣𝐮𝐦𝐩𝐬 𝐨𝐯𝐞𝐫 𝐭𝐡𝐞 𝐥𝐚𝐳𝐲 𝐝𝐨𝐠",
176 "𝕿𝖍𝖊 𝖖𝖚𝖎𝖈𝖐 𝖇𝖗𝖔𝖜𝖓 𝖋𝖔𝖝 𝖏𝖚𝖒𝖕𝖘 𝖔𝖛𝖊𝖗 𝖙𝖍𝖊 𝖑𝖆𝖟𝖞 𝖉𝖔𝖌",
177 "𝑻𝒉𝒆 𝒒𝒖𝒊𝒄𝒌 𝒃𝒓𝒐𝒘𝒏 𝒇𝒐𝒙 𝒋𝒖𝒎𝒑𝒔 𝒐𝒗𝒆𝒓 𝒕𝒉𝒆 𝒍𝒂𝒛𝒚 𝒅𝒐𝒈",
178 "𝓣𝓱𝓮 𝓺𝓾𝓲𝓬𝓴 𝓫𝓻𝓸𝔀𝓷 𝓯𝓸𝔁 𝓳𝓾𝓶𝓹𝓼 𝓸𝓿𝓮𝓻 𝓽𝓱𝓮 𝓵𝓪𝔃𝔂 𝓭𝓸𝓰",
179 "𝕋𝕙𝕖 𝕢𝕦𝕚𝕔𝕜 𝕓𝕣𝕠𝕨𝕟 𝕗𝕠𝕩 𝕛𝕦𝕞𝕡𝕤 𝕠𝕧𝕖𝕣 𝕥𝕙𝕖 𝕝𝕒𝕫𝕪 𝕕𝕠𝕘",
180 # upsidown text
181 "ʇǝɯɐ ʇᴉs ɹolop ɯnsdᴉ ɯǝɹo˥",
182 # reserved strings in windows
183 "NUL",
184 "COM1",
185 "LPT1",
186 # scunthorpe problem
187 "Scunthorpe",
188 # zalgo text
189 "Ṱ̺̺̕o͞ ̷i̲̬͇̪͙n̝̗͕v̟̜̘̦͟o̶̙̰̠kè͚̮̺̪̹̱̤ ̖t̝͕̳̣̻̪͞h̼͓̲̦̳̘̲e͇̣̰̦̬͎ ̢̼̻̱̘h͚͎͙̜̣̲ͅi̦̲̣̰̤v̻͍e̺̭̳̪̰-m̢iͅn̖̺̞̲̯̰d̵̼̟͙̩̼̘̳ ̞̥̱̳̭r̛̗̘e͙p͠r̼̞̻̭̗e̺̠̣͟s̘͇̳͍̝͉e͉̥̯̞̲͚̬͜ǹ̬͎͎̟̖͇̤t͍̬̤͓̼̭͘ͅi̪̱n͠g̴͉ ͏͉ͅc̬̟h͡a̫̻̯͘o̫̟̖͍̙̝͉s̗̦̲.̨̹͈̣",
190 #
191 # examples from https://faultlore.com/blah/text-hates-you/
192 "मनीष منش",
193 "पन्ह पन्ह त्र र्च कृकृ ड्ड न्हृे إلا بسم الله",
194 "lorem لا بسم الله ipsum 你好1234你好",
195}
196
197
198# we don't actually care what order the constants are sorted in, just that the
199# ordering is deterministic.
200GLOBAL_CONSTANTS = Constants(
201 integers=SortedSet(),
202 floats=SortedSet(_constant_floats, key=float_to_int),
203 bytes=SortedSet(),
204 strings=SortedSet(_constant_strings),
205)
206
207_local_constants = Constants(
208 integers=SortedSet(),
209 floats=SortedSet(key=float_to_int),
210 bytes=SortedSet(),
211 strings=SortedSet(),
212)
213# modules that we've already seen and processed for local constants. These are
214# are all modules, not necessarily local ones. This lets us quickly see which
215# modules are new without an expensive path.resolve() or is_local_module_file
216# cache lookup.
217_seen_modules: set[ModuleType] = set()
218_sys_modules_len: Optional[int] = None
219
220
221def _get_local_constants() -> Constants:
222 global _sys_modules_len, _local_constants
223
224 if sys.platform == "emscripten": # pragma: no cover
225 # pyodide builds bundle the stdlib in a nonstandard location, like
226 # `/lib/python312.zip/heapq.py`. To avoid identifying the entirety of
227 # the stdlib as local code and slowing down on emscripten, instead return
228 # that nothing is local.
229 #
230 # pyodide may provide some way to distinguish stdlib/third-party/local
231 # code. I haven't looked into it. If they do, we should correctly implement
232 # ModuleLocation for pyodide instead of this.
233 return _local_constants
234
235 count_constants = len(_local_constants)
236 # We call this function once per HypothesisProvider instance, i.e. once per
237 # input, so it needs to be performant. The logic here is more complicated
238 # than necessary because of this.
239 #
240 # First, we check whether there are any new modules with a very cheap length
241 # check. This check can be fooled if a module is added while another module is
242 # removed, but the more correct check against tuple(sys.modules.keys()) is
243 # substantially more expensive. Such a new module would eventually be discovered
244 # if / when the length changes again in the future.
245 #
246 # If the length has changed, we find just modules we haven't seen before. Of
247 # those, we find the ones which correspond to local modules, and extract their
248 # constants.
249
250 # careful: store sys.modules length when we first check to avoid race conditions
251 # with other threads loading a module before we set _sys_modules_len.
252 if (sys_modules_len := len(sys.modules)) != _sys_modules_len:
253 new_modules = set(sys.modules.values()) - _seen_modules
254 # Repeated SortedSet unions are expensive. Do the initial unions on a
255 # set(), then do a one-time union with _local_constants after.
256 new_constants = Constants()
257 for module in new_modules:
258 if (
259 module_file := getattr(module, "__file__", None)
260 ) is not None and is_local_module_file(module_file):
261 new_constants |= constants_from_module(module)
262 _local_constants |= new_constants
263 _seen_modules.update(new_modules)
264 _sys_modules_len = sys_modules_len
265
266 # if we add any new constant, invalidate the constant cache for permitted values.
267 # A more efficient approach would be invalidating just the keys with this
268 # choice_type.
269 if len(_local_constants) > count_constants:
270 CONSTANTS_CACHE.cache.clear()
271
272 return _local_constants
273
274
275class _BackendInfoMsg(TypedDict):
276 type: str
277 title: str
278 content: Union[str, dict[str, Any]]
279
280
281class PrimitiveProvider(abc.ABC):
282 # This is the low-level interface which would also be implemented
283 # by e.g. CrossHair, by an Atheris-hypothesis integration, etc.
284 # We'd then build the structured tree handling, database and replay
285 # support, etc. on top of this - so all backends get those for free.
286 #
287 # See https://github.com/HypothesisWorks/hypothesis/issues/3086
288
289 # How long a provider instance is used for. One of test_function or
290 # test_case. Defaults to test_function.
291 #
292 # If test_function, a single provider instance will be instantiated and used
293 # for the entirety of each test function. I.e., roughly one provider per
294 # @given annotation. This can be useful if you need to track state over many
295 # executions to a test function.
296 #
297 # This lifetime will cause None to be passed for the ConjectureData object
298 # in PrimitiveProvider.__init__, because that object is instantiated per
299 # test case.
300 #
301 # If test_case, a new provider instance will be instantiated and used each
302 # time hypothesis tries to generate a new input to the test function. This
303 # lifetime can access the passed ConjectureData object.
304 #
305 # Non-hypothesis providers probably want to set a lifetime of test_function.
306 lifetime: _Lifetime = "test_function"
307
308 # Solver-based backends such as hypothesis-crosshair use symbolic values
309 # which record operations performed on them in order to discover new paths.
310 # If avoid_realization is set to True, hypothesis will avoid interacting with
311 # symbolic choices returned by the provider in any way that would force the
312 # solver to narrow the range of possible values for that symbolic.
313 #
314 # Setting this to True disables some hypothesis features, such as
315 # DataTree-based deduplication, and some internal optimizations, such as
316 # caching constraints. Only enable this if it is necessary for your backend.
317 avoid_realization = False
318
319 def __init__(self, conjecturedata: Optional["ConjectureData"], /) -> None:
320 self._cd = conjecturedata
321
322 def per_test_case_context_manager(self):
323 return contextlib.nullcontext()
324
325 def realize(self, value: T, *, for_failure: bool = False) -> T:
326 """
327 Called whenever hypothesis requires a concrete (non-symbolic) value from
328 a potentially symbolic value. Hypothesis will not check that `value` is
329 symbolic before calling `realize`, so you should handle the case where
330 `value` is non-symbolic.
331
332 The returned value should be non-symbolic. If you cannot provide a value,
333 raise hypothesis.errors.BackendCannotProceed("discard_test_case").
334
335 If for_failure is True, the value is associated with a failing example.
336 In this case, the backend should spend substantially more effort when
337 attempting to realize the value, since it is important to avoid discarding
338 failing examples. Backends may still raise BackendCannotProceed when
339 for_failure is True, if realization is truly impossible or if realization
340 takes significantly longer than expected (say, 5 minutes).
341 """
342 return value
343
344 def observe_test_case(self) -> dict[str, Any]:
345 """Called at the end of the test case when observability mode is active.
346
347 The return value should be a non-symbolic json-encodable dictionary,
348 and will be included as `observation["metadata"]["backend"]`.
349 """
350 return {}
351
352 def observe_information_messages(
353 self, *, lifetime: _Lifetime
354 ) -> Iterable[_BackendInfoMsg]:
355 """Called at the end of each test case and again at end of the test function.
356
357 Return an iterable of `{type: info/alert/error, title: str, content: str|dict}`
358 dictionaries to be delivered as individual information messages.
359 (Hypothesis adds the `run_start` timestamp and `property` name for you.)
360 """
361 assert lifetime in ("test_case", "test_function")
362 yield from []
363
364 @abc.abstractmethod
365 def draw_boolean(
366 self,
367 p: float = 0.5,
368 ) -> bool:
369 raise NotImplementedError
370
371 @abc.abstractmethod
372 def draw_integer(
373 self,
374 min_value: Optional[int] = None,
375 max_value: Optional[int] = None,
376 *,
377 # weights are for choosing an element index from a bounded range
378 weights: Optional[dict[int, float]] = None,
379 shrink_towards: int = 0,
380 ) -> int:
381 raise NotImplementedError
382
383 @abc.abstractmethod
384 def draw_float(
385 self,
386 *,
387 min_value: float = -math.inf,
388 max_value: float = math.inf,
389 allow_nan: bool = True,
390 smallest_nonzero_magnitude: float,
391 ) -> float:
392 raise NotImplementedError
393
394 @abc.abstractmethod
395 def draw_string(
396 self,
397 intervals: IntervalSet,
398 *,
399 min_size: int = 0,
400 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
401 ) -> str:
402 raise NotImplementedError
403
404 @abc.abstractmethod
405 def draw_bytes(
406 self,
407 min_size: int = 0,
408 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
409 ) -> bytes:
410 raise NotImplementedError
411
412 def span_start(self, label: int, /) -> None: # noqa: B027 # non-abstract noop
413 """Marks the beginning of a semantically meaningful span.
414
415 Providers can optionally track this data to learn which sub-sequences
416 of draws correspond to a higher-level object, recovering the parse tree.
417 `label` is an opaque integer, which will be shared by all spans drawn
418 from a particular strategy.
419
420 This method is called from ConjectureData.start_span().
421 """
422
423 def span_end(self, discard: bool, /) -> None: # noqa: B027, FBT001
424 """Marks the end of a semantically meaningful span.
425
426 `discard` is True when the draw was filtered out or otherwise marked as
427 unlikely to contribute to the input data as seen by the user's test.
428 Note however that side effects can make this determination unsound.
429
430 This method is called from ConjectureData.stop_span().
431 """
432
433
434class HypothesisProvider(PrimitiveProvider):
435 lifetime = "test_case"
436
437 def __init__(self, conjecturedata: Optional["ConjectureData"], /):
438 super().__init__(conjecturedata)
439 self._random = None if self._cd is None else self._cd._random
440
441 @cached_property
442 def _local_constants(self):
443 # defer computation of local constants until/if we need it
444 return _get_local_constants()
445
446 def _maybe_draw_constant(
447 self,
448 choice_type: ChoiceTypeT,
449 constraints: ChoiceConstraintsT,
450 *,
451 p: float = 0.05,
452 ) -> Optional["ConstantT"]:
453 assert self._random is not None
454 assert self._local_constants is not None
455 assert choice_type != "boolean"
456
457 # check whether we even want a constant before spending time computing
458 # and caching the allowed constants.
459 if self._random.random() > p:
460 return None
461
462 key = (choice_type, choice_constraints_key(choice_type, constraints))
463 if key not in CONSTANTS_CACHE:
464 CONSTANTS_CACHE[key] = (
465 tuple(
466 choice
467 for choice in GLOBAL_CONSTANTS.set_for_type(choice_type)
468 if choice_permitted(choice, constraints)
469 ),
470 tuple(
471 choice
472 for choice in self._local_constants.set_for_type(choice_type)
473 if choice_permitted(choice, constraints)
474 ),
475 )
476
477 # split constants into two pools, so we still have a good chance to draw
478 # global constants even if there are many local constants.
479 (global_constants, local_constants) = CONSTANTS_CACHE[key]
480 constants_lists = ([global_constants] if global_constants else []) + (
481 [local_constants] if local_constants else []
482 )
483 if not constants_lists:
484 return None
485
486 # At this point, we've decided to use a constant. Now we select which pool
487 # to draw that constant from.
488 #
489 # Note that this approach has a different probability distribution than
490 # attempting a random.random for both global_constants and local_constants.
491 constants = self._random.choice(constants_lists)
492 return self._random.choice(constants)
493
494 def draw_boolean(
495 self,
496 p: float = 0.5,
497 ) -> bool:
498 assert self._random is not None
499
500 if p <= 0:
501 return False
502 if p >= 1:
503 return True
504
505 return self._random.random() < p
506
507 def draw_integer(
508 self,
509 min_value: Optional[int] = None,
510 max_value: Optional[int] = None,
511 *,
512 weights: Optional[dict[int, float]] = None,
513 shrink_towards: int = 0,
514 ) -> int:
515 assert self._cd is not None
516 if (
517 constant := self._maybe_draw_constant(
518 "integer",
519 {
520 "min_value": min_value,
521 "max_value": max_value,
522 "weights": weights,
523 "shrink_towards": shrink_towards,
524 },
525 )
526 ) is not None:
527 assert isinstance(constant, int)
528 return constant
529
530 center = 0
531 if min_value is not None:
532 center = max(min_value, center)
533 if max_value is not None:
534 center = min(max_value, center)
535
536 if weights is not None:
537 assert min_value is not None
538 assert max_value is not None
539
540 # format of weights is a mapping of ints to p, where sum(p) < 1.
541 # The remaining probability mass is uniformly distributed over
542 # *all* ints (not just the unmapped ones; this is somewhat undesirable,
543 # but simplifies things).
544 #
545 # We assert that sum(p) is strictly less than 1 because it simplifies
546 # handling forced values when we can force into the unmapped probability
547 # mass. We should eventually remove this restriction.
548 sampler = Sampler(
549 [1 - sum(weights.values()), *weights.values()], observe=False
550 )
551 # if we're forcing, it's easiest to force into the unmapped probability
552 # mass and then force the drawn value after.
553 idx = sampler.sample(self._cd)
554
555 if idx == 0:
556 return self._draw_bounded_integer(min_value, max_value)
557 # implicit reliance on dicts being sorted for determinism
558 return list(weights)[idx - 1]
559
560 if min_value is None and max_value is None:
561 return self._draw_unbounded_integer()
562
563 if min_value is None:
564 assert max_value is not None
565 probe = max_value + 1
566 while max_value < probe:
567 probe = center + self._draw_unbounded_integer()
568 return probe
569
570 if max_value is None:
571 assert min_value is not None
572 probe = min_value - 1
573 while probe < min_value:
574 probe = center + self._draw_unbounded_integer()
575 return probe
576
577 return self._draw_bounded_integer(min_value, max_value)
578
579 def draw_float(
580 self,
581 *,
582 min_value: float = -math.inf,
583 max_value: float = math.inf,
584 allow_nan: bool = True,
585 smallest_nonzero_magnitude: float,
586 ) -> float:
587 assert self._random is not None
588
589 constraints: FloatConstraints = {
590 "min_value": min_value,
591 "max_value": max_value,
592 "allow_nan": allow_nan,
593 "smallest_nonzero_magnitude": smallest_nonzero_magnitude,
594 }
595 if (
596 constant := self._maybe_draw_constant("float", constraints, p=0.15)
597 ) is not None:
598 assert isinstance(constant, float)
599 return constant
600
601 # on top of the probability to draw a constant float, we independently
602 # upweight 0.0/-0.0, math.inf, -math.inf, nans, and boundary values.
603 weird_floats = [
604 f
605 for f in [
606 0.0,
607 -0.0,
608 math.inf,
609 -math.inf,
610 math.nan,
611 -math.nan,
612 SIGNALING_NAN,
613 -SIGNALING_NAN,
614 min_value,
615 next_up(min_value),
616 min_value + 1,
617 max_value - 1,
618 next_down(max_value),
619 max_value,
620 ]
621 if choice_permitted(f, constraints)
622 ]
623
624 if weird_floats and self._random.random() < 0.05:
625 return self._random.choice(weird_floats)
626
627 clamper = make_float_clamper(
628 min_value,
629 max_value,
630 smallest_nonzero_magnitude=smallest_nonzero_magnitude,
631 allow_nan=allow_nan,
632 )
633
634 result = self._draw_float()
635 if allow_nan and math.isnan(result):
636 clamped = result # pragma: no cover
637 else:
638 clamped = clamper(result)
639 if float_to_int(clamped) != float_to_int(result) and not (
640 math.isnan(result) and allow_nan
641 ):
642 result = clamped
643 return result
644
645 def draw_string(
646 self,
647 intervals: IntervalSet,
648 *,
649 min_size: int = 0,
650 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
651 ) -> str:
652 assert self._cd is not None
653 assert self._random is not None
654
655 if len(intervals) == 0:
656 return ""
657
658 if (
659 constant := self._maybe_draw_constant(
660 "string",
661 {"intervals": intervals, "min_size": min_size, "max_size": max_size},
662 )
663 ) is not None:
664 assert isinstance(constant, str)
665 return constant
666
667 average_size = min(
668 max(min_size * 2, min_size + 5),
669 0.5 * (min_size + max_size),
670 )
671
672 chars = []
673 elements = many(
674 self._cd,
675 min_size=min_size,
676 max_size=max_size,
677 average_size=average_size,
678 observe=False,
679 )
680 while elements.more():
681 if len(intervals) > 256:
682 if self.draw_boolean(0.2):
683 i = self._random.randint(256, len(intervals) - 1)
684 else:
685 i = self._random.randint(0, 255)
686 else:
687 i = self._random.randint(0, len(intervals) - 1)
688
689 chars.append(intervals.char_in_shrink_order(i))
690
691 return "".join(chars)
692
693 def draw_bytes(
694 self,
695 min_size: int = 0,
696 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
697 ) -> bytes:
698 assert self._cd is not None
699 assert self._random is not None
700
701 if (
702 constant := self._maybe_draw_constant(
703 "bytes", {"min_size": min_size, "max_size": max_size}
704 )
705 ) is not None:
706 assert isinstance(constant, bytes)
707 return constant
708
709 buf = bytearray()
710 average_size = min(
711 max(min_size * 2, min_size + 5),
712 0.5 * (min_size + max_size),
713 )
714 elements = many(
715 self._cd,
716 min_size=min_size,
717 max_size=max_size,
718 average_size=average_size,
719 observe=False,
720 )
721 while elements.more():
722 buf += self._random.randbytes(1)
723
724 return bytes(buf)
725
726 def _draw_float(self) -> float:
727 assert self._random is not None
728
729 f = lex_to_float(self._random.getrandbits(64))
730 sign = 1 if self._random.getrandbits(1) else -1
731 return sign * f
732
733 def _draw_unbounded_integer(self) -> int:
734 assert self._cd is not None
735 assert self._random is not None
736
737 size = INT_SIZES[INT_SIZES_SAMPLER.sample(self._cd)]
738
739 r = self._random.getrandbits(size)
740 sign = r & 1
741 r >>= 1
742 if sign:
743 r = -r
744 return r
745
746 def _draw_bounded_integer(
747 self,
748 lower: int,
749 upper: int,
750 *,
751 vary_size: bool = True,
752 ) -> int:
753 assert lower <= upper
754 assert self._cd is not None
755 assert self._random is not None
756
757 if lower == upper:
758 return lower
759
760 bits = (upper - lower).bit_length()
761 if bits > 24 and vary_size and self._random.random() < 7 / 8:
762 # For large ranges, we combine the uniform random distribution
763 # with a weighting scheme with moderate chance. Cutoff at 2 ** 24 so that our
764 # choice of unicode characters is uniform but the 32bit distribution is not.
765 idx = INT_SIZES_SAMPLER.sample(self._cd)
766 cap_bits = min(bits, INT_SIZES[idx])
767 upper = min(upper, lower + 2**cap_bits - 1)
768 return self._random.randint(lower, upper)
769
770 return self._random.randint(lower, upper)
771
772
773# Masks for masking off the first byte of an n-bit buffer.
774# The appropriate mask is stored at position n % 8.
775BYTE_MASKS = [(1 << n) - 1 for n in range(8)]
776BYTE_MASKS[0] = 255
777
778
779class BytestringProvider(PrimitiveProvider):
780 lifetime = "test_case"
781
782 def __init__(
783 self, conjecturedata: Optional["ConjectureData"], /, *, bytestring: bytes
784 ):
785 super().__init__(conjecturedata)
786 self.bytestring = bytestring
787 self.index = 0
788 self.drawn = bytearray()
789
790 def _draw_bits(self, n):
791 if n == 0: # pragma: no cover
792 return 0
793 n_bytes = bits_to_bytes(n)
794 if self.index + n_bytes > len(self.bytestring):
795 self._cd.mark_overrun()
796 buf = bytearray(self.bytestring[self.index : self.index + n_bytes])
797 self.index += n_bytes
798
799 buf[0] &= BYTE_MASKS[n % 8]
800 buf = bytes(buf)
801 self.drawn += buf
802 return int_from_bytes(buf)
803
804 def draw_boolean(
805 self,
806 p: float = 0.5,
807 ) -> bool:
808 if p <= 0:
809 return False
810 if p >= 1:
811 return True
812
813 # always use one byte for booleans to maintain constant draw size.
814 # If a probability requires more than 8 bits to represent precisely,
815 # the result will be slightly biased, but not badly.
816 bits = 8
817 size = 2**bits
818 # always leave at least one value that can be true, even for very small
819 # p.
820 falsey = max(1, math.floor(size * (1 - p)))
821 n = self._draw_bits(bits)
822 return n >= falsey
823
824 def draw_integer(
825 self,
826 min_value: Optional[int] = None,
827 max_value: Optional[int] = None,
828 *,
829 weights: Optional[dict[int, float]] = None,
830 shrink_towards: int = 0,
831 ) -> int:
832 assert self._cd is not None
833
834 # we explicitly ignore integer weights for now, as they are likely net
835 # negative on fuzzer performance.
836
837 if min_value is None and max_value is None:
838 min_value = -(2**127)
839 max_value = 2**127 - 1
840 elif min_value is None:
841 assert max_value is not None
842 min_value = max_value - 2**64
843 elif max_value is None:
844 assert min_value is not None
845 max_value = min_value + 2**64
846
847 if min_value == max_value:
848 return min_value
849
850 bits = (max_value - min_value).bit_length()
851 value = self._draw_bits(bits)
852 while not (min_value <= value <= max_value):
853 value = self._draw_bits(bits)
854 return value
855
856 def draw_float(
857 self,
858 *,
859 min_value: float = -math.inf,
860 max_value: float = math.inf,
861 allow_nan: bool = True,
862 smallest_nonzero_magnitude: float,
863 ) -> float:
864 n = self._draw_bits(64)
865 sign = -1 if n >> 64 else 1
866 f = sign * lex_to_float(n & ((1 << 64) - 1))
867 clamper = make_float_clamper(
868 min_value,
869 max_value,
870 smallest_nonzero_magnitude=smallest_nonzero_magnitude,
871 allow_nan=allow_nan,
872 )
873 return clamper(f)
874
875 def _draw_collection(self, min_size, max_size, *, alphabet_size):
876 average_size = min(
877 max(min_size * 2, min_size + 5),
878 0.5 * (min_size + max_size),
879 )
880 elements = many(
881 self._cd,
882 min_size=min_size,
883 max_size=max_size,
884 average_size=average_size,
885 observe=False,
886 )
887 values = []
888 while elements.more():
889 values.append(self.draw_integer(0, alphabet_size - 1))
890 return values
891
892 def draw_string(
893 self,
894 intervals: IntervalSet,
895 *,
896 min_size: int = 0,
897 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
898 ) -> str:
899 values = self._draw_collection(min_size, max_size, alphabet_size=len(intervals))
900 return "".join(chr(intervals[v]) for v in values)
901
902 def draw_bytes(
903 self,
904 min_size: int = 0,
905 max_size: int = COLLECTION_DEFAULT_MAX_SIZE,
906 ) -> bytes:
907 values = self._draw_collection(min_size, max_size, alphabet_size=2**8)
908 return bytes(values)
909
910
911class URandom(Random):
912 # we reimplement a Random instance instead of using SystemRandom, because
913 # os.urandom is not guaranteed to read from /dev/urandom.
914
915 @staticmethod
916 def _urandom(size: int) -> bytes:
917 with open("/dev/urandom", "rb") as f:
918 return f.read(size)
919
920 def getrandbits(self, k: int) -> int:
921 assert k >= 0
922 size = bits_to_bytes(k)
923 n = int_from_bytes(self._urandom(size))
924 # trim excess bits
925 return n >> (size * 8 - k)
926
927 def random(self) -> float:
928 # adapted from random.SystemRandom.random
929 return (int_from_bytes(self._urandom(7)) >> 3) * (2**-53)
930
931
932class URandomProvider(HypothesisProvider):
933 # A provider which reads directly from /dev/urandom as its source of randomness.
934 # This provider exists to provide better Hypothesis integration with Antithesis
935 # (https://antithesis.com/), which interprets calls to /dev/urandom as the
936 # randomness to mutate. This effectively gives Antithesis control over
937 # the choices made by the URandomProvider.
938 #
939 # If you are not using Antithesis, you probably don't want to use this
940 # provider.
941
942 def __init__(self, conjecturedata: Optional["ConjectureData"], /):
943 super().__init__(conjecturedata)
944 if WINDOWS: # pragma: no cover
945 warnings.warn(
946 "/dev/urandom is not available on windows. Falling back to "
947 'standard PRNG generation (equivalent to backend="hypothesis").',
948 HypothesisWarning,
949 stacklevel=1,
950 )
951 # don't overwrite the HypothesisProvider self._random attribute in
952 # this case
953 else:
954 self._random = URandom()