1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11"""Observability tools to spit out analysis-ready tables, one row per test case."""
12
13import base64
14import dataclasses
15import json
16import math
17import os
18import sys
19import threading
20import time
21import warnings
22from collections.abc import Generator
23from contextlib import contextmanager
24from dataclasses import dataclass
25from datetime import date, timedelta
26from functools import lru_cache
27from threading import Lock
28from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast
29
30from hypothesis.configuration import storage_directory
31from hypothesis.errors import HypothesisWarning
32from hypothesis.internal.conjecture.choice import (
33 BooleanConstraints,
34 BytesConstraints,
35 ChoiceConstraintsT,
36 ChoiceNode,
37 ChoiceT,
38 ChoiceTypeT,
39 FloatConstraints,
40 IntegerConstraints,
41 StringConstraints,
42)
43from hypothesis.internal.escalation import InterestingOrigin
44from hypothesis.internal.floats import float_to_int
45from hypothesis.internal.intervalsets import IntervalSet
46
47if TYPE_CHECKING:
48 from typing import TypeAlias
49
50 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status
51
52
53Observation: "TypeAlias" = Union["InfoObservation", "TestCaseObservation"]
54CallbackThreadT: "TypeAlias" = Callable[[Observation], None]
55# for all_threads=True, we pass the thread id as well.
56CallbackAllThreadsT: "TypeAlias" = Callable[[Observation, int], None]
57CallbackT: "TypeAlias" = Union[CallbackThreadT, CallbackAllThreadsT]
58
59# thread_id: list[callback]
60_callbacks: dict[Optional[int], list[CallbackThreadT]] = {}
61# callbacks where all_threads=True was set
62_callbacks_all_threads: list[CallbackAllThreadsT] = []
63
64
65@dataclass
66class PredicateCounts:
67 satisfied: int = 0
68 unsatisfied: int = 0
69
70 def update_count(self, *, condition: bool) -> None:
71 if condition:
72 self.satisfied += 1
73 else:
74 self.unsatisfied += 1
75
76
77def _choice_to_json(choice: Union[ChoiceT, None]) -> Any:
78 if choice is None:
79 return None
80 # see the note on the same check in to_jsonable for why we cast large
81 # integers to floats.
82 if (
83 isinstance(choice, int)
84 and not isinstance(choice, bool)
85 and abs(choice) >= 2**63
86 ):
87 return ["integer", str(choice)]
88 elif isinstance(choice, bytes):
89 return ["bytes", base64.b64encode(choice).decode()]
90 elif isinstance(choice, float) and math.isnan(choice):
91 # handle nonstandard nan bit patterns. We don't need to do this for -0.0
92 # vs 0.0 since json doesn't normalize -0.0 to 0.0.
93 return ["float", float_to_int(choice)]
94 return choice
95
96
97def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]:
98 return [_choice_to_json(choice) for choice in choices]
99
100
101def _constraints_to_json(
102 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
103) -> dict[str, Any]:
104 constraints = constraints.copy()
105 if choice_type == "integer":
106 constraints = cast(IntegerConstraints, constraints)
107 return {
108 "min_value": _choice_to_json(constraints["min_value"]),
109 "max_value": _choice_to_json(constraints["max_value"]),
110 "weights": (
111 None
112 if constraints["weights"] is None
113 # wrap up in a list, instead of a dict, because json dicts
114 # require string keys
115 else [
116 (_choice_to_json(k), v) for k, v in constraints["weights"].items()
117 ]
118 ),
119 "shrink_towards": _choice_to_json(constraints["shrink_towards"]),
120 }
121 elif choice_type == "float":
122 constraints = cast(FloatConstraints, constraints)
123 return {
124 "min_value": _choice_to_json(constraints["min_value"]),
125 "max_value": _choice_to_json(constraints["max_value"]),
126 "allow_nan": constraints["allow_nan"],
127 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"],
128 }
129 elif choice_type == "string":
130 constraints = cast(StringConstraints, constraints)
131 assert isinstance(constraints["intervals"], IntervalSet)
132 return {
133 "intervals": constraints["intervals"].intervals,
134 "min_size": _choice_to_json(constraints["min_size"]),
135 "max_size": _choice_to_json(constraints["max_size"]),
136 }
137 elif choice_type == "bytes":
138 constraints = cast(BytesConstraints, constraints)
139 return {
140 "min_size": _choice_to_json(constraints["min_size"]),
141 "max_size": _choice_to_json(constraints["max_size"]),
142 }
143 elif choice_type == "boolean":
144 constraints = cast(BooleanConstraints, constraints)
145 return {
146 "p": constraints["p"],
147 }
148 else:
149 raise NotImplementedError(f"unknown choice type {choice_type}")
150
151
152def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]:
153 return [
154 {
155 "type": node.type,
156 "value": _choice_to_json(node.value),
157 "constraints": _constraints_to_json(node.type, node.constraints),
158 "was_forced": node.was_forced,
159 }
160 for node in nodes
161 ]
162
163
164@dataclass
165class ObservationMetadata:
166 traceback: Optional[str]
167 reproduction_decorator: Optional[str]
168 predicates: dict[str, PredicateCounts]
169 backend: dict[str, Any]
170 sys_argv: list[str]
171 os_getpid: int
172 imported_at: float
173 data_status: "Status"
174 interesting_origin: Optional[InterestingOrigin]
175 choice_nodes: Optional[tuple[ChoiceNode, ...]]
176 choice_spans: Optional["Spans"]
177
178 def to_json(self) -> dict[str, Any]:
179 data = {
180 "traceback": self.traceback,
181 "reproduction_decorator": self.reproduction_decorator,
182 "predicates": self.predicates,
183 "backend": self.backend,
184 "sys.argv": self.sys_argv,
185 "os.getpid()": self.os_getpid,
186 "imported_at": self.imported_at,
187 "data_status": self.data_status,
188 "interesting_origin": self.interesting_origin,
189 "choice_nodes": (
190 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes)
191 ),
192 "choice_spans": (
193 None
194 if self.choice_spans is None
195 else [
196 (
197 # span.label is an int, but cast to string to avoid conversion
198 # to float (and loss of precision) for large label values.
199 #
200 # The value of this label is opaque to consumers anyway, so its
201 # type shouldn't matter as long as it's consistent.
202 str(span.label),
203 span.start,
204 span.end,
205 span.discarded,
206 )
207 for span in self.choice_spans
208 ]
209 ),
210 }
211 # check that we didn't forget one
212 assert len(data) == len(dataclasses.fields(self))
213 return data
214
215
216@dataclass
217class BaseObservation:
218 type: Literal["test_case", "info", "alert", "error"]
219 property: str
220 run_start: float
221
222
223InfoObservationType = Literal["info", "alert", "error"]
224TestCaseStatus = Literal["gave_up", "passed", "failed"]
225
226
227@dataclass
228class InfoObservation(BaseObservation):
229 type: InfoObservationType
230 title: str
231 content: Union[str, dict]
232
233
234@dataclass
235class TestCaseObservation(BaseObservation):
236 __test__ = False # no! bad pytest!
237
238 type: Literal["test_case"]
239 status: TestCaseStatus
240 status_reason: str
241 representation: str
242 arguments: dict
243 how_generated: str
244 features: dict
245 coverage: Optional[dict[str, list[int]]]
246 timing: dict[str, float]
247 metadata: ObservationMetadata
248
249
250def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None:
251 """
252 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f``
253 should accept one argument, which is an observation. Whenever Hypothesis
254 produces a new observation, it calls each callback with that observation.
255
256 If Hypothesis tests are being run from multiple threads, callbacks are tracked
257 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f``
258 as an observability callback for observations produced on that thread.
259
260 If ``all_threads=True`` is passed, ``f`` will instead be registered as a
261 callback for all threads. This means it will be called for observations
262 generated by all threads, not just the thread which registered ``f`` as a
263 callback. In this case, ``f`` will be passed two arguments: the first is the
264 observation, and the second is the integer thread id from
265 :func:`python:threading.get_ident` where that observation was generated.
266
267 We recommend against registering ``f`` as a callback for both ``all_threads=True``
268 and the default ``all_threads=False``, due to unclear semantics with
269 |remove_observability_callback|.
270 """
271 if all_threads:
272 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f))
273 return
274
275 thread_id = threading.get_ident()
276 if thread_id not in _callbacks:
277 _callbacks[thread_id] = []
278
279 _callbacks[thread_id].append(cast(CallbackThreadT, f))
280
281
282def remove_observability_callback(f: CallbackT, /) -> None:
283 """
284 Removes ``f`` from the :ref:`observability <observability>` callbacks.
285
286 If ``f`` is not in the list of observability callbacks, silently do nothing.
287
288 If running under multiple threads, ``f`` will only be removed from the
289 callbacks for this thread.
290 """
291 if f in _callbacks_all_threads:
292 _callbacks_all_threads.remove(cast(CallbackAllThreadsT, f))
293
294 thread_id = threading.get_ident()
295 if thread_id not in _callbacks:
296 return
297
298 callbacks = _callbacks[thread_id]
299 if f in callbacks:
300 callbacks.remove(cast(CallbackThreadT, f))
301
302 if not callbacks:
303 del _callbacks[thread_id]
304
305
306def observability_enabled() -> bool:
307 """
308 Returns whether or not Hypothesis considers :ref:`observability <observability>`
309 to be enabled. Observability is enabled if there is at least one observability
310 callback present.
311
312 Callers might use this method to determine whether they should compute an
313 expensive representation that is only used under observability, for instance
314 by :ref:`alternative backends <alternative-backends>`.
315 """
316 return bool(_callbacks) or bool(_callbacks_all_threads)
317
318
319@contextmanager
320def with_observability_callback(
321 f: Callable[[Observation], None], /, *, all_threads: bool = False
322) -> Generator[None, None, None]:
323 """
324 A simple context manager which calls |add_observability_callback| on ``f``
325 when it enters and |remove_observability_callback| on ``f`` when it exits.
326 """
327 add_observability_callback(f, all_threads=all_threads)
328 try:
329 yield
330 finally:
331 remove_observability_callback(f)
332
333
334def deliver_observation(observation: Observation) -> None:
335 thread_id = threading.get_ident()
336
337 for callback in _callbacks.get(thread_id, []):
338 callback(observation)
339
340 for callback in _callbacks_all_threads:
341 callback(observation, thread_id)
342
343
344class _TestcaseCallbacks:
345 def __bool__(self):
346 self._note_deprecation()
347 return bool(_callbacks)
348
349 def _note_deprecation(self):
350 from hypothesis._settings import note_deprecation
351
352 note_deprecation(
353 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. "
354 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, "
355 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and "
356 "bool(TESTCASE_CALLBACKS) with observability_enabled().",
357 since="2025-08-01",
358 has_codemod=False,
359 )
360
361 def append(self, f):
362 self._note_deprecation()
363 add_observability_callback(f)
364
365 def remove(self, f):
366 self._note_deprecation()
367 remove_observability_callback(f)
368
369
370#: .. warning::
371#:
372#: Deprecated in favor of |add_observability_callback|,
373#: |remove_observability_callback|, and |observability_enabled|.
374#:
375#: |TESTCASE_CALLBACKS| remains a thin compatibility
376#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those
377#: three methods. It is not an attempt to be fully compatible with the previous
378#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work
379#: anymore. Please update to using the new methods instead.
380#:
381#: |TESTCASE_CALLBACKS| will eventually be removed.
382TESTCASE_CALLBACKS = _TestcaseCallbacks()
383
384
385def make_testcase(
386 *,
387 run_start: float,
388 property: str,
389 data: "ConjectureData",
390 how_generated: str,
391 representation: str = "<unknown>",
392 arguments: Optional[dict] = None,
393 timing: dict[str, float],
394 coverage: Optional[dict[str, list[int]]] = None,
395 phase: Optional[str] = None,
396 backend_metadata: Optional[dict[str, Any]] = None,
397 status: Optional[
398 Union[TestCaseStatus, "Status"]
399 ] = None, # overrides automatic calculation
400 status_reason: Optional[str] = None, # overrides automatic calculation
401 # added to calculated metadata. If keys overlap, the value from this `metadata`
402 # is used
403 metadata: Optional[dict[str, Any]] = None,
404) -> TestCaseObservation:
405 from hypothesis.core import reproduction_decorator
406 from hypothesis.internal.conjecture.data import Status
407
408 # We should only be sending observability reports for datas that have finished
409 # being modified.
410 assert data.frozen
411
412 if status_reason is not None:
413 pass
414 elif data.interesting_origin:
415 status_reason = str(data.interesting_origin)
416 elif phase == "shrink" and data.status == Status.OVERRUN:
417 status_reason = "exceeded size of current best example"
418 else:
419 status_reason = str(data.events.pop("invalid because", ""))
420
421 status_map: dict[Status, TestCaseStatus] = {
422 Status.OVERRUN: "gave_up",
423 Status.INVALID: "gave_up",
424 Status.VALID: "passed",
425 Status.INTERESTING: "failed",
426 }
427
428 if status is not None and isinstance(status, Status):
429 status = status_map[status]
430 if status is None:
431 status = status_map[data.status]
432
433 return TestCaseObservation(
434 type="test_case",
435 status=status,
436 status_reason=status_reason,
437 representation=representation,
438 arguments={
439 k.removeprefix("generate:"): v for k, v in (arguments or {}).items()
440 },
441 how_generated=how_generated, # iid, mutation, etc.
442 features={
443 **{
444 f"target:{k}".strip(":"): v for k, v in data.target_observations.items()
445 },
446 **data.events,
447 },
448 coverage=coverage,
449 timing=timing,
450 metadata=ObservationMetadata(
451 **{
452 "traceback": data.expected_traceback,
453 "reproduction_decorator": (
454 reproduction_decorator(data.choices) if status == "failed" else None
455 ),
456 "predicates": dict(data._observability_predicates),
457 "backend": backend_metadata or {},
458 "data_status": data.status,
459 "interesting_origin": data.interesting_origin,
460 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None,
461 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None,
462 **_system_metadata(),
463 # unpack last so it takes precedence for duplicate keys
464 **(metadata or {}),
465 }
466 ),
467 run_start=run_start,
468 property=property,
469 )
470
471
472_WROTE_TO = set()
473_deliver_to_file_lock = Lock()
474
475
476def _deliver_to_file(
477 observation: Observation, thread_id: int
478) -> None: # pragma: no cover
479 from hypothesis.strategies._internal.utils import to_jsonable
480
481 kind = "testcases" if observation.type == "test_case" else "info"
482 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl")
483 fname.parent.mkdir(exist_ok=True, parents=True)
484
485 observation_bytes = (
486 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n"
487 )
488 # only allow one conccurent file write to avoid write races. This is likely to make
489 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue
490 # would be an improvement, but that requires a background thread, and I
491 # would prefer to avoid a thread in the single-threaded case. We could
492 # switch over to a queue if we detect multithreading, but it's tricky to get
493 # right.
494 with _deliver_to_file_lock:
495 _WROTE_TO.add(fname)
496 with fname.open(mode="a") as f:
497 f.write(observation_bytes)
498
499
500_imported_at = time.time()
501
502
503@lru_cache
504def _system_metadata() -> dict[str, Any]:
505 return {
506 "sys_argv": sys.argv,
507 "os_getpid": os.getpid(),
508 "imported_at": _imported_at,
509 }
510
511
512#: If ``False``, do not collect coverage information when observability is enabled.
513#:
514#: This is exposed both for performance (as coverage collection can be slow on
515#: Python 3.11 and earlier) and size (if you do not use coverage information,
516#: you may not want to store it in-memory).
517OBSERVABILITY_COLLECT_COVERAGE = (
518 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ
519)
520#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys
521#: in test case observations.
522#:
523#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be
524#: a substantial amount of data, and so must be opted-in to, even when
525#: observability is enabled.
526#:
527#: .. warning::
528#:
529#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better
530#: interface for this as of June 2025, and this attribute may disappear or
531#: be renamed without notice.
532#:
533OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ
534
535if OBSERVABILITY_COLLECT_COVERAGE is False and (
536 sys.version_info[:2] >= (3, 12)
537): # pragma: no cover
538 warnings.warn(
539 "Coverage data collection should be quite fast in Python 3.12 or later "
540 "so there should be no need to turn coverage reporting off.",
541 HypothesisWarning,
542 stacklevel=2,
543 )
544
545if (
546 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ
547 or OBSERVABILITY_COLLECT_COVERAGE is False
548): # pragma: no cover
549 add_observability_callback(_deliver_to_file, all_threads=True)
550
551 # Remove files more than a week old, to cap the size on disk
552 max_age = (date.today() - timedelta(days=8)).isoformat()
553 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"):
554 if f.stem < max_age: # pragma: no branch
555 f.unlink(missing_ok=True)