Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/database.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

507 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import abc 

12import errno 

13import json 

14import os 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import weakref 

20from collections.abc import Iterable 

21from datetime import datetime, timedelta, timezone 

22from functools import lru_cache 

23from hashlib import sha384 

24from os import PathLike, getenv 

25from pathlib import Path, PurePath 

26from queue import Queue 

27from threading import Thread 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 Callable, 

32 ClassVar, 

33 Literal, 

34 Optional, 

35 Union, 

36 cast, 

37) 

38from urllib.error import HTTPError, URLError 

39from urllib.request import Request, urlopen 

40from zipfile import BadZipFile, ZipFile 

41 

42from hypothesis._settings import note_deprecation 

43from hypothesis.configuration import storage_directory 

44from hypothesis.errors import HypothesisException, HypothesisWarning 

45from hypothesis.internal.conjecture.choice import ChoiceT 

46from hypothesis.utils.conventions import UniqueIdentifier, not_set 

47 

48__all__ = [ 

49 "DirectoryBasedExampleDatabase", 

50 "ExampleDatabase", 

51 "GitHubArtifactDatabase", 

52 "InMemoryExampleDatabase", 

53 "MultiplexedDatabase", 

54 "ReadOnlyDatabase", 

55] 

56 

57if TYPE_CHECKING: 

58 from typing import TypeAlias 

59 

60 from watchdog.observers.api import BaseObserver 

61 

62StrPathT: "TypeAlias" = Union[str, PathLike[str]] 

63SaveDataT: "TypeAlias" = tuple[bytes, bytes] # key, value 

64DeleteDataT: "TypeAlias" = tuple[bytes, Optional[bytes]] # key, value 

65ListenerEventT: "TypeAlias" = Union[ 

66 tuple[Literal["save"], SaveDataT], tuple[Literal["delete"], DeleteDataT] 

67] 

68ListenerT: "TypeAlias" = Callable[[ListenerEventT], Any] 

69 

70 

71def _usable_dir(path: StrPathT) -> bool: 

72 """ 

73 Returns True if the desired path can be used as database path because 

74 either the directory exists and can be used, or its root directory can 

75 be used and we can make the directory as needed. 

76 """ 

77 path = Path(path) 

78 try: 

79 while not path.exists(): 

80 # Loop terminates because the root dir ('/' on unix) always exists. 

81 path = path.parent 

82 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK) 

83 except PermissionError: 

84 return False 

85 

86 

87def _db_for_path( 

88 path: Optional[Union[StrPathT, UniqueIdentifier, Literal[":memory:"]]] = None, 

89) -> "ExampleDatabase": 

90 if path is not_set: 

91 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover 

92 raise HypothesisException( 

93 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any " 

94 "effect. Configure your database location via a settings profile instead.\n" 

95 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles" 

96 ) 

97 

98 path = storage_directory("examples", intent_to_write=False) 

99 if not _usable_dir(path): # pragma: no cover 

100 warnings.warn( 

101 "The database setting is not configured, and the default " 

102 "location is unusable - falling back to an in-memory " 

103 f"database for this session. {path=}", 

104 HypothesisWarning, 

105 stacklevel=3, 

106 ) 

107 return InMemoryExampleDatabase() 

108 if path in (None, ":memory:"): 

109 return InMemoryExampleDatabase() 

110 path = cast(StrPathT, path) 

111 return DirectoryBasedExampleDatabase(path) 

112 

113 

114class _EDMeta(abc.ABCMeta): 

115 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase": 

116 if self is ExampleDatabase: 

117 note_deprecation( 

118 "Creating a database using the abstract ExampleDatabase() class " 

119 "is deprecated. Prefer using a concrete subclass, like " 

120 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). " 

121 'In particular, the special string ExampleDatabase(":memory:") ' 

122 "should be replaced by InMemoryExampleDatabase().", 

123 since="2025-04-07", 

124 has_codemod=False, 

125 ) 

126 return _db_for_path(*args, **kwargs) 

127 return super().__call__(*args, **kwargs) 

128 

129 

130# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase 

131# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx 

132# maintains a list of metaclass-call-methods to ignore, and while they would prefer 

133# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we 

134# can insert ourselves here. 

135# 

136# This code only runs if Sphinx has already been imported; and it would live in our 

137# docs/conf.py except that we would also like it to work for anyone documenting 

138# downstream ExampleDatabase subclasses too. 

139if "sphinx" in sys.modules: 

140 try: 

141 import sphinx.ext.autodoc 

142 

143 signature = "hypothesis.database._EDMeta.__call__" 

144 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions 

145 if isinstance(sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST, frozenset): 

146 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST = ( 

147 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST | {signature} 

148 ) 

149 else: 

150 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST.append(signature) 

151 except Exception: 

152 pass 

153 

154 

155class ExampleDatabase(metaclass=_EDMeta): 

156 """ 

157 A Hypothesis database, for use in |settings.database|. 

158 

159 Hypothesis automatically saves failures to the database set in 

160 |settings.database|. The next time the test is run, Hypothesis will replay 

161 any failures from the database in |settings.database| for that test (in 

162 |Phase.reuse|). 

163 

164 The database is best thought of as a cache that you never need to invalidate. 

165 Entries may be transparently dropped when upgrading your Hypothesis version 

166 or changing your test. Do not rely on the database for correctness; to ensure 

167 Hypothesis always tries an input, use |@example|. 

168 

169 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis 

170 provides several concrete database subclasses. To write your own database class, 

171 see :doc:`/how-to/custom-database`. 

172 

173 Change listening 

174 ---------------- 

175 

176 An optional extension to |ExampleDatabase| is change listening. On databases 

177 which support change listening, calling |ExampleDatabase.add_listener| adds 

178 a function as a change listener, which will be called whenever a value is 

179 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener| 

180 for details. 

181 

182 All databases in Hypothesis support change listening. Custom database classes 

183 are not required to support change listening, though they will not be compatible 

184 with features that require change listening until they do so. 

185 

186 .. note:: 

187 

188 While no Hypothesis features currently require change listening, change 

189 listening is required by `HypoFuzz <https://hypofuzz.com/>`_. 

190 

191 Database methods 

192 ---------------- 

193 

194 Required methods: 

195 

196 * |ExampleDatabase.save| 

197 * |ExampleDatabase.fetch| 

198 * |ExampleDatabase.delete| 

199 

200 Optional methods: 

201 

202 * |ExampleDatabase.move| 

203 

204 Change listening methods: 

205 

206 * |ExampleDatabase.add_listener| 

207 * |ExampleDatabase.remove_listener| 

208 * |ExampleDatabase.clear_listeners| 

209 * |ExampleDatabase._start_listening| 

210 * |ExampleDatabase._stop_listening| 

211 * |ExampleDatabase._broadcast_change| 

212 """ 

213 

214 def __init__(self) -> None: 

215 self._listeners: list[ListenerT] = [] 

216 

217 @abc.abstractmethod 

218 def save(self, key: bytes, value: bytes) -> None: 

219 """Save ``value`` under ``key``. 

220 

221 If ``value`` is already present in ``key``, silently do nothing. 

222 """ 

223 raise NotImplementedError(f"{type(self).__name__}.save") 

224 

225 @abc.abstractmethod 

226 def fetch(self, key: bytes) -> Iterable[bytes]: 

227 """Return an iterable over all values matching this key.""" 

228 raise NotImplementedError(f"{type(self).__name__}.fetch") 

229 

230 @abc.abstractmethod 

231 def delete(self, key: bytes, value: bytes) -> None: 

232 """Remove ``value`` from ``key``. 

233 

234 If ``value`` is not present in ``key``, silently do nothing. 

235 """ 

236 raise NotImplementedError(f"{type(self).__name__}.delete") 

237 

238 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

239 """ 

240 Move ``value`` from key ``src`` to key ``dest``. 

241 

242 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``, 

243 but may have a more efficient implementation. 

244 

245 Note that ``value`` will be inserted at ``dest`` regardless of whether 

246 it is currently present at ``src``. 

247 """ 

248 if src == dest: 

249 self.save(src, value) 

250 return 

251 self.delete(src, value) 

252 self.save(dest, value) 

253 

254 def add_listener(self, f: ListenerT, /) -> None: 

255 """ 

256 Add a change listener. ``f`` will be called whenever a value is saved, 

257 deleted, or moved in the database. 

258 

259 ``f`` can be called with two different event values: 

260 

261 * ``("save", (key, value))`` 

262 * ``("delete", (key, value))`` 

263 

264 where ``key`` and ``value`` are both ``bytes``. 

265 

266 There is no ``move`` event. Instead, a move is broadcasted as a 

267 ``delete`` event followed by a ``save`` event. 

268 

269 For the ``delete`` event, ``value`` may be ``None``. This might occur if 

270 the database knows that a deletion has occurred in ``key``, but does not 

271 know what value was deleted. 

272 """ 

273 had_listeners = bool(self._listeners) 

274 self._listeners.append(f) 

275 if not had_listeners: 

276 self._start_listening() 

277 

278 def remove_listener(self, f: ListenerT, /) -> None: 

279 """ 

280 Removes ``f`` from the list of change listeners. 

281 

282 If ``f`` is not in the list of change listeners, silently do nothing. 

283 """ 

284 if f not in self._listeners: 

285 return 

286 self._listeners.remove(f) 

287 if not self._listeners: 

288 self._stop_listening() 

289 

290 def clear_listeners(self) -> None: 

291 """Remove all change listeners.""" 

292 had_listeners = bool(self._listeners) 

293 self._listeners.clear() 

294 if had_listeners: 

295 self._stop_listening() 

296 

297 def _broadcast_change(self, event: ListenerEventT) -> None: 

298 """ 

299 Called when a value has been either added to or deleted from a key in 

300 the underlying database store. The possible values for ``event`` are: 

301 

302 * ``("save", (key, value))`` 

303 * ``("delete", (key, value))`` 

304 

305 ``value`` may be ``None`` for the ``delete`` event, indicating we know 

306 that some value was deleted under this key, but not its exact value. 

307 

308 Note that you should not assume your instance is the only reference to 

309 the underlying database store. For example, if two instances of 

310 |DirectoryBasedExampleDatabase| reference the same directory, 

311 _broadcast_change should be called whenever a file is added or removed 

312 from the directory, even if that database was not responsible for 

313 changing the file. 

314 """ 

315 for listener in self._listeners: 

316 listener(event) 

317 

318 def _start_listening(self) -> None: 

319 """ 

320 Called when the database adds a change listener, and did not previously 

321 have any change listeners. Intended to allow databases to wait to start 

322 expensive listening operations until necessary. 

323 

324 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate, 

325 so you do not need to handle the case of multiple consecutive 

326 ``_start_listening`` calls without an intermediate ``_stop_listening`` 

327 call. 

328 """ 

329 warnings.warn( 

330 f"{self.__class__} does not support listening for changes", 

331 HypothesisWarning, 

332 stacklevel=4, 

333 ) 

334 

335 def _stop_listening(self) -> None: 

336 """ 

337 Called whenever no change listeners remain on the database. 

338 

339 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate, 

340 so you do not need to handle the case of multiple consecutive 

341 ``_stop_listening`` calls without an intermediate ``_start_listening`` 

342 call. 

343 """ 

344 warnings.warn( 

345 f"{self.__class__} does not support stopping listening for changes", 

346 HypothesisWarning, 

347 stacklevel=4, 

348 ) 

349 

350 

351class InMemoryExampleDatabase(ExampleDatabase): 

352 """A non-persistent example database, implemented in terms of an in-memory 

353 dictionary. 

354 

355 This can be useful if you call a test function several times in a single 

356 session, or for testing other database implementations, but because it 

357 does not persist between runs we do not recommend it for general use. 

358 """ 

359 

360 def __init__(self) -> None: 

361 super().__init__() 

362 self.data: dict[bytes, set[bytes]] = {} 

363 

364 def __repr__(self) -> str: 

365 return f"InMemoryExampleDatabase({self.data!r})" 

366 

367 def __eq__(self, other: object) -> bool: 

368 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data 

369 

370 def fetch(self, key: bytes) -> Iterable[bytes]: 

371 yield from self.data.get(key, ()) 

372 

373 def save(self, key: bytes, value: bytes) -> None: 

374 value = bytes(value) 

375 values = self.data.setdefault(key, set()) 

376 changed = value not in values 

377 values.add(value) 

378 

379 if changed: 

380 self._broadcast_change(("save", (key, value))) 

381 

382 def delete(self, key: bytes, value: bytes) -> None: 

383 value = bytes(value) 

384 values = self.data.get(key, set()) 

385 changed = value in values 

386 values.discard(value) 

387 

388 if changed: 

389 self._broadcast_change(("delete", (key, value))) 

390 

391 def _start_listening(self) -> None: 

392 # declare compatibility with the listener api, but do the actual 

393 # implementation in .delete and .save, since we know we are the only 

394 # writer to .data. 

395 pass 

396 

397 def _stop_listening(self) -> None: 

398 pass 

399 

400 

401def _hash(key: bytes) -> str: 

402 return sha384(key).hexdigest()[:16] 

403 

404 

405class DirectoryBasedExampleDatabase(ExampleDatabase): 

406 """Use a directory to store Hypothesis examples as files. 

407 

408 Each test corresponds to a directory, and each example to a file within that 

409 directory. While the contents are fairly opaque, a 

410 |DirectoryBasedExampleDatabase| can be shared by checking the directory 

411 into version control, for example with the following ``.gitignore``:: 

412 

413 # Ignore files cached by Hypothesis... 

414 .hypothesis/* 

415 # except for the examples directory 

416 !.hypothesis/examples/ 

417 

418 Note however that this only makes sense if you also pin to an exact version of 

419 Hypothesis, and we would usually recommend implementing a shared database with 

420 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper. 

421 """ 

422 

423 # we keep a database entry of the full values of all the database keys. 

424 # currently only used for inverse mapping of hash -> key in change listening. 

425 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys" 

426 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name) 

427 

428 def __init__(self, path: StrPathT) -> None: 

429 super().__init__() 

430 self.path = Path(path) 

431 self.keypaths: dict[bytes, Path] = {} 

432 self._observer: BaseObserver | None = None 

433 

434 def __repr__(self) -> str: 

435 return f"DirectoryBasedExampleDatabase({self.path!r})" 

436 

437 def __eq__(self, other: object) -> bool: 

438 return ( 

439 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path 

440 ) 

441 

442 def _key_path(self, key: bytes) -> Path: 

443 try: 

444 return self.keypaths[key] 

445 except KeyError: 

446 pass 

447 self.keypaths[key] = self.path / _hash(key) 

448 return self.keypaths[key] 

449 

450 def _value_path(self, key: bytes, value: bytes) -> Path: 

451 return self._key_path(key) / _hash(value) 

452 

453 def fetch(self, key: bytes) -> Iterable[bytes]: 

454 kp = self._key_path(key) 

455 if not kp.is_dir(): 

456 return 

457 

458 try: 

459 for path in os.listdir(kp): 

460 try: 

461 yield (kp / path).read_bytes() 

462 except OSError: 

463 pass 

464 except OSError: # pragma: no cover 

465 # the `kp` directory might have been deleted in the meantime 

466 pass 

467 

468 def save(self, key: bytes, value: bytes) -> None: 

469 key_path = self._key_path(key) 

470 if key_path.name != self._metakeys_hash: 

471 # add this key to our meta entry of all keys - taking care to avoid 

472 # infinite recursion. 

473 self.save(self._metakeys_name, key) 

474 

475 # Note: we attempt to create the dir in question now. We 

476 # already checked for permissions, but there can still be other issues, 

477 # e.g. the disk is full, or permissions might have been changed. 

478 try: 

479 key_path.mkdir(exist_ok=True, parents=True) 

480 path = self._value_path(key, value) 

481 if not path.exists(): 

482 # to mimic an atomic write, create and write in a temporary 

483 # directory, and only move to the final path after. This avoids 

484 # any intermediate state where the file is created (and empty) 

485 # but not yet written to. 

486 fd, tmpname = tempfile.mkstemp() 

487 tmppath = Path(tmpname) 

488 os.write(fd, value) 

489 os.close(fd) 

490 try: 

491 tmppath.rename(path) 

492 except OSError as err: # pragma: no cover 

493 if err.errno == errno.EXDEV: 

494 # Can't rename across filesystem boundaries, see e.g. 

495 # https://github.com/HypothesisWorks/hypothesis/issues/4335 

496 try: 

497 path.write_bytes(tmppath.read_bytes()) 

498 except OSError: 

499 pass 

500 tmppath.unlink() 

501 assert not tmppath.exists() 

502 except OSError: # pragma: no cover 

503 pass 

504 

505 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

506 if src == dest: 

507 self.save(src, value) 

508 return 

509 

510 src_path = self._value_path(src, value) 

511 dest_path = self._value_path(dest, value) 

512 # if the dest key path does not exist, os.renames will create it for us, 

513 # and we will never track its creation in the meta keys entry. Do so now. 

514 if not self._key_path(dest).exists(): 

515 self.save(self._metakeys_name, dest) 

516 

517 try: 

518 os.renames(src_path, dest_path) 

519 except OSError: 

520 self.delete(src, value) 

521 self.save(dest, value) 

522 

523 def delete(self, key: bytes, value: bytes) -> None: 

524 try: 

525 self._value_path(key, value).unlink() 

526 except OSError: 

527 return 

528 

529 # try deleting the key dir, which will only succeed if the dir is empty 

530 # (i.e. ``value`` was the last value in this key). 

531 try: 

532 self._key_path(key).rmdir() 

533 except OSError: 

534 pass 

535 else: 

536 # if the deletion succeeded, also delete this key entry from metakeys. 

537 # (if this key happens to be the metakey itself, this deletion will 

538 # fail; that's ok and faster than checking for this rare case.) 

539 self.delete(self._metakeys_name, key) 

540 

541 def _start_listening(self) -> None: 

542 try: 

543 from watchdog.events import ( 

544 DirCreatedEvent, 

545 DirDeletedEvent, 

546 DirMovedEvent, 

547 FileCreatedEvent, 

548 FileDeletedEvent, 

549 FileMovedEvent, 

550 FileSystemEventHandler, 

551 ) 

552 from watchdog.observers import Observer 

553 except ImportError: 

554 warnings.warn( 

555 f"listening for changes in a {self.__class__.__name__} " 

556 "requires the watchdog library. To install, run " 

557 "`pip install hypothesis[watchdog]`", 

558 HypothesisWarning, 

559 stacklevel=4, 

560 ) 

561 return 

562 

563 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)} 

564 _metakeys_hash = self._metakeys_hash 

565 _broadcast_change = self._broadcast_change 

566 

567 class Handler(FileSystemEventHandler): 

568 def on_created( 

569 _self, event: Union[FileCreatedEvent, DirCreatedEvent] 

570 ) -> None: 

571 # we only registered for the file creation event 

572 assert not isinstance(event, DirCreatedEvent) 

573 # watchdog events are only bytes if we passed a byte path to 

574 # .schedule 

575 assert isinstance(event.src_path, str) 

576 

577 value_path = Path(event.src_path) 

578 # the parent dir represents the key, and its name is the key hash 

579 key_hash = value_path.parent.name 

580 

581 if key_hash == _metakeys_hash: 

582 try: 

583 hash_to_key[value_path.name] = value_path.read_bytes() 

584 except OSError: # pragma: no cover 

585 # this might occur if all the values in a key have been 

586 # deleted and DirectoryBasedExampleDatabase removes its 

587 # metakeys entry (which is `value_path` here`). 

588 pass 

589 return 

590 

591 key = hash_to_key.get(key_hash) 

592 if key is None: # pragma: no cover 

593 # we didn't recognize this key. This shouldn't ever happen, 

594 # but some race condition trickery might cause this. 

595 return 

596 

597 try: 

598 value = value_path.read_bytes() 

599 except OSError: # pragma: no cover 

600 return 

601 

602 _broadcast_change(("save", (key, value))) 

603 

604 def on_deleted( 

605 self, event: Union[FileDeletedEvent, DirDeletedEvent] 

606 ) -> None: 

607 assert not isinstance(event, DirDeletedEvent) 

608 assert isinstance(event.src_path, str) 

609 

610 value_path = Path(event.src_path) 

611 key = hash_to_key.get(value_path.parent.name) 

612 if key is None: # pragma: no cover 

613 return 

614 

615 _broadcast_change(("delete", (key, None))) 

616 

617 def on_moved(self, event: Union[FileMovedEvent, DirMovedEvent]) -> None: 

618 assert not isinstance(event, DirMovedEvent) 

619 assert isinstance(event.src_path, str) 

620 assert isinstance(event.dest_path, str) 

621 

622 src_path = Path(event.src_path) 

623 dest_path = Path(event.dest_path) 

624 k1 = hash_to_key.get(src_path.parent.name) 

625 k2 = hash_to_key.get(dest_path.parent.name) 

626 

627 if k1 is None or k2 is None: # pragma: no cover 

628 return 

629 

630 try: 

631 value = dest_path.read_bytes() 

632 except OSError: # pragma: no cover 

633 return 

634 

635 _broadcast_change(("delete", (k1, value))) 

636 _broadcast_change(("save", (k2, value))) 

637 

638 # If we add a listener to a DirectoryBasedExampleDatabase whose database 

639 # directory doesn't yet exist, the watchdog observer will not fire any 

640 # events, even after the directory gets created. 

641 # 

642 # Ensure the directory exists before starting the observer. 

643 self.path.mkdir(exist_ok=True, parents=True) 

644 self._observer = Observer() 

645 self._observer.schedule( 

646 Handler(), 

647 # remove type: ignore when released 

648 # https://github.com/gorakhargosh/watchdog/pull/1096 

649 self.path, # type: ignore 

650 recursive=True, 

651 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent], 

652 ) 

653 self._observer.start() 

654 

655 def _stop_listening(self) -> None: 

656 assert self._observer is not None 

657 self._observer.stop() 

658 self._observer.join() 

659 self._observer = None 

660 

661 

662class ReadOnlyDatabase(ExampleDatabase): 

663 """A wrapper to make the given database read-only. 

664 

665 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and 

666 ``move`` into silent no-ops. 

667 

668 Note that this disables Hypothesis' automatic discarding of stale examples. 

669 It is designed to allow local machines to access a shared database (e.g. from CI 

670 servers), without propagating changes back from a local or in-development branch. 

671 """ 

672 

673 def __init__(self, db: ExampleDatabase) -> None: 

674 super().__init__() 

675 assert isinstance(db, ExampleDatabase) 

676 self._wrapped = db 

677 

678 def __repr__(self) -> str: 

679 return f"ReadOnlyDatabase({self._wrapped!r})" 

680 

681 def __eq__(self, other: object) -> bool: 

682 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped 

683 

684 def fetch(self, key: bytes) -> Iterable[bytes]: 

685 yield from self._wrapped.fetch(key) 

686 

687 def save(self, key: bytes, value: bytes) -> None: 

688 pass 

689 

690 def delete(self, key: bytes, value: bytes) -> None: 

691 pass 

692 

693 def _start_listening(self) -> None: 

694 # we're read only, so there are no changes to broadcast. 

695 pass 

696 

697 def _stop_listening(self) -> None: 

698 pass 

699 

700 

701class MultiplexedDatabase(ExampleDatabase): 

702 """A wrapper around multiple databases. 

703 

704 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against 

705 all of the wrapped databases. ``fetch`` does not yield duplicate values, even 

706 if the same value is present in two or more of the wrapped databases. 

707 

708 This combines well with a :class:`ReadOnlyDatabase`, as follows: 

709 

710 .. code-block:: python 

711 

712 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/") 

713 shared = CustomNetworkDatabase() 

714 

715 settings.register_profile("ci", database=shared) 

716 settings.register_profile( 

717 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared)) 

718 ) 

719 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

720 

721 So your CI system or fuzzing runs can populate a central shared database; 

722 while local runs on development machines can reproduce any failures from CI 

723 but will only cache their own failures locally and cannot remove examples 

724 from the shared database. 

725 """ 

726 

727 def __init__(self, *dbs: ExampleDatabase) -> None: 

728 super().__init__() 

729 assert all(isinstance(db, ExampleDatabase) for db in dbs) 

730 self._wrapped = dbs 

731 

732 def __repr__(self) -> str: 

733 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped))) 

734 

735 def __eq__(self, other: object) -> bool: 

736 return ( 

737 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped 

738 ) 

739 

740 def fetch(self, key: bytes) -> Iterable[bytes]: 

741 seen = set() 

742 for db in self._wrapped: 

743 for value in db.fetch(key): 

744 if value not in seen: 

745 yield value 

746 seen.add(value) 

747 

748 def save(self, key: bytes, value: bytes) -> None: 

749 for db in self._wrapped: 

750 db.save(key, value) 

751 

752 def delete(self, key: bytes, value: bytes) -> None: 

753 for db in self._wrapped: 

754 db.delete(key, value) 

755 

756 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

757 for db in self._wrapped: 

758 db.move(src, dest, value) 

759 

760 def _start_listening(self) -> None: 

761 for db in self._wrapped: 

762 db.add_listener(self._broadcast_change) 

763 

764 def _stop_listening(self) -> None: 

765 for db in self._wrapped: 

766 db.remove_listener(self._broadcast_change) 

767 

768 

769class GitHubArtifactDatabase(ExampleDatabase): 

770 """ 

771 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact. 

772 

773 You can use this for sharing example databases between CI runs and developers, allowing 

774 the latter to get read-only access to the former. This is particularly useful for 

775 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_), 

776 where the CI system can help find new failing examples through fuzzing, 

777 and developers can reproduce them locally without any manual effort. 

778 

779 .. note:: 

780 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides 

781 this automatically, but it needs to be set manually for local usage. In a developer machine, 

782 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_. 

783 If the repository is private, it's necessary for the token to have ``repo`` scope 

784 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token. 

785 

786 

787 In most cases, this will be used 

788 through the :class:`~hypothesis.database.MultiplexedDatabase`, 

789 by combining a local directory-based database with this one. For example: 

790 

791 .. code-block:: python 

792 

793 local = DirectoryBasedExampleDatabase(".hypothesis/examples") 

794 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo")) 

795 

796 settings.register_profile("ci", database=local) 

797 settings.register_profile("dev", database=MultiplexedDatabase(local, shared)) 

798 # We don't want to use the shared database in CI, only to populate its local one. 

799 # which the workflow should then upload as an artifact. 

800 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

801 

802 .. note:: 

803 Because this database is read-only, you always need to wrap it with the 

804 :class:`ReadOnlyDatabase`. 

805 

806 A setup like this can be paired with a GitHub Actions workflow including 

807 something like the following: 

808 

809 .. code-block:: yaml 

810 

811 - name: Download example database 

812 uses: dawidd6/action-download-artifact@v9 

813 with: 

814 name: hypothesis-example-db 

815 path: .hypothesis/examples 

816 if_no_artifact_found: warn 

817 workflow_conclusion: completed 

818 

819 - name: Run tests 

820 run: pytest 

821 

822 - name: Upload example database 

823 uses: actions/upload-artifact@v3 

824 if: always() 

825 with: 

826 name: hypothesis-example-db 

827 path: .hypothesis/examples 

828 

829 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_ 

830 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_ 

831 does not support downloading artifacts from previous workflow runs. 

832 

833 The database automatically implements a simple file-based cache with a default expiration period 

834 of 1 day. You can adjust this through the ``cache_timeout`` property. 

835 

836 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``). 

837 """ 

838 

839 def __init__( 

840 self, 

841 owner: str, 

842 repo: str, 

843 artifact_name: str = "hypothesis-example-db", 

844 cache_timeout: timedelta = timedelta(days=1), 

845 path: Optional[StrPathT] = None, 

846 ): 

847 super().__init__() 

848 self.owner = owner 

849 self.repo = repo 

850 self.artifact_name = artifact_name 

851 self.cache_timeout = cache_timeout 

852 

853 # Get the GitHub token from the environment 

854 # It's unnecessary to use a token if the repo is public 

855 self.token: Optional[str] = getenv("GITHUB_TOKEN") 

856 

857 if path is None: 

858 self.path: Path = Path( 

859 storage_directory(f"github-artifacts/{self.artifact_name}/") 

860 ) 

861 else: 

862 self.path = Path(path) 

863 

864 # We don't want to initialize the cache until we need to 

865 self._initialized: bool = False 

866 self._disabled: bool = False 

867 

868 # This is the path to the artifact in usage 

869 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip 

870 self._artifact: Optional[Path] = None 

871 # This caches the artifact structure 

872 self._access_cache: Optional[dict[PurePath, set[PurePath]]] = None 

873 

874 # Message to display if user doesn't wrap around ReadOnlyDatabase 

875 self._read_only_message = ( 

876 "This database is read-only. " 

877 "Please wrap this class with ReadOnlyDatabase" 

878 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))." 

879 ) 

880 

881 def __repr__(self) -> str: 

882 return ( 

883 f"GitHubArtifactDatabase(owner={self.owner!r}, " 

884 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})" 

885 ) 

886 

887 def __eq__(self, other: object) -> bool: 

888 return ( 

889 isinstance(other, GitHubArtifactDatabase) 

890 and self.owner == other.owner 

891 and self.repo == other.repo 

892 and self.artifact_name == other.artifact_name 

893 and self.path == other.path 

894 ) 

895 

896 def _prepare_for_io(self) -> None: 

897 assert self._artifact is not None, "Artifact not loaded." 

898 

899 if self._initialized: # pragma: no cover 

900 return 

901 

902 # Test that the artifact is valid 

903 try: 

904 with ZipFile(self._artifact) as f: 

905 if f.testzip(): # pragma: no cover 

906 raise BadZipFile 

907 

908 # Turns out that testzip() doesn't work quite well 

909 # doing the cache initialization here instead 

910 # will give us more coverage of the artifact. 

911 

912 # Cache the files inside each keypath 

913 self._access_cache = {} 

914 with ZipFile(self._artifact) as zf: 

915 namelist = zf.namelist() 

916 # Iterate over files in the artifact 

917 for filename in namelist: 

918 fileinfo = zf.getinfo(filename) 

919 if fileinfo.is_dir(): 

920 self._access_cache[PurePath(filename)] = set() 

921 else: 

922 # Get the keypath from the filename 

923 keypath = PurePath(filename).parent 

924 # Add the file to the keypath 

925 self._access_cache[keypath].add(PurePath(filename)) 

926 except BadZipFile: 

927 warnings.warn( 

928 "The downloaded artifact from GitHub is invalid. " 

929 "This could be because the artifact was corrupted, " 

930 "or because the artifact was not created by Hypothesis. ", 

931 HypothesisWarning, 

932 stacklevel=3, 

933 ) 

934 self._disabled = True 

935 

936 self._initialized = True 

937 

938 def _initialize_db(self) -> None: 

939 # Trigger warning that we suppressed earlier by intent_to_write=False 

940 storage_directory(self.path.name) 

941 # Create the cache directory if it doesn't exist 

942 self.path.mkdir(exist_ok=True, parents=True) 

943 

944 # Get all artifacts 

945 cached_artifacts = sorted( 

946 self.path.glob("*.zip"), 

947 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")), 

948 ) 

949 

950 # Remove all but the latest artifact 

951 for artifact in cached_artifacts[:-1]: 

952 artifact.unlink() 

953 

954 try: 

955 found_artifact = cached_artifacts[-1] 

956 except IndexError: 

957 found_artifact = None 

958 

959 # Check if the latest artifact is a cache hit 

960 if found_artifact is not None and ( 

961 datetime.now(timezone.utc) 

962 - datetime.fromisoformat(found_artifact.stem.replace("_", ":")) 

963 < self.cache_timeout 

964 ): 

965 self._artifact = found_artifact 

966 else: 

967 # Download the latest artifact from GitHub 

968 new_artifact = self._fetch_artifact() 

969 

970 if new_artifact: 

971 if found_artifact is not None: 

972 found_artifact.unlink() 

973 self._artifact = new_artifact 

974 elif found_artifact is not None: 

975 warnings.warn( 

976 "Using an expired artifact as a fallback for the database: " 

977 f"{found_artifact}", 

978 HypothesisWarning, 

979 stacklevel=2, 

980 ) 

981 self._artifact = found_artifact 

982 else: 

983 warnings.warn( 

984 "Couldn't acquire a new or existing artifact. Disabling database.", 

985 HypothesisWarning, 

986 stacklevel=2, 

987 ) 

988 self._disabled = True 

989 return 

990 

991 self._prepare_for_io() 

992 

993 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover 

994 request = Request( 

995 url, 

996 headers={ 

997 "Accept": "application/vnd.github+json", 

998 "X-GitHub-Api-Version": "2022-11-28 ", 

999 "Authorization": f"Bearer {self.token}", 

1000 }, 

1001 ) 

1002 warning_message = None 

1003 response_bytes: Optional[bytes] = None 

1004 try: 

1005 with urlopen(request) as response: 

1006 response_bytes = response.read() 

1007 except HTTPError as e: 

1008 if e.code == 401: 

1009 warning_message = ( 

1010 "Authorization failed when trying to download artifact from GitHub. " 

1011 "Check that you have a valid GITHUB_TOKEN set in your environment." 

1012 ) 

1013 else: 

1014 warning_message = ( 

1015 "Could not get the latest artifact from GitHub. " 

1016 "This could be because because the repository " 

1017 "or artifact does not exist. " 

1018 ) 

1019 # see https://github.com/python/cpython/issues/128734 

1020 e.close() 

1021 except URLError: 

1022 warning_message = "Could not connect to GitHub to get the latest artifact. " 

1023 except TimeoutError: 

1024 warning_message = ( 

1025 "Could not connect to GitHub to get the latest artifact " 

1026 "(connection timed out)." 

1027 ) 

1028 

1029 if warning_message is not None: 

1030 warnings.warn(warning_message, HypothesisWarning, stacklevel=4) 

1031 return None 

1032 

1033 return response_bytes 

1034 

1035 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover 

1036 # Get the list of artifacts from GitHub 

1037 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts" 

1038 response_bytes = self._get_bytes(url) 

1039 if response_bytes is None: 

1040 return None 

1041 

1042 artifacts = json.loads(response_bytes)["artifacts"] 

1043 artifacts = [a for a in artifacts if a["name"] == self.artifact_name] 

1044 

1045 if not artifacts: 

1046 return None 

1047 

1048 # Get the latest artifact from the list 

1049 artifact = max(artifacts, key=lambda a: a["created_at"]) 

1050 url = artifact["archive_download_url"] 

1051 

1052 # Download the artifact 

1053 artifact_bytes = self._get_bytes(url) 

1054 if artifact_bytes is None: 

1055 return None 

1056 

1057 # Save the artifact to the cache 

1058 # We replace ":" with "_" to ensure the filenames are compatible 

1059 # with Windows filesystems 

1060 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_") 

1061 artifact_path = self.path / f"{timestamp}.zip" 

1062 try: 

1063 artifact_path.write_bytes(artifact_bytes) 

1064 except OSError: 

1065 warnings.warn( 

1066 "Could not save the latest artifact from GitHub. ", 

1067 HypothesisWarning, 

1068 stacklevel=3, 

1069 ) 

1070 return None 

1071 

1072 return artifact_path 

1073 

1074 @staticmethod 

1075 @lru_cache 

1076 def _key_path(key: bytes) -> PurePath: 

1077 return PurePath(_hash(key) + "/") 

1078 

1079 def fetch(self, key: bytes) -> Iterable[bytes]: 

1080 if self._disabled: 

1081 return 

1082 

1083 if not self._initialized: 

1084 self._initialize_db() 

1085 if self._disabled: 

1086 return 

1087 

1088 assert self._artifact is not None 

1089 assert self._access_cache is not None 

1090 

1091 kp = self._key_path(key) 

1092 

1093 with ZipFile(self._artifact) as zf: 

1094 # Get the all files in the the kp from the cache 

1095 filenames = self._access_cache.get(kp, ()) 

1096 for filename in filenames: 

1097 with zf.open(filename.as_posix()) as f: 

1098 yield f.read() 

1099 

1100 # Read-only interface 

1101 def save(self, key: bytes, value: bytes) -> None: 

1102 raise RuntimeError(self._read_only_message) 

1103 

1104 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1105 raise RuntimeError(self._read_only_message) 

1106 

1107 def delete(self, key: bytes, value: bytes) -> None: 

1108 raise RuntimeError(self._read_only_message) 

1109 

1110 

1111class BackgroundWriteDatabase(ExampleDatabase): 

1112 """A wrapper which defers writes on the given database to a background thread. 

1113 

1114 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any 

1115 enqueued writes to finish before fetching from the database. 

1116 """ 

1117 

1118 def __init__(self, db: ExampleDatabase) -> None: 

1119 super().__init__() 

1120 self._db = db 

1121 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue() 

1122 self._thread: Optional[Thread] = None 

1123 

1124 def _ensure_thread(self): 

1125 if self._thread is None: 

1126 self._thread = Thread(target=self._worker, daemon=True) 

1127 self._thread.start() 

1128 # avoid an unbounded timeout during gc. 0.1 should be plenty for most 

1129 # use cases. 

1130 weakref.finalize(self, self._join, 0.1) 

1131 

1132 def __repr__(self) -> str: 

1133 return f"BackgroundWriteDatabase({self._db!r})" 

1134 

1135 def __eq__(self, other: object) -> bool: 

1136 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db 

1137 

1138 def _worker(self) -> None: 

1139 while True: 

1140 method, args = self._queue.get() 

1141 getattr(self._db, method)(*args) 

1142 self._queue.task_done() 

1143 

1144 def _join(self, timeout: Optional[float] = None) -> None: 

1145 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634 

1146 with self._queue.all_tasks_done: 

1147 while self._queue.unfinished_tasks: 

1148 self._queue.all_tasks_done.wait(timeout) 

1149 

1150 def fetch(self, key: bytes) -> Iterable[bytes]: 

1151 self._join() 

1152 return self._db.fetch(key) 

1153 

1154 def save(self, key: bytes, value: bytes) -> None: 

1155 self._ensure_thread() 

1156 self._queue.put(("save", (key, value))) 

1157 

1158 def delete(self, key: bytes, value: bytes) -> None: 

1159 self._ensure_thread() 

1160 self._queue.put(("delete", (key, value))) 

1161 

1162 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1163 self._ensure_thread() 

1164 self._queue.put(("move", (src, dest, value))) 

1165 

1166 def _start_listening(self) -> None: 

1167 self._db.add_listener(self._broadcast_change) 

1168 

1169 def _stop_listening(self) -> None: 

1170 self._db.remove_listener(self._broadcast_change) 

1171 

1172 

1173def _pack_uleb128(value: int) -> bytes: 

1174 """ 

1175 Serialize an integer into variable-length bytes. For each byte, the first 7 

1176 bits represent (part of) the integer, while the last bit indicates whether the 

1177 integer continues into the next byte. 

1178 

1179 https://en.wikipedia.org/wiki/LEB128 

1180 """ 

1181 parts = bytearray() 

1182 assert value >= 0 

1183 while True: 

1184 # chop off 7 bits 

1185 byte = value & ((1 << 7) - 1) 

1186 value >>= 7 

1187 # set the continuation bit if we have more left 

1188 if value: 

1189 byte |= 1 << 7 

1190 

1191 parts.append(byte) 

1192 if not value: 

1193 break 

1194 return bytes(parts) 

1195 

1196 

1197def _unpack_uleb128(buffer: bytes) -> tuple[int, int]: 

1198 """ 

1199 Inverts _pack_uleb128, and also returns the index at which at which we stopped 

1200 reading. 

1201 """ 

1202 value = 0 

1203 for i, byte in enumerate(buffer): 

1204 n = byte & ((1 << 7) - 1) 

1205 value |= n << (i * 7) 

1206 

1207 if not byte >> 7: 

1208 break 

1209 return (i + 1, value) 

1210 

1211 

1212def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes: 

1213 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes.""" 

1214 # We use a custom serialization format for this, which might seem crazy - but our 

1215 # data is a flat sequence of elements, and standard tools like protobuf or msgpack 

1216 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode. 

1217 # 

1218 # We simply encode each element with a metadata byte, if needed a uint16 size, and 

1219 # then the payload bytes. For booleans, the payload is inlined into the metadata. 

1220 parts = [] 

1221 for choice in choices: 

1222 if isinstance(choice, bool): 

1223 # `000_0000v` - tag zero, low bit payload. 

1224 parts.append(b"\1" if choice else b"\0") 

1225 continue 

1226 

1227 # `tag_ssss [uint16 size?] [payload]` 

1228 if isinstance(choice, float): 

1229 tag = 1 << 5 

1230 choice = struct.pack("!d", choice) 

1231 elif isinstance(choice, int): 

1232 tag = 2 << 5 

1233 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True) 

1234 elif isinstance(choice, bytes): 

1235 tag = 3 << 5 

1236 else: 

1237 assert isinstance(choice, str) 

1238 tag = 4 << 5 

1239 choice = choice.encode(errors="surrogatepass") 

1240 

1241 size = len(choice) 

1242 if size < 0b11111: 

1243 parts.append((tag | size).to_bytes(1, "big")) 

1244 else: 

1245 parts.append((tag | 0b11111).to_bytes(1, "big")) 

1246 parts.append(_pack_uleb128(size)) 

1247 parts.append(choice) 

1248 

1249 return b"".join(parts) 

1250 

1251 

1252def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]: 

1253 # See above for an explanation of the format. 

1254 parts: list[ChoiceT] = [] 

1255 idx = 0 

1256 while idx < len(buffer): 

1257 tag = buffer[idx] >> 5 

1258 size = buffer[idx] & 0b11111 

1259 idx += 1 

1260 

1261 if tag == 0: 

1262 parts.append(bool(size)) 

1263 continue 

1264 if size == 0b11111: 

1265 (offset, size) = _unpack_uleb128(buffer[idx:]) 

1266 idx += offset 

1267 chunk = buffer[idx : idx + size] 

1268 idx += size 

1269 

1270 if tag == 1: 

1271 assert size == 8, "expected float64" 

1272 parts.extend(struct.unpack("!d", chunk)) 

1273 elif tag == 2: 

1274 parts.append(int.from_bytes(chunk, "big", signed=True)) 

1275 elif tag == 3: 

1276 parts.append(chunk) 

1277 else: 

1278 assert tag == 4 

1279 parts.append(chunk.decode(errors="surrogatepass")) 

1280 return tuple(parts) 

1281 

1282 

1283def choices_from_bytes(buffer: bytes, /) -> Optional[tuple[ChoiceT, ...]]: 

1284 """ 

1285 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes. 

1286 

1287 Returns None if the given bytestring is not a valid serialization of choice 

1288 sequences. 

1289 """ 

1290 try: 

1291 return _choices_from_bytes(buffer) 

1292 except Exception: 

1293 # deserialization error, eg because our format changed or someone put junk 

1294 # data in the db. 

1295 return None