1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11"""Observability tools to spit out analysis-ready tables, one row per test case."""
12
13import base64
14import dataclasses
15import json
16import math
17import os
18import sys
19import threading
20import time
21import warnings
22from collections.abc import Callable, Generator
23from contextlib import contextmanager
24from dataclasses import dataclass
25from datetime import date, timedelta
26from functools import lru_cache
27from pathlib import Path
28from threading import Lock
29from typing import (
30 TYPE_CHECKING,
31 Any,
32 Literal,
33 Optional,
34 TypeAlias,
35 Union,
36 cast,
37)
38
39from hypothesis.configuration import storage_directory
40from hypothesis.errors import HypothesisWarning
41from hypothesis.internal.conjecture.choice import (
42 BooleanConstraints,
43 BytesConstraints,
44 ChoiceConstraintsT,
45 ChoiceNode,
46 ChoiceT,
47 ChoiceTypeT,
48 FloatConstraints,
49 IntegerConstraints,
50 StringConstraints,
51)
52from hypothesis.internal.escalation import InterestingOrigin
53from hypothesis.internal.floats import float_to_int
54from hypothesis.internal.intervalsets import IntervalSet
55from hypothesis.utils.deprecation import note_deprecation
56
57if TYPE_CHECKING:
58 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status
59
60
61Observation: TypeAlias = Union["InfoObservation", "TestCaseObservation"]
62CallbackThreadT: TypeAlias = Callable[[Observation], None]
63# for all_threads=True, we pass the thread id as well.
64CallbackAllThreadsT: TypeAlias = Callable[[Observation, int], None]
65CallbackT: TypeAlias = CallbackThreadT | CallbackAllThreadsT
66
67# thread_id: list[callback]
68_callbacks: dict[int | None, list[CallbackThreadT]] = {}
69# callbacks where all_threads=True was set
70_callbacks_all_threads: list[CallbackAllThreadsT] = []
71
72
73@dataclass(slots=True, frozen=False)
74class PredicateCounts:
75 satisfied: int = 0
76 unsatisfied: int = 0
77
78 def update_count(self, *, condition: bool) -> None:
79 if condition:
80 self.satisfied += 1
81 else:
82 self.unsatisfied += 1
83
84
85def _choice_to_json(choice: ChoiceT | None) -> Any:
86 if choice is None:
87 return None
88 # see the note on the same check in to_jsonable for why we cast large
89 # integers to floats.
90 if (
91 isinstance(choice, int)
92 and not isinstance(choice, bool)
93 and abs(choice) >= 2**63
94 ):
95 return ["integer", str(choice)]
96 elif isinstance(choice, bytes):
97 return ["bytes", base64.b64encode(choice).decode()]
98 elif isinstance(choice, float) and math.isnan(choice):
99 # handle nonstandard nan bit patterns. We don't need to do this for -0.0
100 # vs 0.0 since json doesn't normalize -0.0 to 0.0.
101 return ["float", float_to_int(choice)]
102 return choice
103
104
105def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]:
106 return [_choice_to_json(choice) for choice in choices]
107
108
109def _constraints_to_json(
110 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
111) -> dict[str, Any]:
112 constraints = constraints.copy()
113 if choice_type == "integer":
114 constraints = cast(IntegerConstraints, constraints)
115 return {
116 "min_value": _choice_to_json(constraints["min_value"]),
117 "max_value": _choice_to_json(constraints["max_value"]),
118 "weights": (
119 None
120 if constraints["weights"] is None
121 # wrap up in a list, instead of a dict, because json dicts
122 # require string keys
123 else [
124 (_choice_to_json(k), v) for k, v in constraints["weights"].items()
125 ]
126 ),
127 "shrink_towards": _choice_to_json(constraints["shrink_towards"]),
128 }
129 elif choice_type == "float":
130 constraints = cast(FloatConstraints, constraints)
131 return {
132 "min_value": _choice_to_json(constraints["min_value"]),
133 "max_value": _choice_to_json(constraints["max_value"]),
134 "allow_nan": constraints["allow_nan"],
135 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"],
136 }
137 elif choice_type == "string":
138 constraints = cast(StringConstraints, constraints)
139 assert isinstance(constraints["intervals"], IntervalSet)
140 return {
141 "intervals": constraints["intervals"].intervals,
142 "min_size": _choice_to_json(constraints["min_size"]),
143 "max_size": _choice_to_json(constraints["max_size"]),
144 }
145 elif choice_type == "bytes":
146 constraints = cast(BytesConstraints, constraints)
147 return {
148 "min_size": _choice_to_json(constraints["min_size"]),
149 "max_size": _choice_to_json(constraints["max_size"]),
150 }
151 elif choice_type == "boolean":
152 constraints = cast(BooleanConstraints, constraints)
153 return {
154 "p": constraints["p"],
155 }
156 else:
157 raise NotImplementedError(f"unknown choice type {choice_type}")
158
159
160def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]:
161 return [
162 {
163 "type": node.type,
164 "value": _choice_to_json(node.value),
165 "constraints": _constraints_to_json(node.type, node.constraints),
166 "was_forced": node.was_forced,
167 }
168 for node in nodes
169 ]
170
171
172@dataclass(slots=True, frozen=True)
173class ObservationMetadata:
174 traceback: str | None
175 reproduction_decorator: str | None
176 predicates: dict[str, PredicateCounts]
177 backend: dict[str, Any]
178 sys_argv: list[str]
179 os_getpid: int
180 imported_at: float
181 data_status: "Status"
182 phase: str
183 interesting_origin: InterestingOrigin | None
184 choice_nodes: tuple[ChoiceNode, ...] | None
185 choice_spans: Optional["Spans"]
186
187 def to_json(self) -> dict[str, Any]:
188 data = {
189 "traceback": self.traceback,
190 "reproduction_decorator": self.reproduction_decorator,
191 "predicates": self.predicates,
192 "backend": self.backend,
193 "sys.argv": self.sys_argv,
194 "os.getpid()": self.os_getpid,
195 "imported_at": self.imported_at,
196 "data_status": self.data_status,
197 "phase": self.phase,
198 "interesting_origin": self.interesting_origin,
199 "choice_nodes": (
200 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes)
201 ),
202 "choice_spans": (
203 None
204 if self.choice_spans is None
205 else [
206 (
207 # span.label is an int, but cast to string to avoid conversion
208 # to float (and loss of precision) for large label values.
209 #
210 # The value of this label is opaque to consumers anyway, so its
211 # type shouldn't matter as long as it's consistent.
212 str(span.label),
213 span.start,
214 span.end,
215 span.discarded,
216 )
217 for span in self.choice_spans
218 ]
219 ),
220 }
221 # check that we didn't forget one
222 assert len(data) == len(dataclasses.fields(self))
223 return data
224
225
226@dataclass(slots=True, frozen=True)
227class BaseObservation:
228 type: Literal["test_case", "info", "alert", "error"]
229 property: str
230 run_start: float
231
232
233InfoObservationType = Literal["info", "alert", "error"]
234TestCaseStatus = Literal["gave_up", "passed", "failed"]
235
236
237@dataclass(slots=True, frozen=True)
238class InfoObservation(BaseObservation):
239 type: InfoObservationType
240 title: str
241 content: str | dict
242
243
244@dataclass(slots=True, frozen=True)
245class TestCaseObservation(BaseObservation):
246 __test__ = False # no! bad pytest!
247
248 type: Literal["test_case"]
249 status: TestCaseStatus
250 status_reason: str
251 representation: str
252 arguments: dict
253 how_generated: str
254 features: dict
255 coverage: dict[str, list[int]] | None
256 timing: dict[str, float]
257 metadata: ObservationMetadata
258
259
260def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None:
261 """
262 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f``
263 should accept one argument, which is an observation. Whenever Hypothesis
264 produces a new observation, it calls each callback with that observation.
265
266 If Hypothesis tests are being run from multiple threads, callbacks are tracked
267 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f``
268 as an observability callback for observations produced on that thread.
269
270 If ``all_threads=True`` is passed, ``f`` will instead be registered as a
271 callback for all threads. This means it will be called for observations
272 generated by all threads, not just the thread which registered ``f`` as a
273 callback. In this case, ``f`` will be passed two arguments: the first is the
274 observation, and the second is the integer thread id from
275 :func:`python:threading.get_ident` where that observation was generated.
276
277 We recommend against registering ``f`` as a callback for both ``all_threads=True``
278 and the default ``all_threads=False``, due to unclear semantics with
279 |remove_observability_callback|.
280 """
281 if all_threads:
282 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f))
283 return
284
285 thread_id = threading.get_ident()
286 if thread_id not in _callbacks:
287 _callbacks[thread_id] = []
288
289 _callbacks[thread_id].append(cast(CallbackThreadT, f))
290
291
292def remove_observability_callback(f: CallbackT, /) -> None:
293 """
294 Removes ``f`` from the :ref:`observability <observability>` callbacks.
295
296 If ``f`` is not in the list of observability callbacks, silently do nothing.
297
298 If running under multiple threads, ``f`` will only be removed from the
299 callbacks for this thread.
300 """
301 if f in _callbacks_all_threads:
302 _callbacks_all_threads.remove(f)
303
304 thread_id = threading.get_ident()
305 if thread_id not in _callbacks:
306 return
307
308 callbacks = _callbacks[thread_id]
309 if f in callbacks:
310 callbacks.remove(f)
311
312 if not callbacks:
313 del _callbacks[thread_id]
314
315
316def observability_enabled() -> bool:
317 """
318 Returns whether or not Hypothesis considers :ref:`observability <observability>`
319 to be enabled. Observability is enabled if there is at least one observability
320 callback present.
321
322 Callers might use this method to determine whether they should compute an
323 expensive representation that is only used under observability, for instance
324 by |alternative backends|.
325 """
326 return bool(_callbacks) or bool(_callbacks_all_threads)
327
328
329@contextmanager
330def with_observability_callback(
331 f: Callable[[Observation], None], /, *, all_threads: bool = False
332) -> Generator[None, None, None]:
333 """
334 A simple context manager which calls |add_observability_callback| on ``f``
335 when it enters and |remove_observability_callback| on ``f`` when it exits.
336 """
337 add_observability_callback(f, all_threads=all_threads)
338 try:
339 yield
340 finally:
341 remove_observability_callback(f)
342
343
344def deliver_observation(observation: Observation) -> None:
345 thread_id = threading.get_ident()
346
347 for callback in _callbacks.get(thread_id, []):
348 callback(observation)
349
350 for callback in _callbacks_all_threads:
351 callback(observation, thread_id)
352
353
354class _TestcaseCallbacks:
355 def __bool__(self):
356 self._note_deprecation()
357 return bool(_callbacks)
358
359 def _note_deprecation(self):
360 note_deprecation(
361 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. "
362 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, "
363 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and "
364 "bool(TESTCASE_CALLBACKS) with observability_enabled().",
365 since="2025-08-01",
366 has_codemod=False,
367 )
368
369 def append(self, f):
370 self._note_deprecation()
371 add_observability_callback(f)
372
373 def remove(self, f):
374 self._note_deprecation()
375 remove_observability_callback(f)
376
377
378#: .. warning::
379#:
380#: Deprecated in favor of |add_observability_callback|,
381#: |remove_observability_callback|, and |observability_enabled|.
382#:
383#: |TESTCASE_CALLBACKS| remains a thin compatibility
384#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those
385#: three methods. It is not an attempt to be fully compatible with the previous
386#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work
387#: anymore. Please update to using the new methods instead.
388#:
389#: |TESTCASE_CALLBACKS| will eventually be removed.
390TESTCASE_CALLBACKS = _TestcaseCallbacks()
391
392
393def make_testcase(
394 *,
395 run_start: float,
396 property: str,
397 data: "ConjectureData",
398 how_generated: str,
399 representation: str = "<unknown>",
400 timing: dict[str, float],
401 arguments: dict | None = None,
402 coverage: dict[str, list[int]] | None = None,
403 phase: str | None = None,
404 backend_metadata: dict[str, Any] | None = None,
405 status: (
406 Union[TestCaseStatus, "Status"] | None
407 ) = None, # overrides automatic calculation
408 status_reason: str | None = None, # overrides automatic calculation
409 # added to calculated metadata. If keys overlap, the value from this `metadata`
410 # is used
411 metadata: dict[str, Any] | None = None,
412) -> TestCaseObservation:
413 from hypothesis.core import reproduction_decorator
414 from hypothesis.internal.conjecture.data import Status
415
416 # We should only be sending observability reports for datas that have finished
417 # being modified.
418 assert data.frozen
419
420 if status_reason is not None:
421 pass
422 elif data.interesting_origin:
423 status_reason = str(data.interesting_origin)
424 elif phase == "shrink" and data.status == Status.OVERRUN:
425 status_reason = "exceeded size of current best example"
426 else:
427 status_reason = str(data.events.pop("invalid because", ""))
428
429 status_map: dict[Status, TestCaseStatus] = {
430 Status.OVERRUN: "gave_up",
431 Status.INVALID: "gave_up",
432 Status.VALID: "passed",
433 Status.INTERESTING: "failed",
434 }
435
436 if status is not None and isinstance(status, Status):
437 status = status_map[status]
438 if status is None:
439 status = status_map[data.status]
440
441 return TestCaseObservation(
442 type="test_case",
443 status=status,
444 status_reason=status_reason,
445 representation=representation,
446 arguments={
447 k.removeprefix("generate:"): v for k, v in (arguments or {}).items()
448 },
449 how_generated=how_generated, # iid, mutation, etc.
450 features={
451 **{
452 f"target:{k}".strip(":"): v for k, v in data.target_observations.items()
453 },
454 **data.events,
455 },
456 coverage=coverage,
457 timing=timing,
458 metadata=ObservationMetadata(
459 **{
460 "traceback": data.expected_traceback,
461 "reproduction_decorator": (
462 reproduction_decorator(data.choices) if status == "failed" else None
463 ),
464 "predicates": dict(data._observability_predicates),
465 "backend": backend_metadata or {},
466 "data_status": data.status,
467 "phase": phase,
468 "interesting_origin": data.interesting_origin,
469 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None,
470 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None,
471 **_system_metadata(),
472 # unpack last so it takes precedence for duplicate keys
473 **(metadata or {}),
474 }
475 ),
476 run_start=run_start,
477 property=property,
478 )
479
480
481_WROTE_TO: set[Path] = set()
482_deliver_to_file_lock = Lock()
483
484
485def _deliver_to_file(
486 observation: Observation, thread_id: int
487) -> None: # pragma: no cover
488 from hypothesis.strategies._internal.utils import to_jsonable
489
490 kind = "testcases" if observation.type == "test_case" else "info"
491 observed_dir = storage_directory("observed")
492 observed_dir.create_if_missing()
493 observation_p = observed_dir.path / f"{date.today().isoformat()}_{kind}.jsonl"
494
495 observation_bytes = (
496 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n"
497 )
498 # only allow one conccurent file write to avoid write races. This is likely to make
499 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue
500 # would be an improvement, but that requires a background thread, and I
501 # would prefer to avoid a thread in the single-threaded case. We could
502 # switch over to a queue if we detect multithreading, but it's tricky to get
503 # right.
504 with _deliver_to_file_lock:
505 _WROTE_TO.add(observation_p)
506 with observation_p.open(mode="a") as f:
507 f.write(observation_bytes)
508
509
510_imported_at = time.time()
511
512
513@lru_cache
514def _system_metadata() -> dict[str, Any]:
515 return {
516 "sys_argv": sys.argv,
517 "os_getpid": os.getpid(),
518 "imported_at": _imported_at,
519 }
520
521
522#: If ``False``, do not collect coverage information when observability is enabled.
523#:
524#: This is exposed both for performance (as coverage collection can be slow on
525#: Python 3.11 and earlier) and size (if you do not use coverage information,
526#: you may not want to store it in-memory).
527OBSERVABILITY_COLLECT_COVERAGE = (
528 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ
529)
530#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys
531#: in test case observations.
532#:
533#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be
534#: a substantial amount of data, and so must be opted-in to, even when
535#: observability is enabled.
536#:
537#: .. warning::
538#:
539#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better
540#: interface for this as of June 2025, and this attribute may disappear or
541#: be renamed without notice.
542#:
543OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ
544
545if OBSERVABILITY_COLLECT_COVERAGE is False and (
546 sys.version_info[:2] >= (3, 12)
547): # pragma: no cover
548 warnings.warn(
549 "Coverage data collection should be quite fast in Python 3.12 or later "
550 "so there should be no need to turn coverage reporting off.",
551 HypothesisWarning,
552 stacklevel=2,
553 )
554
555if (
556 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ
557 or OBSERVABILITY_COLLECT_COVERAGE is False
558): # pragma: no cover
559 add_observability_callback(_deliver_to_file, all_threads=True)
560
561 # Remove files more than a week old, to cap the size on disk
562 max_age = (date.today() - timedelta(days=8)).isoformat()
563 for p in storage_directory("observed", intent_to_write=False).path.glob("*.jsonl"):
564 if p.stem < max_age: # pragma: no branch
565 p.unlink(missing_ok=True)