Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/observability.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

189 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""Observability tools to spit out analysis-ready tables, one row per test case.""" 

12 

13import base64 

14import dataclasses 

15import json 

16import math 

17import os 

18import sys 

19import threading 

20import time 

21import warnings 

22from collections.abc import Generator 

23from contextlib import contextmanager 

24from dataclasses import dataclass 

25from datetime import date, timedelta 

26from functools import lru_cache 

27from threading import Lock 

28from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast 

29 

30from hypothesis.configuration import storage_directory 

31from hypothesis.errors import HypothesisWarning 

32from hypothesis.internal.conjecture.choice import ( 

33 BooleanConstraints, 

34 BytesConstraints, 

35 ChoiceConstraintsT, 

36 ChoiceNode, 

37 ChoiceT, 

38 ChoiceTypeT, 

39 FloatConstraints, 

40 IntegerConstraints, 

41 StringConstraints, 

42) 

43from hypothesis.internal.escalation import InterestingOrigin 

44from hypothesis.internal.floats import float_to_int 

45from hypothesis.internal.intervalsets import IntervalSet 

46 

47if TYPE_CHECKING: 

48 from typing import TypeAlias 

49 

50 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status 

51 

52 

53Observation: "TypeAlias" = Union["InfoObservation", "TestCaseObservation"] 

54CallbackThreadT: "TypeAlias" = Callable[[Observation], None] 

55# for all_threads=True, we pass the thread id as well. 

56CallbackAllThreadsT: "TypeAlias" = Callable[[Observation, int], None] 

57CallbackT: "TypeAlias" = Union[CallbackThreadT, CallbackAllThreadsT] 

58 

59# thread_id: list[callback] 

60_callbacks: dict[Optional[int], list[CallbackThreadT]] = {} 

61# callbacks where all_threads=True was set 

62_callbacks_all_threads: list[CallbackAllThreadsT] = [] 

63 

64 

65@dataclass 

66class PredicateCounts: 

67 satisfied: int = 0 

68 unsatisfied: int = 0 

69 

70 def update_count(self, *, condition: bool) -> None: 

71 if condition: 

72 self.satisfied += 1 

73 else: 

74 self.unsatisfied += 1 

75 

76 

77def _choice_to_json(choice: Union[ChoiceT, None]) -> Any: 

78 if choice is None: 

79 return None 

80 # see the note on the same check in to_jsonable for why we cast large 

81 # integers to floats. 

82 if ( 

83 isinstance(choice, int) 

84 and not isinstance(choice, bool) 

85 and abs(choice) >= 2**63 

86 ): 

87 return ["integer", str(choice)] 

88 elif isinstance(choice, bytes): 

89 return ["bytes", base64.b64encode(choice).decode()] 

90 elif isinstance(choice, float) and math.isnan(choice): 

91 # handle nonstandard nan bit patterns. We don't need to do this for -0.0 

92 # vs 0.0 since json doesn't normalize -0.0 to 0.0. 

93 return ["float", float_to_int(choice)] 

94 return choice 

95 

96 

97def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]: 

98 return [_choice_to_json(choice) for choice in choices] 

99 

100 

101def _constraints_to_json( 

102 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

103) -> dict[str, Any]: 

104 constraints = constraints.copy() 

105 if choice_type == "integer": 

106 constraints = cast(IntegerConstraints, constraints) 

107 return { 

108 "min_value": _choice_to_json(constraints["min_value"]), 

109 "max_value": _choice_to_json(constraints["max_value"]), 

110 "weights": ( 

111 None 

112 if constraints["weights"] is None 

113 # wrap up in a list, instead of a dict, because json dicts 

114 # require string keys 

115 else [ 

116 (_choice_to_json(k), v) for k, v in constraints["weights"].items() 

117 ] 

118 ), 

119 "shrink_towards": _choice_to_json(constraints["shrink_towards"]), 

120 } 

121 elif choice_type == "float": 

122 constraints = cast(FloatConstraints, constraints) 

123 return { 

124 "min_value": _choice_to_json(constraints["min_value"]), 

125 "max_value": _choice_to_json(constraints["max_value"]), 

126 "allow_nan": constraints["allow_nan"], 

127 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"], 

128 } 

129 elif choice_type == "string": 

130 constraints = cast(StringConstraints, constraints) 

131 assert isinstance(constraints["intervals"], IntervalSet) 

132 return { 

133 "intervals": constraints["intervals"].intervals, 

134 "min_size": _choice_to_json(constraints["min_size"]), 

135 "max_size": _choice_to_json(constraints["max_size"]), 

136 } 

137 elif choice_type == "bytes": 

138 constraints = cast(BytesConstraints, constraints) 

139 return { 

140 "min_size": _choice_to_json(constraints["min_size"]), 

141 "max_size": _choice_to_json(constraints["max_size"]), 

142 } 

143 elif choice_type == "boolean": 

144 constraints = cast(BooleanConstraints, constraints) 

145 return { 

146 "p": constraints["p"], 

147 } 

148 else: 

149 raise NotImplementedError(f"unknown choice type {choice_type}") 

150 

151 

152def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]: 

153 return [ 

154 { 

155 "type": node.type, 

156 "value": _choice_to_json(node.value), 

157 "constraints": _constraints_to_json(node.type, node.constraints), 

158 "was_forced": node.was_forced, 

159 } 

160 for node in nodes 

161 ] 

162 

163 

164@dataclass 

165class ObservationMetadata: 

166 traceback: Optional[str] 

167 reproduction_decorator: Optional[str] 

168 predicates: dict[str, PredicateCounts] 

169 backend: dict[str, Any] 

170 sys_argv: list[str] 

171 os_getpid: int 

172 imported_at: float 

173 data_status: "Status" 

174 phase: str 

175 interesting_origin: Optional[InterestingOrigin] 

176 choice_nodes: Optional[tuple[ChoiceNode, ...]] 

177 choice_spans: Optional["Spans"] 

178 

179 def to_json(self) -> dict[str, Any]: 

180 data = { 

181 "traceback": self.traceback, 

182 "reproduction_decorator": self.reproduction_decorator, 

183 "predicates": self.predicates, 

184 "backend": self.backend, 

185 "sys.argv": self.sys_argv, 

186 "os.getpid()": self.os_getpid, 

187 "imported_at": self.imported_at, 

188 "data_status": self.data_status, 

189 "phase": self.phase, 

190 "interesting_origin": self.interesting_origin, 

191 "choice_nodes": ( 

192 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes) 

193 ), 

194 "choice_spans": ( 

195 None 

196 if self.choice_spans is None 

197 else [ 

198 ( 

199 # span.label is an int, but cast to string to avoid conversion 

200 # to float (and loss of precision) for large label values. 

201 # 

202 # The value of this label is opaque to consumers anyway, so its 

203 # type shouldn't matter as long as it's consistent. 

204 str(span.label), 

205 span.start, 

206 span.end, 

207 span.discarded, 

208 ) 

209 for span in self.choice_spans 

210 ] 

211 ), 

212 } 

213 # check that we didn't forget one 

214 assert len(data) == len(dataclasses.fields(self)) 

215 return data 

216 

217 

218@dataclass 

219class BaseObservation: 

220 type: Literal["test_case", "info", "alert", "error"] 

221 property: str 

222 run_start: float 

223 

224 

225InfoObservationType = Literal["info", "alert", "error"] 

226TestCaseStatus = Literal["gave_up", "passed", "failed"] 

227 

228 

229@dataclass 

230class InfoObservation(BaseObservation): 

231 type: InfoObservationType 

232 title: str 

233 content: Union[str, dict] 

234 

235 

236@dataclass 

237class TestCaseObservation(BaseObservation): 

238 __test__ = False # no! bad pytest! 

239 

240 type: Literal["test_case"] 

241 status: TestCaseStatus 

242 status_reason: str 

243 representation: str 

244 arguments: dict 

245 how_generated: str 

246 features: dict 

247 coverage: Optional[dict[str, list[int]]] 

248 timing: dict[str, float] 

249 metadata: ObservationMetadata 

250 

251 

252def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None: 

253 """ 

254 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f`` 

255 should accept one argument, which is an observation. Whenever Hypothesis 

256 produces a new observation, it calls each callback with that observation. 

257 

258 If Hypothesis tests are being run from multiple threads, callbacks are tracked 

259 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f`` 

260 as an observability callback for observations produced on that thread. 

261 

262 If ``all_threads=True`` is passed, ``f`` will instead be registered as a 

263 callback for all threads. This means it will be called for observations 

264 generated by all threads, not just the thread which registered ``f`` as a 

265 callback. In this case, ``f`` will be passed two arguments: the first is the 

266 observation, and the second is the integer thread id from 

267 :func:`python:threading.get_ident` where that observation was generated. 

268 

269 We recommend against registering ``f`` as a callback for both ``all_threads=True`` 

270 and the default ``all_threads=False``, due to unclear semantics with 

271 |remove_observability_callback|. 

272 """ 

273 if all_threads: 

274 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f)) 

275 return 

276 

277 thread_id = threading.get_ident() 

278 if thread_id not in _callbacks: 

279 _callbacks[thread_id] = [] 

280 

281 _callbacks[thread_id].append(cast(CallbackThreadT, f)) 

282 

283 

284def remove_observability_callback(f: CallbackT, /) -> None: 

285 """ 

286 Removes ``f`` from the :ref:`observability <observability>` callbacks. 

287 

288 If ``f`` is not in the list of observability callbacks, silently do nothing. 

289 

290 If running under multiple threads, ``f`` will only be removed from the 

291 callbacks for this thread. 

292 """ 

293 if f in _callbacks_all_threads: 

294 _callbacks_all_threads.remove(cast(CallbackAllThreadsT, f)) 

295 

296 thread_id = threading.get_ident() 

297 if thread_id not in _callbacks: 

298 return 

299 

300 callbacks = _callbacks[thread_id] 

301 if f in callbacks: 

302 callbacks.remove(cast(CallbackThreadT, f)) 

303 

304 if not callbacks: 

305 del _callbacks[thread_id] 

306 

307 

308def observability_enabled() -> bool: 

309 """ 

310 Returns whether or not Hypothesis considers :ref:`observability <observability>` 

311 to be enabled. Observability is enabled if there is at least one observability 

312 callback present. 

313 

314 Callers might use this method to determine whether they should compute an 

315 expensive representation that is only used under observability, for instance 

316 by |alternative backends|. 

317 """ 

318 return bool(_callbacks) or bool(_callbacks_all_threads) 

319 

320 

321@contextmanager 

322def with_observability_callback( 

323 f: Callable[[Observation], None], /, *, all_threads: bool = False 

324) -> Generator[None, None, None]: 

325 """ 

326 A simple context manager which calls |add_observability_callback| on ``f`` 

327 when it enters and |remove_observability_callback| on ``f`` when it exits. 

328 """ 

329 add_observability_callback(f, all_threads=all_threads) 

330 try: 

331 yield 

332 finally: 

333 remove_observability_callback(f) 

334 

335 

336def deliver_observation(observation: Observation) -> None: 

337 thread_id = threading.get_ident() 

338 

339 for callback in _callbacks.get(thread_id, []): 

340 callback(observation) 

341 

342 for callback in _callbacks_all_threads: 

343 callback(observation, thread_id) 

344 

345 

346class _TestcaseCallbacks: 

347 def __bool__(self): 

348 self._note_deprecation() 

349 return bool(_callbacks) 

350 

351 def _note_deprecation(self): 

352 from hypothesis._settings import note_deprecation 

353 

354 note_deprecation( 

355 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. " 

356 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, " 

357 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and " 

358 "bool(TESTCASE_CALLBACKS) with observability_enabled().", 

359 since="2025-08-01", 

360 has_codemod=False, 

361 ) 

362 

363 def append(self, f): 

364 self._note_deprecation() 

365 add_observability_callback(f) 

366 

367 def remove(self, f): 

368 self._note_deprecation() 

369 remove_observability_callback(f) 

370 

371 

372#: .. warning:: 

373#: 

374#: Deprecated in favor of |add_observability_callback|, 

375#: |remove_observability_callback|, and |observability_enabled|. 

376#: 

377#: |TESTCASE_CALLBACKS| remains a thin compatibility 

378#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those 

379#: three methods. It is not an attempt to be fully compatible with the previous 

380#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work 

381#: anymore. Please update to using the new methods instead. 

382#: 

383#: |TESTCASE_CALLBACKS| will eventually be removed. 

384TESTCASE_CALLBACKS = _TestcaseCallbacks() 

385 

386 

387def make_testcase( 

388 *, 

389 run_start: float, 

390 property: str, 

391 data: "ConjectureData", 

392 how_generated: str, 

393 representation: str = "<unknown>", 

394 timing: dict[str, float], 

395 arguments: Optional[dict] = None, 

396 coverage: Optional[dict[str, list[int]]] = None, 

397 phase: Optional[str] = None, 

398 backend_metadata: Optional[dict[str, Any]] = None, 

399 status: Optional[ 

400 Union[TestCaseStatus, "Status"] 

401 ] = None, # overrides automatic calculation 

402 status_reason: Optional[str] = None, # overrides automatic calculation 

403 # added to calculated metadata. If keys overlap, the value from this `metadata` 

404 # is used 

405 metadata: Optional[dict[str, Any]] = None, 

406) -> TestCaseObservation: 

407 from hypothesis.core import reproduction_decorator 

408 from hypothesis.internal.conjecture.data import Status 

409 

410 # We should only be sending observability reports for datas that have finished 

411 # being modified. 

412 assert data.frozen 

413 

414 if status_reason is not None: 

415 pass 

416 elif data.interesting_origin: 

417 status_reason = str(data.interesting_origin) 

418 elif phase == "shrink" and data.status == Status.OVERRUN: 

419 status_reason = "exceeded size of current best example" 

420 else: 

421 status_reason = str(data.events.pop("invalid because", "")) 

422 

423 status_map: dict[Status, TestCaseStatus] = { 

424 Status.OVERRUN: "gave_up", 

425 Status.INVALID: "gave_up", 

426 Status.VALID: "passed", 

427 Status.INTERESTING: "failed", 

428 } 

429 

430 if status is not None and isinstance(status, Status): 

431 status = status_map[status] 

432 if status is None: 

433 status = status_map[data.status] 

434 

435 return TestCaseObservation( 

436 type="test_case", 

437 status=status, 

438 status_reason=status_reason, 

439 representation=representation, 

440 arguments={ 

441 k.removeprefix("generate:"): v for k, v in (arguments or {}).items() 

442 }, 

443 how_generated=how_generated, # iid, mutation, etc. 

444 features={ 

445 **{ 

446 f"target:{k}".strip(":"): v for k, v in data.target_observations.items() 

447 }, 

448 **data.events, 

449 }, 

450 coverage=coverage, 

451 timing=timing, 

452 metadata=ObservationMetadata( 

453 **{ 

454 "traceback": data.expected_traceback, 

455 "reproduction_decorator": ( 

456 reproduction_decorator(data.choices) if status == "failed" else None 

457 ), 

458 "predicates": dict(data._observability_predicates), 

459 "backend": backend_metadata or {}, 

460 "data_status": data.status, 

461 "phase": phase, 

462 "interesting_origin": data.interesting_origin, 

463 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None, 

464 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None, 

465 **_system_metadata(), 

466 # unpack last so it takes precedence for duplicate keys 

467 **(metadata or {}), 

468 } 

469 ), 

470 run_start=run_start, 

471 property=property, 

472 ) 

473 

474 

475_WROTE_TO = set() 

476_deliver_to_file_lock = Lock() 

477 

478 

479def _deliver_to_file( 

480 observation: Observation, thread_id: int 

481) -> None: # pragma: no cover 

482 from hypothesis.strategies._internal.utils import to_jsonable 

483 

484 kind = "testcases" if observation.type == "test_case" else "info" 

485 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl") 

486 fname.parent.mkdir(exist_ok=True, parents=True) 

487 

488 observation_bytes = ( 

489 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n" 

490 ) 

491 # only allow one conccurent file write to avoid write races. This is likely to make 

492 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue 

493 # would be an improvement, but that requires a background thread, and I 

494 # would prefer to avoid a thread in the single-threaded case. We could 

495 # switch over to a queue if we detect multithreading, but it's tricky to get 

496 # right. 

497 with _deliver_to_file_lock: 

498 _WROTE_TO.add(fname) 

499 with fname.open(mode="a") as f: 

500 f.write(observation_bytes) 

501 

502 

503_imported_at = time.time() 

504 

505 

506@lru_cache 

507def _system_metadata() -> dict[str, Any]: 

508 return { 

509 "sys_argv": sys.argv, 

510 "os_getpid": os.getpid(), 

511 "imported_at": _imported_at, 

512 } 

513 

514 

515#: If ``False``, do not collect coverage information when observability is enabled. 

516#: 

517#: This is exposed both for performance (as coverage collection can be slow on 

518#: Python 3.11 and earlier) and size (if you do not use coverage information, 

519#: you may not want to store it in-memory). 

520OBSERVABILITY_COLLECT_COVERAGE = ( 

521 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ 

522) 

523#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys 

524#: in test case observations. 

525#: 

526#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be 

527#: a substantial amount of data, and so must be opted-in to, even when 

528#: observability is enabled. 

529#: 

530#: .. warning:: 

531#: 

532#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better 

533#: interface for this as of June 2025, and this attribute may disappear or 

534#: be renamed without notice. 

535#: 

536OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ 

537 

538if OBSERVABILITY_COLLECT_COVERAGE is False and ( 

539 sys.version_info[:2] >= (3, 12) 

540): # pragma: no cover 

541 warnings.warn( 

542 "Coverage data collection should be quite fast in Python 3.12 or later " 

543 "so there should be no need to turn coverage reporting off.", 

544 HypothesisWarning, 

545 stacklevel=2, 

546 ) 

547 

548if ( 

549 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ 

550 or OBSERVABILITY_COLLECT_COVERAGE is False 

551): # pragma: no cover 

552 add_observability_callback(_deliver_to_file, all_threads=True) 

553 

554 # Remove files more than a week old, to cap the size on disk 

555 max_age = (date.today() - timedelta(days=8)).isoformat() 

556 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"): 

557 if f.stem < max_age: # pragma: no branch 

558 f.unlink(missing_ok=True)