1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11"""Observability tools to spit out analysis-ready tables, one row per test case."""
12
13import base64
14import dataclasses
15import json
16import math
17import os
18import sys
19import threading
20import time
21import warnings
22from collections.abc import Callable, Generator
23from contextlib import contextmanager
24from dataclasses import dataclass
25from datetime import date, timedelta
26from functools import lru_cache
27from threading import Lock
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 Literal,
32 Optional,
33 TypeAlias,
34 Union,
35 cast,
36)
37
38from hypothesis.configuration import storage_directory
39from hypothesis.errors import HypothesisWarning
40from hypothesis.internal.conjecture.choice import (
41 BooleanConstraints,
42 BytesConstraints,
43 ChoiceConstraintsT,
44 ChoiceNode,
45 ChoiceT,
46 ChoiceTypeT,
47 FloatConstraints,
48 IntegerConstraints,
49 StringConstraints,
50)
51from hypothesis.internal.escalation import InterestingOrigin
52from hypothesis.internal.floats import float_to_int
53from hypothesis.internal.intervalsets import IntervalSet
54from hypothesis.utils.deprecation import note_deprecation
55
56if TYPE_CHECKING:
57 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status
58
59
60Observation: TypeAlias = Union["InfoObservation", "TestCaseObservation"]
61CallbackThreadT: TypeAlias = Callable[[Observation], None]
62# for all_threads=True, we pass the thread id as well.
63CallbackAllThreadsT: TypeAlias = Callable[[Observation, int], None]
64CallbackT: TypeAlias = CallbackThreadT | CallbackAllThreadsT
65
66# thread_id: list[callback]
67_callbacks: dict[int | None, list[CallbackThreadT]] = {}
68# callbacks where all_threads=True was set
69_callbacks_all_threads: list[CallbackAllThreadsT] = []
70
71
72@dataclass(slots=True, frozen=False)
73class PredicateCounts:
74 satisfied: int = 0
75 unsatisfied: int = 0
76
77 def update_count(self, *, condition: bool) -> None:
78 if condition:
79 self.satisfied += 1
80 else:
81 self.unsatisfied += 1
82
83
84def _choice_to_json(choice: ChoiceT | None) -> Any:
85 if choice is None:
86 return None
87 # see the note on the same check in to_jsonable for why we cast large
88 # integers to floats.
89 if (
90 isinstance(choice, int)
91 and not isinstance(choice, bool)
92 and abs(choice) >= 2**63
93 ):
94 return ["integer", str(choice)]
95 elif isinstance(choice, bytes):
96 return ["bytes", base64.b64encode(choice).decode()]
97 elif isinstance(choice, float) and math.isnan(choice):
98 # handle nonstandard nan bit patterns. We don't need to do this for -0.0
99 # vs 0.0 since json doesn't normalize -0.0 to 0.0.
100 return ["float", float_to_int(choice)]
101 return choice
102
103
104def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]:
105 return [_choice_to_json(choice) for choice in choices]
106
107
108def _constraints_to_json(
109 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT
110) -> dict[str, Any]:
111 constraints = constraints.copy()
112 if choice_type == "integer":
113 constraints = cast(IntegerConstraints, constraints)
114 return {
115 "min_value": _choice_to_json(constraints["min_value"]),
116 "max_value": _choice_to_json(constraints["max_value"]),
117 "weights": (
118 None
119 if constraints["weights"] is None
120 # wrap up in a list, instead of a dict, because json dicts
121 # require string keys
122 else [
123 (_choice_to_json(k), v) for k, v in constraints["weights"].items()
124 ]
125 ),
126 "shrink_towards": _choice_to_json(constraints["shrink_towards"]),
127 }
128 elif choice_type == "float":
129 constraints = cast(FloatConstraints, constraints)
130 return {
131 "min_value": _choice_to_json(constraints["min_value"]),
132 "max_value": _choice_to_json(constraints["max_value"]),
133 "allow_nan": constraints["allow_nan"],
134 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"],
135 }
136 elif choice_type == "string":
137 constraints = cast(StringConstraints, constraints)
138 assert isinstance(constraints["intervals"], IntervalSet)
139 return {
140 "intervals": constraints["intervals"].intervals,
141 "min_size": _choice_to_json(constraints["min_size"]),
142 "max_size": _choice_to_json(constraints["max_size"]),
143 }
144 elif choice_type == "bytes":
145 constraints = cast(BytesConstraints, constraints)
146 return {
147 "min_size": _choice_to_json(constraints["min_size"]),
148 "max_size": _choice_to_json(constraints["max_size"]),
149 }
150 elif choice_type == "boolean":
151 constraints = cast(BooleanConstraints, constraints)
152 return {
153 "p": constraints["p"],
154 }
155 else:
156 raise NotImplementedError(f"unknown choice type {choice_type}")
157
158
159def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]:
160 return [
161 {
162 "type": node.type,
163 "value": _choice_to_json(node.value),
164 "constraints": _constraints_to_json(node.type, node.constraints),
165 "was_forced": node.was_forced,
166 }
167 for node in nodes
168 ]
169
170
171@dataclass(slots=True, frozen=True)
172class ObservationMetadata:
173 traceback: str | None
174 reproduction_decorator: str | None
175 predicates: dict[str, PredicateCounts]
176 backend: dict[str, Any]
177 sys_argv: list[str]
178 os_getpid: int
179 imported_at: float
180 data_status: "Status"
181 phase: str
182 interesting_origin: InterestingOrigin | None
183 choice_nodes: tuple[ChoiceNode, ...] | None
184 choice_spans: Optional["Spans"]
185
186 def to_json(self) -> dict[str, Any]:
187 data = {
188 "traceback": self.traceback,
189 "reproduction_decorator": self.reproduction_decorator,
190 "predicates": self.predicates,
191 "backend": self.backend,
192 "sys.argv": self.sys_argv,
193 "os.getpid()": self.os_getpid,
194 "imported_at": self.imported_at,
195 "data_status": self.data_status,
196 "phase": self.phase,
197 "interesting_origin": self.interesting_origin,
198 "choice_nodes": (
199 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes)
200 ),
201 "choice_spans": (
202 None
203 if self.choice_spans is None
204 else [
205 (
206 # span.label is an int, but cast to string to avoid conversion
207 # to float (and loss of precision) for large label values.
208 #
209 # The value of this label is opaque to consumers anyway, so its
210 # type shouldn't matter as long as it's consistent.
211 str(span.label),
212 span.start,
213 span.end,
214 span.discarded,
215 )
216 for span in self.choice_spans
217 ]
218 ),
219 }
220 # check that we didn't forget one
221 assert len(data) == len(dataclasses.fields(self))
222 return data
223
224
225@dataclass(slots=True, frozen=True)
226class BaseObservation:
227 type: Literal["test_case", "info", "alert", "error"]
228 property: str
229 run_start: float
230
231
232InfoObservationType = Literal["info", "alert", "error"]
233TestCaseStatus = Literal["gave_up", "passed", "failed"]
234
235
236@dataclass(slots=True, frozen=True)
237class InfoObservation(BaseObservation):
238 type: InfoObservationType
239 title: str
240 content: str | dict
241
242
243@dataclass(slots=True, frozen=True)
244class TestCaseObservation(BaseObservation):
245 __test__ = False # no! bad pytest!
246
247 type: Literal["test_case"]
248 status: TestCaseStatus
249 status_reason: str
250 representation: str
251 arguments: dict
252 how_generated: str
253 features: dict
254 coverage: dict[str, list[int]] | None
255 timing: dict[str, float]
256 metadata: ObservationMetadata
257
258
259def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None:
260 """
261 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f``
262 should accept one argument, which is an observation. Whenever Hypothesis
263 produces a new observation, it calls each callback with that observation.
264
265 If Hypothesis tests are being run from multiple threads, callbacks are tracked
266 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f``
267 as an observability callback for observations produced on that thread.
268
269 If ``all_threads=True`` is passed, ``f`` will instead be registered as a
270 callback for all threads. This means it will be called for observations
271 generated by all threads, not just the thread which registered ``f`` as a
272 callback. In this case, ``f`` will be passed two arguments: the first is the
273 observation, and the second is the integer thread id from
274 :func:`python:threading.get_ident` where that observation was generated.
275
276 We recommend against registering ``f`` as a callback for both ``all_threads=True``
277 and the default ``all_threads=False``, due to unclear semantics with
278 |remove_observability_callback|.
279 """
280 if all_threads:
281 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f))
282 return
283
284 thread_id = threading.get_ident()
285 if thread_id not in _callbacks:
286 _callbacks[thread_id] = []
287
288 _callbacks[thread_id].append(cast(CallbackThreadT, f))
289
290
291def remove_observability_callback(f: CallbackT, /) -> None:
292 """
293 Removes ``f`` from the :ref:`observability <observability>` callbacks.
294
295 If ``f`` is not in the list of observability callbacks, silently do nothing.
296
297 If running under multiple threads, ``f`` will only be removed from the
298 callbacks for this thread.
299 """
300 if f in _callbacks_all_threads:
301 _callbacks_all_threads.remove(cast(CallbackAllThreadsT, f))
302
303 thread_id = threading.get_ident()
304 if thread_id not in _callbacks:
305 return
306
307 callbacks = _callbacks[thread_id]
308 if f in callbacks:
309 callbacks.remove(cast(CallbackThreadT, f))
310
311 if not callbacks:
312 del _callbacks[thread_id]
313
314
315def observability_enabled() -> bool:
316 """
317 Returns whether or not Hypothesis considers :ref:`observability <observability>`
318 to be enabled. Observability is enabled if there is at least one observability
319 callback present.
320
321 Callers might use this method to determine whether they should compute an
322 expensive representation that is only used under observability, for instance
323 by |alternative backends|.
324 """
325 return bool(_callbacks) or bool(_callbacks_all_threads)
326
327
328@contextmanager
329def with_observability_callback(
330 f: Callable[[Observation], None], /, *, all_threads: bool = False
331) -> Generator[None, None, None]:
332 """
333 A simple context manager which calls |add_observability_callback| on ``f``
334 when it enters and |remove_observability_callback| on ``f`` when it exits.
335 """
336 add_observability_callback(f, all_threads=all_threads)
337 try:
338 yield
339 finally:
340 remove_observability_callback(f)
341
342
343def deliver_observation(observation: Observation) -> None:
344 thread_id = threading.get_ident()
345
346 for callback in _callbacks.get(thread_id, []):
347 callback(observation)
348
349 for callback in _callbacks_all_threads:
350 callback(observation, thread_id)
351
352
353class _TestcaseCallbacks:
354 def __bool__(self):
355 self._note_deprecation()
356 return bool(_callbacks)
357
358 def _note_deprecation(self):
359 note_deprecation(
360 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. "
361 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, "
362 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and "
363 "bool(TESTCASE_CALLBACKS) with observability_enabled().",
364 since="2025-08-01",
365 has_codemod=False,
366 )
367
368 def append(self, f):
369 self._note_deprecation()
370 add_observability_callback(f)
371
372 def remove(self, f):
373 self._note_deprecation()
374 remove_observability_callback(f)
375
376
377#: .. warning::
378#:
379#: Deprecated in favor of |add_observability_callback|,
380#: |remove_observability_callback|, and |observability_enabled|.
381#:
382#: |TESTCASE_CALLBACKS| remains a thin compatibility
383#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those
384#: three methods. It is not an attempt to be fully compatible with the previous
385#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work
386#: anymore. Please update to using the new methods instead.
387#:
388#: |TESTCASE_CALLBACKS| will eventually be removed.
389TESTCASE_CALLBACKS = _TestcaseCallbacks()
390
391
392def make_testcase(
393 *,
394 run_start: float,
395 property: str,
396 data: "ConjectureData",
397 how_generated: str,
398 representation: str = "<unknown>",
399 timing: dict[str, float],
400 arguments: dict | None = None,
401 coverage: dict[str, list[int]] | None = None,
402 phase: str | None = None,
403 backend_metadata: dict[str, Any] | None = None,
404 status: (
405 Union[TestCaseStatus, "Status"] | None
406 ) = None, # overrides automatic calculation
407 status_reason: str | None = None, # overrides automatic calculation
408 # added to calculated metadata. If keys overlap, the value from this `metadata`
409 # is used
410 metadata: dict[str, Any] | None = None,
411) -> TestCaseObservation:
412 from hypothesis.core import reproduction_decorator
413 from hypothesis.internal.conjecture.data import Status
414
415 # We should only be sending observability reports for datas that have finished
416 # being modified.
417 assert data.frozen
418
419 if status_reason is not None:
420 pass
421 elif data.interesting_origin:
422 status_reason = str(data.interesting_origin)
423 elif phase == "shrink" and data.status == Status.OVERRUN:
424 status_reason = "exceeded size of current best example"
425 else:
426 status_reason = str(data.events.pop("invalid because", ""))
427
428 status_map: dict[Status, TestCaseStatus] = {
429 Status.OVERRUN: "gave_up",
430 Status.INVALID: "gave_up",
431 Status.VALID: "passed",
432 Status.INTERESTING: "failed",
433 }
434
435 if status is not None and isinstance(status, Status):
436 status = status_map[status]
437 if status is None:
438 status = status_map[data.status]
439
440 return TestCaseObservation(
441 type="test_case",
442 status=status,
443 status_reason=status_reason,
444 representation=representation,
445 arguments={
446 k.removeprefix("generate:"): v for k, v in (arguments or {}).items()
447 },
448 how_generated=how_generated, # iid, mutation, etc.
449 features={
450 **{
451 f"target:{k}".strip(":"): v for k, v in data.target_observations.items()
452 },
453 **data.events,
454 },
455 coverage=coverage,
456 timing=timing,
457 metadata=ObservationMetadata(
458 **{
459 "traceback": data.expected_traceback,
460 "reproduction_decorator": (
461 reproduction_decorator(data.choices) if status == "failed" else None
462 ),
463 "predicates": dict(data._observability_predicates),
464 "backend": backend_metadata or {},
465 "data_status": data.status,
466 "phase": phase,
467 "interesting_origin": data.interesting_origin,
468 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None,
469 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None,
470 **_system_metadata(),
471 # unpack last so it takes precedence for duplicate keys
472 **(metadata or {}),
473 }
474 ),
475 run_start=run_start,
476 property=property,
477 )
478
479
480_WROTE_TO = set()
481_deliver_to_file_lock = Lock()
482
483
484def _deliver_to_file(
485 observation: Observation, thread_id: int
486) -> None: # pragma: no cover
487 from hypothesis.strategies._internal.utils import to_jsonable
488
489 kind = "testcases" if observation.type == "test_case" else "info"
490 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl")
491 fname.parent.mkdir(exist_ok=True, parents=True)
492
493 observation_bytes = (
494 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n"
495 )
496 # only allow one conccurent file write to avoid write races. This is likely to make
497 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue
498 # would be an improvement, but that requires a background thread, and I
499 # would prefer to avoid a thread in the single-threaded case. We could
500 # switch over to a queue if we detect multithreading, but it's tricky to get
501 # right.
502 with _deliver_to_file_lock:
503 _WROTE_TO.add(fname)
504 with fname.open(mode="a") as f:
505 f.write(observation_bytes)
506
507
508_imported_at = time.time()
509
510
511@lru_cache
512def _system_metadata() -> dict[str, Any]:
513 return {
514 "sys_argv": sys.argv,
515 "os_getpid": os.getpid(),
516 "imported_at": _imported_at,
517 }
518
519
520#: If ``False``, do not collect coverage information when observability is enabled.
521#:
522#: This is exposed both for performance (as coverage collection can be slow on
523#: Python 3.11 and earlier) and size (if you do not use coverage information,
524#: you may not want to store it in-memory).
525OBSERVABILITY_COLLECT_COVERAGE = (
526 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ
527)
528#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys
529#: in test case observations.
530#:
531#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be
532#: a substantial amount of data, and so must be opted-in to, even when
533#: observability is enabled.
534#:
535#: .. warning::
536#:
537#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better
538#: interface for this as of June 2025, and this attribute may disappear or
539#: be renamed without notice.
540#:
541OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ
542
543if OBSERVABILITY_COLLECT_COVERAGE is False and (
544 sys.version_info[:2] >= (3, 12)
545): # pragma: no cover
546 warnings.warn(
547 "Coverage data collection should be quite fast in Python 3.12 or later "
548 "so there should be no need to turn coverage reporting off.",
549 HypothesisWarning,
550 stacklevel=2,
551 )
552
553if (
554 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ
555 or OBSERVABILITY_COLLECT_COVERAGE is False
556): # pragma: no cover
557 add_observability_callback(_deliver_to_file, all_threads=True)
558
559 # Remove files more than a week old, to cap the size on disk
560 max_age = (date.today() - timedelta(days=8)).isoformat()
561 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"):
562 if f.stem < max_age: # pragma: no branch
563 f.unlink(missing_ok=True)