Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/observability.py: 58%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

141 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11"""Observability tools to spit out analysis-ready tables, one row per test case.""" 

12 

13import base64 

14import dataclasses 

15import json 

16import math 

17import os 

18import sys 

19import time 

20import warnings 

21from collections.abc import Generator 

22from contextlib import contextmanager 

23from dataclasses import dataclass 

24from datetime import date, timedelta 

25from functools import lru_cache 

26from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, Union, cast 

27 

28from hypothesis.configuration import storage_directory 

29from hypothesis.errors import HypothesisWarning 

30from hypothesis.internal.conjecture.choice import ( 

31 BooleanConstraints, 

32 BytesConstraints, 

33 ChoiceConstraintsT, 

34 ChoiceNode, 

35 ChoiceT, 

36 ChoiceTypeT, 

37 FloatConstraints, 

38 IntegerConstraints, 

39 StringConstraints, 

40) 

41from hypothesis.internal.escalation import InterestingOrigin 

42from hypothesis.internal.floats import float_to_int 

43from hypothesis.internal.intervalsets import IntervalSet 

44 

45if TYPE_CHECKING: 

46 from typing import TypeAlias 

47 

48 from hypothesis.internal.conjecture.data import ConjectureData, Spans, Status 

49 

50 

51@dataclass 

52class PredicateCounts: 

53 satisfied: int = 0 

54 unsatisfied: int = 0 

55 

56 def update_count(self, *, condition: bool) -> None: 

57 if condition: 

58 self.satisfied += 1 

59 else: 

60 self.unsatisfied += 1 

61 

62 

63def _choice_to_json(choice: Union[ChoiceT, None]) -> Any: 

64 if choice is None: 

65 return None 

66 # see the note on the same check in to_jsonable for why we cast large 

67 # integers to floats. 

68 if ( 

69 isinstance(choice, int) 

70 and not isinstance(choice, bool) 

71 and abs(choice) >= 2**63 

72 ): 

73 return ["integer", str(choice)] 

74 elif isinstance(choice, bytes): 

75 return ["bytes", base64.b64encode(choice).decode()] 

76 elif isinstance(choice, float) and math.isnan(choice): 

77 # handle nonstandard nan bit patterns. We don't need to do this for -0.0 

78 # vs 0.0 since json doesn't normalize -0.0 to 0.0. 

79 return ["float", float_to_int(choice)] 

80 return choice 

81 

82 

83def choices_to_json(choices: tuple[ChoiceT, ...]) -> list[Any]: 

84 return [_choice_to_json(choice) for choice in choices] 

85 

86 

87def _constraints_to_json( 

88 choice_type: ChoiceTypeT, constraints: ChoiceConstraintsT 

89) -> dict[str, Any]: 

90 constraints = constraints.copy() 

91 if choice_type == "integer": 

92 constraints = cast(IntegerConstraints, constraints) 

93 return { 

94 "min_value": _choice_to_json(constraints["min_value"]), 

95 "max_value": _choice_to_json(constraints["max_value"]), 

96 "weights": ( 

97 None 

98 if constraints["weights"] is None 

99 # wrap up in a list, instead of a dict, because json dicts 

100 # require string keys 

101 else [ 

102 (_choice_to_json(k), v) for k, v in constraints["weights"].items() 

103 ] 

104 ), 

105 "shrink_towards": _choice_to_json(constraints["shrink_towards"]), 

106 } 

107 elif choice_type == "float": 

108 constraints = cast(FloatConstraints, constraints) 

109 return { 

110 "min_value": _choice_to_json(constraints["min_value"]), 

111 "max_value": _choice_to_json(constraints["max_value"]), 

112 "allow_nan": constraints["allow_nan"], 

113 "smallest_nonzero_magnitude": constraints["smallest_nonzero_magnitude"], 

114 } 

115 elif choice_type == "string": 

116 constraints = cast(StringConstraints, constraints) 

117 assert isinstance(constraints["intervals"], IntervalSet) 

118 return { 

119 "intervals": constraints["intervals"].intervals, 

120 "min_size": _choice_to_json(constraints["min_size"]), 

121 "max_size": _choice_to_json(constraints["max_size"]), 

122 } 

123 elif choice_type == "bytes": 

124 constraints = cast(BytesConstraints, constraints) 

125 return { 

126 "min_size": _choice_to_json(constraints["min_size"]), 

127 "max_size": _choice_to_json(constraints["max_size"]), 

128 } 

129 elif choice_type == "boolean": 

130 constraints = cast(BooleanConstraints, constraints) 

131 return { 

132 "p": constraints["p"], 

133 } 

134 else: 

135 raise NotImplementedError(f"unknown choice type {choice_type}") 

136 

137 

138def nodes_to_json(nodes: tuple[ChoiceNode, ...]) -> list[dict[str, Any]]: 

139 return [ 

140 { 

141 "type": node.type, 

142 "value": _choice_to_json(node.value), 

143 "constraints": _constraints_to_json(node.type, node.constraints), 

144 "was_forced": node.was_forced, 

145 } 

146 for node in nodes 

147 ] 

148 

149 

150@dataclass 

151class ObservationMetadata: 

152 traceback: Optional[str] 

153 reproduction_decorator: Optional[str] 

154 predicates: dict[str, PredicateCounts] 

155 backend: dict[str, Any] 

156 sys_argv: list[str] 

157 os_getpid: int 

158 imported_at: float 

159 data_status: "Status" 

160 interesting_origin: Optional[InterestingOrigin] 

161 choice_nodes: Optional[tuple[ChoiceNode, ...]] 

162 choice_spans: Optional["Spans"] 

163 

164 def to_json(self) -> dict[str, Any]: 

165 data = { 

166 "traceback": self.traceback, 

167 "reproduction_decorator": self.reproduction_decorator, 

168 "predicates": self.predicates, 

169 "backend": self.backend, 

170 "sys.argv": self.sys_argv, 

171 "os.getpid()": self.os_getpid, 

172 "imported_at": self.imported_at, 

173 "data_status": self.data_status, 

174 "interesting_origin": self.interesting_origin, 

175 "choice_nodes": ( 

176 None if self.choice_nodes is None else nodes_to_json(self.choice_nodes) 

177 ), 

178 "choice_spans": ( 

179 None 

180 if self.choice_spans is None 

181 else [ 

182 ( 

183 # span.label is an int, but cast to string to avoid conversion 

184 # to float (and loss of precision) for large label values. 

185 # 

186 # The value of this label is opaque to consumers anyway, so its 

187 # type shouldn't matter as long as it's consistent. 

188 str(span.label), 

189 span.start, 

190 span.end, 

191 span.discarded, 

192 ) 

193 for span in self.choice_spans 

194 ] 

195 ), 

196 } 

197 # check that we didn't forget one 

198 assert len(data) == len(dataclasses.fields(self)) 

199 return data 

200 

201 

202@dataclass 

203class BaseObservation: 

204 type: Literal["test_case", "info", "alert", "error"] 

205 property: str 

206 run_start: float 

207 

208 

209InfoObservationType = Literal["info", "alert", "error"] 

210TestCaseStatus = Literal["gave_up", "passed", "failed"] 

211 

212 

213@dataclass 

214class InfoObservation(BaseObservation): 

215 type: InfoObservationType 

216 title: str 

217 content: Union[str, dict] 

218 

219 

220@dataclass 

221class TestCaseObservation(BaseObservation): 

222 __test__ = False # no! bad pytest! 

223 

224 type: Literal["test_case"] 

225 status: TestCaseStatus 

226 status_reason: str 

227 representation: str 

228 arguments: dict 

229 how_generated: str 

230 features: dict 

231 coverage: Optional[dict[str, list[int]]] 

232 timing: dict[str, float] 

233 metadata: ObservationMetadata 

234 

235 

236Observation: "TypeAlias" = Union[InfoObservation, TestCaseObservation] 

237 

238#: A list of callback functions for :ref:`observability <observability>`. Whenever 

239#: a new observation is created, each function in this list will be called with a 

240#: single value, which is a dictionary representing that observation. 

241#: 

242#: You can append a function to this list to receive observability reports, and 

243#: remove that function from the list to stop receiving observability reports. 

244#: Observability is considered enabled if this list is nonempty. 

245TESTCASE_CALLBACKS: list[Callable[[Observation], None]] = [] 

246 

247 

248@contextmanager 

249def with_observation_callback( 

250 callback: Callable[[Observation], None], 

251) -> Generator[None, None, None]: 

252 TESTCASE_CALLBACKS.append(callback) 

253 try: 

254 yield 

255 finally: 

256 TESTCASE_CALLBACKS.remove(callback) 

257 

258 

259def deliver_observation(observation: Observation) -> None: 

260 for callback in TESTCASE_CALLBACKS: 

261 callback(observation) 

262 

263 

264def make_testcase( 

265 *, 

266 run_start: float, 

267 property: str, 

268 data: "ConjectureData", 

269 how_generated: str, 

270 representation: str = "<unknown>", 

271 arguments: Optional[dict] = None, 

272 timing: dict[str, float], 

273 coverage: Optional[dict[str, list[int]]] = None, 

274 phase: Optional[str] = None, 

275 backend_metadata: Optional[dict[str, Any]] = None, 

276 status: Optional[ 

277 Union[TestCaseStatus, "Status"] 

278 ] = None, # overrides automatic calculation 

279 status_reason: Optional[str] = None, # overrides automatic calculation 

280 # added to calculated metadata. If keys overlap, the value from this `metadata` 

281 # is used 

282 metadata: Optional[dict[str, Any]] = None, 

283) -> TestCaseObservation: 

284 from hypothesis.core import reproduction_decorator 

285 from hypothesis.internal.conjecture.data import Status 

286 

287 # We should only be sending observability reports for datas that have finished 

288 # being modified. 

289 assert data.frozen 

290 

291 if status_reason is not None: 

292 pass 

293 elif data.interesting_origin: 

294 status_reason = str(data.interesting_origin) 

295 elif phase == "shrink" and data.status == Status.OVERRUN: 

296 status_reason = "exceeded size of current best example" 

297 else: 

298 status_reason = str(data.events.pop("invalid because", "")) 

299 

300 status_map: dict[Status, TestCaseStatus] = { 

301 Status.OVERRUN: "gave_up", 

302 Status.INVALID: "gave_up", 

303 Status.VALID: "passed", 

304 Status.INTERESTING: "failed", 

305 } 

306 

307 if status is not None and isinstance(status, Status): 

308 status = status_map[status] 

309 

310 return TestCaseObservation( 

311 type="test_case", 

312 status=status if status is not None else status_map[data.status], 

313 status_reason=status_reason, 

314 representation=representation, 

315 arguments={ 

316 k.removeprefix("generate:"): v for k, v in (arguments or {}).items() 

317 }, 

318 how_generated=how_generated, # iid, mutation, etc. 

319 features={ 

320 **{ 

321 f"target:{k}".strip(":"): v for k, v in data.target_observations.items() 

322 }, 

323 **data.events, 

324 }, 

325 coverage=coverage, 

326 timing=timing, 

327 metadata=ObservationMetadata( 

328 **{ 

329 "traceback": data.expected_traceback, 

330 "reproduction_decorator": ( 

331 reproduction_decorator(data.choices) if status == "failed" else None 

332 ), 

333 "predicates": dict(data._observability_predicates), 

334 "backend": backend_metadata or {}, 

335 "data_status": data.status, 

336 "interesting_origin": data.interesting_origin, 

337 "choice_nodes": data.nodes if OBSERVABILITY_CHOICES else None, 

338 "choice_spans": data.spans if OBSERVABILITY_CHOICES else None, 

339 **_system_metadata(), 

340 # unpack last so it takes precedence for duplicate keys 

341 **(metadata or {}), 

342 } 

343 ), 

344 run_start=run_start, 

345 property=property, 

346 ) 

347 

348 

349_WROTE_TO = set() 

350 

351 

352def _deliver_to_file(observation: Observation) -> None: # pragma: no cover 

353 from hypothesis.strategies._internal.utils import to_jsonable 

354 

355 kind = "testcases" if observation.type == "test_case" else "info" 

356 fname = storage_directory("observed", f"{date.today().isoformat()}_{kind}.jsonl") 

357 fname.parent.mkdir(exist_ok=True, parents=True) 

358 _WROTE_TO.add(fname) 

359 with fname.open(mode="a") as f: 

360 f.write(json.dumps(to_jsonable(observation, avoid_realization=False)) + "\n") 

361 

362 

363_imported_at = time.time() 

364 

365 

366@lru_cache 

367def _system_metadata() -> dict[str, Any]: 

368 return { 

369 "sys_argv": sys.argv, 

370 "os_getpid": os.getpid(), 

371 "imported_at": _imported_at, 

372 } 

373 

374 

375#: If ``False``, do not collect coverage information when observability is enabled. 

376#: 

377#: This is exposed both for performance (as coverage collection can be slow on 

378#: Python 3.11 and earlier) and size (if you do not use coverage information, 

379#: you may not want to store it in-memory). 

380OBSERVABILITY_COLLECT_COVERAGE = ( 

381 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os.environ 

382) 

383#: If ``True``, include the ``metadata.choice_nodes`` and ``metadata.spans`` keys 

384#: in test case observations. 

385#: 

386#: ``False`` by default. ``metadata.choice_nodes`` and ``metadata.spans`` can be 

387#: a substantial amount of data, and so must be opted-in to, even when 

388#: observability is enabled. 

389#: 

390#: .. warning:: 

391#: 

392#: EXPERIMENTAL AND UNSTABLE. We are actively working towards a better 

393#: interface for this as of June 2025, and this attribute may disappear or 

394#: be renamed without notice. 

395#: 

396OBSERVABILITY_CHOICES = "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICES" in os.environ 

397 

398if OBSERVABILITY_COLLECT_COVERAGE is False and ( 

399 sys.version_info[:2] >= (3, 12) 

400): # pragma: no cover 

401 warnings.warn( 

402 "Coverage data collection should be quite fast in Python 3.12 or later " 

403 "so there should be no need to turn coverage reporting off.", 

404 HypothesisWarning, 

405 stacklevel=2, 

406 ) 

407 

408if ( 

409 "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os.environ 

410 or OBSERVABILITY_COLLECT_COVERAGE is False 

411): # pragma: no cover 

412 TESTCASE_CALLBACKS.append(_deliver_to_file) 

413 

414 # Remove files more than a week old, to cap the size on disk 

415 max_age = (date.today() - timedelta(days=8)).isoformat() 

416 for f in storage_directory("observed", intent_to_write=False).glob("*.jsonl"): 

417 if f.stem < max_age: # pragma: no branch 

418 f.unlink(missing_ok=True)