Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/observability.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

188 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""Observability tools to spit out analysis-ready tables, one row per test case.""" 

12 

13import base64 

14import dataclasses 

15import json 

16import math 

17import os 

18import sys 

19import threading 

20import time 

21import warnings 

22from collections.abc import Generator 

23from contextlib import contextmanager 

24from dataclasses import dataclass 

25from datetime import date, timedelta 

26from functools import lru_cache 

27from threading import Lock 

28from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast 

29 

30from hypothesis.configuration import storage_directory 

31from hypothesis.errors import HypothesisWarning 

32from hypothesis.internal.conjecture.choice import ( 

33 BooleanConstraints, 

34 BytesConstraints, 

35 ChoiceConstraintsT, 

36 ChoiceNode, 

37 ChoiceT, 

38 ChoiceTypeT, 

39 FloatConstraints, 

40 IntegerConstraints, 

41 StringConstraints, 

42) 

43from hypothesis.internal.escalation import InterestingOrigin 

44from hypothesis.internal.floats import float_to_int 

45from hypothesis.internal.intervalsets import IntervalSet 

46 

47if TYPE_CHECKING: 

48 from typing import TypeAlias 

49 

50 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status 

51 

52 

53Observation: "TypeAlias" = Union["InfoObservation", "TestCaseObservation"] 

54CallbackThreadT: "TypeAlias" = Callable[[Observation], None] 

55# for all_threads=True, we pass the thread id as well. 

56CallbackAllThreadsT: "TypeAlias" = Callable[[Observation, int], None] 

57CallbackT: "TypeAlias" = Union[CallbackThreadT, CallbackAllThreadsT] 

58 

59# thread_id: list[callback] 

60_callbacks: dict[Optional[int], list[CallbackThreadT]] = {} 

61# callbacks where all_threads=True was set 

62_callbacks_all_threads: list[CallbackAllThreadsT] = [] 

63 

64 

65@dataclass 

66class PredicateCounts: 

67 satisfied: int = 0 

68 unsatisfied: int = 0 

69 

70 def update_count(self, *, condition: bool) -> None: 

71 if condition: 

72 self.satisfied += 1 

73 else: 

74 self.unsatisfied += 1 

75 

76 

77def _choice_to_json(choice: Union[ChoiceT, None]) -> Any: 

78 if choice is None: 

79 return None 

80 # see the note on the same check in to_jsonable for why we cast large 

81 # integers to floats. 

82 if ( 

83 isinstance(choice, int) 

84 and not isinstance(choice, bool) 

85 and abs(choice) >= 2**63 

86 ): 

87 return ["integer", str(choice)] 

88 elif isinstance(choice, bytes): 

89 return ["bytes", base64.b64encode(choice).decode()] 

90 elif isinstance(choice, float) and math.isnan(choice): 

91 # handle nonstandard nan bit patterns. We don't need to do this for -0.0 

92 # vs 0.0 since json doesn't normalize -0.0 to 0.0. 

93 return ["float", float_to_int(choice)] 

94 return choice 

95 

96 

97def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]: 

98 return [_choice_to_json(choice) for choice in choices] 

99 

100 

101def _constraints_to_json( 

102 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

103) -> dict[str, Any]: 

104 constraints = constraints.copy() 

105 if choice_type == "integer": 

106 constraints = cast(IntegerConstraints, constraints) 

107 return { 

108 "min_value": _choice_to_json(constraints["min_value"]), 

109 "max_value": _choice_to_json(constraints["max_value"]), 

110 "weights": ( 

111 None 

112 if constraints["weights"] is None 

113 # wrap up in a list, instead of a dict, because json dicts 

114 # require string keys 

115 else [ 

116 (_choice_to_json(k), v) for k, v in constraints["weights"].items() 

117 ] 

118 ), 

119 "shrink_towards": _choice_to_json(constraints["shrink_towards"]), 

120 } 

121 elif choice_type == "float": 

122 constraints = cast(FloatConstraints, constraints) 

123 return { 

124 "min_value": _choice_to_json(constraints["min_value"]), 

125 "max_value": _choice_to_json(constraints["max_value"]), 

126 "allow_nan": constraints["allow_nan"], 

127 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"], 

128 } 

129 elif choice_type == "string": 

130 constraints = cast(StringConstraints, constraints) 

131 assert isinstance(constraints["intervals"], IntervalSet) 

132 return { 

133 "intervals": constraints["intervals"].intervals, 

134 "min_size": _choice_to_json(constraints["min_size"]), 

135 "max_size": _choice_to_json(constraints["max_size"]), 

136 } 

137 elif choice_type == "bytes": 

138 constraints = cast(BytesConstraints, constraints) 

139 return { 

140 "min_size": _choice_to_json(constraints["min_size"]), 

141 "max_size": _choice_to_json(constraints["max_size"]), 

142 } 

143 elif choice_type == "boolean": 

144 constraints = cast(BooleanConstraints, constraints) 

145 return { 

146 "p": constraints["p"], 

147 } 

148 else: 

149 raise NotImplementedError(f"unknown choice type {choice_type}") 

150 

151 

152def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]: 

153 return [ 

154 { 

155 "type": node.type, 

156 "value": _choice_to_json(node.value), 

157 "constraints": _constraints_to_json(node.type, node.constraints), 

158 "was_forced": node.was_forced, 

159 } 

160 for node in nodes 

161 ] 

162 

163 

164@dataclass 

165class ObservationMetadata: 

166 traceback: Optional[str] 

167 reproduction_decorator: Optional[str] 

168 predicates: dict[str, PredicateCounts] 

169 backend: dict[str, Any] 

170 sys_argv: list[str] 

171 os_getpid: int 

172 imported_at: float 

173 data_status: "Status" 

174 interesting_origin: Optional[InterestingOrigin] 

175 choice_nodes: Optional[tuple[ChoiceNode, ...]] 

176 choice_spans: Optional["Spans"] 

177 

178 def to_json(self) -> dict[str, Any]: 

179 data = { 

180 "traceback": self.traceback, 

181 "reproduction_decorator": self.reproduction_decorator, 

182 "predicates": self.predicates, 

183 "backend": self.backend, 

184 "sys.argv": self.sys_argv, 

185 "os.getpid()": self.os_getpid, 

186 "imported_at": self.imported_at, 

187 "data_status": self.data_status, 

188 "interesting_origin": self.interesting_origin, 

189 "choice_nodes": ( 

190 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes) 

191 ), 

192 "choice_spans": ( 

193 None 

194 if self.choice_spans is None 

195 else [ 

196 ( 

197 # span.label is an int, but cast to string to avoid conversion 

198 # to float (and loss of precision) for large label values. 

199 # 

200 # The value of this label is opaque to consumers anyway, so its 

201 # type shouldn't matter as long as it's consistent. 

202 str(span.label), 

203 span.start, 

204 span.end, 

205 span.discarded, 

206 ) 

207 for span in self.choice_spans 

208 ] 

209 ), 

210 } 

211 # check that we didn't forget one 

212 assert len(data) == len(dataclasses.fields(self)) 

213 return data 

214 

215 

216@dataclass 

217class BaseObservation: 

218 type: Literal["test_case", "info", "alert", "error"] 

219 property: str 

220 run_start: float 

221 

222 

223InfoObservationType = Literal["info", "alert", "error"] 

224TestCaseStatus = Literal["gave_up", "passed", "failed"] 

225 

226 

227@dataclass 

228class InfoObservation(BaseObservation): 

229 type: InfoObservationType 

230 title: str 

231 content: Union[str, dict] 

232 

233 

234@dataclass 

235class TestCaseObservation(BaseObservation): 

236 __test__ = False # no! bad pytest! 

237 

238 type: Literal["test_case"] 

239 status: TestCaseStatus 

240 status_reason: str 

241 representation: str 

242 arguments: dict 

243 how_generated: str 

244 features: dict 

245 coverage: Optional[dict[str, list[int]]] 

246 timing: dict[str, float] 

247 metadata: ObservationMetadata 

248 

249 

250def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None: 

251 """ 

252 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f`` 

253 should accept one argument, which is an observation. Whenever Hypothesis 

254 produces a new observation, it calls each callback with that observation. 

255 

256 If Hypothesis tests are being run from multiple threads, callbacks are tracked 

257 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f`` 

258 as an observability callback for observations produced on that thread. 

259 

260 If ``all_threads=True`` is passed, ``f`` will instead be registered as a 

261 callback for all threads. This means it will be called for observations 

262 generated by all threads, not just the thread which registered ``f`` as a 

263 callback. In this case, ``f`` will be passed two arguments: the first is the 

264 observation, and the second is the integer thread id from 

265 :func:`python:threading.get_ident` where that observation was generated. 

266 

267 We recommend against registering ``f`` as a callback for both ``all_threads=True`` 

268 and the default ``all_threads=False``, due to unclear semantics with 

269 |remove_observability_callback|. 

270 """ 

271 if all_threads: 

272 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f)) 

273 return 

274 

275 thread_id = threading.get_ident() 

276 if thread_id not in _callbacks: 

277 _callbacks[thread_id] = [] 

278 

279 _callbacks[thread_id].append(cast(CallbackThreadT, f)) 

280 

281 

282def remove_observability_callback(f: CallbackT, /) -> None: 

283 """ 

284 Removes ``f`` from the :ref:`observability <observability>` callbacks. 

285 

286 If ``f`` is not in the list of observability callbacks, silently do nothing. 

287 

288 If running under multiple threads, ``f`` will only be removed from the 

289 callbacks for this thread. 

290 """ 

291 if f in _callbacks_all_threads: 

292 _callbacks_all_threads.remove(cast(CallbackAllThreadsT, f)) 

293 

294 thread_id = threading.get_ident() 

295 if thread_id not in _callbacks: 

296 return 

297 

298 callbacks = _callbacks[thread_id] 

299 if f in callbacks: 

300 callbacks.remove(cast(CallbackThreadT, f)) 

301 

302 if not callbacks: 

303 del _callbacks[thread_id] 

304 

305 

306def observability_enabled() -> bool: 

307 """ 

308 Returns whether or not Hypothesis considers :ref:`observability <observability>` 

309 to be enabled. Observability is enabled if there is at least one observability 

310 callback present. 

311 

312 Callers might use this method to determine whether they should compute an 

313 expensive representation that is only used under observability, for instance 

314 by :ref:`alternative backends <alternative-backends>`. 

315 """ 

316 return bool(_callbacks) or bool(_callbacks_all_threads) 

317 

318 

319@contextmanager 

320def with_observability_callback( 

321 f: Callable[[Observation], None], /, *, all_threads: bool = False 

322) -> Generator[None, None, None]: 

323 """ 

324 A simple context manager which calls |add_observability_callback| on ``f`` 

325 when it enters and |remove_observability_callback| on ``f`` when it exits. 

326 """ 

327 add_observability_callback(f, all_threads=all_threads) 

328 try: 

329 yield 

330 finally: 

331 remove_observability_callback(f) 

332 

333 

334def deliver_observation(observation: Observation) -> None: 

335 thread_id = threading.get_ident() 

336 

337 for callback in _callbacks.get(thread_id, []): 

338 callback(observation) 

339 

340 for callback in _callbacks_all_threads: 

341 callback(observation, thread_id) 

342 

343 

344class _TestcaseCallbacks: 

345 def __bool__(self): 

346 self._note_deprecation() 

347 return bool(_callbacks) 

348 

349 def _note_deprecation(self): 

350 from hypothesis._settings import note_deprecation 

351 

352 note_deprecation( 

353 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. " 

354 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, " 

355 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and " 

356 "bool(TESTCASE_CALLBACKS) with observability_enabled().", 

357 since="2025-08-01", 

358 has_codemod=False, 

359 ) 

360 

361 def append(self, f): 

362 self._note_deprecation() 

363 add_observability_callback(f) 

364 

365 def remove(self, f): 

366 self._note_deprecation() 

367 remove_observability_callback(f) 

368 

369 

370#: .. warning:: 

371#: 

372#: Deprecated in favor of |add_observability_callback|, 

373#: |remove_observability_callback|, and |observability_enabled|. 

374#: 

375#: |TESTCASE_CALLBACKS| remains a thin compatibility 

376#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those 

377#: three methods. It is not an attempt to be fully compatible with the previous 

378#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work 

379#: anymore. Please update to using the new methods instead. 

380#: 

381#: |TESTCASE_CALLBACKS| will eventually be removed. 

382TESTCASE_CALLBACKS = _TestcaseCallbacks() 

383 

384 

385def make_testcase( 

386 *, 

387 run_start: float, 

388 property: str, 

389 data: "ConjectureData", 

390 how_generated: str, 

391 representation: str = "<unknown>", 

392 arguments: Optional[dict] = None, 

393 timing: dict[str, float], 

394 coverage: Optional[dict[str, list[int]]] = None, 

395 phase: Optional[str] = None, 

396 backend_metadata: Optional[dict[str, Any]] = None, 

397 status: Optional[ 

398 Union[TestCaseStatus, "Status"] 

399 ] = None, # overrides automatic calculation 

400 status_reason: Optional[str] = None, # overrides automatic calculation 

401 # added to calculated metadata. If keys overlap, the value from this `metadata` 

402 # is used 

403 metadata: Optional[dict[str, Any]] = None, 

404) -> TestCaseObservation: 

405 from hypothesis.core import reproduction_decorator 

406 from hypothesis.internal.conjecture.data import Status 

407 

408 # We should only be sending observability reports for datas that have finished 

409 # being modified. 

410 assert data.frozen 

411 

412 if status_reason is not None: 

413 pass 

414 elif data.interesting_origin: 

415 status_reason = str(data.interesting_origin) 

416 elif phase == "shrink" and data.status == Status.OVERRUN: 

417 status_reason = "exceeded size of current best example" 

418 else: 

419 status_reason = str(data.events.pop("invalid because", "")) 

420 

421 status_map: dict[Status, TestCaseStatus] = { 

422 Status.OVERRUN: "gave_up", 

423 Status.INVALID: "gave_up", 

424 Status.VALID: "passed", 

425 Status.INTERESTING: "failed", 

426 } 

427 

428 if status is not None and isinstance(status, Status): 

429 status = status_map[status] 

430 if status is None: 

431 status = status_map[data.status] 

432 

433 return TestCaseObservation( 

434 type="test_case", 

435 status=status, 

436 status_reason=status_reason, 

437 representation=representation, 

438 arguments={ 

439 k.removeprefix("generate:"): v for k, v in (arguments or {}).items() 

440 }, 

441 how_generated=how_generated, # iid, mutation, etc. 

442 features={ 

443 **{ 

444 f"target:{k}".strip(":"): v for k, v in data.target_observations.items() 

445 }, 

446 **data.events, 

447 }, 

448 coverage=coverage, 

449 timing=timing, 

450 metadata=ObservationMetadata( 

451 **{ 

452 "traceback": data.expected_traceback, 

453 "reproduction_decorator": ( 

454 reproduction_decorator(data.choices) if status == "failed" else None 

455 ), 

456 "predicates": dict(data._observability_predicates), 

457 "backend": backend_metadata or {}, 

458 "data_status": data.status, 

459 "interesting_origin": data.interesting_origin, 

460 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None, 

461 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None, 

462 **_system_metadata(), 

463 # unpack last so it takes precedence for duplicate keys 

464 **(metadata or {}), 

465 } 

466 ), 

467 run_start=run_start, 

468 property=property, 

469 ) 

470 

471 

472_WROTE_TO = set() 

473_deliver_to_file_lock = Lock() 

474 

475 

476def _deliver_to_file( 

477 observation: Observation, thread_id: int 

478) -> None: # pragma: no cover 

479 from hypothesis.strategies._internal.utils import to_jsonable 

480 

481 kind = "testcases" if observation.type == "test_case" else "info" 

482 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl") 

483 fname.parent.mkdir(exist_ok=True, parents=True) 

484 

485 observation_bytes = ( 

486 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n" 

487 ) 

488 # only allow one conccurent file write to avoid write races. This is likely to make 

489 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue 

490 # would be an improvement, but that requires a background thread, and I 

491 # would prefer to avoid a thread in the single-threaded case. We could 

492 # switch over to a queue if we detect multithreading, but it's tricky to get 

493 # right. 

494 with _deliver_to_file_lock: 

495 _WROTE_TO.add(fname) 

496 with fname.open(mode="a") as f: 

497 f.write(observation_bytes) 

498 

499 

500_imported_at = time.time() 

501 

502 

503@lru_cache 

504def _system_metadata() -> dict[str, Any]: 

505 return { 

506 "sys_argv": sys.argv, 

507 "os_getpid": os.getpid(), 

508 "imported_at": _imported_at, 

509 } 

510 

511 

512#: If ``False``, do not collect coverage information when observability is enabled. 

513#: 

514#: This is exposed both for performance (as coverage collection can be slow on 

515#: Python 3.11 and earlier) and size (if you do not use coverage information, 

516#: you may not want to store it in-memory). 

517OBSERVABILITY_COLLECT_COVERAGE = ( 

518 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ 

519) 

520#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys 

521#: in test case observations. 

522#: 

523#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be 

524#: a substantial amount of data, and so must be opted-in to, even when 

525#: observability is enabled. 

526#: 

527#: .. warning:: 

528#: 

529#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better 

530#: interface for this as of June 2025, and this attribute may disappear or 

531#: be renamed without notice. 

532#: 

533OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ 

534 

535if OBSERVABILITY_COLLECT_COVERAGE is False and ( 

536 sys.version_info[:2] >= (3, 12) 

537): # pragma: no cover 

538 warnings.warn( 

539 "Coverage data collection should be quite fast in Python 3.12 or later " 

540 "so there should be no need to turn coverage reporting off.", 

541 HypothesisWarning, 

542 stacklevel=2, 

543 ) 

544 

545if ( 

546 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ 

547 or OBSERVABILITY_COLLECT_COVERAGE is False 

548): # pragma: no cover 

549 add_observability_callback(_deliver_to_file, all_threads=True) 

550 

551 # Remove files more than a week old, to cap the size on disk 

552 max_age = (date.today() - timedelta(days=8)).isoformat() 

553 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"): 

554 if f.stem < max_age: # pragma: no branch 

555 f.unlink(missing_ok=True)