Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/database.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

506 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import abc 

12import errno 

13import json 

14import os 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import weakref 

20from collections.abc import Callable, Iterable 

21from datetime import datetime, timedelta, timezone 

22from functools import lru_cache 

23from hashlib import sha384 

24from os import PathLike, getenv 

25from pathlib import Path, PurePath 

26from queue import Queue 

27from threading import Thread 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 ClassVar, 

32 Literal, 

33 TypeAlias, 

34 cast, 

35) 

36from urllib.error import HTTPError, URLError 

37from urllib.request import Request, urlopen 

38from zipfile import BadZipFile, ZipFile 

39 

40from hypothesis._settings import note_deprecation 

41from hypothesis.configuration import storage_directory 

42from hypothesis.errors import HypothesisException, HypothesisWarning 

43from hypothesis.internal.conjecture.choice import ChoiceT 

44from hypothesis.utils.conventions import UniqueIdentifier, not_set 

45 

46__all__ = [ 

47 "DirectoryBasedExampleDatabase", 

48 "ExampleDatabase", 

49 "GitHubArtifactDatabase", 

50 "InMemoryExampleDatabase", 

51 "MultiplexedDatabase", 

52 "ReadOnlyDatabase", 

53] 

54 

55if TYPE_CHECKING: 

56 from watchdog.observers.api import BaseObserver 

57 

58StrPathT: TypeAlias = str | PathLike[str] 

59SaveDataT: TypeAlias = tuple[bytes, bytes] # key, value 

60DeleteDataT: TypeAlias = tuple[bytes, bytes | None] # key, value 

61ListenerEventT: TypeAlias = ( 

62 tuple[Literal["save"], SaveDataT] | tuple[Literal["delete"], DeleteDataT] 

63) 

64ListenerT: TypeAlias = Callable[[ListenerEventT], Any] 

65 

66 

67def _usable_dir(path: StrPathT) -> bool: 

68 """ 

69 Returns True if the desired path can be used as database path because 

70 either the directory exists and can be used, or its root directory can 

71 be used and we can make the directory as needed. 

72 """ 

73 path = Path(path) 

74 try: 

75 while not path.exists(): 

76 # Loop terminates because the root dir ('/' on unix) always exists. 

77 path = path.parent 

78 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK) 

79 except PermissionError: 

80 return False 

81 

82 

83def _db_for_path( 

84 path: StrPathT | UniqueIdentifier | Literal[":memory:"] | None = None, 

85) -> "ExampleDatabase": 

86 if path is not_set: 

87 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover 

88 raise HypothesisException( 

89 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any " 

90 "effect. Configure your database location via a settings profile instead.\n" 

91 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles" 

92 ) 

93 

94 path = storage_directory("examples", intent_to_write=False) 

95 if not _usable_dir(path): # pragma: no cover 

96 warnings.warn( 

97 "The database setting is not configured, and the default " 

98 "location is unusable - falling back to an in-memory " 

99 f"database for this session. {path=}", 

100 HypothesisWarning, 

101 stacklevel=3, 

102 ) 

103 return InMemoryExampleDatabase() 

104 if path in (None, ":memory:"): 

105 return InMemoryExampleDatabase() 

106 path = cast(StrPathT, path) 

107 return DirectoryBasedExampleDatabase(path) 

108 

109 

110class _EDMeta(abc.ABCMeta): 

111 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase": 

112 if self is ExampleDatabase: 

113 note_deprecation( 

114 "Creating a database using the abstract ExampleDatabase() class " 

115 "is deprecated. Prefer using a concrete subclass, like " 

116 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). " 

117 'In particular, the special string ExampleDatabase(":memory:") ' 

118 "should be replaced by InMemoryExampleDatabase().", 

119 since="2025-04-07", 

120 has_codemod=False, 

121 ) 

122 return _db_for_path(*args, **kwargs) 

123 return super().__call__(*args, **kwargs) 

124 

125 

126# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase 

127# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx 

128# maintains a list of metaclass-call-methods to ignore, and while they would prefer 

129# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we 

130# can insert ourselves here. 

131# 

132# This code only runs if Sphinx has already been imported; and it would live in our 

133# docs/conf.py except that we would also like it to work for anyone documenting 

134# downstream ExampleDatabase subclasses too. 

135if "sphinx" in sys.modules: 

136 try: 

137 import sphinx.ext.autodoc 

138 

139 signature = "hypothesis.database._EDMeta.__call__" 

140 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions 

141 if isinstance(sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST, frozenset): 

142 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST = ( 

143 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST | {signature} 

144 ) 

145 else: 

146 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST.append(signature) 

147 except Exception: 

148 pass 

149 

150 

151class ExampleDatabase(metaclass=_EDMeta): 

152 """ 

153 A Hypothesis database, for use in |settings.database|. 

154 

155 Hypothesis automatically saves failures to the database set in 

156 |settings.database|. The next time the test is run, Hypothesis will replay 

157 any failures from the database in |settings.database| for that test (in 

158 |Phase.reuse|). 

159 

160 The database is best thought of as a cache that you never need to invalidate. 

161 Entries may be transparently dropped when upgrading your Hypothesis version 

162 or changing your test. Do not rely on the database for correctness; to ensure 

163 Hypothesis always tries an input, use |@example|. 

164 

165 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis 

166 provides several concrete database subclasses. To write your own database class, 

167 see :doc:`/how-to/custom-database`. 

168 

169 Change listening 

170 ---------------- 

171 

172 An optional extension to |ExampleDatabase| is change listening. On databases 

173 which support change listening, calling |ExampleDatabase.add_listener| adds 

174 a function as a change listener, which will be called whenever a value is 

175 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener| 

176 for details. 

177 

178 All databases in Hypothesis support change listening. Custom database classes 

179 are not required to support change listening, though they will not be compatible 

180 with features that require change listening until they do so. 

181 

182 .. note:: 

183 

184 While no Hypothesis features currently require change listening, change 

185 listening is required by `HypoFuzz <https://hypofuzz.com/>`_. 

186 

187 Database methods 

188 ---------------- 

189 

190 Required methods: 

191 

192 * |ExampleDatabase.save| 

193 * |ExampleDatabase.fetch| 

194 * |ExampleDatabase.delete| 

195 

196 Optional methods: 

197 

198 * |ExampleDatabase.move| 

199 

200 Change listening methods: 

201 

202 * |ExampleDatabase.add_listener| 

203 * |ExampleDatabase.remove_listener| 

204 * |ExampleDatabase.clear_listeners| 

205 * |ExampleDatabase._start_listening| 

206 * |ExampleDatabase._stop_listening| 

207 * |ExampleDatabase._broadcast_change| 

208 """ 

209 

210 def __init__(self) -> None: 

211 self._listeners: list[ListenerT] = [] 

212 

213 @abc.abstractmethod 

214 def save(self, key: bytes, value: bytes) -> None: 

215 """Save ``value`` under ``key``. 

216 

217 If ``value`` is already present in ``key``, silently do nothing. 

218 """ 

219 raise NotImplementedError(f"{type(self).__name__}.save") 

220 

221 @abc.abstractmethod 

222 def fetch(self, key: bytes) -> Iterable[bytes]: 

223 """Return an iterable over all values matching this key.""" 

224 raise NotImplementedError(f"{type(self).__name__}.fetch") 

225 

226 @abc.abstractmethod 

227 def delete(self, key: bytes, value: bytes) -> None: 

228 """Remove ``value`` from ``key``. 

229 

230 If ``value`` is not present in ``key``, silently do nothing. 

231 """ 

232 raise NotImplementedError(f"{type(self).__name__}.delete") 

233 

234 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

235 """ 

236 Move ``value`` from key ``src`` to key ``dest``. 

237 

238 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``, 

239 but may have a more efficient implementation. 

240 

241 Note that ``value`` will be inserted at ``dest`` regardless of whether 

242 it is currently present at ``src``. 

243 """ 

244 if src == dest: 

245 self.save(src, value) 

246 return 

247 self.delete(src, value) 

248 self.save(dest, value) 

249 

250 def add_listener(self, f: ListenerT, /) -> None: 

251 """ 

252 Add a change listener. ``f`` will be called whenever a value is saved, 

253 deleted, or moved in the database. 

254 

255 ``f`` can be called with two different event values: 

256 

257 * ``("save", (key, value))`` 

258 * ``("delete", (key, value))`` 

259 

260 where ``key`` and ``value`` are both ``bytes``. 

261 

262 There is no ``move`` event. Instead, a move is broadcasted as a 

263 ``delete`` event followed by a ``save`` event. 

264 

265 For the ``delete`` event, ``value`` may be ``None``. This might occur if 

266 the database knows that a deletion has occurred in ``key``, but does not 

267 know what value was deleted. 

268 """ 

269 had_listeners = bool(self._listeners) 

270 self._listeners.append(f) 

271 if not had_listeners: 

272 self._start_listening() 

273 

274 def remove_listener(self, f: ListenerT, /) -> None: 

275 """ 

276 Removes ``f`` from the list of change listeners. 

277 

278 If ``f`` is not in the list of change listeners, silently do nothing. 

279 """ 

280 if f not in self._listeners: 

281 return 

282 self._listeners.remove(f) 

283 if not self._listeners: 

284 self._stop_listening() 

285 

286 def clear_listeners(self) -> None: 

287 """Remove all change listeners.""" 

288 had_listeners = bool(self._listeners) 

289 self._listeners.clear() 

290 if had_listeners: 

291 self._stop_listening() 

292 

293 def _broadcast_change(self, event: ListenerEventT) -> None: 

294 """ 

295 Called when a value has been either added to or deleted from a key in 

296 the underlying database store. The possible values for ``event`` are: 

297 

298 * ``("save", (key, value))`` 

299 * ``("delete", (key, value))`` 

300 

301 ``value`` may be ``None`` for the ``delete`` event, indicating we know 

302 that some value was deleted under this key, but not its exact value. 

303 

304 Note that you should not assume your instance is the only reference to 

305 the underlying database store. For example, if two instances of 

306 |DirectoryBasedExampleDatabase| reference the same directory, 

307 _broadcast_change should be called whenever a file is added or removed 

308 from the directory, even if that database was not responsible for 

309 changing the file. 

310 """ 

311 for listener in self._listeners: 

312 listener(event) 

313 

314 def _start_listening(self) -> None: 

315 """ 

316 Called when the database adds a change listener, and did not previously 

317 have any change listeners. Intended to allow databases to wait to start 

318 expensive listening operations until necessary. 

319 

320 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate, 

321 so you do not need to handle the case of multiple consecutive 

322 ``_start_listening`` calls without an intermediate ``_stop_listening`` 

323 call. 

324 """ 

325 warnings.warn( 

326 f"{self.__class__} does not support listening for changes", 

327 HypothesisWarning, 

328 stacklevel=4, 

329 ) 

330 

331 def _stop_listening(self) -> None: 

332 """ 

333 Called whenever no change listeners remain on the database. 

334 

335 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate, 

336 so you do not need to handle the case of multiple consecutive 

337 ``_stop_listening`` calls without an intermediate ``_start_listening`` 

338 call. 

339 """ 

340 warnings.warn( 

341 f"{self.__class__} does not support stopping listening for changes", 

342 HypothesisWarning, 

343 stacklevel=4, 

344 ) 

345 

346 

347class InMemoryExampleDatabase(ExampleDatabase): 

348 """A non-persistent example database, implemented in terms of an in-memory 

349 dictionary. 

350 

351 This can be useful if you call a test function several times in a single 

352 session, or for testing other database implementations, but because it 

353 does not persist between runs we do not recommend it for general use. 

354 """ 

355 

356 def __init__(self) -> None: 

357 super().__init__() 

358 self.data: dict[bytes, set[bytes]] = {} 

359 

360 def __repr__(self) -> str: 

361 return f"InMemoryExampleDatabase({self.data!r})" 

362 

363 def __eq__(self, other: object) -> bool: 

364 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data 

365 

366 def fetch(self, key: bytes) -> Iterable[bytes]: 

367 yield from self.data.get(key, ()) 

368 

369 def save(self, key: bytes, value: bytes) -> None: 

370 value = bytes(value) 

371 values = self.data.setdefault(key, set()) 

372 changed = value not in values 

373 values.add(value) 

374 

375 if changed: 

376 self._broadcast_change(("save", (key, value))) 

377 

378 def delete(self, key: bytes, value: bytes) -> None: 

379 value = bytes(value) 

380 values = self.data.get(key, set()) 

381 changed = value in values 

382 values.discard(value) 

383 

384 if changed: 

385 self._broadcast_change(("delete", (key, value))) 

386 

387 def _start_listening(self) -> None: 

388 # declare compatibility with the listener api, but do the actual 

389 # implementation in .delete and .save, since we know we are the only 

390 # writer to .data. 

391 pass 

392 

393 def _stop_listening(self) -> None: 

394 pass 

395 

396 

397def _hash(key: bytes) -> str: 

398 return sha384(key).hexdigest()[:16] 

399 

400 

401class DirectoryBasedExampleDatabase(ExampleDatabase): 

402 """Use a directory to store Hypothesis examples as files. 

403 

404 Each test corresponds to a directory, and each example to a file within that 

405 directory. While the contents are fairly opaque, a 

406 |DirectoryBasedExampleDatabase| can be shared by checking the directory 

407 into version control, for example with the following ``.gitignore``:: 

408 

409 # Ignore files cached by Hypothesis... 

410 .hypothesis/* 

411 # except for the examples directory 

412 !.hypothesis/examples/ 

413 

414 Note however that this only makes sense if you also pin to an exact version of 

415 Hypothesis, and we would usually recommend implementing a shared database with 

416 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper. 

417 """ 

418 

419 # we keep a database entry of the full values of all the database keys. 

420 # currently only used for inverse mapping of hash -> key in change listening. 

421 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys" 

422 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name) 

423 

424 def __init__(self, path: StrPathT) -> None: 

425 super().__init__() 

426 self.path = Path(path) 

427 self.keypaths: dict[bytes, Path] = {} 

428 self._observer: BaseObserver | None = None 

429 

430 def __repr__(self) -> str: 

431 return f"DirectoryBasedExampleDatabase({self.path!r})" 

432 

433 def __eq__(self, other: object) -> bool: 

434 return ( 

435 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path 

436 ) 

437 

438 def _key_path(self, key: bytes) -> Path: 

439 try: 

440 return self.keypaths[key] 

441 except KeyError: 

442 pass 

443 self.keypaths[key] = self.path / _hash(key) 

444 return self.keypaths[key] 

445 

446 def _value_path(self, key: bytes, value: bytes) -> Path: 

447 return self._key_path(key) / _hash(value) 

448 

449 def fetch(self, key: bytes) -> Iterable[bytes]: 

450 kp = self._key_path(key) 

451 if not kp.is_dir(): 

452 return 

453 

454 try: 

455 for path in os.listdir(kp): 

456 try: 

457 yield (kp / path).read_bytes() 

458 except OSError: 

459 pass 

460 except OSError: # pragma: no cover 

461 # the `kp` directory might have been deleted in the meantime 

462 pass 

463 

464 def save(self, key: bytes, value: bytes) -> None: 

465 key_path = self._key_path(key) 

466 if key_path.name != self._metakeys_hash: 

467 # add this key to our meta entry of all keys - taking care to avoid 

468 # infinite recursion. 

469 self.save(self._metakeys_name, key) 

470 

471 # Note: we attempt to create the dir in question now. We 

472 # already checked for permissions, but there can still be other issues, 

473 # e.g. the disk is full, or permissions might have been changed. 

474 try: 

475 key_path.mkdir(exist_ok=True, parents=True) 

476 path = self._value_path(key, value) 

477 if not path.exists(): 

478 # to mimic an atomic write, create and write in a temporary 

479 # directory, and only move to the final path after. This avoids 

480 # any intermediate state where the file is created (and empty) 

481 # but not yet written to. 

482 fd, tmpname = tempfile.mkstemp() 

483 tmppath = Path(tmpname) 

484 os.write(fd, value) 

485 os.close(fd) 

486 try: 

487 tmppath.rename(path) 

488 except OSError as err: # pragma: no cover 

489 if err.errno == errno.EXDEV: 

490 # Can't rename across filesystem boundaries, see e.g. 

491 # https://github.com/HypothesisWorks/hypothesis/issues/4335 

492 try: 

493 path.write_bytes(tmppath.read_bytes()) 

494 except OSError: 

495 pass 

496 tmppath.unlink() 

497 assert not tmppath.exists() 

498 except OSError: # pragma: no cover 

499 pass 

500 

501 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

502 if src == dest: 

503 self.save(src, value) 

504 return 

505 

506 src_path = self._value_path(src, value) 

507 dest_path = self._value_path(dest, value) 

508 # if the dest key path does not exist, os.renames will create it for us, 

509 # and we will never track its creation in the meta keys entry. Do so now. 

510 if not self._key_path(dest).exists(): 

511 self.save(self._metakeys_name, dest) 

512 

513 try: 

514 os.renames(src_path, dest_path) 

515 except OSError: 

516 self.delete(src, value) 

517 self.save(dest, value) 

518 

519 def delete(self, key: bytes, value: bytes) -> None: 

520 try: 

521 self._value_path(key, value).unlink() 

522 except OSError: 

523 return 

524 

525 # try deleting the key dir, which will only succeed if the dir is empty 

526 # (i.e. ``value`` was the last value in this key). 

527 try: 

528 self._key_path(key).rmdir() 

529 except OSError: 

530 pass 

531 else: 

532 # if the deletion succeeded, also delete this key entry from metakeys. 

533 # (if this key happens to be the metakey itself, this deletion will 

534 # fail; that's ok and faster than checking for this rare case.) 

535 self.delete(self._metakeys_name, key) 

536 

537 def _start_listening(self) -> None: 

538 try: 

539 from watchdog.events import ( 

540 DirCreatedEvent, 

541 DirDeletedEvent, 

542 DirMovedEvent, 

543 FileCreatedEvent, 

544 FileDeletedEvent, 

545 FileMovedEvent, 

546 FileSystemEventHandler, 

547 ) 

548 from watchdog.observers import Observer 

549 except ImportError: 

550 warnings.warn( 

551 f"listening for changes in a {self.__class__.__name__} " 

552 "requires the watchdog library. To install, run " 

553 "`pip install hypothesis[watchdog]`", 

554 HypothesisWarning, 

555 stacklevel=4, 

556 ) 

557 return 

558 

559 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)} 

560 _metakeys_hash = self._metakeys_hash 

561 _broadcast_change = self._broadcast_change 

562 

563 class Handler(FileSystemEventHandler): 

564 def on_created(_self, event: FileCreatedEvent | DirCreatedEvent) -> None: 

565 # we only registered for the file creation event 

566 assert not isinstance(event, DirCreatedEvent) 

567 # watchdog events are only bytes if we passed a byte path to 

568 # .schedule 

569 assert isinstance(event.src_path, str) 

570 

571 value_path = Path(event.src_path) 

572 # the parent dir represents the key, and its name is the key hash 

573 key_hash = value_path.parent.name 

574 

575 if key_hash == _metakeys_hash: 

576 try: 

577 hash_to_key[value_path.name] = value_path.read_bytes() 

578 except OSError: # pragma: no cover 

579 # this might occur if all the values in a key have been 

580 # deleted and DirectoryBasedExampleDatabase removes its 

581 # metakeys entry (which is `value_path` here`). 

582 pass 

583 return 

584 

585 key = hash_to_key.get(key_hash) 

586 if key is None: # pragma: no cover 

587 # we didn't recognize this key. This shouldn't ever happen, 

588 # but some race condition trickery might cause this. 

589 return 

590 

591 try: 

592 value = value_path.read_bytes() 

593 except OSError: # pragma: no cover 

594 return 

595 

596 _broadcast_change(("save", (key, value))) 

597 

598 def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None: 

599 assert not isinstance(event, DirDeletedEvent) 

600 assert isinstance(event.src_path, str) 

601 

602 value_path = Path(event.src_path) 

603 key = hash_to_key.get(value_path.parent.name) 

604 if key is None: # pragma: no cover 

605 return 

606 

607 _broadcast_change(("delete", (key, None))) 

608 

609 def on_moved(self, event: FileMovedEvent | DirMovedEvent) -> None: 

610 assert not isinstance(event, DirMovedEvent) 

611 assert isinstance(event.src_path, str) 

612 assert isinstance(event.dest_path, str) 

613 

614 src_path = Path(event.src_path) 

615 dest_path = Path(event.dest_path) 

616 k1 = hash_to_key.get(src_path.parent.name) 

617 k2 = hash_to_key.get(dest_path.parent.name) 

618 

619 if k1 is None or k2 is None: # pragma: no cover 

620 return 

621 

622 try: 

623 value = dest_path.read_bytes() 

624 except OSError: # pragma: no cover 

625 return 

626 

627 _broadcast_change(("delete", (k1, value))) 

628 _broadcast_change(("save", (k2, value))) 

629 

630 # If we add a listener to a DirectoryBasedExampleDatabase whose database 

631 # directory doesn't yet exist, the watchdog observer will not fire any 

632 # events, even after the directory gets created. 

633 # 

634 # Ensure the directory exists before starting the observer. 

635 self.path.mkdir(exist_ok=True, parents=True) 

636 self._observer = Observer() 

637 self._observer.schedule( 

638 Handler(), 

639 # remove type: ignore when released 

640 # https://github.com/gorakhargosh/watchdog/pull/1096 

641 self.path, # type: ignore 

642 recursive=True, 

643 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent], 

644 ) 

645 self._observer.start() 

646 

647 def _stop_listening(self) -> None: 

648 assert self._observer is not None 

649 self._observer.stop() 

650 self._observer.join() 

651 self._observer = None 

652 

653 

654class ReadOnlyDatabase(ExampleDatabase): 

655 """A wrapper to make the given database read-only. 

656 

657 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and 

658 ``move`` into silent no-ops. 

659 

660 Note that this disables Hypothesis' automatic discarding of stale examples. 

661 It is designed to allow local machines to access a shared database (e.g. from CI 

662 servers), without propagating changes back from a local or in-development branch. 

663 """ 

664 

665 def __init__(self, db: ExampleDatabase) -> None: 

666 super().__init__() 

667 assert isinstance(db, ExampleDatabase) 

668 self._wrapped = db 

669 

670 def __repr__(self) -> str: 

671 return f"ReadOnlyDatabase({self._wrapped!r})" 

672 

673 def __eq__(self, other: object) -> bool: 

674 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped 

675 

676 def fetch(self, key: bytes) -> Iterable[bytes]: 

677 yield from self._wrapped.fetch(key) 

678 

679 def save(self, key: bytes, value: bytes) -> None: 

680 pass 

681 

682 def delete(self, key: bytes, value: bytes) -> None: 

683 pass 

684 

685 def _start_listening(self) -> None: 

686 # we're read only, so there are no changes to broadcast. 

687 pass 

688 

689 def _stop_listening(self) -> None: 

690 pass 

691 

692 

693class MultiplexedDatabase(ExampleDatabase): 

694 """A wrapper around multiple databases. 

695 

696 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against 

697 all of the wrapped databases. ``fetch`` does not yield duplicate values, even 

698 if the same value is present in two or more of the wrapped databases. 

699 

700 This combines well with a :class:`ReadOnlyDatabase`, as follows: 

701 

702 .. code-block:: python 

703 

704 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/") 

705 shared = CustomNetworkDatabase() 

706 

707 settings.register_profile("ci", database=shared) 

708 settings.register_profile( 

709 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared)) 

710 ) 

711 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

712 

713 So your CI system or fuzzing runs can populate a central shared database; 

714 while local runs on development machines can reproduce any failures from CI 

715 but will only cache their own failures locally and cannot remove examples 

716 from the shared database. 

717 """ 

718 

719 def __init__(self, *dbs: ExampleDatabase) -> None: 

720 super().__init__() 

721 assert all(isinstance(db, ExampleDatabase) for db in dbs) 

722 self._wrapped = dbs 

723 

724 def __repr__(self) -> str: 

725 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped))) 

726 

727 def __eq__(self, other: object) -> bool: 

728 return ( 

729 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped 

730 ) 

731 

732 def fetch(self, key: bytes) -> Iterable[bytes]: 

733 seen = set() 

734 for db in self._wrapped: 

735 for value in db.fetch(key): 

736 if value not in seen: 

737 yield value 

738 seen.add(value) 

739 

740 def save(self, key: bytes, value: bytes) -> None: 

741 for db in self._wrapped: 

742 db.save(key, value) 

743 

744 def delete(self, key: bytes, value: bytes) -> None: 

745 for db in self._wrapped: 

746 db.delete(key, value) 

747 

748 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

749 for db in self._wrapped: 

750 db.move(src, dest, value) 

751 

752 def _start_listening(self) -> None: 

753 for db in self._wrapped: 

754 db.add_listener(self._broadcast_change) 

755 

756 def _stop_listening(self) -> None: 

757 for db in self._wrapped: 

758 db.remove_listener(self._broadcast_change) 

759 

760 

761class GitHubArtifactDatabase(ExampleDatabase): 

762 """ 

763 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact. 

764 

765 You can use this for sharing example databases between CI runs and developers, allowing 

766 the latter to get read-only access to the former. This is particularly useful for 

767 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_), 

768 where the CI system can help find new failing examples through fuzzing, 

769 and developers can reproduce them locally without any manual effort. 

770 

771 .. note:: 

772 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides 

773 this automatically, but it needs to be set manually for local usage. In a developer machine, 

774 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_. 

775 If the repository is private, it's necessary for the token to have ``repo`` scope 

776 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token. 

777 

778 

779 In most cases, this will be used 

780 through the :class:`~hypothesis.database.MultiplexedDatabase`, 

781 by combining a local directory-based database with this one. For example: 

782 

783 .. code-block:: python 

784 

785 local = DirectoryBasedExampleDatabase(".hypothesis/examples") 

786 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo")) 

787 

788 settings.register_profile("ci", database=local) 

789 settings.register_profile("dev", database=MultiplexedDatabase(local, shared)) 

790 # We don't want to use the shared database in CI, only to populate its local one. 

791 # which the workflow should then upload as an artifact. 

792 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

793 

794 .. note:: 

795 Because this database is read-only, you always need to wrap it with the 

796 :class:`ReadOnlyDatabase`. 

797 

798 A setup like this can be paired with a GitHub Actions workflow including 

799 something like the following: 

800 

801 .. code-block:: yaml 

802 

803 - name: Download example database 

804 uses: dawidd6/action-download-artifact@v9 

805 with: 

806 name: hypothesis-example-db 

807 path: .hypothesis/examples 

808 if_no_artifact_found: warn 

809 workflow_conclusion: completed 

810 

811 - name: Run tests 

812 run: pytest 

813 

814 - name: Upload example database 

815 uses: actions/upload-artifact@v3 

816 if: always() 

817 with: 

818 name: hypothesis-example-db 

819 path: .hypothesis/examples 

820 

821 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_ 

822 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_ 

823 does not support downloading artifacts from previous workflow runs. 

824 

825 The database automatically implements a simple file-based cache with a default expiration period 

826 of 1 day. You can adjust this through the ``cache_timeout`` property. 

827 

828 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``). 

829 """ 

830 

831 def __init__( 

832 self, 

833 owner: str, 

834 repo: str, 

835 artifact_name: str = "hypothesis-example-db", 

836 cache_timeout: timedelta = timedelta(days=1), 

837 path: StrPathT | None = None, 

838 ): 

839 super().__init__() 

840 self.owner = owner 

841 self.repo = repo 

842 self.artifact_name = artifact_name 

843 self.cache_timeout = cache_timeout 

844 

845 # Get the GitHub token from the environment 

846 # It's unnecessary to use a token if the repo is public 

847 self.token: str | None = getenv("GITHUB_TOKEN") 

848 

849 if path is None: 

850 self.path: Path = Path( 

851 storage_directory(f"github-artifacts/{self.artifact_name}/") 

852 ) 

853 else: 

854 self.path = Path(path) 

855 

856 # We don't want to initialize the cache until we need to 

857 self._initialized: bool = False 

858 self._disabled: bool = False 

859 

860 # This is the path to the artifact in usage 

861 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip 

862 self._artifact: Path | None = None 

863 # This caches the artifact structure 

864 self._access_cache: dict[PurePath, set[PurePath]] | None = None 

865 

866 # Message to display if user doesn't wrap around ReadOnlyDatabase 

867 self._read_only_message = ( 

868 "This database is read-only. " 

869 "Please wrap this class with ReadOnlyDatabase" 

870 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))." 

871 ) 

872 

873 def __repr__(self) -> str: 

874 return ( 

875 f"GitHubArtifactDatabase(owner={self.owner!r}, " 

876 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})" 

877 ) 

878 

879 def __eq__(self, other: object) -> bool: 

880 return ( 

881 isinstance(other, GitHubArtifactDatabase) 

882 and self.owner == other.owner 

883 and self.repo == other.repo 

884 and self.artifact_name == other.artifact_name 

885 and self.path == other.path 

886 ) 

887 

888 def _prepare_for_io(self) -> None: 

889 assert self._artifact is not None, "Artifact not loaded." 

890 

891 if self._initialized: # pragma: no cover 

892 return 

893 

894 # Test that the artifact is valid 

895 try: 

896 with ZipFile(self._artifact) as f: 

897 if f.testzip(): # pragma: no cover 

898 raise BadZipFile 

899 

900 # Turns out that testzip() doesn't work quite well 

901 # doing the cache initialization here instead 

902 # will give us more coverage of the artifact. 

903 

904 # Cache the files inside each keypath 

905 self._access_cache = {} 

906 with ZipFile(self._artifact) as zf: 

907 namelist = zf.namelist() 

908 # Iterate over files in the artifact 

909 for filename in namelist: 

910 fileinfo = zf.getinfo(filename) 

911 if fileinfo.is_dir(): 

912 self._access_cache[PurePath(filename)] = set() 

913 else: 

914 # Get the keypath from the filename 

915 keypath = PurePath(filename).parent 

916 # Add the file to the keypath 

917 self._access_cache[keypath].add(PurePath(filename)) 

918 except BadZipFile: 

919 warnings.warn( 

920 "The downloaded artifact from GitHub is invalid. " 

921 "This could be because the artifact was corrupted, " 

922 "or because the artifact was not created by Hypothesis. ", 

923 HypothesisWarning, 

924 stacklevel=3, 

925 ) 

926 self._disabled = True 

927 

928 self._initialized = True 

929 

930 def _initialize_db(self) -> None: 

931 # Trigger warning that we suppressed earlier by intent_to_write=False 

932 storage_directory(self.path.name) 

933 # Create the cache directory if it doesn't exist 

934 self.path.mkdir(exist_ok=True, parents=True) 

935 

936 # Get all artifacts 

937 cached_artifacts = sorted( 

938 self.path.glob("*.zip"), 

939 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")), 

940 ) 

941 

942 # Remove all but the latest artifact 

943 for artifact in cached_artifacts[:-1]: 

944 artifact.unlink() 

945 

946 try: 

947 found_artifact = cached_artifacts[-1] 

948 except IndexError: 

949 found_artifact = None 

950 

951 # Check if the latest artifact is a cache hit 

952 if found_artifact is not None and ( 

953 datetime.now(timezone.utc) 

954 - datetime.fromisoformat(found_artifact.stem.replace("_", ":")) 

955 < self.cache_timeout 

956 ): 

957 self._artifact = found_artifact 

958 else: 

959 # Download the latest artifact from GitHub 

960 new_artifact = self._fetch_artifact() 

961 

962 if new_artifact: 

963 if found_artifact is not None: 

964 found_artifact.unlink() 

965 self._artifact = new_artifact 

966 elif found_artifact is not None: 

967 warnings.warn( 

968 "Using an expired artifact as a fallback for the database: " 

969 f"{found_artifact}", 

970 HypothesisWarning, 

971 stacklevel=2, 

972 ) 

973 self._artifact = found_artifact 

974 else: 

975 warnings.warn( 

976 "Couldn't acquire a new or existing artifact. Disabling database.", 

977 HypothesisWarning, 

978 stacklevel=2, 

979 ) 

980 self._disabled = True 

981 return 

982 

983 self._prepare_for_io() 

984 

985 def _get_bytes(self, url: str) -> bytes | None: # pragma: no cover 

986 request = Request( 

987 url, 

988 headers={ 

989 "Accept": "application/vnd.github+json", 

990 "X-GitHub-Api-Version": "2022-11-28 ", 

991 "Authorization": f"Bearer {self.token}", 

992 }, 

993 ) 

994 warning_message = None 

995 response_bytes: bytes | None = None 

996 try: 

997 with urlopen(request) as response: 

998 response_bytes = response.read() 

999 except HTTPError as e: 

1000 if e.code == 401: 

1001 warning_message = ( 

1002 "Authorization failed when trying to download artifact from GitHub. " 

1003 "Check that you have a valid GITHUB_TOKEN set in your environment." 

1004 ) 

1005 else: 

1006 warning_message = ( 

1007 "Could not get the latest artifact from GitHub. " 

1008 "This could be because because the repository " 

1009 "or artifact does not exist. " 

1010 ) 

1011 # see https://github.com/python/cpython/issues/128734 

1012 e.close() 

1013 except URLError: 

1014 warning_message = "Could not connect to GitHub to get the latest artifact. " 

1015 except TimeoutError: 

1016 warning_message = ( 

1017 "Could not connect to GitHub to get the latest artifact " 

1018 "(connection timed out)." 

1019 ) 

1020 

1021 if warning_message is not None: 

1022 warnings.warn(warning_message, HypothesisWarning, stacklevel=4) 

1023 return None 

1024 

1025 return response_bytes 

1026 

1027 def _fetch_artifact(self) -> Path | None: # pragma: no cover 

1028 # Get the list of artifacts from GitHub 

1029 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts" 

1030 response_bytes = self._get_bytes(url) 

1031 if response_bytes is None: 

1032 return None 

1033 

1034 artifacts = json.loads(response_bytes)["artifacts"] 

1035 artifacts = [a for a in artifacts if a["name"] == self.artifact_name] 

1036 

1037 if not artifacts: 

1038 return None 

1039 

1040 # Get the latest artifact from the list 

1041 artifact = max(artifacts, key=lambda a: a["created_at"]) 

1042 url = artifact["archive_download_url"] 

1043 

1044 # Download the artifact 

1045 artifact_bytes = self._get_bytes(url) 

1046 if artifact_bytes is None: 

1047 return None 

1048 

1049 # Save the artifact to the cache 

1050 # We replace ":" with "_" to ensure the filenames are compatible 

1051 # with Windows filesystems 

1052 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_") 

1053 artifact_path = self.path / f"{timestamp}.zip" 

1054 try: 

1055 artifact_path.write_bytes(artifact_bytes) 

1056 except OSError: 

1057 warnings.warn( 

1058 "Could not save the latest artifact from GitHub. ", 

1059 HypothesisWarning, 

1060 stacklevel=3, 

1061 ) 

1062 return None 

1063 

1064 return artifact_path 

1065 

1066 @staticmethod 

1067 @lru_cache 

1068 def _key_path(key: bytes) -> PurePath: 

1069 return PurePath(_hash(key) + "/") 

1070 

1071 def fetch(self, key: bytes) -> Iterable[bytes]: 

1072 if self._disabled: 

1073 return 

1074 

1075 if not self._initialized: 

1076 self._initialize_db() 

1077 if self._disabled: 

1078 return 

1079 

1080 assert self._artifact is not None 

1081 assert self._access_cache is not None 

1082 

1083 kp = self._key_path(key) 

1084 

1085 with ZipFile(self._artifact) as zf: 

1086 # Get the all files in the the kp from the cache 

1087 filenames = self._access_cache.get(kp, ()) 

1088 for filename in filenames: 

1089 with zf.open(filename.as_posix()) as f: 

1090 yield f.read() 

1091 

1092 # Read-only interface 

1093 def save(self, key: bytes, value: bytes) -> None: 

1094 raise RuntimeError(self._read_only_message) 

1095 

1096 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1097 raise RuntimeError(self._read_only_message) 

1098 

1099 def delete(self, key: bytes, value: bytes) -> None: 

1100 raise RuntimeError(self._read_only_message) 

1101 

1102 

1103class BackgroundWriteDatabase(ExampleDatabase): 

1104 """A wrapper which defers writes on the given database to a background thread. 

1105 

1106 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any 

1107 enqueued writes to finish before fetching from the database. 

1108 """ 

1109 

1110 def __init__(self, db: ExampleDatabase) -> None: 

1111 super().__init__() 

1112 self._db = db 

1113 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue() 

1114 self._thread: Thread | None = None 

1115 

1116 def _ensure_thread(self): 

1117 if self._thread is None: 

1118 self._thread = Thread(target=self._worker, daemon=True) 

1119 self._thread.start() 

1120 # avoid an unbounded timeout during gc. 0.1 should be plenty for most 

1121 # use cases. 

1122 weakref.finalize(self, self._join, 0.1) 

1123 

1124 def __repr__(self) -> str: 

1125 return f"BackgroundWriteDatabase({self._db!r})" 

1126 

1127 def __eq__(self, other: object) -> bool: 

1128 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db 

1129 

1130 def _worker(self) -> None: 

1131 while True: 

1132 method, args = self._queue.get() 

1133 getattr(self._db, method)(*args) 

1134 self._queue.task_done() 

1135 

1136 def _join(self, timeout: float | None = None) -> None: 

1137 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634 

1138 with self._queue.all_tasks_done: 

1139 while self._queue.unfinished_tasks: 

1140 self._queue.all_tasks_done.wait(timeout) 

1141 

1142 def fetch(self, key: bytes) -> Iterable[bytes]: 

1143 self._join() 

1144 return self._db.fetch(key) 

1145 

1146 def save(self, key: bytes, value: bytes) -> None: 

1147 self._ensure_thread() 

1148 self._queue.put(("save", (key, value))) 

1149 

1150 def delete(self, key: bytes, value: bytes) -> None: 

1151 self._ensure_thread() 

1152 self._queue.put(("delete", (key, value))) 

1153 

1154 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1155 self._ensure_thread() 

1156 self._queue.put(("move", (src, dest, value))) 

1157 

1158 def _start_listening(self) -> None: 

1159 self._db.add_listener(self._broadcast_change) 

1160 

1161 def _stop_listening(self) -> None: 

1162 self._db.remove_listener(self._broadcast_change) 

1163 

1164 

1165def _pack_uleb128(value: int) -> bytes: 

1166 """ 

1167 Serialize an integer into variable-length bytes. For each byte, the first 7 

1168 bits represent (part of) the integer, while the last bit indicates whether the 

1169 integer continues into the next byte. 

1170 

1171 https://en.wikipedia.org/wiki/LEB128 

1172 """ 

1173 parts = bytearray() 

1174 assert value >= 0 

1175 while True: 

1176 # chop off 7 bits 

1177 byte = value & ((1 << 7) - 1) 

1178 value >>= 7 

1179 # set the continuation bit if we have more left 

1180 if value: 

1181 byte |= 1 << 7 

1182 

1183 parts.append(byte) 

1184 if not value: 

1185 break 

1186 return bytes(parts) 

1187 

1188 

1189def _unpack_uleb128(buffer: bytes) -> tuple[int, int]: 

1190 """ 

1191 Inverts _pack_uleb128, and also returns the index at which at which we stopped 

1192 reading. 

1193 """ 

1194 value = 0 

1195 for i, byte in enumerate(buffer): 

1196 n = byte & ((1 << 7) - 1) 

1197 value |= n << (i * 7) 

1198 

1199 if not byte >> 7: 

1200 break 

1201 return (i + 1, value) 

1202 

1203 

1204def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes: 

1205 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes.""" 

1206 # We use a custom serialization format for this, which might seem crazy - but our 

1207 # data is a flat sequence of elements, and standard tools like protobuf or msgpack 

1208 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode. 

1209 # 

1210 # We simply encode each element with a metadata byte, if needed a uint16 size, and 

1211 # then the payload bytes. For booleans, the payload is inlined into the metadata. 

1212 parts = [] 

1213 for choice in choices: 

1214 if isinstance(choice, bool): 

1215 # `000_0000v` - tag zero, low bit payload. 

1216 parts.append(b"\1" if choice else b"\0") 

1217 continue 

1218 

1219 # `tag_ssss [uint16 size?] [payload]` 

1220 if isinstance(choice, float): 

1221 tag = 1 << 5 

1222 choice = struct.pack("!d", choice) 

1223 elif isinstance(choice, int): 

1224 tag = 2 << 5 

1225 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True) 

1226 elif isinstance(choice, bytes): 

1227 tag = 3 << 5 

1228 else: 

1229 assert isinstance(choice, str) 

1230 tag = 4 << 5 

1231 choice = choice.encode(errors="surrogatepass") 

1232 

1233 size = len(choice) 

1234 if size < 0b11111: 

1235 parts.append((tag | size).to_bytes(1, "big")) 

1236 else: 

1237 parts.append((tag | 0b11111).to_bytes(1, "big")) 

1238 parts.append(_pack_uleb128(size)) 

1239 parts.append(choice) 

1240 

1241 return b"".join(parts) 

1242 

1243 

1244def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]: 

1245 # See above for an explanation of the format. 

1246 parts: list[ChoiceT] = [] 

1247 idx = 0 

1248 while idx < len(buffer): 

1249 tag = buffer[idx] >> 5 

1250 size = buffer[idx] & 0b11111 

1251 idx += 1 

1252 

1253 if tag == 0: 

1254 parts.append(bool(size)) 

1255 continue 

1256 if size == 0b11111: 

1257 (offset, size) = _unpack_uleb128(buffer[idx:]) 

1258 idx += offset 

1259 chunk = buffer[idx : idx + size] 

1260 idx += size 

1261 

1262 if tag == 1: 

1263 assert size == 8, "expected float64" 

1264 parts.extend(struct.unpack("!d", chunk)) 

1265 elif tag == 2: 

1266 parts.append(int.from_bytes(chunk, "big", signed=True)) 

1267 elif tag == 3: 

1268 parts.append(chunk) 

1269 else: 

1270 assert tag == 4 

1271 parts.append(chunk.decode(errors="surrogatepass")) 

1272 return tuple(parts) 

1273 

1274 

1275def choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...] | None: 

1276 """ 

1277 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes. 

1278 

1279 Returns None if the given bytestring is not a valid serialization of choice 

1280 sequences. 

1281 """ 

1282 try: 

1283 return _choices_from_bytes(buffer) 

1284 except Exception: 

1285 # deserialization error, eg because our format changed or someone put junk 

1286 # data in the db. 

1287 return None