1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11"""Observability tools to spit out analysis-ready tables, one row per test case."""
12
13import base64
14import dataclasses
15import json
16import math
17import os
18import sys
19import time
20import warnings
21from collections.abc import Generator
22from contextlib import contextmanager
23from dataclasses import dataclass
24from datetime import date, timedelta
25from functools import lru_cache
26from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast
27
28from hypothesis.configuration import storage_directory
29from hypothesis.errors import HypothesisWarning
30from hypothesis.internal.conjecture.choice import (
31 BooleanConstraints,
32 BytesConstraints,
33 ChoiceConstraintsT,
34 ChoiceNode,
35 ChoiceT,
36 ChoiceTypeT,
37 FloatConstraints,
38 IntegerConstraints,
39 StringConstraints,
40)
41from hypothesis.internal.escalation import InterestingOrigin
42from hypothesis.internal.floats import float_to_int
43from hypothesis.internal.intervalsets import IntervalSet
44
45if TYPE_CHECKING:
46 from typing import TypeAlias
47
48 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status
49
50
51@dataclass
52class PredicateCounts:
53 satisfied: int = 0
54 unsatisfied: int = 0
55
56 def update_count(self, *, condition: bool) -> None:
57 if condition:
58 self.satisfied += 1
59 else:
60 self.unsatisfied += 1
61
62
63def _choice_to_json(choice: Union[ChoiceT, None]) -> Any:
64 if choice is None:
65 return None
66 # see the note on the same check in to_jsonable for why we cast large
67 # integers to floats.
68 if (
69 isinstance(choice, int)
70 and not isinstance(choice, bool)
71 and abs(choice) >= 2**63
72 ):
73 return ["integer", str(choice)]
74 elif isinstance(choice, bytes):
75 return ["bytes", base64.b64encode(choice).decode()]
76 elif isinstance(choice, float) and math.isnan(choice):
77 # handle nonstandard nan bit patterns. We don't need to do this for -0.0
78 # vs 0.0 since json doesn't normalize -0.0 to 0.0.
79 return ["float", float_to_int(choice)]
80 return choice
81
82
83def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]:
84 return [_choice_to_json(choice) for choice in choices]
85
86
87def _constraints_to_json(
88 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
89) -> dict[str, Any]:
90 constraints = constraints.copy()
91 if choice_type == "integer":
92 constraints = cast(IntegerConstraints, constraints)
93 return {
94 "min_value": _choice_to_json(constraints["min_value"]),
95 "max_value": _choice_to_json(constraints["max_value"]),
96 "weights": (
97 None
98 if constraints["weights"] is None
99 # wrap up in a list, instead of a dict, because json dicts
100 # require string keys
101 else [
102 (_choice_to_json(k), v) for k, v in constraints["weights"].items()
103 ]
104 ),
105 "shrink_towards": _choice_to_json(constraints["shrink_towards"]),
106 }
107 elif choice_type == "float":
108 constraints = cast(FloatConstraints, constraints)
109 return {
110 "min_value": _choice_to_json(constraints["min_value"]),
111 "max_value": _choice_to_json(constraints["max_value"]),
112 "allow_nan": constraints["allow_nan"],
113 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"],
114 }
115 elif choice_type == "string":
116 constraints = cast(StringConstraints, constraints)
117 assert isinstance(constraints["intervals"], IntervalSet)
118 return {
119 "intervals": constraints["intervals"].intervals,
120 "min_size": _choice_to_json(constraints["min_size"]),
121 "max_size": _choice_to_json(constraints["max_size"]),
122 }
123 elif choice_type == "bytes":
124 constraints = cast(BytesConstraints, constraints)
125 return {
126 "min_size": _choice_to_json(constraints["min_size"]),
127 "max_size": _choice_to_json(constraints["max_size"]),
128 }
129 elif choice_type == "boolean":
130 constraints = cast(BooleanConstraints, constraints)
131 return {
132 "p": constraints["p"],
133 }
134 else:
135 raise NotImplementedError(f"unknown choice type {choice_type}")
136
137
138def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]:
139 return [
140 {
141 "type": node.type,
142 "value": _choice_to_json(node.value),
143 "constraints": _constraints_to_json(node.type, node.constraints),
144 "was_forced": node.was_forced,
145 }
146 for node in nodes
147 ]
148
149
150@dataclass
151class ObservationMetadata:
152 traceback: Optional[str]
153 reproduction_decorator: Optional[str]
154 predicates: dict[str, PredicateCounts]
155 backend: dict[str, Any]
156 sys_argv: list[str]
157 os_getpid: int
158 imported_at: float
159 data_status: "Status"
160 interesting_origin: Optional[InterestingOrigin]
161 choice_nodes: Optional[tuple[ChoiceNode, ...]]
162 choice_spans: Optional["Spans"]
163
164 def to_json(self) -> dict[str, Any]:
165 data = {
166 "traceback": self.traceback,
167 "reproduction_decorator": self.reproduction_decorator,
168 "predicates": self.predicates,
169 "backend": self.backend,
170 "sys.argv": self.sys_argv,
171 "os.getpid()": self.os_getpid,
172 "imported_at": self.imported_at,
173 "data_status": self.data_status,
174 "interesting_origin": self.interesting_origin,
175 "choice_nodes": (
176 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes)
177 ),
178 "choice_spans": (
179 None
180 if self.choice_spans is None
181 else [
182 (
183 # span.label is an int, but cast to string to avoid conversion
184 # to float (and loss of precision) for large label values.
185 #
186 # The value of this label is opaque to consumers anyway, so its
187 # type shouldn't matter as long as it's consistent.
188 str(span.label),
189 span.start,
190 span.end,
191 span.discarded,
192 )
193 for span in self.choice_spans
194 ]
195 ),
196 }
197 # check that we didn't forget one
198 assert len(data) == len(dataclasses.fields(self))
199 return data
200
201
202@dataclass
203class BaseObservation:
204 type: Literal["test_case", "info", "alert", "error"]
205 property: str
206 run_start: float
207
208
209InfoObservationType = Literal["info", "alert", "error"]
210TestCaseStatus = Literal["gave_up", "passed", "failed"]
211
212
213@dataclass
214class InfoObservation(BaseObservation):
215 type: InfoObservationType
216 title: str
217 content: Union[str, dict]
218
219
220@dataclass
221class TestCaseObservation(BaseObservation):
222 __test__ = False # no! bad pytest!
223
224 type: Literal["test_case"]
225 status: TestCaseStatus
226 status_reason: str
227 representation: str
228 arguments: dict
229 how_generated: str
230 features: dict
231 coverage: Optional[dict[str, list[int]]]
232 timing: dict[str, float]
233 metadata: ObservationMetadata
234
235
236Observation: "TypeAlias" = Union[InfoObservation, TestCaseObservation]
237
238#: A list of callback functions for :ref:`observability <observability>`. Whenever
239#: a new observation is created, each function in this list will be called with a
240#: single value, which is a dictionary representing that observation.
241#:
242#: You can append a function to this list to receive observability reports, and
243#: remove that function from the list to stop receiving observability reports.
244#: Observability is considered enabled if this list is nonempty.
245TESTCASE_CALLBACKS: list[Callable[[Observation], None]] = []
246
247
248@contextmanager
249def with_observation_callback(
250 callback: Callable[[Observation], None],
251) -> Generator[None, None, None]:
252 TESTCASE_CALLBACKS.append(callback)
253 try:
254 yield
255 finally:
256 TESTCASE_CALLBACKS.remove(callback)
257
258
259def deliver_observation(observation: Observation) -> None:
260 for callback in TESTCASE_CALLBACKS:
261 callback(observation)
262
263
264def make_testcase(
265 *,
266 run_start: float,
267 property: str,
268 data: "ConjectureData",
269 how_generated: str,
270 representation: str = "<unknown>",
271 arguments: Optional[dict] = None,
272 timing: dict[str, float],
273 coverage: Optional[dict[str, list[int]]] = None,
274 phase: Optional[str] = None,
275 backend_metadata: Optional[dict[str, Any]] = None,
276 status: Optional[
277 Union[TestCaseStatus, "Status"]
278 ] = None, # overrides automatic calculation
279 status_reason: Optional[str] = None, # overrides automatic calculation
280 # added to calculated metadata. If keys overlap, the value from this `metadata`
281 # is used
282 metadata: Optional[dict[str, Any]] = None,
283) -> TestCaseObservation:
284 from hypothesis.core import reproduction_decorator
285 from hypothesis.internal.conjecture.data import Status
286
287 # We should only be sending observability reports for datas that have finished
288 # being modified.
289 assert data.frozen
290
291 if status_reason is not None:
292 pass
293 elif data.interesting_origin:
294 status_reason = str(data.interesting_origin)
295 elif phase == "shrink" and data.status == Status.OVERRUN:
296 status_reason = "exceeded size of current best example"
297 else:
298 status_reason = str(data.events.pop("invalid because", ""))
299
300 status_map: dict[Status, TestCaseStatus] = {
301 Status.OVERRUN: "gave_up",
302 Status.INVALID: "gave_up",
303 Status.VALID: "passed",
304 Status.INTERESTING: "failed",
305 }
306
307 if status is not None and isinstance(status, Status):
308 status = status_map[status]
309
310 return TestCaseObservation(
311 type="test_case",
312 status=status if status is not None else status_map[data.status],
313 status_reason=status_reason,
314 representation=representation,
315 arguments={
316 k.removeprefix("generate:"): v for k, v in (arguments or {}).items()
317 },
318 how_generated=how_generated, # iid, mutation, etc.
319 features={
320 **{
321 f"target:{k}".strip(":"): v for k, v in data.target_observations.items()
322 },
323 **data.events,
324 },
325 coverage=coverage,
326 timing=timing,
327 metadata=ObservationMetadata(
328 **{
329 "traceback": data.expected_traceback,
330 "reproduction_decorator": (
331 reproduction_decorator(data.choices) if status == "failed" else None
332 ),
333 "predicates": dict(data._observability_predicates),
334 "backend": backend_metadata or {},
335 "data_status": data.status,
336 "interesting_origin": data.interesting_origin,
337 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None,
338 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None,
339 **_system_metadata(),
340 # unpack last so it takes precedence for duplicate keys
341 **(metadata or {}),
342 }
343 ),
344 run_start=run_start,
345 property=property,
346 )
347
348
349_WROTE_TO = set()
350
351
352def _deliver_to_file(observation: Observation) -> None: # pragma: no cover
353 from hypothesis.strategies._internal.utils import to_jsonable
354
355 kind = "testcases" if observation.type == "test_case" else "info"
356 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl")
357 fname.parent.mkdir(exist_ok=True, parents=True)
358 _WROTE_TO.add(fname)
359 with fname.open(mode="a") as f:
360 f.write(json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n")
361
362
363_imported_at = time.time()
364
365
366@lru_cache
367def _system_metadata() -> dict[str, Any]:
368 return {
369 "sys_argv": sys.argv,
370 "os_getpid": os.getpid(),
371 "imported_at": _imported_at,
372 }
373
374
375#: If ``False``, do not collect coverage information when observability is enabled.
376#:
377#: This is exposed both for performance (as coverage collection can be slow on
378#: Python 3.11 and earlier) and size (if you do not use coverage information,
379#: you may not want to store it in-memory).
380OBSERVABILITY_COLLECT_COVERAGE = (
381 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ
382)
383#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys
384#: in test case observations.
385#:
386#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be
387#: a substantial amount of data, and so must be opted-in to, even when
388#: observability is enabled.
389#:
390#: .. warning::
391#:
392#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better
393#: interface for this as of June 2025, and this attribute may disappear or
394#: be renamed without notice.
395#:
396OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ
397
398if OBSERVABILITY_COLLECT_COVERAGE is False and (
399 sys.version_info[:2] >= (3, 12)
400): # pragma: no cover
401 warnings.warn(
402 "Coverage data collection should be quite fast in Python 3.12 or later "
403 "so there should be no need to turn coverage reporting off.",
404 HypothesisWarning,
405 stacklevel=2,
406 )
407
408if (
409 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ
410 or OBSERVABILITY_COLLECT_COVERAGE is False
411): # pragma: no cover
412 TESTCASE_CALLBACKS.append(_deliver_to_file)
413
414 # Remove files more than a week old, to cap the size on disk
415 max_age = (date.today() - timedelta(days=8)).isoformat()
416 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"):
417 if f.stem < max_age: # pragma: no branch
418 f.unlink(missing_ok=True)