Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/observability.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

189 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""Observability tools to spit out analysis-ready tables, one row per test case.""" 

12 

13import base64 

14import dataclasses 

15import json 

16import math 

17import os 

18import sys 

19import threading 

20import time 

21import warnings 

22from collections.abc import Callable, Generator 

23from contextlib import contextmanager 

24from dataclasses import dataclass 

25from datetime import date, timedelta 

26from functools import lru_cache 

27from pathlib import Path 

28from threading import Lock 

29from typing import ( 

30 TYPE_CHECKING, 

31 Any, 

32 Literal, 

33 Optional, 

34 TypeAlias, 

35 Union, 

36 cast, 

37) 

38 

39from hypothesis.configuration import storage_directory 

40from hypothesis.errors import HypothesisWarning 

41from hypothesis.internal.conjecture.choice import ( 

42 BooleanConstraints, 

43 BytesConstraints, 

44 ChoiceConstraintsT, 

45 ChoiceNode, 

46 ChoiceT, 

47 ChoiceTypeT, 

48 FloatConstraints, 

49 IntegerConstraints, 

50 StringConstraints, 

51) 

52from hypothesis.internal.escalation import InterestingOrigin 

53from hypothesis.internal.floats import float_to_int 

54from hypothesis.internal.intervalsets import IntervalSet 

55from hypothesis.utils.deprecation import note_deprecation 

56 

57if TYPE_CHECKING: 

58 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status 

59 

60 

61Observation: TypeAlias = Union["InfoObservation", "TestCaseObservation"] 

62CallbackThreadT: TypeAlias = Callable[[Observation], None] 

63# for all_threads=True, we pass the thread id as well. 

64CallbackAllThreadsT: TypeAlias = Callable[[Observation, int], None] 

65CallbackT: TypeAlias = CallbackThreadT | CallbackAllThreadsT 

66 

67# thread_id: list[callback] 

68_callbacks: dict[int | None, list[CallbackThreadT]] = {} 

69# callbacks where all_threads=True was set 

70_callbacks_all_threads: list[CallbackAllThreadsT] = [] 

71 

72 

73@dataclass(slots=True, frozen=False) 

74class PredicateCounts: 

75 satisfied: int = 0 

76 unsatisfied: int = 0 

77 

78 def update_count(self, *, condition: bool) -> None: 

79 if condition: 

80 self.satisfied += 1 

81 else: 

82 self.unsatisfied += 1 

83 

84 

85def _choice_to_json(choice: ChoiceT | None) -> Any: 

86 if choice is None: 

87 return None 

88 # see the note on the same check in to_jsonable for why we cast large 

89 # integers to floats. 

90 if ( 

91 isinstance(choice, int) 

92 and not isinstance(choice, bool) 

93 and abs(choice) >= 2**63 

94 ): 

95 return ["integer", str(choice)] 

96 elif isinstance(choice, bytes): 

97 return ["bytes", base64.b64encode(choice).decode()] 

98 elif isinstance(choice, float) and math.isnan(choice): 

99 # handle nonstandard nan bit patterns. We don't need to do this for -0.0 

100 # vs 0.0 since json doesn't normalize -0.0 to 0.0. 

101 return ["float", float_to_int(choice)] 

102 return choice 

103 

104 

105def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]: 

106 return [_choice_to_json(choice) for choice in choices] 

107 

108 

109def _constraints_to_json( 

110 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

111) -> dict[str, Any]: 

112 constraints = constraints.copy() 

113 if choice_type == "integer": 

114 constraints = cast(IntegerConstraints, constraints) 

115 return { 

116 "min_value": _choice_to_json(constraints["min_value"]), 

117 "max_value": _choice_to_json(constraints["max_value"]), 

118 "weights": ( 

119 None 

120 if constraints["weights"] is None 

121 # wrap up in a list, instead of a dict, because json dicts 

122 # require string keys 

123 else [ 

124 (_choice_to_json(k), v) for k, v in constraints["weights"].items() 

125 ] 

126 ), 

127 "shrink_towards": _choice_to_json(constraints["shrink_towards"]), 

128 } 

129 elif choice_type == "float": 

130 constraints = cast(FloatConstraints, constraints) 

131 return { 

132 "min_value": _choice_to_json(constraints["min_value"]), 

133 "max_value": _choice_to_json(constraints["max_value"]), 

134 "allow_nan": constraints["allow_nan"], 

135 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"], 

136 } 

137 elif choice_type == "string": 

138 constraints = cast(StringConstraints, constraints) 

139 assert isinstance(constraints["intervals"], IntervalSet) 

140 return { 

141 "intervals": constraints["intervals"].intervals, 

142 "min_size": _choice_to_json(constraints["min_size"]), 

143 "max_size": _choice_to_json(constraints["max_size"]), 

144 } 

145 elif choice_type == "bytes": 

146 constraints = cast(BytesConstraints, constraints) 

147 return { 

148 "min_size": _choice_to_json(constraints["min_size"]), 

149 "max_size": _choice_to_json(constraints["max_size"]), 

150 } 

151 elif choice_type == "boolean": 

152 constraints = cast(BooleanConstraints, constraints) 

153 return { 

154 "p": constraints["p"], 

155 } 

156 else: 

157 raise NotImplementedError(f"unknown choice type {choice_type}") 

158 

159 

160def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]: 

161 return [ 

162 { 

163 "type": node.type, 

164 "value": _choice_to_json(node.value), 

165 "constraints": _constraints_to_json(node.type, node.constraints), 

166 "was_forced": node.was_forced, 

167 } 

168 for node in nodes 

169 ] 

170 

171 

172@dataclass(slots=True, frozen=True) 

173class ObservationMetadata: 

174 traceback: str | None 

175 reproduction_decorator: str | None 

176 predicates: dict[str, PredicateCounts] 

177 backend: dict[str, Any] 

178 sys_argv: list[str] 

179 os_getpid: int 

180 imported_at: float 

181 data_status: "Status" 

182 phase: str 

183 interesting_origin: InterestingOrigin | None 

184 choice_nodes: tuple[ChoiceNode, ...] | None 

185 choice_spans: Optional["Spans"] 

186 

187 def to_json(self) -> dict[str, Any]: 

188 data = { 

189 "traceback": self.traceback, 

190 "reproduction_decorator": self.reproduction_decorator, 

191 "predicates": self.predicates, 

192 "backend": self.backend, 

193 "sys.argv": self.sys_argv, 

194 "os.getpid()": self.os_getpid, 

195 "imported_at": self.imported_at, 

196 "data_status": self.data_status, 

197 "phase": self.phase, 

198 "interesting_origin": self.interesting_origin, 

199 "choice_nodes": ( 

200 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes) 

201 ), 

202 "choice_spans": ( 

203 None 

204 if self.choice_spans is None 

205 else [ 

206 ( 

207 # span.label is an int, but cast to string to avoid conversion 

208 # to float (and loss of precision) for large label values. 

209 # 

210 # The value of this label is opaque to consumers anyway, so its 

211 # type shouldn't matter as long as it's consistent. 

212 str(span.label), 

213 span.start, 

214 span.end, 

215 span.discarded, 

216 ) 

217 for span in self.choice_spans 

218 ] 

219 ), 

220 } 

221 # check that we didn't forget one 

222 assert len(data) == len(dataclasses.fields(self)) 

223 return data 

224 

225 

226@dataclass(slots=True, frozen=True) 

227class BaseObservation: 

228 type: Literal["test_case", "info", "alert", "error"] 

229 property: str 

230 run_start: float 

231 

232 

233InfoObservationType = Literal["info", "alert", "error"] 

234TestCaseStatus = Literal["gave_up", "passed", "failed"] 

235 

236 

237@dataclass(slots=True, frozen=True) 

238class InfoObservation(BaseObservation): 

239 type: InfoObservationType 

240 title: str 

241 content: str | dict 

242 

243 

244@dataclass(slots=True, frozen=True) 

245class TestCaseObservation(BaseObservation): 

246 __test__ = False # no! bad pytest! 

247 

248 type: Literal["test_case"] 

249 status: TestCaseStatus 

250 status_reason: str 

251 representation: str 

252 arguments: dict 

253 how_generated: str 

254 features: dict 

255 coverage: dict[str, list[int]] | None 

256 timing: dict[str, float] 

257 metadata: ObservationMetadata 

258 

259 

260def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None: 

261 """ 

262 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f`` 

263 should accept one argument, which is an observation. Whenever Hypothesis 

264 produces a new observation, it calls each callback with that observation. 

265 

266 If Hypothesis tests are being run from multiple threads, callbacks are tracked 

267 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f`` 

268 as an observability callback for observations produced on that thread. 

269 

270 If ``all_threads=True`` is passed, ``f`` will instead be registered as a 

271 callback for all threads. This means it will be called for observations 

272 generated by all threads, not just the thread which registered ``f`` as a 

273 callback. In this case, ``f`` will be passed two arguments: the first is the 

274 observation, and the second is the integer thread id from 

275 :func:`python:threading.get_ident` where that observation was generated. 

276 

277 We recommend against registering ``f`` as a callback for both ``all_threads=True`` 

278 and the default ``all_threads=False``, due to unclear semantics with 

279 |remove_observability_callback|. 

280 """ 

281 if all_threads: 

282 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f)) 

283 return 

284 

285 thread_id = threading.get_ident() 

286 if thread_id not in _callbacks: 

287 _callbacks[thread_id] = [] 

288 

289 _callbacks[thread_id].append(cast(CallbackThreadT, f)) 

290 

291 

292def remove_observability_callback(f: CallbackT, /) -> None: 

293 """ 

294 Removes ``f`` from the :ref:`observability <observability>` callbacks. 

295 

296 If ``f`` is not in the list of observability callbacks, silently do nothing. 

297 

298 If running under multiple threads, ``f`` will only be removed from the 

299 callbacks for this thread. 

300 """ 

301 if f in _callbacks_all_threads: 

302 _callbacks_all_threads.remove(f) 

303 

304 thread_id = threading.get_ident() 

305 if thread_id not in _callbacks: 

306 return 

307 

308 callbacks = _callbacks[thread_id] 

309 if f in callbacks: 

310 callbacks.remove(f) 

311 

312 if not callbacks: 

313 del _callbacks[thread_id] 

314 

315 

316def observability_enabled() -> bool: 

317 """ 

318 Returns whether or not Hypothesis considers :ref:`observability <observability>` 

319 to be enabled. Observability is enabled if there is at least one observability 

320 callback present. 

321 

322 Callers might use this method to determine whether they should compute an 

323 expensive representation that is only used under observability, for instance 

324 by |alternative backends|. 

325 """ 

326 return bool(_callbacks) or bool(_callbacks_all_threads) 

327 

328 

329@contextmanager 

330def with_observability_callback( 

331 f: Callable[[Observation], None], /, *, all_threads: bool = False 

332) -> Generator[None, None, None]: 

333 """ 

334 A simple context manager which calls |add_observability_callback| on ``f`` 

335 when it enters and |remove_observability_callback| on ``f`` when it exits. 

336 """ 

337 add_observability_callback(f, all_threads=all_threads) 

338 try: 

339 yield 

340 finally: 

341 remove_observability_callback(f) 

342 

343 

344def deliver_observation(observation: Observation) -> None: 

345 thread_id = threading.get_ident() 

346 

347 for callback in _callbacks.get(thread_id, []): 

348 callback(observation) 

349 

350 for callback in _callbacks_all_threads: 

351 callback(observation, thread_id) 

352 

353 

354class _TestcaseCallbacks: 

355 def __bool__(self): 

356 self._note_deprecation() 

357 return bool(_callbacks) 

358 

359 def _note_deprecation(self): 

360 note_deprecation( 

361 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. " 

362 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, " 

363 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and " 

364 "bool(TESTCASE_CALLBACKS) with observability_enabled().", 

365 since="2025-08-01", 

366 has_codemod=False, 

367 ) 

368 

369 def append(self, f): 

370 self._note_deprecation() 

371 add_observability_callback(f) 

372 

373 def remove(self, f): 

374 self._note_deprecation() 

375 remove_observability_callback(f) 

376 

377 

378#: .. warning:: 

379#: 

380#: Deprecated in favor of |add_observability_callback|, 

381#: |remove_observability_callback|, and |observability_enabled|. 

382#: 

383#: |TESTCASE_CALLBACKS| remains a thin compatibility 

384#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those 

385#: three methods. It is not an attempt to be fully compatible with the previous 

386#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work 

387#: anymore. Please update to using the new methods instead. 

388#: 

389#: |TESTCASE_CALLBACKS| will eventually be removed. 

390TESTCASE_CALLBACKS = _TestcaseCallbacks() 

391 

392 

393def make_testcase( 

394 *, 

395 run_start: float, 

396 property: str, 

397 data: "ConjectureData", 

398 how_generated: str, 

399 representation: str = "<unknown>", 

400 timing: dict[str, float], 

401 arguments: dict | None = None, 

402 coverage: dict[str, list[int]] | None = None, 

403 phase: str | None = None, 

404 backend_metadata: dict[str, Any] | None = None, 

405 status: ( 

406 Union[TestCaseStatus, "Status"] | None 

407 ) = None, # overrides automatic calculation 

408 status_reason: str | None = None, # overrides automatic calculation 

409 # added to calculated metadata. If keys overlap, the value from this `metadata` 

410 # is used 

411 metadata: dict[str, Any] | None = None, 

412) -> TestCaseObservation: 

413 from hypothesis.core import reproduction_decorator 

414 from hypothesis.internal.conjecture.data import Status 

415 

416 # We should only be sending observability reports for datas that have finished 

417 # being modified. 

418 assert data.frozen 

419 

420 if status_reason is not None: 

421 pass 

422 elif data.interesting_origin: 

423 status_reason = str(data.interesting_origin) 

424 elif phase == "shrink" and data.status == Status.OVERRUN: 

425 status_reason = "exceeded size of current best example" 

426 else: 

427 status_reason = str(data.events.pop("invalid because", "")) 

428 

429 status_map: dict[Status, TestCaseStatus] = { 

430 Status.OVERRUN: "gave_up", 

431 Status.INVALID: "gave_up", 

432 Status.VALID: "passed", 

433 Status.INTERESTING: "failed", 

434 } 

435 

436 if status is not None and isinstance(status, Status): 

437 status = status_map[status] 

438 if status is None: 

439 status = status_map[data.status] 

440 

441 return TestCaseObservation( 

442 type="test_case", 

443 status=status, 

444 status_reason=status_reason, 

445 representation=representation, 

446 arguments={ 

447 k.removeprefix("generate:"): v for k, v in (arguments or {}).items() 

448 }, 

449 how_generated=how_generated, # iid, mutation, etc. 

450 features={ 

451 **{ 

452 f"target:{k}".strip(":"): v for k, v in data.target_observations.items() 

453 }, 

454 **data.events, 

455 }, 

456 coverage=coverage, 

457 timing=timing, 

458 metadata=ObservationMetadata( 

459 **{ 

460 "traceback": data.expected_traceback, 

461 "reproduction_decorator": ( 

462 reproduction_decorator(data.choices) if status == "failed" else None 

463 ), 

464 "predicates": dict(data._observability_predicates), 

465 "backend": backend_metadata or {}, 

466 "data_status": data.status, 

467 "phase": phase, 

468 "interesting_origin": data.interesting_origin, 

469 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None, 

470 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None, 

471 **_system_metadata(), 

472 # unpack last so it takes precedence for duplicate keys 

473 **(metadata or {}), 

474 } 

475 ), 

476 run_start=run_start, 

477 property=property, 

478 ) 

479 

480 

481_WROTE_TO: set[Path] = set() 

482_deliver_to_file_lock = Lock() 

483 

484 

485def _deliver_to_file( 

486 observation: Observation, thread_id: int 

487) -> None: # pragma: no cover 

488 from hypothesis.strategies._internal.utils import to_jsonable 

489 

490 kind = "testcases" if observation.type == "test_case" else "info" 

491 observed_dir = storage_directory("observed") 

492 observed_dir.create_if_missing() 

493 observation_p = observed_dir.path / f"{date.today().isoformat()}_{kind}.jsonl" 

494 

495 observation_bytes = ( 

496 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n" 

497 ) 

498 # only allow one conccurent file write to avoid write races. This is likely to make 

499 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue 

500 # would be an improvement, but that requires a background thread, and I 

501 # would prefer to avoid a thread in the single-threaded case. We could 

502 # switch over to a queue if we detect multithreading, but it's tricky to get 

503 # right. 

504 with _deliver_to_file_lock: 

505 _WROTE_TO.add(observation_p) 

506 with observation_p.open(mode="a") as f: 

507 f.write(observation_bytes) 

508 

509 

510_imported_at = time.time() 

511 

512 

513@lru_cache 

514def _system_metadata() -> dict[str, Any]: 

515 return { 

516 "sys_argv": sys.argv, 

517 "os_getpid": os.getpid(), 

518 "imported_at": _imported_at, 

519 } 

520 

521 

522#: If ``False``, do not collect coverage information when observability is enabled. 

523#: 

524#: This is exposed both for performance (as coverage collection can be slow on 

525#: Python 3.11 and earlier) and size (if you do not use coverage information, 

526#: you may not want to store it in-memory). 

527OBSERVABILITY_COLLECT_COVERAGE = ( 

528 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ 

529) 

530#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys 

531#: in test case observations. 

532#: 

533#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be 

534#: a substantial amount of data, and so must be opted-in to, even when 

535#: observability is enabled. 

536#: 

537#: .. warning:: 

538#: 

539#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better 

540#: interface for this as of June 2025, and this attribute may disappear or 

541#: be renamed without notice. 

542#: 

543OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ 

544 

545if OBSERVABILITY_COLLECT_COVERAGE is False and ( 

546 sys.version_info[:2] >= (3, 12) 

547): # pragma: no cover 

548 warnings.warn( 

549 "Coverage data collection should be quite fast in Python 3.12 or later " 

550 "so there should be no need to turn coverage reporting off.", 

551 HypothesisWarning, 

552 stacklevel=2, 

553 ) 

554 

555if ( 

556 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ 

557 or OBSERVABILITY_COLLECT_COVERAGE is False 

558): # pragma: no cover 

559 add_observability_callback(_deliver_to_file, all_threads=True) 

560 

561 # Remove files more than a week old, to cap the size on disk 

562 max_age = (date.today() - timedelta(days=8)).isoformat() 

563 for p in storage_directory("observed", intent_to_write=False).path.glob("*.jsonl"): 

564 if p.stem < max_age: # pragma: no branch 

565 p.unlink(missing_ok=True)