Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/database.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

504 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import abc 

12import errno 

13import json 

14import os 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import weakref 

20from collections.abc import Iterable 

21from datetime import datetime, timedelta, timezone 

22from functools import lru_cache 

23from hashlib import sha384 

24from os import PathLike, getenv 

25from pathlib import Path, PurePath 

26from queue import Queue 

27from threading import Thread 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 Callable, 

32 ClassVar, 

33 Literal, 

34 Optional, 

35 Union, 

36 cast, 

37) 

38from urllib.error import HTTPError, URLError 

39from urllib.request import Request, urlopen 

40from zipfile import BadZipFile, ZipFile 

41 

42from hypothesis._settings import note_deprecation 

43from hypothesis.configuration import storage_directory 

44from hypothesis.errors import HypothesisException, HypothesisWarning 

45from hypothesis.internal.conjecture.choice import ChoiceT 

46from hypothesis.utils.conventions import UniqueIdentifier, not_set 

47 

48__all__ = [ 

49 "DirectoryBasedExampleDatabase", 

50 "ExampleDatabase", 

51 "GitHubArtifactDatabase", 

52 "InMemoryExampleDatabase", 

53 "MultiplexedDatabase", 

54 "ReadOnlyDatabase", 

55] 

56 

57if TYPE_CHECKING: 

58 from typing import TypeAlias 

59 

60 from watchdog.observers.api import BaseObserver 

61 

62StrPathT: "TypeAlias" = Union[str, PathLike[str]] 

63SaveDataT: "TypeAlias" = tuple[bytes, bytes] # key, value 

64DeleteDataT: "TypeAlias" = tuple[bytes, Optional[bytes]] # key, value 

65ListenerEventT: "TypeAlias" = Union[ 

66 tuple[Literal["save"], SaveDataT], tuple[Literal["delete"], DeleteDataT] 

67] 

68ListenerT: "TypeAlias" = Callable[[ListenerEventT], Any] 

69 

70 

71def _usable_dir(path: StrPathT) -> bool: 

72 """ 

73 Returns True if the desired path can be used as database path because 

74 either the directory exists and can be used, or its root directory can 

75 be used and we can make the directory as needed. 

76 """ 

77 path = Path(path) 

78 try: 

79 while not path.exists(): 

80 # Loop terminates because the root dir ('/' on unix) always exists. 

81 path = path.parent 

82 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK) 

83 except PermissionError: 

84 return False 

85 

86 

87def _db_for_path( 

88 path: Optional[Union[StrPathT, UniqueIdentifier, Literal[":memory:"]]] = None, 

89) -> "ExampleDatabase": 

90 if path is not_set: 

91 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover 

92 raise HypothesisException( 

93 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any " 

94 "effect. Configure your database location via a settings profile instead.\n" 

95 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles" 

96 ) 

97 

98 path = storage_directory("examples", intent_to_write=False) 

99 if not _usable_dir(path): # pragma: no cover 

100 warnings.warn( 

101 "The database setting is not configured, and the default " 

102 "location is unusable - falling back to an in-memory " 

103 f"database for this session. {path=}", 

104 HypothesisWarning, 

105 stacklevel=3, 

106 ) 

107 return InMemoryExampleDatabase() 

108 if path in (None, ":memory:"): 

109 return InMemoryExampleDatabase() 

110 path = cast(StrPathT, path) 

111 return DirectoryBasedExampleDatabase(path) 

112 

113 

114class _EDMeta(abc.ABCMeta): 

115 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase": 

116 if self is ExampleDatabase: 

117 note_deprecation( 

118 "Creating a database using the abstract ExampleDatabase() class " 

119 "is deprecated. Prefer using a concrete subclass, like " 

120 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). " 

121 'In particular, the special string ExampleDatabase(":memory:") ' 

122 "should be replaced by InMemoryExampleDatabase().", 

123 since="2025-04-07", 

124 has_codemod=False, 

125 ) 

126 return _db_for_path(*args, **kwargs) 

127 return super().__call__(*args, **kwargs) 

128 

129 

130# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase 

131# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx 

132# maintains a list of metaclass-call-methods to ignore, and while they would prefer 

133# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we 

134# can insert ourselves here. 

135# 

136# This code only runs if Sphinx has already been imported; and it would live in our 

137# docs/conf.py except that we would also like it to work for anyone documenting 

138# downstream ExampleDatabase subclasses too. 

139if "sphinx" in sys.modules: 

140 try: 

141 from sphinx.ext.autodoc import _METACLASS_CALL_BLACKLIST 

142 

143 _METACLASS_CALL_BLACKLIST.append("hypothesis.database._EDMeta.__call__") 

144 except Exception: 

145 pass 

146 

147 

148class ExampleDatabase(metaclass=_EDMeta): 

149 """ 

150 A Hypothesis database, for use in |settings.database|. 

151 

152 Hypothesis automatically saves failures to the database set in 

153 |settings.database|. The next time the test is run, Hypothesis will replay 

154 any failures from the database in |settings.database| for that test (in 

155 |Phase.reuse|). 

156 

157 The database is best thought of as a cache that you never need to invalidate. 

158 Entries may be transparently dropped when upgrading your Hypothesis version 

159 or changing your test. Do not rely on the database for correctness; to ensure 

160 Hypothesis always tries an input, use |@example|. 

161 

162 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis 

163 provides several concrete database subclasses. To write your own database class, 

164 see :doc:`/how-to/custom-database`. 

165 

166 Change listening 

167 ---------------- 

168 

169 An optional extension to |ExampleDatabase| is change listening. On databases 

170 which support change listening, calling |ExampleDatabase.add_listener| adds 

171 a function as a change listener, which will be called whenever a value is 

172 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener| 

173 for details. 

174 

175 All databases in Hypothesis support change listening. Custom database classes 

176 are not required to support change listening, though they will not be compatible 

177 with features that require change listening until they do so. 

178 

179 .. note:: 

180 

181 While no Hypothesis features currently require change listening, change 

182 listening is required by `HypoFuzz <https://hypofuzz.com/>`_. 

183 

184 Database methods 

185 ---------------- 

186 

187 Required methods: 

188 

189 * |ExampleDatabase.save| 

190 * |ExampleDatabase.fetch| 

191 * |ExampleDatabase.delete| 

192 

193 Optional methods: 

194 

195 * |ExampleDatabase.move| 

196 

197 Change listening methods: 

198 

199 * |ExampleDatabase.add_listener| 

200 * |ExampleDatabase.remove_listener| 

201 * |ExampleDatabase.clear_listeners| 

202 * |ExampleDatabase._start_listening| 

203 * |ExampleDatabase._stop_listening| 

204 * |ExampleDatabase._broadcast_change| 

205 """ 

206 

207 def __init__(self) -> None: 

208 self._listeners: list[ListenerT] = [] 

209 

210 @abc.abstractmethod 

211 def save(self, key: bytes, value: bytes) -> None: 

212 """Save ``value`` under ``key``. 

213 

214 If ``value`` is already present in ``key``, silently do nothing. 

215 """ 

216 raise NotImplementedError(f"{type(self).__name__}.save") 

217 

218 @abc.abstractmethod 

219 def fetch(self, key: bytes) -> Iterable[bytes]: 

220 """Return an iterable over all values matching this key.""" 

221 raise NotImplementedError(f"{type(self).__name__}.fetch") 

222 

223 @abc.abstractmethod 

224 def delete(self, key: bytes, value: bytes) -> None: 

225 """Remove ``value`` from ``key``. 

226 

227 If ``value`` is not present in ``key``, silently do nothing. 

228 """ 

229 raise NotImplementedError(f"{type(self).__name__}.delete") 

230 

231 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

232 """ 

233 Move ``value`` from key ``src`` to key ``dest``. 

234 

235 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``, 

236 but may have a more efficient implementation. 

237 

238 Note that ``value`` will be inserted at ``dest`` regardless of whether 

239 it is currently present at ``src``. 

240 """ 

241 if src == dest: 

242 self.save(src, value) 

243 return 

244 self.delete(src, value) 

245 self.save(dest, value) 

246 

247 def add_listener(self, f: ListenerT, /) -> None: 

248 """ 

249 Add a change listener. ``f`` will be called whenever a value is saved, 

250 deleted, or moved in the database. 

251 

252 ``f`` can be called with two different event values: 

253 

254 * ``("save", (key, value))`` 

255 * ``("delete", (key, value))`` 

256 

257 where ``key`` and ``value`` are both ``bytes``. 

258 

259 There is no ``move`` event. Instead, a move is broadcasted as a 

260 ``delete`` event followed by a ``save`` event. 

261 

262 For the ``delete`` event, ``value`` may be ``None``. This might occur if 

263 the database knows that a deletion has occurred in ``key``, but does not 

264 know what value was deleted. 

265 """ 

266 had_listeners = bool(self._listeners) 

267 self._listeners.append(f) 

268 if not had_listeners: 

269 self._start_listening() 

270 

271 def remove_listener(self, f: ListenerT, /) -> None: 

272 """ 

273 Removes ``f`` from the list of change listeners. 

274 

275 If ``f`` is not in the list of change listeners, silently do nothing. 

276 """ 

277 if f not in self._listeners: 

278 return 

279 self._listeners.remove(f) 

280 if not self._listeners: 

281 self._stop_listening() 

282 

283 def clear_listeners(self) -> None: 

284 """Remove all change listeners.""" 

285 had_listeners = bool(self._listeners) 

286 self._listeners.clear() 

287 if had_listeners: 

288 self._stop_listening() 

289 

290 def _broadcast_change(self, event: ListenerEventT) -> None: 

291 """ 

292 Called when a value has been either added to or deleted from a key in 

293 the underlying database store. The possible values for ``event`` are: 

294 

295 * ``("save", (key, value))`` 

296 * ``("delete", (key, value))`` 

297 

298 ``value`` may be ``None`` for the ``delete`` event, indicating we know 

299 that some value was deleted under this key, but not its exact value. 

300 

301 Note that you should not assume your instance is the only reference to 

302 the underlying database store. For example, if two instances of 

303 |DirectoryBasedExampleDatabase| reference the same directory, 

304 _broadcast_change should be called whenever a file is added or removed 

305 from the directory, even if that database was not responsible for 

306 changing the file. 

307 """ 

308 for listener in self._listeners: 

309 listener(event) 

310 

311 def _start_listening(self) -> None: 

312 """ 

313 Called when the database adds a change listener, and did not previously 

314 have any change listeners. Intended to allow databases to wait to start 

315 expensive listening operations until necessary. 

316 

317 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate, 

318 so you do not need to handle the case of multiple consecutive 

319 ``_start_listening`` calls without an intermediate ``_stop_listening`` 

320 call. 

321 """ 

322 warnings.warn( 

323 f"{self.__class__} does not support listening for changes", 

324 HypothesisWarning, 

325 stacklevel=4, 

326 ) 

327 

328 def _stop_listening(self) -> None: 

329 """ 

330 Called whenever no change listeners remain on the database. 

331 

332 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate, 

333 so you do not need to handle the case of multiple consecutive 

334 ``_stop_listening`` calls without an intermediate ``_start_listening`` 

335 call. 

336 """ 

337 warnings.warn( 

338 f"{self.__class__} does not support stopping listening for changes", 

339 HypothesisWarning, 

340 stacklevel=4, 

341 ) 

342 

343 

344class InMemoryExampleDatabase(ExampleDatabase): 

345 """A non-persistent example database, implemented in terms of an in-memory 

346 dictionary. 

347 

348 This can be useful if you call a test function several times in a single 

349 session, or for testing other database implementations, but because it 

350 does not persist between runs we do not recommend it for general use. 

351 """ 

352 

353 def __init__(self) -> None: 

354 super().__init__() 

355 self.data: dict[bytes, set[bytes]] = {} 

356 

357 def __repr__(self) -> str: 

358 return f"InMemoryExampleDatabase({self.data!r})" 

359 

360 def __eq__(self, other: object) -> bool: 

361 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data 

362 

363 def fetch(self, key: bytes) -> Iterable[bytes]: 

364 yield from self.data.get(key, ()) 

365 

366 def save(self, key: bytes, value: bytes) -> None: 

367 value = bytes(value) 

368 values = self.data.setdefault(key, set()) 

369 changed = value not in values 

370 values.add(value) 

371 

372 if changed: 

373 self._broadcast_change(("save", (key, value))) 

374 

375 def delete(self, key: bytes, value: bytes) -> None: 

376 value = bytes(value) 

377 values = self.data.get(key, set()) 

378 changed = value in values 

379 values.discard(value) 

380 

381 if changed: 

382 self._broadcast_change(("delete", (key, value))) 

383 

384 def _start_listening(self) -> None: 

385 # declare compatibility with the listener api, but do the actual 

386 # implementation in .delete and .save, since we know we are the only 

387 # writer to .data. 

388 pass 

389 

390 def _stop_listening(self) -> None: 

391 pass 

392 

393 

394def _hash(key: bytes) -> str: 

395 return sha384(key).hexdigest()[:16] 

396 

397 

398class DirectoryBasedExampleDatabase(ExampleDatabase): 

399 """Use a directory to store Hypothesis examples as files. 

400 

401 Each test corresponds to a directory, and each example to a file within that 

402 directory. While the contents are fairly opaque, a 

403 |DirectoryBasedExampleDatabase| can be shared by checking the directory 

404 into version control, for example with the following ``.gitignore``:: 

405 

406 # Ignore files cached by Hypothesis... 

407 .hypothesis/* 

408 # except for the examples directory 

409 !.hypothesis/examples/ 

410 

411 Note however that this only makes sense if you also pin to an exact version of 

412 Hypothesis, and we would usually recommend implementing a shared database with 

413 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper. 

414 """ 

415 

416 # we keep a database entry of the full values of all the database keys. 

417 # currently only used for inverse mapping of hash -> key in change listening. 

418 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys" 

419 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name) 

420 

421 def __init__(self, path: StrPathT) -> None: 

422 super().__init__() 

423 self.path = Path(path) 

424 self.keypaths: dict[bytes, Path] = {} 

425 self._observer: BaseObserver | None = None 

426 

427 def __repr__(self) -> str: 

428 return f"DirectoryBasedExampleDatabase({self.path!r})" 

429 

430 def __eq__(self, other: object) -> bool: 

431 return ( 

432 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path 

433 ) 

434 

435 def _key_path(self, key: bytes) -> Path: 

436 try: 

437 return self.keypaths[key] 

438 except KeyError: 

439 pass 

440 self.keypaths[key] = self.path / _hash(key) 

441 return self.keypaths[key] 

442 

443 def _value_path(self, key: bytes, value: bytes) -> Path: 

444 return self._key_path(key) / _hash(value) 

445 

446 def fetch(self, key: bytes) -> Iterable[bytes]: 

447 kp = self._key_path(key) 

448 if not kp.is_dir(): 

449 return 

450 

451 try: 

452 for path in os.listdir(kp): 

453 try: 

454 yield (kp / path).read_bytes() 

455 except OSError: 

456 pass 

457 except OSError: # pragma: no cover 

458 # the `kp` directory might have been deleted in the meantime 

459 pass 

460 

461 def save(self, key: bytes, value: bytes) -> None: 

462 key_path = self._key_path(key) 

463 if key_path.name != self._metakeys_hash: 

464 # add this key to our meta entry of all keys - taking care to avoid 

465 # infinite recursion. 

466 self.save(self._metakeys_name, key) 

467 

468 # Note: we attempt to create the dir in question now. We 

469 # already checked for permissions, but there can still be other issues, 

470 # e.g. the disk is full, or permissions might have been changed. 

471 try: 

472 key_path.mkdir(exist_ok=True, parents=True) 

473 path = self._value_path(key, value) 

474 if not path.exists(): 

475 # to mimic an atomic write, create and write in a temporary 

476 # directory, and only move to the final path after. This avoids 

477 # any intermediate state where the file is created (and empty) 

478 # but not yet written to. 

479 fd, tmpname = tempfile.mkstemp() 

480 tmppath = Path(tmpname) 

481 os.write(fd, value) 

482 os.close(fd) 

483 try: 

484 tmppath.rename(path) 

485 except OSError as err: # pragma: no cover 

486 if err.errno == errno.EXDEV: 

487 # Can't rename across filesystem boundaries, see e.g. 

488 # https://github.com/HypothesisWorks/hypothesis/issues/4335 

489 try: 

490 path.write_bytes(tmppath.read_bytes()) 

491 except OSError: 

492 pass 

493 tmppath.unlink() 

494 assert not tmppath.exists() 

495 except OSError: # pragma: no cover 

496 pass 

497 

498 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

499 if src == dest: 

500 self.save(src, value) 

501 return 

502 

503 src_path = self._value_path(src, value) 

504 dest_path = self._value_path(dest, value) 

505 # if the dest key path does not exist, os.renames will create it for us, 

506 # and we will never track its creation in the meta keys entry. Do so now. 

507 if not self._key_path(dest).exists(): 

508 self.save(self._metakeys_name, dest) 

509 

510 try: 

511 os.renames(src_path, dest_path) 

512 except OSError: 

513 self.delete(src, value) 

514 self.save(dest, value) 

515 

516 def delete(self, key: bytes, value: bytes) -> None: 

517 try: 

518 self._value_path(key, value).unlink() 

519 except OSError: 

520 return 

521 

522 # try deleting the key dir, which will only succeed if the dir is empty 

523 # (i.e. ``value`` was the last value in this key). 

524 try: 

525 self._key_path(key).rmdir() 

526 except OSError: 

527 pass 

528 else: 

529 # if the deletion succeeded, also delete this key entry from metakeys. 

530 # (if this key happens to be the metakey itself, this deletion will 

531 # fail; that's ok and faster than checking for this rare case.) 

532 self.delete(self._metakeys_name, key) 

533 

534 def _start_listening(self) -> None: 

535 try: 

536 from watchdog.events import ( 

537 DirCreatedEvent, 

538 DirDeletedEvent, 

539 DirMovedEvent, 

540 FileCreatedEvent, 

541 FileDeletedEvent, 

542 FileMovedEvent, 

543 FileSystemEventHandler, 

544 ) 

545 from watchdog.observers import Observer 

546 except ImportError: 

547 warnings.warn( 

548 f"listening for changes in a {self.__class__.__name__} " 

549 "requires the watchdog library. To install, run " 

550 "`pip install hypothesis[watchdog]`", 

551 HypothesisWarning, 

552 stacklevel=4, 

553 ) 

554 return 

555 

556 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)} 

557 _metakeys_hash = self._metakeys_hash 

558 _broadcast_change = self._broadcast_change 

559 

560 class Handler(FileSystemEventHandler): 

561 def on_created( 

562 _self, event: Union[FileCreatedEvent, DirCreatedEvent] 

563 ) -> None: 

564 # we only registered for the file creation event 

565 assert not isinstance(event, DirCreatedEvent) 

566 # watchdog events are only bytes if we passed a byte path to 

567 # .schedule 

568 assert isinstance(event.src_path, str) 

569 

570 value_path = Path(event.src_path) 

571 # the parent dir represents the key, and its name is the key hash 

572 key_hash = value_path.parent.name 

573 

574 if key_hash == _metakeys_hash: 

575 try: 

576 hash_to_key[value_path.name] = value_path.read_bytes() 

577 except OSError: # pragma: no cover 

578 # this might occur if all the values in a key have been 

579 # deleted and DirectoryBasedExampleDatabase removes its 

580 # metakeys entry (which is `value_path` here`). 

581 pass 

582 return 

583 

584 key = hash_to_key.get(key_hash) 

585 if key is None: # pragma: no cover 

586 # we didn't recognize this key. This shouldn't ever happen, 

587 # but some race condition trickery might cause this. 

588 return 

589 

590 try: 

591 value = value_path.read_bytes() 

592 except OSError: # pragma: no cover 

593 return 

594 

595 _broadcast_change(("save", (key, value))) 

596 

597 def on_deleted( 

598 self, event: Union[FileDeletedEvent, DirDeletedEvent] 

599 ) -> None: 

600 assert not isinstance(event, DirDeletedEvent) 

601 assert isinstance(event.src_path, str) 

602 

603 value_path = Path(event.src_path) 

604 key = hash_to_key.get(value_path.parent.name) 

605 if key is None: # pragma: no cover 

606 return 

607 

608 _broadcast_change(("delete", (key, None))) 

609 

610 def on_moved(self, event: Union[FileMovedEvent, DirMovedEvent]) -> None: 

611 assert not isinstance(event, DirMovedEvent) 

612 assert isinstance(event.src_path, str) 

613 assert isinstance(event.dest_path, str) 

614 

615 src_path = Path(event.src_path) 

616 dest_path = Path(event.dest_path) 

617 k1 = hash_to_key.get(src_path.parent.name) 

618 k2 = hash_to_key.get(dest_path.parent.name) 

619 

620 if k1 is None or k2 is None: # pragma: no cover 

621 return 

622 

623 try: 

624 value = dest_path.read_bytes() 

625 except OSError: # pragma: no cover 

626 return 

627 

628 _broadcast_change(("delete", (k1, value))) 

629 _broadcast_change(("save", (k2, value))) 

630 

631 # If we add a listener to a DirectoryBasedExampleDatabase whose database 

632 # directory doesn't yet exist, the watchdog observer will not fire any 

633 # events, even after the directory gets created. 

634 # 

635 # Ensure the directory exists before starting the observer. 

636 self.path.mkdir(exist_ok=True, parents=True) 

637 self._observer = Observer() 

638 self._observer.schedule( 

639 Handler(), 

640 # remove type: ignore when released 

641 # https://github.com/gorakhargosh/watchdog/pull/1096 

642 self.path, # type: ignore 

643 recursive=True, 

644 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent], 

645 ) 

646 self._observer.start() 

647 

648 def _stop_listening(self) -> None: 

649 assert self._observer is not None 

650 self._observer.stop() 

651 self._observer.join() 

652 self._observer = None 

653 

654 

655class ReadOnlyDatabase(ExampleDatabase): 

656 """A wrapper to make the given database read-only. 

657 

658 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and 

659 ``move`` into silent no-ops. 

660 

661 Note that this disables Hypothesis' automatic discarding of stale examples. 

662 It is designed to allow local machines to access a shared database (e.g. from CI 

663 servers), without propagating changes back from a local or in-development branch. 

664 """ 

665 

666 def __init__(self, db: ExampleDatabase) -> None: 

667 super().__init__() 

668 assert isinstance(db, ExampleDatabase) 

669 self._wrapped = db 

670 

671 def __repr__(self) -> str: 

672 return f"ReadOnlyDatabase({self._wrapped!r})" 

673 

674 def __eq__(self, other: object) -> bool: 

675 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped 

676 

677 def fetch(self, key: bytes) -> Iterable[bytes]: 

678 yield from self._wrapped.fetch(key) 

679 

680 def save(self, key: bytes, value: bytes) -> None: 

681 pass 

682 

683 def delete(self, key: bytes, value: bytes) -> None: 

684 pass 

685 

686 def _start_listening(self) -> None: 

687 # we're read only, so there are no changes to broadcast. 

688 pass 

689 

690 def _stop_listening(self) -> None: 

691 pass 

692 

693 

694class MultiplexedDatabase(ExampleDatabase): 

695 """A wrapper around multiple databases. 

696 

697 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against 

698 all of the wrapped databases. ``fetch`` does not yield duplicate values, even 

699 if the same value is present in two or more of the wrapped databases. 

700 

701 This combines well with a :class:`ReadOnlyDatabase`, as follows: 

702 

703 .. code-block:: python 

704 

705 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/") 

706 shared = CustomNetworkDatabase() 

707 

708 settings.register_profile("ci", database=shared) 

709 settings.register_profile( 

710 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared)) 

711 ) 

712 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

713 

714 So your CI system or fuzzing runs can populate a central shared database; 

715 while local runs on development machines can reproduce any failures from CI 

716 but will only cache their own failures locally and cannot remove examples 

717 from the shared database. 

718 """ 

719 

720 def __init__(self, *dbs: ExampleDatabase) -> None: 

721 super().__init__() 

722 assert all(isinstance(db, ExampleDatabase) for db in dbs) 

723 self._wrapped = dbs 

724 

725 def __repr__(self) -> str: 

726 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped))) 

727 

728 def __eq__(self, other: object) -> bool: 

729 return ( 

730 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped 

731 ) 

732 

733 def fetch(self, key: bytes) -> Iterable[bytes]: 

734 seen = set() 

735 for db in self._wrapped: 

736 for value in db.fetch(key): 

737 if value not in seen: 

738 yield value 

739 seen.add(value) 

740 

741 def save(self, key: bytes, value: bytes) -> None: 

742 for db in self._wrapped: 

743 db.save(key, value) 

744 

745 def delete(self, key: bytes, value: bytes) -> None: 

746 for db in self._wrapped: 

747 db.delete(key, value) 

748 

749 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

750 for db in self._wrapped: 

751 db.move(src, dest, value) 

752 

753 def _start_listening(self) -> None: 

754 for db in self._wrapped: 

755 db.add_listener(self._broadcast_change) 

756 

757 def _stop_listening(self) -> None: 

758 for db in self._wrapped: 

759 db.remove_listener(self._broadcast_change) 

760 

761 

762class GitHubArtifactDatabase(ExampleDatabase): 

763 """ 

764 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact. 

765 

766 You can use this for sharing example databases between CI runs and developers, allowing 

767 the latter to get read-only access to the former. This is particularly useful for 

768 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_), 

769 where the CI system can help find new failing examples through fuzzing, 

770 and developers can reproduce them locally without any manual effort. 

771 

772 .. note:: 

773 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides 

774 this automatically, but it needs to be set manually for local usage. In a developer machine, 

775 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_. 

776 If the repository is private, it's necessary for the token to have ``repo`` scope 

777 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token. 

778 

779 

780 In most cases, this will be used 

781 through the :class:`~hypothesis.database.MultiplexedDatabase`, 

782 by combining a local directory-based database with this one. For example: 

783 

784 .. code-block:: python 

785 

786 local = DirectoryBasedExampleDatabase(".hypothesis/examples") 

787 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo")) 

788 

789 settings.register_profile("ci", database=local) 

790 settings.register_profile("dev", database=MultiplexedDatabase(local, shared)) 

791 # We don't want to use the shared database in CI, only to populate its local one. 

792 # which the workflow should then upload as an artifact. 

793 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

794 

795 .. note:: 

796 Because this database is read-only, you always need to wrap it with the 

797 :class:`ReadOnlyDatabase`. 

798 

799 A setup like this can be paired with a GitHub Actions workflow including 

800 something like the following: 

801 

802 .. code-block:: yaml 

803 

804 - name: Download example database 

805 uses: dawidd6/action-download-artifact@v9 

806 with: 

807 name: hypothesis-example-db 

808 path: .hypothesis/examples 

809 if_no_artifact_found: warn 

810 workflow_conclusion: completed 

811 

812 - name: Run tests 

813 run: pytest 

814 

815 - name: Upload example database 

816 uses: actions/upload-artifact@v3 

817 if: always() 

818 with: 

819 name: hypothesis-example-db 

820 path: .hypothesis/examples 

821 

822 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_ 

823 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_ 

824 does not support downloading artifacts from previous workflow runs. 

825 

826 The database automatically implements a simple file-based cache with a default expiration period 

827 of 1 day. You can adjust this through the ``cache_timeout`` property. 

828 

829 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``). 

830 """ 

831 

832 def __init__( 

833 self, 

834 owner: str, 

835 repo: str, 

836 artifact_name: str = "hypothesis-example-db", 

837 cache_timeout: timedelta = timedelta(days=1), 

838 path: Optional[StrPathT] = None, 

839 ): 

840 super().__init__() 

841 self.owner = owner 

842 self.repo = repo 

843 self.artifact_name = artifact_name 

844 self.cache_timeout = cache_timeout 

845 

846 # Get the GitHub token from the environment 

847 # It's unnecessary to use a token if the repo is public 

848 self.token: Optional[str] = getenv("GITHUB_TOKEN") 

849 

850 if path is None: 

851 self.path: Path = Path( 

852 storage_directory(f"github-artifacts/{self.artifact_name}/") 

853 ) 

854 else: 

855 self.path = Path(path) 

856 

857 # We don't want to initialize the cache until we need to 

858 self._initialized: bool = False 

859 self._disabled: bool = False 

860 

861 # This is the path to the artifact in usage 

862 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip 

863 self._artifact: Optional[Path] = None 

864 # This caches the artifact structure 

865 self._access_cache: Optional[dict[PurePath, set[PurePath]]] = None 

866 

867 # Message to display if user doesn't wrap around ReadOnlyDatabase 

868 self._read_only_message = ( 

869 "This database is read-only. " 

870 "Please wrap this class with ReadOnlyDatabase" 

871 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))." 

872 ) 

873 

874 def __repr__(self) -> str: 

875 return ( 

876 f"GitHubArtifactDatabase(owner={self.owner!r}, " 

877 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})" 

878 ) 

879 

880 def __eq__(self, other: object) -> bool: 

881 return ( 

882 isinstance(other, GitHubArtifactDatabase) 

883 and self.owner == other.owner 

884 and self.repo == other.repo 

885 and self.artifact_name == other.artifact_name 

886 and self.path == other.path 

887 ) 

888 

889 def _prepare_for_io(self) -> None: 

890 assert self._artifact is not None, "Artifact not loaded." 

891 

892 if self._initialized: # pragma: no cover 

893 return 

894 

895 # Test that the artifact is valid 

896 try: 

897 with ZipFile(self._artifact) as f: 

898 if f.testzip(): # pragma: no cover 

899 raise BadZipFile 

900 

901 # Turns out that testzip() doesn't work quite well 

902 # doing the cache initialization here instead 

903 # will give us more coverage of the artifact. 

904 

905 # Cache the files inside each keypath 

906 self._access_cache = {} 

907 with ZipFile(self._artifact) as zf: 

908 namelist = zf.namelist() 

909 # Iterate over files in the artifact 

910 for filename in namelist: 

911 fileinfo = zf.getinfo(filename) 

912 if fileinfo.is_dir(): 

913 self._access_cache[PurePath(filename)] = set() 

914 else: 

915 # Get the keypath from the filename 

916 keypath = PurePath(filename).parent 

917 # Add the file to the keypath 

918 self._access_cache[keypath].add(PurePath(filename)) 

919 except BadZipFile: 

920 warnings.warn( 

921 "The downloaded artifact from GitHub is invalid. " 

922 "This could be because the artifact was corrupted, " 

923 "or because the artifact was not created by Hypothesis. ", 

924 HypothesisWarning, 

925 stacklevel=3, 

926 ) 

927 self._disabled = True 

928 

929 self._initialized = True 

930 

931 def _initialize_db(self) -> None: 

932 # Trigger warning that we suppressed earlier by intent_to_write=False 

933 storage_directory(self.path.name) 

934 # Create the cache directory if it doesn't exist 

935 self.path.mkdir(exist_ok=True, parents=True) 

936 

937 # Get all artifacts 

938 cached_artifacts = sorted( 

939 self.path.glob("*.zip"), 

940 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")), 

941 ) 

942 

943 # Remove all but the latest artifact 

944 for artifact in cached_artifacts[:-1]: 

945 artifact.unlink() 

946 

947 try: 

948 found_artifact = cached_artifacts[-1] 

949 except IndexError: 

950 found_artifact = None 

951 

952 # Check if the latest artifact is a cache hit 

953 if found_artifact is not None and ( 

954 datetime.now(timezone.utc) 

955 - datetime.fromisoformat(found_artifact.stem.replace("_", ":")) 

956 < self.cache_timeout 

957 ): 

958 self._artifact = found_artifact 

959 else: 

960 # Download the latest artifact from GitHub 

961 new_artifact = self._fetch_artifact() 

962 

963 if new_artifact: 

964 if found_artifact is not None: 

965 found_artifact.unlink() 

966 self._artifact = new_artifact 

967 elif found_artifact is not None: 

968 warnings.warn( 

969 "Using an expired artifact as a fallback for the database: " 

970 f"{found_artifact}", 

971 HypothesisWarning, 

972 stacklevel=2, 

973 ) 

974 self._artifact = found_artifact 

975 else: 

976 warnings.warn( 

977 "Couldn't acquire a new or existing artifact. Disabling database.", 

978 HypothesisWarning, 

979 stacklevel=2, 

980 ) 

981 self._disabled = True 

982 return 

983 

984 self._prepare_for_io() 

985 

986 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover 

987 request = Request( 

988 url, 

989 headers={ 

990 "Accept": "application/vnd.github+json", 

991 "X-GitHub-Api-Version": "2022-11-28 ", 

992 "Authorization": f"Bearer {self.token}", 

993 }, 

994 ) 

995 warning_message = None 

996 response_bytes: Optional[bytes] = None 

997 try: 

998 with urlopen(request) as response: 

999 response_bytes = response.read() 

1000 except HTTPError as e: 

1001 if e.code == 401: 

1002 warning_message = ( 

1003 "Authorization failed when trying to download artifact from GitHub. " 

1004 "Check that you have a valid GITHUB_TOKEN set in your environment." 

1005 ) 

1006 else: 

1007 warning_message = ( 

1008 "Could not get the latest artifact from GitHub. " 

1009 "This could be because because the repository " 

1010 "or artifact does not exist. " 

1011 ) 

1012 except URLError: 

1013 warning_message = "Could not connect to GitHub to get the latest artifact. " 

1014 except TimeoutError: 

1015 warning_message = ( 

1016 "Could not connect to GitHub to get the latest artifact " 

1017 "(connection timed out)." 

1018 ) 

1019 

1020 if warning_message is not None: 

1021 warnings.warn(warning_message, HypothesisWarning, stacklevel=4) 

1022 return None 

1023 

1024 return response_bytes 

1025 

1026 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover 

1027 # Get the list of artifacts from GitHub 

1028 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts" 

1029 response_bytes = self._get_bytes(url) 

1030 if response_bytes is None: 

1031 return None 

1032 

1033 artifacts = json.loads(response_bytes)["artifacts"] 

1034 artifacts = [a for a in artifacts if a["name"] == self.artifact_name] 

1035 

1036 if not artifacts: 

1037 return None 

1038 

1039 # Get the latest artifact from the list 

1040 artifact = max(artifacts, key=lambda a: a["created_at"]) 

1041 url = artifact["archive_download_url"] 

1042 

1043 # Download the artifact 

1044 artifact_bytes = self._get_bytes(url) 

1045 if artifact_bytes is None: 

1046 return None 

1047 

1048 # Save the artifact to the cache 

1049 # We replace ":" with "_" to ensure the filenames are compatible 

1050 # with Windows filesystems 

1051 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_") 

1052 artifact_path = self.path / f"{timestamp}.zip" 

1053 try: 

1054 artifact_path.write_bytes(artifact_bytes) 

1055 except OSError: 

1056 warnings.warn( 

1057 "Could not save the latest artifact from GitHub. ", 

1058 HypothesisWarning, 

1059 stacklevel=3, 

1060 ) 

1061 return None 

1062 

1063 return artifact_path 

1064 

1065 @staticmethod 

1066 @lru_cache 

1067 def _key_path(key: bytes) -> PurePath: 

1068 return PurePath(_hash(key) + "/") 

1069 

1070 def fetch(self, key: bytes) -> Iterable[bytes]: 

1071 if self._disabled: 

1072 return 

1073 

1074 if not self._initialized: 

1075 self._initialize_db() 

1076 if self._disabled: 

1077 return 

1078 

1079 assert self._artifact is not None 

1080 assert self._access_cache is not None 

1081 

1082 kp = self._key_path(key) 

1083 

1084 with ZipFile(self._artifact) as zf: 

1085 # Get the all files in the the kp from the cache 

1086 filenames = self._access_cache.get(kp, ()) 

1087 for filename in filenames: 

1088 with zf.open(filename.as_posix()) as f: 

1089 yield f.read() 

1090 

1091 # Read-only interface 

1092 def save(self, key: bytes, value: bytes) -> None: 

1093 raise RuntimeError(self._read_only_message) 

1094 

1095 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1096 raise RuntimeError(self._read_only_message) 

1097 

1098 def delete(self, key: bytes, value: bytes) -> None: 

1099 raise RuntimeError(self._read_only_message) 

1100 

1101 

1102class BackgroundWriteDatabase(ExampleDatabase): 

1103 """A wrapper which defers writes on the given database to a background thread. 

1104 

1105 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any 

1106 enqueued writes to finish before fetching from the database. 

1107 """ 

1108 

1109 def __init__(self, db: ExampleDatabase) -> None: 

1110 super().__init__() 

1111 self._db = db 

1112 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue() 

1113 self._thread: Optional[Thread] = None 

1114 

1115 def _ensure_thread(self): 

1116 if self._thread is None: 

1117 self._thread = Thread(target=self._worker, daemon=True) 

1118 self._thread.start() 

1119 # avoid an unbounded timeout during gc. 0.1 should be plenty for most 

1120 # use cases. 

1121 weakref.finalize(self, self._join, 0.1) 

1122 

1123 def __repr__(self) -> str: 

1124 return f"BackgroundWriteDatabase({self._db!r})" 

1125 

1126 def __eq__(self, other: object) -> bool: 

1127 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db 

1128 

1129 def _worker(self) -> None: 

1130 while True: 

1131 method, args = self._queue.get() 

1132 getattr(self._db, method)(*args) 

1133 self._queue.task_done() 

1134 

1135 def _join(self, timeout: Optional[float] = None) -> None: 

1136 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634 

1137 with self._queue.all_tasks_done: 

1138 while self._queue.unfinished_tasks: 

1139 self._queue.all_tasks_done.wait(timeout) 

1140 

1141 def fetch(self, key: bytes) -> Iterable[bytes]: 

1142 self._join() 

1143 return self._db.fetch(key) 

1144 

1145 def save(self, key: bytes, value: bytes) -> None: 

1146 self._ensure_thread() 

1147 self._queue.put(("save", (key, value))) 

1148 

1149 def delete(self, key: bytes, value: bytes) -> None: 

1150 self._ensure_thread() 

1151 self._queue.put(("delete", (key, value))) 

1152 

1153 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1154 self._ensure_thread() 

1155 self._queue.put(("move", (src, dest, value))) 

1156 

1157 def _start_listening(self) -> None: 

1158 self._db.add_listener(self._broadcast_change) 

1159 

1160 def _stop_listening(self) -> None: 

1161 self._db.remove_listener(self._broadcast_change) 

1162 

1163 

1164def _pack_uleb128(value: int) -> bytes: 

1165 """ 

1166 Serialize an integer into variable-length bytes. For each byte, the first 7 

1167 bits represent (part of) the integer, while the last bit indicates whether the 

1168 integer continues into the next byte. 

1169 

1170 https://en.wikipedia.org/wiki/LEB128 

1171 """ 

1172 parts = bytearray() 

1173 assert value >= 0 

1174 while True: 

1175 # chop off 7 bits 

1176 byte = value & ((1 << 7) - 1) 

1177 value >>= 7 

1178 # set the continuation bit if we have more left 

1179 if value: 

1180 byte |= 1 << 7 

1181 

1182 parts.append(byte) 

1183 if not value: 

1184 break 

1185 return bytes(parts) 

1186 

1187 

1188def _unpack_uleb128(buffer: bytes) -> tuple[int, int]: 

1189 """ 

1190 Inverts _pack_uleb128, and also returns the index at which at which we stopped 

1191 reading. 

1192 """ 

1193 value = 0 

1194 for i, byte in enumerate(buffer): 

1195 n = byte & ((1 << 7) - 1) 

1196 value |= n << (i * 7) 

1197 

1198 if not byte >> 7: 

1199 break 

1200 return (i + 1, value) 

1201 

1202 

1203def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes: 

1204 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes.""" 

1205 # We use a custom serialization format for this, which might seem crazy - but our 

1206 # data is a flat sequence of elements, and standard tools like protobuf or msgpack 

1207 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode. 

1208 # 

1209 # We simply encode each element with a metadata byte, if needed a uint16 size, and 

1210 # then the payload bytes. For booleans, the payload is inlined into the metadata. 

1211 parts = [] 

1212 for choice in choices: 

1213 if isinstance(choice, bool): 

1214 # `000_0000v` - tag zero, low bit payload. 

1215 parts.append(b"\1" if choice else b"\0") 

1216 continue 

1217 

1218 # `tag_ssss [uint16 size?] [payload]` 

1219 if isinstance(choice, float): 

1220 tag = 1 << 5 

1221 choice = struct.pack("!d", choice) 

1222 elif isinstance(choice, int): 

1223 tag = 2 << 5 

1224 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True) 

1225 elif isinstance(choice, bytes): 

1226 tag = 3 << 5 

1227 else: 

1228 assert isinstance(choice, str) 

1229 tag = 4 << 5 

1230 choice = choice.encode(errors="surrogatepass") 

1231 

1232 size = len(choice) 

1233 if size < 0b11111: 

1234 parts.append((tag | size).to_bytes(1, "big")) 

1235 else: 

1236 parts.append((tag | 0b11111).to_bytes(1, "big")) 

1237 parts.append(_pack_uleb128(size)) 

1238 parts.append(choice) 

1239 

1240 return b"".join(parts) 

1241 

1242 

1243def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]: 

1244 # See above for an explanation of the format. 

1245 parts: list[ChoiceT] = [] 

1246 idx = 0 

1247 while idx < len(buffer): 

1248 tag = buffer[idx] >> 5 

1249 size = buffer[idx] & 0b11111 

1250 idx += 1 

1251 

1252 if tag == 0: 

1253 parts.append(bool(size)) 

1254 continue 

1255 if size == 0b11111: 

1256 (offset, size) = _unpack_uleb128(buffer[idx:]) 

1257 idx += offset 

1258 chunk = buffer[idx : idx + size] 

1259 idx += size 

1260 

1261 if tag == 1: 

1262 assert size == 8, "expected float64" 

1263 parts.extend(struct.unpack("!d", chunk)) 

1264 elif tag == 2: 

1265 parts.append(int.from_bytes(chunk, "big", signed=True)) 

1266 elif tag == 3: 

1267 parts.append(chunk) 

1268 else: 

1269 assert tag == 4 

1270 parts.append(chunk.decode(errors="surrogatepass")) 

1271 return tuple(parts) 

1272 

1273 

1274def choices_from_bytes(buffer: bytes, /) -> Optional[tuple[ChoiceT, ...]]: 

1275 """ 

1276 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes. 

1277 

1278 Returns None if the given bytestring is not a valid serialization of choice 

1279 sequences. 

1280 """ 

1281 try: 

1282 return _choices_from_bytes(buffer) 

1283 except Exception: 

1284 # deserialization error, eg because our format changed or someone put junk 

1285 # data in the db. 

1286 return None