1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11"""Observability tools to spit out analysis-ready tables, one row per test case."""
12
13import base64
14import dataclasses
15import json
16import math
17import os
18import sys
19import threading
20import time
21import warnings
22from collections.abc import Generator
23from contextlib import contextmanager
24from dataclasses import dataclass
25from datetime import date, timedelta
26from functools import lru_cache
27from threading import Lock
28from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast
29
30from hypothesis.configuration import storage_directory
31from hypothesis.errors import HypothesisWarning
32from hypothesis.internal.conjecture.choice import (
33 BooleanConstraints,
34 BytesConstraints,
35 ChoiceConstraintsT,
36 ChoiceNode,
37 ChoiceT,
38 ChoiceTypeT,
39 FloatConstraints,
40 IntegerConstraints,
41 StringConstraints,
42)
43from hypothesis.internal.escalation import InterestingOrigin
44from hypothesis.internal.floats import float_to_int
45from hypothesis.internal.intervalsets import IntervalSet
46
47if TYPE_CHECKING:
48 from typing import TypeAlias
49
50 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status
51
52
53Observation: "TypeAlias" = Union["InfoObservation", "TestCaseObservation"]
54CallbackThreadT: "TypeAlias" = Callable[[Observation], None]
55# for all_threads=True, we pass the thread id as well.
56CallbackAllThreadsT: "TypeAlias" = Callable[[Observation, int], None]
57CallbackT: "TypeAlias" = Union[CallbackThreadT, CallbackAllThreadsT]
58
59# thread_id: list[callback]
60_callbacks: dict[Optional[int], list[CallbackThreadT]] = {}
61# callbacks where all_threads=True was set
62_callbacks_all_threads: list[CallbackAllThreadsT] = []
63
64
65@dataclass
66class PredicateCounts:
67 satisfied: int = 0
68 unsatisfied: int = 0
69
70 def update_count(self, *, condition: bool) -> None:
71 if condition:
72 self.satisfied += 1
73 else:
74 self.unsatisfied += 1
75
76
77def _choice_to_json(choice: Union[ChoiceT, None]) -> Any:
78 if choice is None:
79 return None
80 # see the note on the same check in to_jsonable for why we cast large
81 # integers to floats.
82 if (
83 isinstance(choice, int)
84 and not isinstance(choice, bool)
85 and abs(choice) >= 2**63
86 ):
87 return ["integer", str(choice)]
88 elif isinstance(choice, bytes):
89 return ["bytes", base64.b64encode(choice).decode()]
90 elif isinstance(choice, float) and math.isnan(choice):
91 # handle nonstandard nan bit patterns. We don't need to do this for -0.0
92 # vs 0.0 since json doesn't normalize -0.0 to 0.0.
93 return ["float", float_to_int(choice)]
94 return choice
95
96
97def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]:
98 return [_choice_to_json(choice) for choice in choices]
99
100
101def _constraints_to_json(
102 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
103) -> dict[str, Any]:
104 constraints = constraints.copy()
105 if choice_type == "integer":
106 constraints = cast(IntegerConstraints, constraints)
107 return {
108 "min_value": _choice_to_json(constraints["min_value"]),
109 "max_value": _choice_to_json(constraints["max_value"]),
110 "weights": (
111 None
112 if constraints["weights"] is None
113 # wrap up in a list, instead of a dict, because json dicts
114 # require string keys
115 else [
116 (_choice_to_json(k), v) for k, v in constraints["weights"].items()
117 ]
118 ),
119 "shrink_towards": _choice_to_json(constraints["shrink_towards"]),
120 }
121 elif choice_type == "float":
122 constraints = cast(FloatConstraints, constraints)
123 return {
124 "min_value": _choice_to_json(constraints["min_value"]),
125 "max_value": _choice_to_json(constraints["max_value"]),
126 "allow_nan": constraints["allow_nan"],
127 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"],
128 }
129 elif choice_type == "string":
130 constraints = cast(StringConstraints, constraints)
131 assert isinstance(constraints["intervals"], IntervalSet)
132 return {
133 "intervals": constraints["intervals"].intervals,
134 "min_size": _choice_to_json(constraints["min_size"]),
135 "max_size": _choice_to_json(constraints["max_size"]),
136 }
137 elif choice_type == "bytes":
138 constraints = cast(BytesConstraints, constraints)
139 return {
140 "min_size": _choice_to_json(constraints["min_size"]),
141 "max_size": _choice_to_json(constraints["max_size"]),
142 }
143 elif choice_type == "boolean":
144 constraints = cast(BooleanConstraints, constraints)
145 return {
146 "p": constraints["p"],
147 }
148 else:
149 raise NotImplementedError(f"unknown choice type {choice_type}")
150
151
152def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]:
153 return [
154 {
155 "type": node.type,
156 "value": _choice_to_json(node.value),
157 "constraints": _constraints_to_json(node.type, node.constraints),
158 "was_forced": node.was_forced,
159 }
160 for node in nodes
161 ]
162
163
164@dataclass
165class ObservationMetadata:
166 traceback: Optional[str]
167 reproduction_decorator: Optional[str]
168 predicates: dict[str, PredicateCounts]
169 backend: dict[str, Any]
170 sys_argv: list[str]
171 os_getpid: int
172 imported_at: float
173 data_status: "Status"
174 phase: str
175 interesting_origin: Optional[InterestingOrigin]
176 choice_nodes: Optional[tuple[ChoiceNode, ...]]
177 choice_spans: Optional["Spans"]
178
179 def to_json(self) -> dict[str, Any]:
180 data = {
181 "traceback": self.traceback,
182 "reproduction_decorator": self.reproduction_decorator,
183 "predicates": self.predicates,
184 "backend": self.backend,
185 "sys.argv": self.sys_argv,
186 "os.getpid()": self.os_getpid,
187 "imported_at": self.imported_at,
188 "data_status": self.data_status,
189 "phase": self.phase,
190 "interesting_origin": self.interesting_origin,
191 "choice_nodes": (
192 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes)
193 ),
194 "choice_spans": (
195 None
196 if self.choice_spans is None
197 else [
198 (
199 # span.label is an int, but cast to string to avoid conversion
200 # to float (and loss of precision) for large label values.
201 #
202 # The value of this label is opaque to consumers anyway, so its
203 # type shouldn't matter as long as it's consistent.
204 str(span.label),
205 span.start,
206 span.end,
207 span.discarded,
208 )
209 for span in self.choice_spans
210 ]
211 ),
212 }
213 # check that we didn't forget one
214 assert len(data) == len(dataclasses.fields(self))
215 return data
216
217
218@dataclass
219class BaseObservation:
220 type: Literal["test_case", "info", "alert", "error"]
221 property: str
222 run_start: float
223
224
225InfoObservationType = Literal["info", "alert", "error"]
226TestCaseStatus = Literal["gave_up", "passed", "failed"]
227
228
229@dataclass
230class InfoObservation(BaseObservation):
231 type: InfoObservationType
232 title: str
233 content: Union[str, dict]
234
235
236@dataclass
237class TestCaseObservation(BaseObservation):
238 __test__ = False # no! bad pytest!
239
240 type: Literal["test_case"]
241 status: TestCaseStatus
242 status_reason: str
243 representation: str
244 arguments: dict
245 how_generated: str
246 features: dict
247 coverage: Optional[dict[str, list[int]]]
248 timing: dict[str, float]
249 metadata: ObservationMetadata
250
251
252def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None:
253 """
254 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f``
255 should accept one argument, which is an observation. Whenever Hypothesis
256 produces a new observation, it calls each callback with that observation.
257
258 If Hypothesis tests are being run from multiple threads, callbacks are tracked
259 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f``
260 as an observability callback for observations produced on that thread.
261
262 If ``all_threads=True`` is passed, ``f`` will instead be registered as a
263 callback for all threads. This means it will be called for observations
264 generated by all threads, not just the thread which registered ``f`` as a
265 callback. In this case, ``f`` will be passed two arguments: the first is the
266 observation, and the second is the integer thread id from
267 :func:`python:threading.get_ident` where that observation was generated.
268
269 We recommend against registering ``f`` as a callback for both ``all_threads=True``
270 and the default ``all_threads=False``, due to unclear semantics with
271 |remove_observability_callback|.
272 """
273 if all_threads:
274 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f))
275 return
276
277 thread_id = threading.get_ident()
278 if thread_id not in _callbacks:
279 _callbacks[thread_id] = []
280
281 _callbacks[thread_id].append(cast(CallbackThreadT, f))
282
283
284def remove_observability_callback(f: CallbackT, /) -> None:
285 """
286 Removes ``f`` from the :ref:`observability <observability>` callbacks.
287
288 If ``f`` is not in the list of observability callbacks, silently do nothing.
289
290 If running under multiple threads, ``f`` will only be removed from the
291 callbacks for this thread.
292 """
293 if f in _callbacks_all_threads:
294 _callbacks_all_threads.remove(cast(CallbackAllThreadsT, f))
295
296 thread_id = threading.get_ident()
297 if thread_id not in _callbacks:
298 return
299
300 callbacks = _callbacks[thread_id]
301 if f in callbacks:
302 callbacks.remove(cast(CallbackThreadT, f))
303
304 if not callbacks:
305 del _callbacks[thread_id]
306
307
308def observability_enabled() -> bool:
309 """
310 Returns whether or not Hypothesis considers :ref:`observability <observability>`
311 to be enabled. Observability is enabled if there is at least one observability
312 callback present.
313
314 Callers might use this method to determine whether they should compute an
315 expensive representation that is only used under observability, for instance
316 by |alternative backends|.
317 """
318 return bool(_callbacks) or bool(_callbacks_all_threads)
319
320
321@contextmanager
322def with_observability_callback(
323 f: Callable[[Observation], None], /, *, all_threads: bool = False
324) -> Generator[None, None, None]:
325 """
326 A simple context manager which calls |add_observability_callback| on ``f``
327 when it enters and |remove_observability_callback| on ``f`` when it exits.
328 """
329 add_observability_callback(f, all_threads=all_threads)
330 try:
331 yield
332 finally:
333 remove_observability_callback(f)
334
335
336def deliver_observation(observation: Observation) -> None:
337 thread_id = threading.get_ident()
338
339 for callback in _callbacks.get(thread_id, []):
340 callback(observation)
341
342 for callback in _callbacks_all_threads:
343 callback(observation, thread_id)
344
345
346class _TestcaseCallbacks:
347 def __bool__(self):
348 self._note_deprecation()
349 return bool(_callbacks)
350
351 def _note_deprecation(self):
352 from hypothesis._settings import note_deprecation
353
354 note_deprecation(
355 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. "
356 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, "
357 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and "
358 "bool(TESTCASE_CALLBACKS) with observability_enabled().",
359 since="2025-08-01",
360 has_codemod=False,
361 )
362
363 def append(self, f):
364 self._note_deprecation()
365 add_observability_callback(f)
366
367 def remove(self, f):
368 self._note_deprecation()
369 remove_observability_callback(f)
370
371
372#: .. warning::
373#:
374#: Deprecated in favor of |add_observability_callback|,
375#: |remove_observability_callback|, and |observability_enabled|.
376#:
377#: |TESTCASE_CALLBACKS| remains a thin compatibility
378#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those
379#: three methods. It is not an attempt to be fully compatible with the previous
380#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work
381#: anymore. Please update to using the new methods instead.
382#:
383#: |TESTCASE_CALLBACKS| will eventually be removed.
384TESTCASE_CALLBACKS = _TestcaseCallbacks()
385
386
387def make_testcase(
388 *,
389 run_start: float,
390 property: str,
391 data: "ConjectureData",
392 how_generated: str,
393 representation: str = "<unknown>",
394 timing: dict[str, float],
395 arguments: Optional[dict] = None,
396 coverage: Optional[dict[str, list[int]]] = None,
397 phase: Optional[str] = None,
398 backend_metadata: Optional[dict[str, Any]] = None,
399 status: Optional[
400 Union[TestCaseStatus, "Status"]
401 ] = None, # overrides automatic calculation
402 status_reason: Optional[str] = None, # overrides automatic calculation
403 # added to calculated metadata. If keys overlap, the value from this `metadata`
404 # is used
405 metadata: Optional[dict[str, Any]] = None,
406) -> TestCaseObservation:
407 from hypothesis.core import reproduction_decorator
408 from hypothesis.internal.conjecture.data import Status
409
410 # We should only be sending observability reports for datas that have finished
411 # being modified.
412 assert data.frozen
413
414 if status_reason is not None:
415 pass
416 elif data.interesting_origin:
417 status_reason = str(data.interesting_origin)
418 elif phase == "shrink" and data.status == Status.OVERRUN:
419 status_reason = "exceeded size of current best example"
420 else:
421 status_reason = str(data.events.pop("invalid because", ""))
422
423 status_map: dict[Status, TestCaseStatus] = {
424 Status.OVERRUN: "gave_up",
425 Status.INVALID: "gave_up",
426 Status.VALID: "passed",
427 Status.INTERESTING: "failed",
428 }
429
430 if status is not None and isinstance(status, Status):
431 status = status_map[status]
432 if status is None:
433 status = status_map[data.status]
434
435 return TestCaseObservation(
436 type="test_case",
437 status=status,
438 status_reason=status_reason,
439 representation=representation,
440 arguments={
441 k.removeprefix("generate:"): v for k, v in (arguments or {}).items()
442 },
443 how_generated=how_generated, # iid, mutation, etc.
444 features={
445 **{
446 f"target:{k}".strip(":"): v for k, v in data.target_observations.items()
447 },
448 **data.events,
449 },
450 coverage=coverage,
451 timing=timing,
452 metadata=ObservationMetadata(
453 **{
454 "traceback": data.expected_traceback,
455 "reproduction_decorator": (
456 reproduction_decorator(data.choices) if status == "failed" else None
457 ),
458 "predicates": dict(data._observability_predicates),
459 "backend": backend_metadata or {},
460 "data_status": data.status,
461 "phase": phase,
462 "interesting_origin": data.interesting_origin,
463 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None,
464 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None,
465 **_system_metadata(),
466 # unpack last so it takes precedence for duplicate keys
467 **(metadata or {}),
468 }
469 ),
470 run_start=run_start,
471 property=property,
472 )
473
474
475_WROTE_TO = set()
476_deliver_to_file_lock = Lock()
477
478
479def _deliver_to_file(
480 observation: Observation, thread_id: int
481) -> None: # pragma: no cover
482 from hypothesis.strategies._internal.utils import to_jsonable
483
484 kind = "testcases" if observation.type == "test_case" else "info"
485 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl")
486 fname.parent.mkdir(exist_ok=True, parents=True)
487
488 observation_bytes = (
489 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n"
490 )
491 # only allow one conccurent file write to avoid write races. This is likely to make
492 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue
493 # would be an improvement, but that requires a background thread, and I
494 # would prefer to avoid a thread in the single-threaded case. We could
495 # switch over to a queue if we detect multithreading, but it's tricky to get
496 # right.
497 with _deliver_to_file_lock:
498 _WROTE_TO.add(fname)
499 with fname.open(mode="a") as f:
500 f.write(observation_bytes)
501
502
503_imported_at = time.time()
504
505
506@lru_cache
507def _system_metadata() -> dict[str, Any]:
508 return {
509 "sys_argv": sys.argv,
510 "os_getpid": os.getpid(),
511 "imported_at": _imported_at,
512 }
513
514
515#: If ``False``, do not collect coverage information when observability is enabled.
516#:
517#: This is exposed both for performance (as coverage collection can be slow on
518#: Python 3.11 and earlier) and size (if you do not use coverage information,
519#: you may not want to store it in-memory).
520OBSERVABILITY_COLLECT_COVERAGE = (
521 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ
522)
523#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys
524#: in test case observations.
525#:
526#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be
527#: a substantial amount of data, and so must be opted-in to, even when
528#: observability is enabled.
529#:
530#: .. warning::
531#:
532#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better
533#: interface for this as of June 2025, and this attribute may disappear or
534#: be renamed without notice.
535#:
536OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ
537
538if OBSERVABILITY_COLLECT_COVERAGE is False and (
539 sys.version_info[:2] >= (3, 12)
540): # pragma: no cover
541 warnings.warn(
542 "Coverage data collection should be quite fast in Python 3.12 or later "
543 "so there should be no need to turn coverage reporting off.",
544 HypothesisWarning,
545 stacklevel=2,
546 )
547
548if (
549 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ
550 or OBSERVABILITY_COLLECT_COVERAGE is False
551): # pragma: no cover
552 add_observability_callback(_deliver_to_file, all_threads=True)
553
554 # Remove files more than a week old, to cap the size on disk
555 max_age = (date.today() - timedelta(days=8)).isoformat()
556 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"):
557 if f.stem < max_age: # pragma: no branch
558 f.unlink(missing_ok=True)