Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/observability.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

188 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""Observability tools to spit out analysis-ready tables, one row per test case.""" 

12 

13import base64 

14import dataclasses 

15import json 

16import math 

17import os 

18import sys 

19import threading 

20import time 

21import warnings 

22from collections.abc import Callable, Generator 

23from contextlib import contextmanager 

24from dataclasses import dataclass 

25from datetime import date, timedelta 

26from functools import lru_cache 

27from threading import Lock 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 Literal, 

32 Optional, 

33 TypeAlias, 

34 Union, 

35 cast, 

36) 

37 

38from hypothesis.configuration import storage_directory 

39from hypothesis.errors import HypothesisWarning 

40from hypothesis.internal.conjecture.choice import ( 

41 BooleanConstraints, 

42 BytesConstraints, 

43 ChoiceConstraintsT, 

44 ChoiceNode, 

45 ChoiceT, 

46 ChoiceTypeT, 

47 FloatConstraints, 

48 IntegerConstraints, 

49 StringConstraints, 

50) 

51from hypothesis.internal.escalation import InterestingOrigin 

52from hypothesis.internal.floats import float_to_int 

53from hypothesis.internal.intervalsets import IntervalSet 

54 

55if TYPE_CHECKING: 

56 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status 

57 

58 

59Observation: TypeAlias = Union["InfoObservation", "TestCaseObservation"] 

60CallbackThreadT: TypeAlias = Callable[[Observation], None] 

61# for all_threads=True, we pass the thread id as well. 

62CallbackAllThreadsT: TypeAlias = Callable[[Observation, int], None] 

63CallbackT: TypeAlias = CallbackThreadT | CallbackAllThreadsT 

64 

65# thread_id: list[callback] 

66_callbacks: dict[int | None, list[CallbackThreadT]] = {} 

67# callbacks where all_threads=True was set 

68_callbacks_all_threads: list[CallbackAllThreadsT] = [] 

69 

70 

71@dataclass(slots=True, frozen=False) 

72class PredicateCounts: 

73 satisfied: int = 0 

74 unsatisfied: int = 0 

75 

76 def update_count(self, *, condition: bool) -> None: 

77 if condition: 

78 self.satisfied += 1 

79 else: 

80 self.unsatisfied += 1 

81 

82 

83def _choice_to_json(choice: ChoiceT | None) -> Any: 

84 if choice is None: 

85 return None 

86 # see the note on the same check in to_jsonable for why we cast large 

87 # integers to floats. 

88 if ( 

89 isinstance(choice, int) 

90 and not isinstance(choice, bool) 

91 and abs(choice) >= 2**63 

92 ): 

93 return ["integer", str(choice)] 

94 elif isinstance(choice, bytes): 

95 return ["bytes", base64.b64encode(choice).decode()] 

96 elif isinstance(choice, float) and math.isnan(choice): 

97 # handle nonstandard nan bit patterns. We don't need to do this for -0.0 

98 # vs 0.0 since json doesn't normalize -0.0 to 0.0. 

99 return ["float", float_to_int(choice)] 

100 return choice 

101 

102 

103def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]: 

104 return [_choice_to_json(choice) for choice in choices] 

105 

106 

107def _constraints_to_json( 

108 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

109) -> dict[str, Any]: 

110 constraints = constraints.copy() 

111 if choice_type == "integer": 

112 constraints = cast(IntegerConstraints, constraints) 

113 return { 

114 "min_value": _choice_to_json(constraints["min_value"]), 

115 "max_value": _choice_to_json(constraints["max_value"]), 

116 "weights": ( 

117 None 

118 if constraints["weights"] is None 

119 # wrap up in a list, instead of a dict, because json dicts 

120 # require string keys 

121 else [ 

122 (_choice_to_json(k), v) for k, v in constraints["weights"].items() 

123 ] 

124 ), 

125 "shrink_towards": _choice_to_json(constraints["shrink_towards"]), 

126 } 

127 elif choice_type == "float": 

128 constraints = cast(FloatConstraints, constraints) 

129 return { 

130 "min_value": _choice_to_json(constraints["min_value"]), 

131 "max_value": _choice_to_json(constraints["max_value"]), 

132 "allow_nan": constraints["allow_nan"], 

133 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"], 

134 } 

135 elif choice_type == "string": 

136 constraints = cast(StringConstraints, constraints) 

137 assert isinstance(constraints["intervals"], IntervalSet) 

138 return { 

139 "intervals": constraints["intervals"].intervals, 

140 "min_size": _choice_to_json(constraints["min_size"]), 

141 "max_size": _choice_to_json(constraints["max_size"]), 

142 } 

143 elif choice_type == "bytes": 

144 constraints = cast(BytesConstraints, constraints) 

145 return { 

146 "min_size": _choice_to_json(constraints["min_size"]), 

147 "max_size": _choice_to_json(constraints["max_size"]), 

148 } 

149 elif choice_type == "boolean": 

150 constraints = cast(BooleanConstraints, constraints) 

151 return { 

152 "p": constraints["p"], 

153 } 

154 else: 

155 raise NotImplementedError(f"unknown choice type {choice_type}") 

156 

157 

158def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]: 

159 return [ 

160 { 

161 "type": node.type, 

162 "value": _choice_to_json(node.value), 

163 "constraints": _constraints_to_json(node.type, node.constraints), 

164 "was_forced": node.was_forced, 

165 } 

166 for node in nodes 

167 ] 

168 

169 

170@dataclass(slots=True, frozen=True) 

171class ObservationMetadata: 

172 traceback: str | None 

173 reproduction_decorator: str | None 

174 predicates: dict[str, PredicateCounts] 

175 backend: dict[str, Any] 

176 sys_argv: list[str] 

177 os_getpid: int 

178 imported_at: float 

179 data_status: "Status" 

180 phase: str 

181 interesting_origin: InterestingOrigin | None 

182 choice_nodes: tuple[ChoiceNode, ...] | None 

183 choice_spans: Optional["Spans"] 

184 

185 def to_json(self) -> dict[str, Any]: 

186 data = { 

187 "traceback": self.traceback, 

188 "reproduction_decorator": self.reproduction_decorator, 

189 "predicates": self.predicates, 

190 "backend": self.backend, 

191 "sys.argv": self.sys_argv, 

192 "os.getpid()": self.os_getpid, 

193 "imported_at": self.imported_at, 

194 "data_status": self.data_status, 

195 "phase": self.phase, 

196 "interesting_origin": self.interesting_origin, 

197 "choice_nodes": ( 

198 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes) 

199 ), 

200 "choice_spans": ( 

201 None 

202 if self.choice_spans is None 

203 else [ 

204 ( 

205 # span.label is an int, but cast to string to avoid conversion 

206 # to float (and loss of precision) for large label values. 

207 # 

208 # The value of this label is opaque to consumers anyway, so its 

209 # type shouldn't matter as long as it's consistent. 

210 str(span.label), 

211 span.start, 

212 span.end, 

213 span.discarded, 

214 ) 

215 for span in self.choice_spans 

216 ] 

217 ), 

218 } 

219 # check that we didn't forget one 

220 assert len(data) == len(dataclasses.fields(self)) 

221 return data 

222 

223 

224@dataclass(slots=True, frozen=True) 

225class BaseObservation: 

226 type: Literal["test_case", "info", "alert", "error"] 

227 property: str 

228 run_start: float 

229 

230 

231InfoObservationType = Literal["info", "alert", "error"] 

232TestCaseStatus = Literal["gave_up", "passed", "failed"] 

233 

234 

235@dataclass(slots=True, frozen=True) 

236class InfoObservation(BaseObservation): 

237 type: InfoObservationType 

238 title: str 

239 content: str | dict 

240 

241 

242@dataclass(slots=True, frozen=True) 

243class TestCaseObservation(BaseObservation): 

244 __test__ = False # no! bad pytest! 

245 

246 type: Literal["test_case"] 

247 status: TestCaseStatus 

248 status_reason: str 

249 representation: str 

250 arguments: dict 

251 how_generated: str 

252 features: dict 

253 coverage: dict[str, list[int]] | None 

254 timing: dict[str, float] 

255 metadata: ObservationMetadata 

256 

257 

258def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None: 

259 """ 

260 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f`` 

261 should accept one argument, which is an observation. Whenever Hypothesis 

262 produces a new observation, it calls each callback with that observation. 

263 

264 If Hypothesis tests are being run from multiple threads, callbacks are tracked 

265 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f`` 

266 as an observability callback for observations produced on that thread. 

267 

268 If ``all_threads=True`` is passed, ``f`` will instead be registered as a 

269 callback for all threads. This means it will be called for observations 

270 generated by all threads, not just the thread which registered ``f`` as a 

271 callback. In this case, ``f`` will be passed two arguments: the first is the 

272 observation, and the second is the integer thread id from 

273 :func:`python:threading.get_ident` where that observation was generated. 

274 

275 We recommend against registering ``f`` as a callback for both ``all_threads=True`` 

276 and the default ``all_threads=False``, due to unclear semantics with 

277 |remove_observability_callback|. 

278 """ 

279 if all_threads: 

280 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f)) 

281 return 

282 

283 thread_id = threading.get_ident() 

284 if thread_id not in _callbacks: 

285 _callbacks[thread_id] = [] 

286 

287 _callbacks[thread_id].append(cast(CallbackThreadT, f)) 

288 

289 

290def remove_observability_callback(f: CallbackT, /) -> None: 

291 """ 

292 Removes ``f`` from the :ref:`observability <observability>` callbacks. 

293 

294 If ``f`` is not in the list of observability callbacks, silently do nothing. 

295 

296 If running under multiple threads, ``f`` will only be removed from the 

297 callbacks for this thread. 

298 """ 

299 if f in _callbacks_all_threads: 

300 _callbacks_all_threads.remove(cast(CallbackAllThreadsT, f)) 

301 

302 thread_id = threading.get_ident() 

303 if thread_id not in _callbacks: 

304 return 

305 

306 callbacks = _callbacks[thread_id] 

307 if f in callbacks: 

308 callbacks.remove(cast(CallbackThreadT, f)) 

309 

310 if not callbacks: 

311 del _callbacks[thread_id] 

312 

313 

314def observability_enabled() -> bool: 

315 """ 

316 Returns whether or not Hypothesis considers :ref:`observability <observability>` 

317 to be enabled. Observability is enabled if there is at least one observability 

318 callback present. 

319 

320 Callers might use this method to determine whether they should compute an 

321 expensive representation that is only used under observability, for instance 

322 by |alternative backends|. 

323 """ 

324 return bool(_callbacks) or bool(_callbacks_all_threads) 

325 

326 

327@contextmanager 

328def with_observability_callback( 

329 f: Callable[[Observation], None], /, *, all_threads: bool = False 

330) -> Generator[None, None, None]: 

331 """ 

332 A simple context manager which calls |add_observability_callback| on ``f`` 

333 when it enters and |remove_observability_callback| on ``f`` when it exits. 

334 """ 

335 add_observability_callback(f, all_threads=all_threads) 

336 try: 

337 yield 

338 finally: 

339 remove_observability_callback(f) 

340 

341 

342def deliver_observation(observation: Observation) -> None: 

343 thread_id = threading.get_ident() 

344 

345 for callback in _callbacks.get(thread_id, []): 

346 callback(observation) 

347 

348 for callback in _callbacks_all_threads: 

349 callback(observation, thread_id) 

350 

351 

352class _TestcaseCallbacks: 

353 def __bool__(self): 

354 self._note_deprecation() 

355 return bool(_callbacks) 

356 

357 def _note_deprecation(self): 

358 from hypothesis._settings import note_deprecation 

359 

360 note_deprecation( 

361 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. " 

362 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, " 

363 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and " 

364 "bool(TESTCASE_CALLBACKS) with observability_enabled().", 

365 since="2025-08-01", 

366 has_codemod=False, 

367 ) 

368 

369 def append(self, f): 

370 self._note_deprecation() 

371 add_observability_callback(f) 

372 

373 def remove(self, f): 

374 self._note_deprecation() 

375 remove_observability_callback(f) 

376 

377 

378#: .. warning:: 

379#: 

380#: Deprecated in favor of |add_observability_callback|, 

381#: |remove_observability_callback|, and |observability_enabled|. 

382#: 

383#: |TESTCASE_CALLBACKS| remains a thin compatibility 

384#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those 

385#: three methods. It is not an attempt to be fully compatible with the previous 

386#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work 

387#: anymore. Please update to using the new methods instead. 

388#: 

389#: |TESTCASE_CALLBACKS| will eventually be removed. 

390TESTCASE_CALLBACKS = _TestcaseCallbacks() 

391 

392 

393def make_testcase( 

394 *, 

395 run_start: float, 

396 property: str, 

397 data: "ConjectureData", 

398 how_generated: str, 

399 representation: str = "<unknown>", 

400 timing: dict[str, float], 

401 arguments: dict | None = None, 

402 coverage: dict[str, list[int]] | None = None, 

403 phase: str | None = None, 

404 backend_metadata: dict[str, Any] | None = None, 

405 status: ( 

406 Union[TestCaseStatus, "Status"] | None 

407 ) = None, # overrides automatic calculation 

408 status_reason: str | None = None, # overrides automatic calculation 

409 # added to calculated metadata. If keys overlap, the value from this `metadata` 

410 # is used 

411 metadata: dict[str, Any] | None = None, 

412) -> TestCaseObservation: 

413 from hypothesis.core import reproduction_decorator 

414 from hypothesis.internal.conjecture.data import Status 

415 

416 # We should only be sending observability reports for datas that have finished 

417 # being modified. 

418 assert data.frozen 

419 

420 if status_reason is not None: 

421 pass 

422 elif data.interesting_origin: 

423 status_reason = str(data.interesting_origin) 

424 elif phase == "shrink" and data.status == Status.OVERRUN: 

425 status_reason = "exceeded size of current best example" 

426 else: 

427 status_reason = str(data.events.pop("invalid because", "")) 

428 

429 status_map: dict[Status, TestCaseStatus] = { 

430 Status.OVERRUN: "gave_up", 

431 Status.INVALID: "gave_up", 

432 Status.VALID: "passed", 

433 Status.INTERESTING: "failed", 

434 } 

435 

436 if status is not None and isinstance(status, Status): 

437 status = status_map[status] 

438 if status is None: 

439 status = status_map[data.status] 

440 

441 return TestCaseObservation( 

442 type="test_case", 

443 status=status, 

444 status_reason=status_reason, 

445 representation=representation, 

446 arguments={ 

447 k.removeprefix("generate:"): v for k, v in (arguments or {}).items() 

448 }, 

449 how_generated=how_generated, # iid, mutation, etc. 

450 features={ 

451 **{ 

452 f"target:{k}".strip(":"): v for k, v in data.target_observations.items() 

453 }, 

454 **data.events, 

455 }, 

456 coverage=coverage, 

457 timing=timing, 

458 metadata=ObservationMetadata( 

459 **{ 

460 "traceback": data.expected_traceback, 

461 "reproduction_decorator": ( 

462 reproduction_decorator(data.choices) if status == "failed" else None 

463 ), 

464 "predicates": dict(data._observability_predicates), 

465 "backend": backend_metadata or {}, 

466 "data_status": data.status, 

467 "phase": phase, 

468 "interesting_origin": data.interesting_origin, 

469 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None, 

470 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None, 

471 **_system_metadata(), 

472 # unpack last so it takes precedence for duplicate keys 

473 **(metadata or {}), 

474 } 

475 ), 

476 run_start=run_start, 

477 property=property, 

478 ) 

479 

480 

481_WROTE_TO = set() 

482_deliver_to_file_lock = Lock() 

483 

484 

485def _deliver_to_file( 

486 observation: Observation, thread_id: int 

487) -> None: # pragma: no cover 

488 from hypothesis.strategies._internal.utils import to_jsonable 

489 

490 kind = "testcases" if observation.type == "test_case" else "info" 

491 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl") 

492 fname.parent.mkdir(exist_ok=True, parents=True) 

493 

494 observation_bytes = ( 

495 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n" 

496 ) 

497 # only allow one conccurent file write to avoid write races. This is likely to make 

498 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue 

499 # would be an improvement, but that requires a background thread, and I 

500 # would prefer to avoid a thread in the single-threaded case. We could 

501 # switch over to a queue if we detect multithreading, but it's tricky to get 

502 # right. 

503 with _deliver_to_file_lock: 

504 _WROTE_TO.add(fname) 

505 with fname.open(mode="a") as f: 

506 f.write(observation_bytes) 

507 

508 

509_imported_at = time.time() 

510 

511 

512@lru_cache 

513def _system_metadata() -> dict[str, Any]: 

514 return { 

515 "sys_argv": sys.argv, 

516 "os_getpid": os.getpid(), 

517 "imported_at": _imported_at, 

518 } 

519 

520 

521#: If ``False``, do not collect coverage information when observability is enabled. 

522#: 

523#: This is exposed both for performance (as coverage collection can be slow on 

524#: Python 3.11 and earlier) and size (if you do not use coverage information, 

525#: you may not want to store it in-memory). 

526OBSERVABILITY_COLLECT_COVERAGE = ( 

527 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ 

528) 

529#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys 

530#: in test case observations. 

531#: 

532#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be 

533#: a substantial amount of data, and so must be opted-in to, even when 

534#: observability is enabled. 

535#: 

536#: .. warning:: 

537#: 

538#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better 

539#: interface for this as of June 2025, and this attribute may disappear or 

540#: be renamed without notice. 

541#: 

542OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ 

543 

544if OBSERVABILITY_COLLECT_COVERAGE is False and ( 

545 sys.version_info[:2] >= (3, 12) 

546): # pragma: no cover 

547 warnings.warn( 

548 "Coverage data collection should be quite fast in Python 3.12 or later " 

549 "so there should be no need to turn coverage reporting off.", 

550 HypothesisWarning, 

551 stacklevel=2, 

552 ) 

553 

554if ( 

555 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ 

556 or OBSERVABILITY_COLLECT_COVERAGE is False 

557): # pragma: no cover 

558 add_observability_callback(_deliver_to_file, all_threads=True) 

559 

560 # Remove files more than a week old, to cap the size on disk 

561 max_age = (date.today() - timedelta(days=8)).isoformat() 

562 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"): 

563 if f.stem < max_age: # pragma: no branch 

564 f.unlink(missing_ok=True)