Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/observability.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

188 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""Observability tools to spit out analysis-ready tables, one row per test case.""" 

12 

13import base64 

14import dataclasses 

15import json 

16import math 

17import os 

18import sys 

19import threading 

20import time 

21import warnings 

22from collections.abc import Callable, Generator 

23from contextlib import contextmanager 

24from dataclasses import dataclass 

25from datetime import date, timedelta 

26from functools import lru_cache 

27from threading import Lock 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 Literal, 

32 Optional, 

33 TypeAlias, 

34 Union, 

35 cast, 

36) 

37 

38from hypothesis.configuration import storage_directory 

39from hypothesis.errors import HypothesisWarning 

40from hypothesis.internal.conjecture.choice import ( 

41 BooleanConstraints, 

42 BytesConstraints, 

43 ChoiceConstraintsT, 

44 ChoiceNode, 

45 ChoiceT, 

46 ChoiceTypeT, 

47 FloatConstraints, 

48 IntegerConstraints, 

49 StringConstraints, 

50) 

51from hypothesis.internal.escalation import InterestingOrigin 

52from hypothesis.internal.floats import float_to_int 

53from hypothesis.internal.intervalsets import IntervalSet 

54from hypothesis.utils.deprecation import note_deprecation 

55 

56if TYPE_CHECKING: 

57 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status 

58 

59 

60Observation: TypeAlias = Union["InfoObservation", "TestCaseObservation"] 

61CallbackThreadT: TypeAlias = Callable[[Observation], None] 

62# for all_threads=True, we pass the thread id as well. 

63CallbackAllThreadsT: TypeAlias = Callable[[Observation, int], None] 

64CallbackT: TypeAlias = CallbackThreadT | CallbackAllThreadsT 

65 

66# thread_id: list[callback] 

67_callbacks: dict[int | None, list[CallbackThreadT]] = {} 

68# callbacks where all_threads=True was set 

69_callbacks_all_threads: list[CallbackAllThreadsT] = [] 

70 

71 

72@dataclass(slots=True, frozen=False) 

73class PredicateCounts: 

74 satisfied: int = 0 

75 unsatisfied: int = 0 

76 

77 def update_count(self, *, condition: bool) -> None: 

78 if condition: 

79 self.satisfied += 1 

80 else: 

81 self.unsatisfied += 1 

82 

83 

84def _choice_to_json(choice: ChoiceT | None) -> Any: 

85 if choice is None: 

86 return None 

87 # see the note on the same check in to_jsonable for why we cast large 

88 # integers to floats. 

89 if ( 

90 isinstance(choice, int) 

91 and not isinstance(choice, bool) 

92 and abs(choice) >= 2**63 

93 ): 

94 return ["integer", str(choice)] 

95 elif isinstance(choice, bytes): 

96 return ["bytes", base64.b64encode(choice).decode()] 

97 elif isinstance(choice, float) and math.isnan(choice): 

98 # handle nonstandard nan bit patterns. We don't need to do this for -0.0 

99 # vs 0.0 since json doesn't normalize -0.0 to 0.0. 

100 return ["float", float_to_int(choice)] 

101 return choice 

102 

103 

104def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]: 

105 return [_choice_to_json(choice) for choice in choices] 

106 

107 

108def _constraints_to_json( 

109 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

110) -> dict[str, Any]: 

111 constraints = constraints.copy() 

112 if choice_type == "integer": 

113 constraints = cast(IntegerConstraints, constraints) 

114 return { 

115 "min_value": _choice_to_json(constraints["min_value"]), 

116 "max_value": _choice_to_json(constraints["max_value"]), 

117 "weights": ( 

118 None 

119 if constraints["weights"] is None 

120 # wrap up in a list, instead of a dict, because json dicts 

121 # require string keys 

122 else [ 

123 (_choice_to_json(k), v) for k, v in constraints["weights"].items() 

124 ] 

125 ), 

126 "shrink_towards": _choice_to_json(constraints["shrink_towards"]), 

127 } 

128 elif choice_type == "float": 

129 constraints = cast(FloatConstraints, constraints) 

130 return { 

131 "min_value": _choice_to_json(constraints["min_value"]), 

132 "max_value": _choice_to_json(constraints["max_value"]), 

133 "allow_nan": constraints["allow_nan"], 

134 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"], 

135 } 

136 elif choice_type == "string": 

137 constraints = cast(StringConstraints, constraints) 

138 assert isinstance(constraints["intervals"], IntervalSet) 

139 return { 

140 "intervals": constraints["intervals"].intervals, 

141 "min_size": _choice_to_json(constraints["min_size"]), 

142 "max_size": _choice_to_json(constraints["max_size"]), 

143 } 

144 elif choice_type == "bytes": 

145 constraints = cast(BytesConstraints, constraints) 

146 return { 

147 "min_size": _choice_to_json(constraints["min_size"]), 

148 "max_size": _choice_to_json(constraints["max_size"]), 

149 } 

150 elif choice_type == "boolean": 

151 constraints = cast(BooleanConstraints, constraints) 

152 return { 

153 "p": constraints["p"], 

154 } 

155 else: 

156 raise NotImplementedError(f"unknown choice type {choice_type}") 

157 

158 

159def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]: 

160 return [ 

161 { 

162 "type": node.type, 

163 "value": _choice_to_json(node.value), 

164 "constraints": _constraints_to_json(node.type, node.constraints), 

165 "was_forced": node.was_forced, 

166 } 

167 for node in nodes 

168 ] 

169 

170 

171@dataclass(slots=True, frozen=True) 

172class ObservationMetadata: 

173 traceback: str | None 

174 reproduction_decorator: str | None 

175 predicates: dict[str, PredicateCounts] 

176 backend: dict[str, Any] 

177 sys_argv: list[str] 

178 os_getpid: int 

179 imported_at: float 

180 data_status: "Status" 

181 phase: str 

182 interesting_origin: InterestingOrigin | None 

183 choice_nodes: tuple[ChoiceNode, ...] | None 

184 choice_spans: Optional["Spans"] 

185 

186 def to_json(self) -> dict[str, Any]: 

187 data = { 

188 "traceback": self.traceback, 

189 "reproduction_decorator": self.reproduction_decorator, 

190 "predicates": self.predicates, 

191 "backend": self.backend, 

192 "sys.argv": self.sys_argv, 

193 "os.getpid()": self.os_getpid, 

194 "imported_at": self.imported_at, 

195 "data_status": self.data_status, 

196 "phase": self.phase, 

197 "interesting_origin": self.interesting_origin, 

198 "choice_nodes": ( 

199 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes) 

200 ), 

201 "choice_spans": ( 

202 None 

203 if self.choice_spans is None 

204 else [ 

205 ( 

206 # span.label is an int, but cast to string to avoid conversion 

207 # to float (and loss of precision) for large label values. 

208 # 

209 # The value of this label is opaque to consumers anyway, so its 

210 # type shouldn't matter as long as it's consistent. 

211 str(span.label), 

212 span.start, 

213 span.end, 

214 span.discarded, 

215 ) 

216 for span in self.choice_spans 

217 ] 

218 ), 

219 } 

220 # check that we didn't forget one 

221 assert len(data) == len(dataclasses.fields(self)) 

222 return data 

223 

224 

225@dataclass(slots=True, frozen=True) 

226class BaseObservation: 

227 type: Literal["test_case", "info", "alert", "error"] 

228 property: str 

229 run_start: float 

230 

231 

232InfoObservationType = Literal["info", "alert", "error"] 

233TestCaseStatus = Literal["gave_up", "passed", "failed"] 

234 

235 

236@dataclass(slots=True, frozen=True) 

237class InfoObservation(BaseObservation): 

238 type: InfoObservationType 

239 title: str 

240 content: str | dict 

241 

242 

243@dataclass(slots=True, frozen=True) 

244class TestCaseObservation(BaseObservation): 

245 __test__ = False # no! bad pytest! 

246 

247 type: Literal["test_case"] 

248 status: TestCaseStatus 

249 status_reason: str 

250 representation: str 

251 arguments: dict 

252 how_generated: str 

253 features: dict 

254 coverage: dict[str, list[int]] | None 

255 timing: dict[str, float] 

256 metadata: ObservationMetadata 

257 

258 

259def add_observability_callback(f: CallbackT, /, *, all_threads: bool = False) -> None: 

260 """ 

261 Adds ``f`` as a callback for :ref:`observability <observability>`. ``f`` 

262 should accept one argument, which is an observation. Whenever Hypothesis 

263 produces a new observation, it calls each callback with that observation. 

264 

265 If Hypothesis tests are being run from multiple threads, callbacks are tracked 

266 per-thread. In other words, ``add_observability_callback(f)`` only adds ``f`` 

267 as an observability callback for observations produced on that thread. 

268 

269 If ``all_threads=True`` is passed, ``f`` will instead be registered as a 

270 callback for all threads. This means it will be called for observations 

271 generated by all threads, not just the thread which registered ``f`` as a 

272 callback. In this case, ``f`` will be passed two arguments: the first is the 

273 observation, and the second is the integer thread id from 

274 :func:`python:threading.get_ident` where that observation was generated. 

275 

276 We recommend against registering ``f`` as a callback for both ``all_threads=True`` 

277 and the default ``all_threads=False``, due to unclear semantics with 

278 |remove_observability_callback|. 

279 """ 

280 if all_threads: 

281 _callbacks_all_threads.append(cast(CallbackAllThreadsT, f)) 

282 return 

283 

284 thread_id = threading.get_ident() 

285 if thread_id not in _callbacks: 

286 _callbacks[thread_id] = [] 

287 

288 _callbacks[thread_id].append(cast(CallbackThreadT, f)) 

289 

290 

291def remove_observability_callback(f: CallbackT, /) -> None: 

292 """ 

293 Removes ``f`` from the :ref:`observability <observability>` callbacks. 

294 

295 If ``f`` is not in the list of observability callbacks, silently do nothing. 

296 

297 If running under multiple threads, ``f`` will only be removed from the 

298 callbacks for this thread. 

299 """ 

300 if f in _callbacks_all_threads: 

301 _callbacks_all_threads.remove(cast(CallbackAllThreadsT, f)) 

302 

303 thread_id = threading.get_ident() 

304 if thread_id not in _callbacks: 

305 return 

306 

307 callbacks = _callbacks[thread_id] 

308 if f in callbacks: 

309 callbacks.remove(cast(CallbackThreadT, f)) 

310 

311 if not callbacks: 

312 del _callbacks[thread_id] 

313 

314 

315def observability_enabled() -> bool: 

316 """ 

317 Returns whether or not Hypothesis considers :ref:`observability <observability>` 

318 to be enabled. Observability is enabled if there is at least one observability 

319 callback present. 

320 

321 Callers might use this method to determine whether they should compute an 

322 expensive representation that is only used under observability, for instance 

323 by |alternative backends|. 

324 """ 

325 return bool(_callbacks) or bool(_callbacks_all_threads) 

326 

327 

328@contextmanager 

329def with_observability_callback( 

330 f: Callable[[Observation], None], /, *, all_threads: bool = False 

331) -> Generator[None, None, None]: 

332 """ 

333 A simple context manager which calls |add_observability_callback| on ``f`` 

334 when it enters and |remove_observability_callback| on ``f`` when it exits. 

335 """ 

336 add_observability_callback(f, all_threads=all_threads) 

337 try: 

338 yield 

339 finally: 

340 remove_observability_callback(f) 

341 

342 

343def deliver_observation(observation: Observation) -> None: 

344 thread_id = threading.get_ident() 

345 

346 for callback in _callbacks.get(thread_id, []): 

347 callback(observation) 

348 

349 for callback in _callbacks_all_threads: 

350 callback(observation, thread_id) 

351 

352 

353class _TestcaseCallbacks: 

354 def __bool__(self): 

355 self._note_deprecation() 

356 return bool(_callbacks) 

357 

358 def _note_deprecation(self): 

359 note_deprecation( 

360 "hypothesis.internal.observability.TESTCASE_CALLBACKS is deprecated. " 

361 "Replace TESTCASE_CALLBACKS.append with add_observability_callback, " 

362 "TESTCASE_CALLBACKS.remove with remove_observability_callback, and " 

363 "bool(TESTCASE_CALLBACKS) with observability_enabled().", 

364 since="2025-08-01", 

365 has_codemod=False, 

366 ) 

367 

368 def append(self, f): 

369 self._note_deprecation() 

370 add_observability_callback(f) 

371 

372 def remove(self, f): 

373 self._note_deprecation() 

374 remove_observability_callback(f) 

375 

376 

377#: .. warning:: 

378#: 

379#: Deprecated in favor of |add_observability_callback|, 

380#: |remove_observability_callback|, and |observability_enabled|. 

381#: 

382#: |TESTCASE_CALLBACKS| remains a thin compatibility 

383#: shim which forwards ``.append``, ``.remove``, and ``bool()`` to those 

384#: three methods. It is not an attempt to be fully compatible with the previous 

385#: ``TESTCASE_CALLBACKS = []``, so iteration or other usages will not work 

386#: anymore. Please update to using the new methods instead. 

387#: 

388#: |TESTCASE_CALLBACKS| will eventually be removed. 

389TESTCASE_CALLBACKS = _TestcaseCallbacks() 

390 

391 

392def make_testcase( 

393 *, 

394 run_start: float, 

395 property: str, 

396 data: "ConjectureData", 

397 how_generated: str, 

398 representation: str = "<unknown>", 

399 timing: dict[str, float], 

400 arguments: dict | None = None, 

401 coverage: dict[str, list[int]] | None = None, 

402 phase: str | None = None, 

403 backend_metadata: dict[str, Any] | None = None, 

404 status: ( 

405 Union[TestCaseStatus, "Status"] | None 

406 ) = None, # overrides automatic calculation 

407 status_reason: str | None = None, # overrides automatic calculation 

408 # added to calculated metadata. If keys overlap, the value from this `metadata` 

409 # is used 

410 metadata: dict[str, Any] | None = None, 

411) -> TestCaseObservation: 

412 from hypothesis.core import reproduction_decorator 

413 from hypothesis.internal.conjecture.data import Status 

414 

415 # We should only be sending observability reports for datas that have finished 

416 # being modified. 

417 assert data.frozen 

418 

419 if status_reason is not None: 

420 pass 

421 elif data.interesting_origin: 

422 status_reason = str(data.interesting_origin) 

423 elif phase == "shrink" and data.status == Status.OVERRUN: 

424 status_reason = "exceeded size of current best example" 

425 else: 

426 status_reason = str(data.events.pop("invalid because", "")) 

427 

428 status_map: dict[Status, TestCaseStatus] = { 

429 Status.OVERRUN: "gave_up", 

430 Status.INVALID: "gave_up", 

431 Status.VALID: "passed", 

432 Status.INTERESTING: "failed", 

433 } 

434 

435 if status is not None and isinstance(status, Status): 

436 status = status_map[status] 

437 if status is None: 

438 status = status_map[data.status] 

439 

440 return TestCaseObservation( 

441 type="test_case", 

442 status=status, 

443 status_reason=status_reason, 

444 representation=representation, 

445 arguments={ 

446 k.removeprefix("generate:"): v for k, v in (arguments or {}).items() 

447 }, 

448 how_generated=how_generated, # iid, mutation, etc. 

449 features={ 

450 **{ 

451 f"target:{k}".strip(":"): v for k, v in data.target_observations.items() 

452 }, 

453 **data.events, 

454 }, 

455 coverage=coverage, 

456 timing=timing, 

457 metadata=ObservationMetadata( 

458 **{ 

459 "traceback": data.expected_traceback, 

460 "reproduction_decorator": ( 

461 reproduction_decorator(data.choices) if status == "failed" else None 

462 ), 

463 "predicates": dict(data._observability_predicates), 

464 "backend": backend_metadata or {}, 

465 "data_status": data.status, 

466 "phase": phase, 

467 "interesting_origin": data.interesting_origin, 

468 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None, 

469 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None, 

470 **_system_metadata(), 

471 # unpack last so it takes precedence for duplicate keys 

472 **(metadata or {}), 

473 } 

474 ), 

475 run_start=run_start, 

476 property=property, 

477 ) 

478 

479 

480_WROTE_TO = set() 

481_deliver_to_file_lock = Lock() 

482 

483 

484def _deliver_to_file( 

485 observation: Observation, thread_id: int 

486) -> None: # pragma: no cover 

487 from hypothesis.strategies._internal.utils import to_jsonable 

488 

489 kind = "testcases" if observation.type == "test_case" else "info" 

490 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl") 

491 fname.parent.mkdir(exist_ok=True, parents=True) 

492 

493 observation_bytes = ( 

494 json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n" 

495 ) 

496 # only allow one conccurent file write to avoid write races. This is likely to make 

497 # HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY quite slow under threading. A queue 

498 # would be an improvement, but that requires a background thread, and I 

499 # would prefer to avoid a thread in the single-threaded case. We could 

500 # switch over to a queue if we detect multithreading, but it's tricky to get 

501 # right. 

502 with _deliver_to_file_lock: 

503 _WROTE_TO.add(fname) 

504 with fname.open(mode="a") as f: 

505 f.write(observation_bytes) 

506 

507 

508_imported_at = time.time() 

509 

510 

511@lru_cache 

512def _system_metadata() -> dict[str, Any]: 

513 return { 

514 "sys_argv": sys.argv, 

515 "os_getpid": os.getpid(), 

516 "imported_at": _imported_at, 

517 } 

518 

519 

520#: If ``False``, do not collect coverage information when observability is enabled. 

521#: 

522#: This is exposed both for performance (as coverage collection can be slow on 

523#: Python 3.11 and earlier) and size (if you do not use coverage information, 

524#: you may not want to store it in-memory). 

525OBSERVABILITY_COLLECT_COVERAGE = ( 

526 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ 

527) 

528#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys 

529#: in test case observations. 

530#: 

531#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be 

532#: a substantial amount of data, and so must be opted-in to, even when 

533#: observability is enabled. 

534#: 

535#: .. warning:: 

536#: 

537#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better 

538#: interface for this as of June 2025, and this attribute may disappear or 

539#: be renamed without notice. 

540#: 

541OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ 

542 

543if OBSERVABILITY_COLLECT_COVERAGE is False and ( 

544 sys.version_info[:2] >= (3, 12) 

545): # pragma: no cover 

546 warnings.warn( 

547 "Coverage data collection should be quite fast in Python 3.12 or later " 

548 "so there should be no need to turn coverage reporting off.", 

549 HypothesisWarning, 

550 stacklevel=2, 

551 ) 

552 

553if ( 

554 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ 

555 or OBSERVABILITY_COLLECT_COVERAGE is False 

556): # pragma: no cover 

557 add_observability_callback(_deliver_to_file, all_threads=True) 

558 

559 # Remove files more than a week old, to cap the size on disk 

560 max_age = (date.today() - timedelta(days=8)).isoformat() 

561 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"): 

562 if f.stem < max_age: # pragma: no branch 

563 f.unlink(missing_ok=True)