1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11"""Observability tools to spit out analysis-ready tables, one row per test case."""
12
13import base64
14import dataclasses
15import json
16import math
17import os
18import sys
19import threading
20import time
21import warnings
22from collections.abc import Callable, Generator
23from contextlib import contextmanager
24from dataclasses import dataclass
25from datetime import date, timedelta
26from functools import lru_cache
27from threading import Lock
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 Literal,
32 Optional,
33 TypeAlias,
34 Union,
35 cast,
36)
37
38from hypothesis.configuration import storage_directory
39from hypothesis.errors import HypothesisWarning
40from hypothesis.internal.conjecture.choice import (
41 BooleanConstraints,
42 BytesConstraints,
43 ChoiceConstraintsT,
44 ChoiceNode,
45 ChoiceT,
46 ChoiceTypeT,
47 FloatConstraints,
48 IntegerConstraints,
49 StringConstraints,
50)
51from hypothesis.internal.escalation import InterestingOrigin
52from hypothesis.internal.floats import float_to_int
53from hypothesis.internal.intervalsets import IntervalSet
54
55if TYPE_CHECKING:
56 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status
57
58
59Observation: TypeAlias = Union["InfoObservation", "TestCaseObservation"]
60CallbackThreadT: TypeAlias = Callable[[Observation], None]
61# for all_threads=True, we pass the thread id as well.
62CallbackAllThreadsT: TypeAlias = Callable[[Observation, int], None]
63CallbackT: TypeAlias = CallbackThreadT | CallbackAllThreadsT
64
65# thread_id: list[callback]
66_callbacks: dict[int | None, list[CallbackThreadT]] = {}
67# callbacks where all_threads=True was set
68_callbacks_all_threads: list[CallbackAllThreadsT] = []
69
70
71@dataclass(slots=True, frozen=False)
72class PredicateCounts:
73 satisfied: int = 0
74 unsatisfied: int = 0
75
76 def update_count(self, *, condition: bool) -> None:
77 if condition:
78 self.satisfied += 1
79 else:
80 self.unsatisfied += 1
81
82
83def _choice_to_json(choice: ChoiceT | None) -> Any:
84 if choice is None:
85 return None
86 # see the note on the same check in to_jsonable for why we cast large
87 # integers to floats.
88 if (
89 isinstance(choice, int)
90 and not isinstance(choice, bool)
91 and abs(choice) >= 2**63
92 ):
93 return ["integer", str(choice)]
94 elif isinstance(choice, bytes):
95 return ["bytes", base64.b64encode(choice).decode()]
96 elif isinstance(choice, float) and math.isnan(choice):
97 # handle nonstandard nan bit patterns. We don't need to do this for -0.0
98 # vs 0.0 since json doesn't normalize -0.0 to 0.0.
99 return ["float", float_to_int(choice)]
100 return choice
101
102
103def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]:
104 return [_choice_to_json(choice) for choice in choices]
105
106
107def _constraints_to_json(
108 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
109) -> dict[str, Any]:
110 constraints = constraints.copy()
111 if choice_type == "integer":
112 constraints = cast(IntegerConstraints, constraints)
113 return {
114 "min_value": _choice_to_json(constraints["min_value"]),
115 "max_value": _choice_to_json(constraints["max_value"]),
116 "weights": (
117 None
118 if constraints["weights"] is None
119 # wrap up in a list, instead of a dict, because json dicts
120 # require string keys
121 else [
122 (_choice_to_json(k), v) for k, v in constraints["weights"].items()
123 ]
124 ),
125 "shrink_towards": _choice_to_json(constraints["shrink_towards"]),
126 }
127 elif choice_type == "float":
128 constraints = cast(FloatConstraints, constraints)
129 return {
130 "min_value": _choice_to_json(constraints["min_value"]),
131 "max_value": _choice_to_json(constraints["max_value"]),
132 "allow_nan": constraints["allow_nan"],
133 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"],
134 }
135 elif choice_type == "string":
136 constraints = cast(StringConstraints, constraints)
137 assert isinstance(constraints["intervals"], IntervalSet)
138 return {
139 "intervals": constraints["intervals"].intervals,
140 "min_size": _choice_to_json(constraints["min_size"]),
141 "max_size": _choice_to_json(constraints["max_size"]),
142 }
143 elif choice_type == "bytes":
144 constraints = cast(BytesConstraints, constraints)
145 return {
146 "min_size": _choice_to_json(constraints["min_size"]),
147 "max_size": _choice_to_json(constraints["max_size"]),
148 }
149 elif choice_type == "boolean":
150 constraints = cast(BooleanConstraints, constraints)
151 return {
152 "p": constraints["p"],
153 }
154 else:
155 raise NotImplementedError(f"unknown choice type {choice_type}")
156
157
158def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]:
159 return [
160 {
161 "type": node.type,
162 "value": _choice_to_json(node.value),
163 "constraints": _constraints_to_json(node.type, node.constraints),
164 "was_forced": node.was_forced,
165 }
166 for node in nodes
167 ]
168
169
170@dataclass(slots=True, frozen=True)
171class ObservationMetadata:
172 traceback: str | None
173 reproduction_decorator: str | None
174 predicates: dict[str, PredicateCounts]
175 backend: dict[str, Any]
176 sys_argv: list[str]
177 os_getpid: int
178 imported_at: float
179 data_status: "Status"
180 phase: str
181 interesting_origin: InterestingOrigin | None
182 choice_nodes: tuple[ChoiceNode, ...] | None
183 choice_spans: Optional["Spans"]
184
185 def to_json(self) -> dict[str, Any]:
186 data = {
187 "traceback": self.traceback,
188 "reproduction_decorator": self.reproduction_decorator,
189 "predicates": self.predicates,
190 "backend": self.backend,
191 "sys.argv": self.sys_argv,
192 "os.getpid()": self.os_getpid,
193 "imported_at": self.imported_at,
194 "data_status": self.data_status,
195 "phase": self.phase,
196 "interesting_origin": self.interesting_origin,
197 "choice_nodes": (
198 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes)
199 ),
200 "choice_spans": (
201 None
202 if self.choice_spans is None
203 else [
204 (
205 # span.label is an int, but cast to string to avoid conversion
206 # to float (and loss of precision) for large label values.
207 #
208 # The value of this label is opaque to consumers anyway, so its
209 # type shouldn't matter as long as it's consistent.
210 str(span.label),
211 span.start,
212 span.end,
213 span.discarded,
214 )
215 for span in self.choice_spans
216 ]
217 ),
218 }
219 # check that we didn't forget one
220 assert len(data) == len(dataclasses.fields(self))
221 return data
222
223
224@dataclass(slots=True, frozen=True)
225class BaseObservation:
226 type: Literal["test_case", "info", "alert", "error"]
227 property: str
228 run_start: float
229
230
231InfoObservationType = Literal["info", "alert", "error"]
232TestCaseStatus = Literal["gave_up", "passed", "failed"]
233
234
235@dataclass(slots=True, frozen=True)
236class InfoObservation(BaseObservation):
237 type: InfoObservationType
238 title: str
239 content: str | dict
240
241
242@dataclass(slots=True, frozen=True)
243class TestCaseObservation(BaseObservation):
244 __test__ = False # no! bad pytest!
245
246 type: Literal["test_case"]
247 status: TestCaseStatus
248 status_reason: str
249 representation: str
250 arguments: dict
251 how_generated: str
252 features: dict
253 coverage: dict[str, list[int]] | None
254 timing: dict[str, float]
255 metadata: ObservationMetadata
256
257
258def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None:
259 """
260 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f``
261 should accept one argument, which is an observation. Whenever Hypothesis
262 produces a new observation, it calls each callback with that observation.
263
264 If Hypothesis tests are being run from multiple threads, callbacks are tracked
265 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f``
266 as an observability callback for observations produced on that thread.
267
268 If ``all_threads=True`` is passed, ``f`` will instead be registered as a
269 callback for all threads. This means it will be called for observations
270 generated by all threads, not just the thread which registered ``f`` as a
271 callback. In this case, ``f`` will be passed two arguments: the first is the
272 observation, and the second is the integer thread id from
273 :func:`python:threading.get_ident` where that observation was generated.
274
275 We recommend against registering ``f`` as a callback for both ``all_threads=True``
276 and the default ``all_threads=False``, due to unclear semantics with
277 |remove_observability_callback|.
278 """
279 if all_threads:
280 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f))
281 return
282
283 thread_id = threading.get_ident()
284 if thread_id not in _callbacks:
285 _callbacks[thread_id] = []
286
287 _callbacks[thread_id].append(cast(CallbackThreadT, f))
288
289
290def remove_observability_callback(f: CallbackT, /) -> None:
291 """
292 Removes ``f`` from the :ref:`observability <observability>` callbacks.
293
294 If ``f`` is not in the list of observability callbacks, silently do nothing.
295
296 If running under multiple threads, ``f`` will only be removed from the
297 callbacks for this thread.
298 """
299 if f in _callbacks_all_threads:
300 _callbacks_all_threads.remove(cast(CallbackAllThreadsT, f))
301
302 thread_id = threading.get_ident()
303 if thread_id not in _callbacks:
304 return
305
306 callbacks = _callbacks[thread_id]
307 if f in callbacks:
308 callbacks.remove(cast(CallbackThreadT, f))
309
310 if not callbacks:
311 del _callbacks[thread_id]
312
313
314def observability_enabled() -> bool:
315 """
316 Returns whether or not Hypothesis considers :ref:`observability <observability>`
317 to be enabled. Observability is enabled if there is at least one observability
318 callback present.
319
320 Callers might use this method to determine whether they should compute an
321 expensive representation that is only used under observability, for instance
322 by |alternative backends|.
323 """
324 return bool(_callbacks) or bool(_callbacks_all_threads)
325
326
327@contextmanager
328def with_observability_callback(
329 f: Callable[[Observation], None], /, *, all_threads: bool = False
330) -> Generator[None, None, None]:
331 """
332 A simple context manager which calls |add_observability_callback| on ``f``
333 when it enters and |remove_observability_callback| on ``f`` when it exits.
334 """
335 add_observability_callback(f, all_threads=all_threads)
336 try:
337 yield
338 finally:
339 remove_observability_callback(f)
340
341
342def deliver_observation(observation: Observation) -> None:
343 thread_id = threading.get_ident()
344
345 for callback in _callbacks.get(thread_id, []):
346 callback(observation)
347
348 for callback in _callbacks_all_threads:
349 callback(observation, thread_id)
350
351
352class _TestcaseCallbacks:
353 def __bool__(self):
354 self._note_deprecation()
355 return bool(_callbacks)
356
357 def _note_deprecation(self):
358 from hypothesis._settings import note_deprecation
359
360 note_deprecation(
361 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. "
362 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, "
363 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and "
364 "bool(TESTCASE_CALLBACKS) with observability_enabled().",
365 since="2025-08-01",
366 has_codemod=False,
367 )
368
369 def append(self, f):
370 self._note_deprecation()
371 add_observability_callback(f)
372
373 def remove(self, f):
374 self._note_deprecation()
375 remove_observability_callback(f)
376
377
378#: .. warning::
379#:
380#: Deprecated in favor of |add_observability_callback|,
381#: |remove_observability_callback|, and |observability_enabled|.
382#:
383#: |TESTCASE_CALLBACKS| remains a thin compatibility
384#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those
385#: three methods. It is not an attempt to be fully compatible with the previous
386#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work
387#: anymore. Please update to using the new methods instead.
388#:
389#: |TESTCASE_CALLBACKS| will eventually be removed.
390TESTCASE_CALLBACKS = _TestcaseCallbacks()
391
392
393def make_testcase(
394 *,
395 run_start: float,
396 property: str,
397 data: "ConjectureData",
398 how_generated: str,
399 representation: str = "<unknown>",
400 timing: dict[str, float],
401 arguments: dict | None = None,
402 coverage: dict[str, list[int]] | None = None,
403 phase: str | None = None,
404 backend_metadata: dict[str, Any] | None = None,
405 status: (
406 Union[TestCaseStatus, "Status"] | None
407 ) = None, # overrides automatic calculation
408 status_reason: str | None = None, # overrides automatic calculation
409 # added to calculated metadata. If keys overlap, the value from this `metadata`
410 # is used
411 metadata: dict[str, Any] | None = None,
412) -> TestCaseObservation:
413 from hypothesis.core import reproduction_decorator
414 from hypothesis.internal.conjecture.data import Status
415
416 # We should only be sending observability reports for datas that have finished
417 # being modified.
418 assert data.frozen
419
420 if status_reason is not None:
421 pass
422 elif data.interesting_origin:
423 status_reason = str(data.interesting_origin)
424 elif phase == "shrink" and data.status == Status.OVERRUN:
425 status_reason = "exceeded size of current best example"
426 else:
427 status_reason = str(data.events.pop("invalid because", ""))
428
429 status_map: dict[Status, TestCaseStatus] = {
430 Status.OVERRUN: "gave_up",
431 Status.INVALID: "gave_up",
432 Status.VALID: "passed",
433 Status.INTERESTING: "failed",
434 }
435
436 if status is not None and isinstance(status, Status):
437 status = status_map[status]
438 if status is None:
439 status = status_map[data.status]
440
441 return TestCaseObservation(
442 type="test_case",
443 status=status,
444 status_reason=status_reason,
445 representation=representation,
446 arguments={
447 k.removeprefix("generate:"): v for k, v in (arguments or {}).items()
448 },
449 how_generated=how_generated, # iid, mutation, etc.
450 features={
451 **{
452 f"target:{k}".strip(":"): v for k, v in data.target_observations.items()
453 },
454 **data.events,
455 },
456 coverage=coverage,
457 timing=timing,
458 metadata=ObservationMetadata(
459 **{
460 "traceback": data.expected_traceback,
461 "reproduction_decorator": (
462 reproduction_decorator(data.choices) if status == "failed" else None
463 ),
464 "predicates": dict(data._observability_predicates),
465 "backend": backend_metadata or {},
466 "data_status": data.status,
467 "phase": phase,
468 "interesting_origin": data.interesting_origin,
469 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None,
470 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None,
471 **_system_metadata(),
472 # unpack last so it takes precedence for duplicate keys
473 **(metadata or {}),
474 }
475 ),
476 run_start=run_start,
477 property=property,
478 )
479
480
481_WROTE_TO = set()
482_deliver_to_file_lock = Lock()
483
484
485def _deliver_to_file(
486 observation: Observation, thread_id: int
487) -> None: # pragma: no cover
488 from hypothesis.strategies._internal.utils import to_jsonable
489
490 kind = "testcases" if observation.type == "test_case" else "info"
491 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl")
492 fname.parent.mkdir(exist_ok=True, parents=True)
493
494 observation_bytes = (
495 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n"
496 )
497 # only allow one conccurent file write to avoid write races. This is likely to make
498 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue
499 # would be an improvement, but that requires a background thread, and I
500 # would prefer to avoid a thread in the single-threaded case. We could
501 # switch over to a queue if we detect multithreading, but it's tricky to get
502 # right.
503 with _deliver_to_file_lock:
504 _WROTE_TO.add(fname)
505 with fname.open(mode="a") as f:
506 f.write(observation_bytes)
507
508
509_imported_at = time.time()
510
511
512@lru_cache
513def _system_metadata() -> dict[str, Any]:
514 return {
515 "sys_argv": sys.argv,
516 "os_getpid": os.getpid(),
517 "imported_at": _imported_at,
518 }
519
520
521#: If ``False``, do not collect coverage information when observability is enabled.
522#:
523#: This is exposed both for performance (as coverage collection can be slow on
524#: Python 3.11 and earlier) and size (if you do not use coverage information,
525#: you may not want to store it in-memory).
526OBSERVABILITY_COLLECT_COVERAGE = (
527 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ
528)
529#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys
530#: in test case observations.
531#:
532#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be
533#: a substantial amount of data, and so must be opted-in to, even when
534#: observability is enabled.
535#:
536#: .. warning::
537#:
538#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better
539#: interface for this as of June 2025, and this attribute may disappear or
540#: be renamed without notice.
541#:
542OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ
543
544if OBSERVABILITY_COLLECT_COVERAGE is False and (
545 sys.version_info[:2] >= (3, 12)
546): # pragma: no cover
547 warnings.warn(
548 "Coverage data collection should be quite fast in Python 3.12 or later "
549 "so there should be no need to turn coverage reporting off.",
550 HypothesisWarning,
551 stacklevel=2,
552 )
553
554if (
555 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ
556 or OBSERVABILITY_COLLECT_COVERAGE is False
557): # pragma: no cover
558 add_observability_callback(_deliver_to_file, all_threads=True)
559
560 # Remove files more than a week old, to cap the size on disk
561 max_age = (date.today() - timedelta(days=8)).isoformat()
562 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"):
563 if f.stem < max_age: # pragma: no branch
564 f.unlink(missing_ok=True)