Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/database.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

507 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import abc 

12import errno 

13import json 

14import os 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import weakref 

20from collections.abc import Iterable 

21from datetime import datetime, timedelta, timezone 

22from functools import lru_cache 

23from hashlib import sha384 

24from os import PathLike, getenv 

25from pathlib import Path, PurePath 

26from queue import Queue 

27from threading import Thread 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 Callable, 

32 ClassVar, 

33 Literal, 

34 Optional, 

35 Union, 

36 cast, 

37) 

38from urllib.error import HTTPError, URLError 

39from urllib.request import Request, urlopen 

40from zipfile import BadZipFile, ZipFile 

41 

42from hypothesis._settings import note_deprecation 

43from hypothesis.configuration import storage_directory 

44from hypothesis.errors import HypothesisException, HypothesisWarning 

45from hypothesis.internal.conjecture.choice import ChoiceT 

46from hypothesis.utils.conventions import UniqueIdentifier, not_set 

47 

48__all__ = [ 

49 "DirectoryBasedExampleDatabase", 

50 "ExampleDatabase", 

51 "GitHubArtifactDatabase", 

52 "InMemoryExampleDatabase", 

53 "MultiplexedDatabase", 

54 "ReadOnlyDatabase", 

55] 

56 

57if TYPE_CHECKING: 

58 from typing import TypeAlias 

59 

60 from watchdog.observers.api import BaseObserver 

61 

62StrPathT: "TypeAlias" = Union[str, PathLike[str]] 

63SaveDataT: "TypeAlias" = tuple[bytes, bytes] # key, value 

64DeleteDataT: "TypeAlias" = tuple[bytes, Optional[bytes]] # key, value 

65ListenerEventT: "TypeAlias" = Union[ 

66 tuple[Literal["save"], SaveDataT], tuple[Literal["delete"], DeleteDataT] 

67] 

68ListenerT: "TypeAlias" = Callable[[ListenerEventT], Any] 

69 

70 

71def _usable_dir(path: StrPathT) -> bool: 

72 """ 

73 Returns True if the desired path can be used as database path because 

74 either the directory exists and can be used, or its root directory can 

75 be used and we can make the directory as needed. 

76 """ 

77 path = Path(path) 

78 try: 

79 while not path.exists(): 

80 # Loop terminates because the root dir ('/' on unix) always exists. 

81 path = path.parent 

82 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK) 

83 except PermissionError: 

84 return False 

85 

86 

87def _db_for_path( 

88 path: Optional[Union[StrPathT, UniqueIdentifier, Literal[":memory:"]]] = None, 

89) -> "ExampleDatabase": 

90 if path is not_set: 

91 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover 

92 raise HypothesisException( 

93 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any " 

94 "effect. Configure your database location via a settings profile instead.\n" 

95 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles" 

96 ) 

97 

98 path = storage_directory("examples", intent_to_write=False) 

99 if not _usable_dir(path): # pragma: no cover 

100 warnings.warn( 

101 "The database setting is not configured, and the default " 

102 "location is unusable - falling back to an in-memory " 

103 f"database for this session. {path=}", 

104 HypothesisWarning, 

105 stacklevel=3, 

106 ) 

107 return InMemoryExampleDatabase() 

108 if path in (None, ":memory:"): 

109 return InMemoryExampleDatabase() 

110 path = cast(StrPathT, path) 

111 return DirectoryBasedExampleDatabase(path) 

112 

113 

114class _EDMeta(abc.ABCMeta): 

115 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase": 

116 if self is ExampleDatabase: 

117 note_deprecation( 

118 "Creating a database using the abstract ExampleDatabase() class " 

119 "is deprecated. Prefer using a concrete subclass, like " 

120 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). " 

121 'In particular, the special string ExampleDatabase(":memory:") ' 

122 "should be replaced by InMemoryExampleDatabase().", 

123 since="2025-04-07", 

124 has_codemod=False, 

125 ) 

126 return _db_for_path(*args, **kwargs) 

127 return super().__call__(*args, **kwargs) 

128 

129 

130# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase 

131# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx 

132# maintains a list of metaclass-call-methods to ignore, and while they would prefer 

133# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we 

134# can insert ourselves here. 

135# 

136# This code only runs if Sphinx has already been imported; and it would live in our 

137# docs/conf.py except that we would also like it to work for anyone documenting 

138# downstream ExampleDatabase subclasses too. 

139if "sphinx" in sys.modules: 

140 try: 

141 import sphinx.ext.autodoc 

142 

143 signature = "hypothesis.database._EDMeta.__call__" 

144 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions 

145 if isinstance(sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST, frozenset): 

146 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST = ( 

147 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST | {signature} 

148 ) 

149 else: 

150 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST.append(signature) 

151 except Exception: 

152 pass 

153 

154 

155class ExampleDatabase(metaclass=_EDMeta): 

156 """ 

157 A Hypothesis database, for use in |settings.database|. 

158 

159 Hypothesis automatically saves failures to the database set in 

160 |settings.database|. The next time the test is run, Hypothesis will replay 

161 any failures from the database in |settings.database| for that test (in 

162 |Phase.reuse|). 

163 

164 The database is best thought of as a cache that you never need to invalidate. 

165 Entries may be transparently dropped when upgrading your Hypothesis version 

166 or changing your test. Do not rely on the database for correctness; to ensure 

167 Hypothesis always tries an input, use |@example|. 

168 

169 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis 

170 provides several concrete database subclasses. To write your own database class, 

171 see :doc:`/how-to/custom-database`. 

172 

173 Change listening 

174 ---------------- 

175 

176 An optional extension to |ExampleDatabase| is change listening. On databases 

177 which support change listening, calling |ExampleDatabase.add_listener| adds 

178 a function as a change listener, which will be called whenever a value is 

179 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener| 

180 for details. 

181 

182 All databases in Hypothesis support change listening. Custom database classes 

183 are not required to support change listening, though they will not be compatible 

184 with features that require change listening until they do so. 

185 

186 .. note:: 

187 

188 While no Hypothesis features currently require change listening, change 

189 listening is required by `HypoFuzz <https://hypofuzz.com/>`_. 

190 

191 Database methods 

192 ---------------- 

193 

194 Required methods: 

195 

196 * |ExampleDatabase.save| 

197 * |ExampleDatabase.fetch| 

198 * |ExampleDatabase.delete| 

199 

200 Optional methods: 

201 

202 * |ExampleDatabase.move| 

203 

204 Change listening methods: 

205 

206 * |ExampleDatabase.add_listener| 

207 * |ExampleDatabase.remove_listener| 

208 * |ExampleDatabase.clear_listeners| 

209 * |ExampleDatabase._start_listening| 

210 * |ExampleDatabase._stop_listening| 

211 * |ExampleDatabase._broadcast_change| 

212 """ 

213 

214 def __init__(self) -> None: 

215 self._listeners: list[ListenerT] = [] 

216 

217 @abc.abstractmethod 

218 def save(self, key: bytes, value: bytes) -> None: 

219 """Save ``value`` under ``key``. 

220 

221 If ``value`` is already present in ``key``, silently do nothing. 

222 """ 

223 raise NotImplementedError(f"{type(self).__name__}.save") 

224 

225 @abc.abstractmethod 

226 def fetch(self, key: bytes) -> Iterable[bytes]: 

227 """Return an iterable over all values matching this key.""" 

228 raise NotImplementedError(f"{type(self).__name__}.fetch") 

229 

230 @abc.abstractmethod 

231 def delete(self, key: bytes, value: bytes) -> None: 

232 """Remove ``value`` from ``key``. 

233 

234 If ``value`` is not present in ``key``, silently do nothing. 

235 """ 

236 raise NotImplementedError(f"{type(self).__name__}.delete") 

237 

238 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

239 """ 

240 Move ``value`` from key ``src`` to key ``dest``. 

241 

242 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``, 

243 but may have a more efficient implementation. 

244 

245 Note that ``value`` will be inserted at ``dest`` regardless of whether 

246 it is currently present at ``src``. 

247 """ 

248 if src == dest: 

249 self.save(src, value) 

250 return 

251 self.delete(src, value) 

252 self.save(dest, value) 

253 

254 def add_listener(self, f: ListenerT, /) -> None: 

255 """ 

256 Add a change listener. ``f`` will be called whenever a value is saved, 

257 deleted, or moved in the database. 

258 

259 ``f`` can be called with two different event values: 

260 

261 * ``("save", (key, value))`` 

262 * ``("delete", (key, value))`` 

263 

264 where ``key`` and ``value`` are both ``bytes``. 

265 

266 There is no ``move`` event. Instead, a move is broadcasted as a 

267 ``delete`` event followed by a ``save`` event. 

268 

269 For the ``delete`` event, ``value`` may be ``None``. This might occur if 

270 the database knows that a deletion has occurred in ``key``, but does not 

271 know what value was deleted. 

272 """ 

273 had_listeners = bool(self._listeners) 

274 self._listeners.append(f) 

275 if not had_listeners: 

276 self._start_listening() 

277 

278 def remove_listener(self, f: ListenerT, /) -> None: 

279 """ 

280 Removes ``f`` from the list of change listeners. 

281 

282 If ``f`` is not in the list of change listeners, silently do nothing. 

283 """ 

284 if f not in self._listeners: 

285 return 

286 self._listeners.remove(f) 

287 if not self._listeners: 

288 self._stop_listening() 

289 

290 def clear_listeners(self) -> None: 

291 """Remove all change listeners.""" 

292 had_listeners = bool(self._listeners) 

293 self._listeners.clear() 

294 if had_listeners: 

295 self._stop_listening() 

296 

297 def _broadcast_change(self, event: ListenerEventT) -> None: 

298 """ 

299 Called when a value has been either added to or deleted from a key in 

300 the underlying database store. The possible values for ``event`` are: 

301 

302 * ``("save", (key, value))`` 

303 * ``("delete", (key, value))`` 

304 

305 ``value`` may be ``None`` for the ``delete`` event, indicating we know 

306 that some value was deleted under this key, but not its exact value. 

307 

308 Note that you should not assume your instance is the only reference to 

309 the underlying database store. For example, if two instances of 

310 |DirectoryBasedExampleDatabase| reference the same directory, 

311 _broadcast_change should be called whenever a file is added or removed 

312 from the directory, even if that database was not responsible for 

313 changing the file. 

314 """ 

315 for listener in self._listeners: 

316 listener(event) 

317 

318 def _start_listening(self) -> None: 

319 """ 

320 Called when the database adds a change listener, and did not previously 

321 have any change listeners. Intended to allow databases to wait to start 

322 expensive listening operations until necessary. 

323 

324 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate, 

325 so you do not need to handle the case of multiple consecutive 

326 ``_start_listening`` calls without an intermediate ``_stop_listening`` 

327 call. 

328 """ 

329 warnings.warn( 

330 f"{self.__class__} does not support listening for changes", 

331 HypothesisWarning, 

332 stacklevel=4, 

333 ) 

334 

335 def _stop_listening(self) -> None: 

336 """ 

337 Called whenever no change listeners remain on the database. 

338 

339 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate, 

340 so you do not need to handle the case of multiple consecutive 

341 ``_stop_listening`` calls without an intermediate ``_start_listening`` 

342 call. 

343 """ 

344 warnings.warn( 

345 f"{self.__class__} does not support stopping listening for changes", 

346 HypothesisWarning, 

347 stacklevel=4, 

348 ) 

349 

350 

351class InMemoryExampleDatabase(ExampleDatabase): 

352 """A non-persistent example database, implemented in terms of an in-memory 

353 dictionary. 

354 

355 This can be useful if you call a test function several times in a single 

356 session, or for testing other database implementations, but because it 

357 does not persist between runs we do not recommend it for general use. 

358 """ 

359 

360 def __init__(self) -> None: 

361 super().__init__() 

362 self.data: dict[bytes, set[bytes]] = {} 

363 

364 def __repr__(self) -> str: 

365 return f"InMemoryExampleDatabase({self.data!r})" 

366 

367 def __eq__(self, other: object) -> bool: 

368 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data 

369 

370 def fetch(self, key: bytes) -> Iterable[bytes]: 

371 yield from self.data.get(key, ()) 

372 

373 def save(self, key: bytes, value: bytes) -> None: 

374 value = bytes(value) 

375 values = self.data.setdefault(key, set()) 

376 changed = value not in values 

377 values.add(value) 

378 

379 if changed: 

380 self._broadcast_change(("save", (key, value))) 

381 

382 def delete(self, key: bytes, value: bytes) -> None: 

383 value = bytes(value) 

384 values = self.data.get(key, set()) 

385 changed = value in values 

386 values.discard(value) 

387 

388 if changed: 

389 self._broadcast_change(("delete", (key, value))) 

390 

391 def _start_listening(self) -> None: 

392 # declare compatibility with the listener api, but do the actual 

393 # implementation in .delete and .save, since we know we are the only 

394 # writer to .data. 

395 pass 

396 

397 def _stop_listening(self) -> None: 

398 pass 

399 

400 

401def _hash(key: bytes) -> str: 

402 return sha384(key).hexdigest()[:16] 

403 

404 

405class DirectoryBasedExampleDatabase(ExampleDatabase): 

406 """Use a directory to store Hypothesis examples as files. 

407 

408 Each test corresponds to a directory, and each example to a file within that 

409 directory. While the contents are fairly opaque, a 

410 |DirectoryBasedExampleDatabase| can be shared by checking the directory 

411 into version control, for example with the following ``.gitignore``:: 

412 

413 # Ignore files cached by Hypothesis... 

414 .hypothesis/* 

415 # except for the examples directory 

416 !.hypothesis/examples/ 

417 

418 Note however that this only makes sense if you also pin to an exact version of 

419 Hypothesis, and we would usually recommend implementing a shared database with 

420 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper. 

421 """ 

422 

423 # we keep a database entry of the full values of all the database keys. 

424 # currently only used for inverse mapping of hash -> key in change listening. 

425 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys" 

426 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name) 

427 

428 def __init__(self, path: StrPathT) -> None: 

429 super().__init__() 

430 self.path = Path(path) 

431 self.keypaths: dict[bytes, Path] = {} 

432 self._observer: BaseObserver | None = None 

433 

434 def __repr__(self) -> str: 

435 return f"DirectoryBasedExampleDatabase({self.path!r})" 

436 

437 def __eq__(self, other: object) -> bool: 

438 return ( 

439 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path 

440 ) 

441 

442 def _key_path(self, key: bytes) -> Path: 

443 try: 

444 return self.keypaths[key] 

445 except KeyError: 

446 pass 

447 self.keypaths[key] = self.path / _hash(key) 

448 return self.keypaths[key] 

449 

450 def _value_path(self, key: bytes, value: bytes) -> Path: 

451 return self._key_path(key) / _hash(value) 

452 

453 def fetch(self, key: bytes) -> Iterable[bytes]: 

454 kp = self._key_path(key) 

455 if not kp.is_dir(): 

456 return 

457 

458 try: 

459 for path in os.listdir(kp): 

460 try: 

461 yield (kp / path).read_bytes() 

462 except OSError: 

463 pass 

464 except OSError: # pragma: no cover 

465 # the `kp` directory might have been deleted in the meantime 

466 pass 

467 

468 def save(self, key: bytes, value: bytes) -> None: 

469 key_path = self._key_path(key) 

470 if key_path.name != self._metakeys_hash: 

471 # add this key to our meta entry of all keys - taking care to avoid 

472 # infinite recursion. 

473 self.save(self._metakeys_name, key) 

474 

475 # Note: we attempt to create the dir in question now. We 

476 # already checked for permissions, but there can still be other issues, 

477 # e.g. the disk is full, or permissions might have been changed. 

478 try: 

479 key_path.mkdir(exist_ok=True, parents=True) 

480 path = self._value_path(key, value) 

481 if not path.exists(): 

482 # to mimic an atomic write, create and write in a temporary 

483 # directory, and only move to the final path after. This avoids 

484 # any intermediate state where the file is created (and empty) 

485 # but not yet written to. 

486 fd, tmpname = tempfile.mkstemp() 

487 tmppath = Path(tmpname) 

488 os.write(fd, value) 

489 os.close(fd) 

490 try: 

491 tmppath.rename(path) 

492 except OSError as err: # pragma: no cover 

493 if err.errno == errno.EXDEV: 

494 # Can't rename across filesystem boundaries, see e.g. 

495 # https://github.com/HypothesisWorks/hypothesis/issues/4335 

496 try: 

497 path.write_bytes(tmppath.read_bytes()) 

498 except OSError: 

499 pass 

500 tmppath.unlink() 

501 assert not tmppath.exists() 

502 except OSError: # pragma: no cover 

503 pass 

504 

505 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

506 if src == dest: 

507 self.save(src, value) 

508 return 

509 

510 src_path = self._value_path(src, value) 

511 dest_path = self._value_path(dest, value) 

512 # if the dest key path does not exist, os.renames will create it for us, 

513 # and we will never track its creation in the meta keys entry. Do so now. 

514 if not self._key_path(dest).exists(): 

515 self.save(self._metakeys_name, dest) 

516 

517 try: 

518 os.renames(src_path, dest_path) 

519 except OSError: 

520 self.delete(src, value) 

521 self.save(dest, value) 

522 

523 def delete(self, key: bytes, value: bytes) -> None: 

524 try: 

525 self._value_path(key, value).unlink() 

526 except OSError: 

527 return 

528 

529 # try deleting the key dir, which will only succeed if the dir is empty 

530 # (i.e. ``value`` was the last value in this key). 

531 try: 

532 self._key_path(key).rmdir() 

533 except OSError: 

534 pass 

535 else: 

536 # if the deletion succeeded, also delete this key entry from metakeys. 

537 # (if this key happens to be the metakey itself, this deletion will 

538 # fail; that's ok and faster than checking for this rare case.) 

539 self.delete(self._metakeys_name, key) 

540 

541 def _start_listening(self) -> None: 

542 try: 

543 from watchdog.events import ( 

544 DirCreatedEvent, 

545 DirDeletedEvent, 

546 DirMovedEvent, 

547 FileCreatedEvent, 

548 FileDeletedEvent, 

549 FileMovedEvent, 

550 FileSystemEventHandler, 

551 ) 

552 from watchdog.observers import Observer 

553 except ImportError: 

554 warnings.warn( 

555 f"listening for changes in a {self.__class__.__name__} " 

556 "requires the watchdog library. To install, run " 

557 "`pip install hypothesis[watchdog]`", 

558 HypothesisWarning, 

559 stacklevel=4, 

560 ) 

561 return 

562 

563 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)} 

564 _metakeys_hash = self._metakeys_hash 

565 _broadcast_change = self._broadcast_change 

566 

567 class Handler(FileSystemEventHandler): 

568 def on_created( 

569 _self, event: Union[FileCreatedEvent, DirCreatedEvent] 

570 ) -> None: 

571 # we only registered for the file creation event 

572 assert not isinstance(event, DirCreatedEvent) 

573 # watchdog events are only bytes if we passed a byte path to 

574 # .schedule 

575 assert isinstance(event.src_path, str) 

576 

577 value_path = Path(event.src_path) 

578 # the parent dir represents the key, and its name is the key hash 

579 key_hash = value_path.parent.name 

580 

581 if key_hash == _metakeys_hash: 

582 try: 

583 hash_to_key[value_path.name] = value_path.read_bytes() 

584 except OSError: # pragma: no cover 

585 # this might occur if all the values in a key have been 

586 # deleted and DirectoryBasedExampleDatabase removes its 

587 # metakeys entry (which is `value_path` here`). 

588 pass 

589 return 

590 

591 key = hash_to_key.get(key_hash) 

592 if key is None: # pragma: no cover 

593 # we didn't recognize this key. This shouldn't ever happen, 

594 # but some race condition trickery might cause this. 

595 return 

596 

597 try: 

598 value = value_path.read_bytes() 

599 except OSError: # pragma: no cover 

600 return 

601 

602 _broadcast_change(("save", (key, value))) 

603 

604 def on_deleted( 

605 self, event: Union[FileDeletedEvent, DirDeletedEvent] 

606 ) -> None: 

607 assert not isinstance(event, DirDeletedEvent) 

608 assert isinstance(event.src_path, str) 

609 

610 value_path = Path(event.src_path) 

611 key = hash_to_key.get(value_path.parent.name) 

612 if key is None: # pragma: no cover 

613 return 

614 

615 _broadcast_change(("delete", (key, None))) 

616 

617 def on_moved(self, event: Union[FileMovedEvent, DirMovedEvent]) -> None: 

618 assert not isinstance(event, DirMovedEvent) 

619 assert isinstance(event.src_path, str) 

620 assert isinstance(event.dest_path, str) 

621 

622 src_path = Path(event.src_path) 

623 dest_path = Path(event.dest_path) 

624 k1 = hash_to_key.get(src_path.parent.name) 

625 k2 = hash_to_key.get(dest_path.parent.name) 

626 

627 if k1 is None or k2 is None: # pragma: no cover 

628 return 

629 

630 try: 

631 value = dest_path.read_bytes() 

632 except OSError: # pragma: no cover 

633 return 

634 

635 _broadcast_change(("delete", (k1, value))) 

636 _broadcast_change(("save", (k2, value))) 

637 

638 # If we add a listener to a DirectoryBasedExampleDatabase whose database 

639 # directory doesn't yet exist, the watchdog observer will not fire any 

640 # events, even after the directory gets created. 

641 # 

642 # Ensure the directory exists before starting the observer. 

643 self.path.mkdir(exist_ok=True, parents=True) 

644 self._observer = Observer() 

645 self._observer.schedule( 

646 Handler(), 

647 # remove type: ignore when released 

648 # https://github.com/gorakhargosh/watchdog/pull/1096 

649 self.path, # type: ignore 

650 recursive=True, 

651 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent], 

652 ) 

653 self._observer.start() 

654 

655 def _stop_listening(self) -> None: 

656 assert self._observer is not None 

657 self._observer.stop() 

658 self._observer.join() 

659 self._observer = None 

660 

661 

662class ReadOnlyDatabase(ExampleDatabase): 

663 """A wrapper to make the given database read-only. 

664 

665 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and 

666 ``move`` into silent no-ops. 

667 

668 Note that this disables Hypothesis' automatic discarding of stale examples. 

669 It is designed to allow local machines to access a shared database (e.g. from CI 

670 servers), without propagating changes back from a local or in-development branch. 

671 """ 

672 

673 def __init__(self, db: ExampleDatabase) -> None: 

674 super().__init__() 

675 assert isinstance(db, ExampleDatabase) 

676 self._wrapped = db 

677 

678 def __repr__(self) -> str: 

679 return f"ReadOnlyDatabase({self._wrapped!r})" 

680 

681 def __eq__(self, other: object) -> bool: 

682 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped 

683 

684 def fetch(self, key: bytes) -> Iterable[bytes]: 

685 yield from self._wrapped.fetch(key) 

686 

687 def save(self, key: bytes, value: bytes) -> None: 

688 pass 

689 

690 def delete(self, key: bytes, value: bytes) -> None: 

691 pass 

692 

693 def _start_listening(self) -> None: 

694 # we're read only, so there are no changes to broadcast. 

695 pass 

696 

697 def _stop_listening(self) -> None: 

698 pass 

699 

700 

701class MultiplexedDatabase(ExampleDatabase): 

702 """A wrapper around multiple databases. 

703 

704 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against 

705 all of the wrapped databases. ``fetch`` does not yield duplicate values, even 

706 if the same value is present in two or more of the wrapped databases. 

707 

708 This combines well with a :class:`ReadOnlyDatabase`, as follows: 

709 

710 .. code-block:: python 

711 

712 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/") 

713 shared = CustomNetworkDatabase() 

714 

715 settings.register_profile("ci", database=shared) 

716 settings.register_profile( 

717 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared)) 

718 ) 

719 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

720 

721 So your CI system or fuzzing runs can populate a central shared database; 

722 while local runs on development machines can reproduce any failures from CI 

723 but will only cache their own failures locally and cannot remove examples 

724 from the shared database. 

725 """ 

726 

727 def __init__(self, *dbs: ExampleDatabase) -> None: 

728 super().__init__() 

729 assert all(isinstance(db, ExampleDatabase) for db in dbs) 

730 self._wrapped = dbs 

731 

732 def __repr__(self) -> str: 

733 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped))) 

734 

735 def __eq__(self, other: object) -> bool: 

736 return ( 

737 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped 

738 ) 

739 

740 def fetch(self, key: bytes) -> Iterable[bytes]: 

741 seen = set() 

742 for db in self._wrapped: 

743 for value in db.fetch(key): 

744 if value not in seen: 

745 yield value 

746 seen.add(value) 

747 

748 def save(self, key: bytes, value: bytes) -> None: 

749 for db in self._wrapped: 

750 db.save(key, value) 

751 

752 def delete(self, key: bytes, value: bytes) -> None: 

753 for db in self._wrapped: 

754 db.delete(key, value) 

755 

756 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

757 for db in self._wrapped: 

758 db.move(src, dest, value) 

759 

760 def _start_listening(self) -> None: 

761 for db in self._wrapped: 

762 db.add_listener(self._broadcast_change) 

763 

764 def _stop_listening(self) -> None: 

765 for db in self._wrapped: 

766 db.remove_listener(self._broadcast_change) 

767 

768 

769class GitHubArtifactDatabase(ExampleDatabase): 

770 """ 

771 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact. 

772 

773 You can use this for sharing example databases between CI runs and developers, allowing 

774 the latter to get read-only access to the former. This is particularly useful for 

775 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_), 

776 where the CI system can help find new failing examples through fuzzing, 

777 and developers can reproduce them locally without any manual effort. 

778 

779 .. note:: 

780 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides 

781 this automatically, but it needs to be set manually for local usage. In a developer machine, 

782 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_. 

783 If the repository is private, it's necessary for the token to have ``repo`` scope 

784 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token. 

785 

786 

787 In most cases, this will be used 

788 through the :class:`~hypothesis.database.MultiplexedDatabase`, 

789 by combining a local directory-based database with this one. For example: 

790 

791 .. code-block:: python 

792 

793 local = DirectoryBasedExampleDatabase(".hypothesis/examples") 

794 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo")) 

795 

796 settings.register_profile("ci", database=local) 

797 settings.register_profile("dev", database=MultiplexedDatabase(local, shared)) 

798 # We don't want to use the shared database in CI, only to populate its local one. 

799 # which the workflow should then upload as an artifact. 

800 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

801 

802 .. note:: 

803 Because this database is read-only, you always need to wrap it with the 

804 :class:`ReadOnlyDatabase`. 

805 

806 A setup like this can be paired with a GitHub Actions workflow including 

807 something like the following: 

808 

809 .. code-block:: yaml 

810 

811 - name: Download example database 

812 uses: dawidd6/action-download-artifact@v9 

813 with: 

814 name: hypothesis-example-db 

815 path: .hypothesis/examples 

816 if_no_artifact_found: warn 

817 workflow_conclusion: completed 

818 

819 - name: Run tests 

820 run: pytest 

821 

822 - name: Upload example database 

823 uses: actions/upload-artifact@v3 

824 if: always() 

825 with: 

826 name: hypothesis-example-db 

827 path: .hypothesis/examples 

828 

829 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_ 

830 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_ 

831 does not support downloading artifacts from previous workflow runs. 

832 

833 The database automatically implements a simple file-based cache with a default expiration period 

834 of 1 day. You can adjust this through the ``cache_timeout`` property. 

835 

836 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``). 

837 """ 

838 

839 def __init__( 

840 self, 

841 owner: str, 

842 repo: str, 

843 artifact_name: str = "hypothesis-example-db", 

844 cache_timeout: timedelta = timedelta(days=1), 

845 path: Optional[StrPathT] = None, 

846 ): 

847 super().__init__() 

848 self.owner = owner 

849 self.repo = repo 

850 self.artifact_name = artifact_name 

851 self.cache_timeout = cache_timeout 

852 

853 # Get the GitHub token from the environment 

854 # It's unnecessary to use a token if the repo is public 

855 self.token: Optional[str] = getenv("GITHUB_TOKEN") 

856 

857 if path is None: 

858 self.path: Path = Path( 

859 storage_directory(f"github-artifacts/{self.artifact_name}/") 

860 ) 

861 else: 

862 self.path = Path(path) 

863 

864 # We don't want to initialize the cache until we need to 

865 self._initialized: bool = False 

866 self._disabled: bool = False 

867 

868 # This is the path to the artifact in usage 

869 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip 

870 self._artifact: Optional[Path] = None 

871 # This caches the artifact structure 

872 self._access_cache: Optional[dict[PurePath, set[PurePath]]] = None 

873 

874 # Message to display if user doesn't wrap around ReadOnlyDatabase 

875 self._read_only_message = ( 

876 "This database is read-only. " 

877 "Please wrap this class with ReadOnlyDatabase" 

878 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))." 

879 ) 

880 

881 def __repr__(self) -> str: 

882 return ( 

883 f"GitHubArtifactDatabase(owner={self.owner!r}, " 

884 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})" 

885 ) 

886 

887 def __eq__(self, other: object) -> bool: 

888 return ( 

889 isinstance(other, GitHubArtifactDatabase) 

890 and self.owner == other.owner 

891 and self.repo == other.repo 

892 and self.artifact_name == other.artifact_name 

893 and self.path == other.path 

894 ) 

895 

896 def _prepare_for_io(self) -> None: 

897 assert self._artifact is not None, "Artifact not loaded." 

898 

899 if self._initialized: # pragma: no cover 

900 return 

901 

902 # Test that the artifact is valid 

903 try: 

904 with ZipFile(self._artifact) as f: 

905 if f.testzip(): # pragma: no cover 

906 raise BadZipFile 

907 

908 # Turns out that testzip() doesn't work quite well 

909 # doing the cache initialization here instead 

910 # will give us more coverage of the artifact. 

911 

912 # Cache the files inside each keypath 

913 self._access_cache = {} 

914 with ZipFile(self._artifact) as zf: 

915 namelist = zf.namelist() 

916 # Iterate over files in the artifact 

917 for filename in namelist: 

918 fileinfo = zf.getinfo(filename) 

919 if fileinfo.is_dir(): 

920 self._access_cache[PurePath(filename)] = set() 

921 else: 

922 # Get the keypath from the filename 

923 keypath = PurePath(filename).parent 

924 # Add the file to the keypath 

925 self._access_cache[keypath].add(PurePath(filename)) 

926 except BadZipFile: 

927 warnings.warn( 

928 "The downloaded artifact from GitHub is invalid. " 

929 "This could be because the artifact was corrupted, " 

930 "or because the artifact was not created by Hypothesis. ", 

931 HypothesisWarning, 

932 stacklevel=3, 

933 ) 

934 self._disabled = True 

935 

936 self._initialized = True 

937 

938 def _initialize_db(self) -> None: 

939 # Trigger warning that we suppressed earlier by intent_to_write=False 

940 storage_directory(self.path.name) 

941 # Create the cache directory if it doesn't exist 

942 self.path.mkdir(exist_ok=True, parents=True) 

943 

944 # Get all artifacts 

945 cached_artifacts = sorted( 

946 self.path.glob("*.zip"), 

947 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")), 

948 ) 

949 

950 # Remove all but the latest artifact 

951 for artifact in cached_artifacts[:-1]: 

952 artifact.unlink() 

953 

954 try: 

955 found_artifact = cached_artifacts[-1] 

956 except IndexError: 

957 found_artifact = None 

958 

959 # Check if the latest artifact is a cache hit 

960 if found_artifact is not None and ( 

961 datetime.now(timezone.utc) 

962 - datetime.fromisoformat(found_artifact.stem.replace("_", ":")) 

963 < self.cache_timeout 

964 ): 

965 self._artifact = found_artifact 

966 else: 

967 # Download the latest artifact from GitHub 

968 new_artifact = self._fetch_artifact() 

969 

970 if new_artifact: 

971 if found_artifact is not None: 

972 found_artifact.unlink() 

973 self._artifact = new_artifact 

974 elif found_artifact is not None: 

975 warnings.warn( 

976 "Using an expired artifact as a fallback for the database: " 

977 f"{found_artifact}", 

978 HypothesisWarning, 

979 stacklevel=2, 

980 ) 

981 self._artifact = found_artifact 

982 else: 

983 warnings.warn( 

984 "Couldn't acquire a new or existing artifact. Disabling database.", 

985 HypothesisWarning, 

986 stacklevel=2, 

987 ) 

988 self._disabled = True 

989 return 

990 

991 self._prepare_for_io() 

992 

993 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover 

994 request = Request( 

995 url, 

996 headers={ 

997 "Accept": "application/vnd.github+json", 

998 "X-GitHub-Api-Version": "2022-11-28 ", 

999 "Authorization": f"Bearer {self.token}", 

1000 }, 

1001 ) 

1002 warning_message = None 

1003 response_bytes: Optional[bytes] = None 

1004 try: 

1005 with urlopen(request) as response: 

1006 response_bytes = response.read() 

1007 except HTTPError as e: 

1008 if e.code == 401: 

1009 warning_message = ( 

1010 "Authorization failed when trying to download artifact from GitHub. " 

1011 "Check that you have a valid GITHUB_TOKEN set in your environment." 

1012 ) 

1013 else: 

1014 warning_message = ( 

1015 "Could not get the latest artifact from GitHub. " 

1016 "This could be because because the repository " 

1017 "or artifact does not exist. " 

1018 ) 

1019 except URLError: 

1020 warning_message = "Could not connect to GitHub to get the latest artifact. " 

1021 except TimeoutError: 

1022 warning_message = ( 

1023 "Could not connect to GitHub to get the latest artifact " 

1024 "(connection timed out)." 

1025 ) 

1026 

1027 if warning_message is not None: 

1028 warnings.warn(warning_message, HypothesisWarning, stacklevel=4) 

1029 return None 

1030 

1031 return response_bytes 

1032 

1033 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover 

1034 # Get the list of artifacts from GitHub 

1035 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts" 

1036 response_bytes = self._get_bytes(url) 

1037 if response_bytes is None: 

1038 return None 

1039 

1040 artifacts = json.loads(response_bytes)["artifacts"] 

1041 artifacts = [a for a in artifacts if a["name"] == self.artifact_name] 

1042 

1043 if not artifacts: 

1044 return None 

1045 

1046 # Get the latest artifact from the list 

1047 artifact = max(artifacts, key=lambda a: a["created_at"]) 

1048 url = artifact["archive_download_url"] 

1049 

1050 # Download the artifact 

1051 artifact_bytes = self._get_bytes(url) 

1052 if artifact_bytes is None: 

1053 return None 

1054 

1055 # Save the artifact to the cache 

1056 # We replace ":" with "_" to ensure the filenames are compatible 

1057 # with Windows filesystems 

1058 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_") 

1059 artifact_path = self.path / f"{timestamp}.zip" 

1060 try: 

1061 artifact_path.write_bytes(artifact_bytes) 

1062 except OSError: 

1063 warnings.warn( 

1064 "Could not save the latest artifact from GitHub. ", 

1065 HypothesisWarning, 

1066 stacklevel=3, 

1067 ) 

1068 return None 

1069 

1070 return artifact_path 

1071 

1072 @staticmethod 

1073 @lru_cache 

1074 def _key_path(key: bytes) -> PurePath: 

1075 return PurePath(_hash(key) + "/") 

1076 

1077 def fetch(self, key: bytes) -> Iterable[bytes]: 

1078 if self._disabled: 

1079 return 

1080 

1081 if not self._initialized: 

1082 self._initialize_db() 

1083 if self._disabled: 

1084 return 

1085 

1086 assert self._artifact is not None 

1087 assert self._access_cache is not None 

1088 

1089 kp = self._key_path(key) 

1090 

1091 with ZipFile(self._artifact) as zf: 

1092 # Get the all files in the the kp from the cache 

1093 filenames = self._access_cache.get(kp, ()) 

1094 for filename in filenames: 

1095 with zf.open(filename.as_posix()) as f: 

1096 yield f.read() 

1097 

1098 # Read-only interface 

1099 def save(self, key: bytes, value: bytes) -> None: 

1100 raise RuntimeError(self._read_only_message) 

1101 

1102 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1103 raise RuntimeError(self._read_only_message) 

1104 

1105 def delete(self, key: bytes, value: bytes) -> None: 

1106 raise RuntimeError(self._read_only_message) 

1107 

1108 

1109class BackgroundWriteDatabase(ExampleDatabase): 

1110 """A wrapper which defers writes on the given database to a background thread. 

1111 

1112 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any 

1113 enqueued writes to finish before fetching from the database. 

1114 """ 

1115 

1116 def __init__(self, db: ExampleDatabase) -> None: 

1117 super().__init__() 

1118 self._db = db 

1119 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue() 

1120 self._thread: Optional[Thread] = None 

1121 

1122 def _ensure_thread(self): 

1123 if self._thread is None: 

1124 self._thread = Thread(target=self._worker, daemon=True) 

1125 self._thread.start() 

1126 # avoid an unbounded timeout during gc. 0.1 should be plenty for most 

1127 # use cases. 

1128 weakref.finalize(self, self._join, 0.1) 

1129 

1130 def __repr__(self) -> str: 

1131 return f"BackgroundWriteDatabase({self._db!r})" 

1132 

1133 def __eq__(self, other: object) -> bool: 

1134 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db 

1135 

1136 def _worker(self) -> None: 

1137 while True: 

1138 method, args = self._queue.get() 

1139 getattr(self._db, method)(*args) 

1140 self._queue.task_done() 

1141 

1142 def _join(self, timeout: Optional[float] = None) -> None: 

1143 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634 

1144 with self._queue.all_tasks_done: 

1145 while self._queue.unfinished_tasks: 

1146 self._queue.all_tasks_done.wait(timeout) 

1147 

1148 def fetch(self, key: bytes) -> Iterable[bytes]: 

1149 self._join() 

1150 return self._db.fetch(key) 

1151 

1152 def save(self, key: bytes, value: bytes) -> None: 

1153 self._ensure_thread() 

1154 self._queue.put(("save", (key, value))) 

1155 

1156 def delete(self, key: bytes, value: bytes) -> None: 

1157 self._ensure_thread() 

1158 self._queue.put(("delete", (key, value))) 

1159 

1160 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1161 self._ensure_thread() 

1162 self._queue.put(("move", (src, dest, value))) 

1163 

1164 def _start_listening(self) -> None: 

1165 self._db.add_listener(self._broadcast_change) 

1166 

1167 def _stop_listening(self) -> None: 

1168 self._db.remove_listener(self._broadcast_change) 

1169 

1170 

1171def _pack_uleb128(value: int) -> bytes: 

1172 """ 

1173 Serialize an integer into variable-length bytes. For each byte, the first 7 

1174 bits represent (part of) the integer, while the last bit indicates whether the 

1175 integer continues into the next byte. 

1176 

1177 https://en.wikipedia.org/wiki/LEB128 

1178 """ 

1179 parts = bytearray() 

1180 assert value >= 0 

1181 while True: 

1182 # chop off 7 bits 

1183 byte = value & ((1 << 7) - 1) 

1184 value >>= 7 

1185 # set the continuation bit if we have more left 

1186 if value: 

1187 byte |= 1 << 7 

1188 

1189 parts.append(byte) 

1190 if not value: 

1191 break 

1192 return bytes(parts) 

1193 

1194 

1195def _unpack_uleb128(buffer: bytes) -> tuple[int, int]: 

1196 """ 

1197 Inverts _pack_uleb128, and also returns the index at which at which we stopped 

1198 reading. 

1199 """ 

1200 value = 0 

1201 for i, byte in enumerate(buffer): 

1202 n = byte & ((1 << 7) - 1) 

1203 value |= n << (i * 7) 

1204 

1205 if not byte >> 7: 

1206 break 

1207 return (i + 1, value) 

1208 

1209 

1210def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes: 

1211 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes.""" 

1212 # We use a custom serialization format for this, which might seem crazy - but our 

1213 # data is a flat sequence of elements, and standard tools like protobuf or msgpack 

1214 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode. 

1215 # 

1216 # We simply encode each element with a metadata byte, if needed a uint16 size, and 

1217 # then the payload bytes. For booleans, the payload is inlined into the metadata. 

1218 parts = [] 

1219 for choice in choices: 

1220 if isinstance(choice, bool): 

1221 # `000_0000v` - tag zero, low bit payload. 

1222 parts.append(b"\1" if choice else b"\0") 

1223 continue 

1224 

1225 # `tag_ssss [uint16 size?] [payload]` 

1226 if isinstance(choice, float): 

1227 tag = 1 << 5 

1228 choice = struct.pack("!d", choice) 

1229 elif isinstance(choice, int): 

1230 tag = 2 << 5 

1231 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True) 

1232 elif isinstance(choice, bytes): 

1233 tag = 3 << 5 

1234 else: 

1235 assert isinstance(choice, str) 

1236 tag = 4 << 5 

1237 choice = choice.encode(errors="surrogatepass") 

1238 

1239 size = len(choice) 

1240 if size < 0b11111: 

1241 parts.append((tag | size).to_bytes(1, "big")) 

1242 else: 

1243 parts.append((tag | 0b11111).to_bytes(1, "big")) 

1244 parts.append(_pack_uleb128(size)) 

1245 parts.append(choice) 

1246 

1247 return b"".join(parts) 

1248 

1249 

1250def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]: 

1251 # See above for an explanation of the format. 

1252 parts: list[ChoiceT] = [] 

1253 idx = 0 

1254 while idx < len(buffer): 

1255 tag = buffer[idx] >> 5 

1256 size = buffer[idx] & 0b11111 

1257 idx += 1 

1258 

1259 if tag == 0: 

1260 parts.append(bool(size)) 

1261 continue 

1262 if size == 0b11111: 

1263 (offset, size) = _unpack_uleb128(buffer[idx:]) 

1264 idx += offset 

1265 chunk = buffer[idx : idx + size] 

1266 idx += size 

1267 

1268 if tag == 1: 

1269 assert size == 8, "expected float64" 

1270 parts.extend(struct.unpack("!d", chunk)) 

1271 elif tag == 2: 

1272 parts.append(int.from_bytes(chunk, "big", signed=True)) 

1273 elif tag == 3: 

1274 parts.append(chunk) 

1275 else: 

1276 assert tag == 4 

1277 parts.append(chunk.decode(errors="surrogatepass")) 

1278 return tuple(parts) 

1279 

1280 

1281def choices_from_bytes(buffer: bytes, /) -> Optional[tuple[ChoiceT, ...]]: 

1282 """ 

1283 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes. 

1284 

1285 Returns None if the given bytestring is not a valid serialization of choice 

1286 sequences. 

1287 """ 

1288 try: 

1289 return _choices_from_bytes(buffer) 

1290 except Exception: 

1291 # deserialization error, eg because our format changed or someone put junk 

1292 # data in the db. 

1293 return None