1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import abc
12import errno
13import json
14import os
15import struct
16import sys
17import tempfile
18import warnings
19import weakref
20from collections.abc import Iterable
21from datetime import datetime, timedelta, timezone
22from functools import lru_cache
23from hashlib import sha384
24from os import PathLike, getenv
25from pathlib import Path, PurePath
26from queue import Queue
27from threading import Thread
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 Callable,
32 ClassVar,
33 Literal,
34 Optional,
35 Union,
36 cast,
37)
38from urllib.error import HTTPError, URLError
39from urllib.request import Request, urlopen
40from zipfile import BadZipFile, ZipFile
41
42from hypothesis._settings import note_deprecation
43from hypothesis.configuration import storage_directory
44from hypothesis.errors import HypothesisException, HypothesisWarning
45from hypothesis.internal.conjecture.choice import ChoiceT
46from hypothesis.utils.conventions import UniqueIdentifier, not_set
47
48__all__ = [
49 "DirectoryBasedExampleDatabase",
50 "ExampleDatabase",
51 "GitHubArtifactDatabase",
52 "InMemoryExampleDatabase",
53 "MultiplexedDatabase",
54 "ReadOnlyDatabase",
55]
56
57if TYPE_CHECKING:
58 from typing import TypeAlias
59
60 from watchdog.observers.api import BaseObserver
61
62StrPathT: "TypeAlias" = Union[str, PathLike[str]]
63SaveDataT: "TypeAlias" = tuple[bytes, bytes] # key, value
64DeleteDataT: "TypeAlias" = tuple[bytes, Optional[bytes]] # key, value
65ListenerEventT: "TypeAlias" = Union[
66 tuple[Literal["save"], SaveDataT], tuple[Literal["delete"], DeleteDataT]
67]
68ListenerT: "TypeAlias" = Callable[[ListenerEventT], Any]
69
70
71def _usable_dir(path: StrPathT) -> bool:
72 """
73 Returns True if the desired path can be used as database path because
74 either the directory exists and can be used, or its root directory can
75 be used and we can make the directory as needed.
76 """
77 path = Path(path)
78 try:
79 while not path.exists():
80 # Loop terminates because the root dir ('/' on unix) always exists.
81 path = path.parent
82 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK)
83 except PermissionError:
84 return False
85
86
87def _db_for_path(
88 path: Optional[Union[StrPathT, UniqueIdentifier, Literal[":memory:"]]] = None,
89) -> "ExampleDatabase":
90 if path is not_set:
91 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover
92 raise HypothesisException(
93 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any "
94 "effect. Configure your database location via a settings profile instead.\n"
95 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles"
96 )
97
98 path = storage_directory("examples", intent_to_write=False)
99 if not _usable_dir(path): # pragma: no cover
100 warnings.warn(
101 "The database setting is not configured, and the default "
102 "location is unusable - falling back to an in-memory "
103 f"database for this session. {path=}",
104 HypothesisWarning,
105 stacklevel=3,
106 )
107 return InMemoryExampleDatabase()
108 if path in (None, ":memory:"):
109 return InMemoryExampleDatabase()
110 path = cast(StrPathT, path)
111 return DirectoryBasedExampleDatabase(path)
112
113
114class _EDMeta(abc.ABCMeta):
115 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase":
116 if self is ExampleDatabase:
117 note_deprecation(
118 "Creating a database using the abstract ExampleDatabase() class "
119 "is deprecated. Prefer using a concrete subclass, like "
120 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). "
121 'In particular, the special string ExampleDatabase(":memory:") '
122 "should be replaced by InMemoryExampleDatabase().",
123 since="2025-04-07",
124 has_codemod=False,
125 )
126 return _db_for_path(*args, **kwargs)
127 return super().__call__(*args, **kwargs)
128
129
130# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase
131# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx
132# maintains a list of metaclass-call-methods to ignore, and while they would prefer
133# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we
134# can insert ourselves here.
135#
136# This code only runs if Sphinx has already been imported; and it would live in our
137# docs/conf.py except that we would also like it to work for anyone documenting
138# downstream ExampleDatabase subclasses too.
139if "sphinx" in sys.modules:
140 try:
141 import sphinx.ext.autodoc
142
143 signature = "hypothesis.database._EDMeta.__call__"
144 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions
145 if isinstance(sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST, frozenset):
146 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST = (
147 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST | {signature}
148 )
149 else:
150 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST.append(signature)
151 except Exception:
152 pass
153
154
155class ExampleDatabase(metaclass=_EDMeta):
156 """
157 A Hypothesis database, for use in |settings.database|.
158
159 Hypothesis automatically saves failures to the database set in
160 |settings.database|. The next time the test is run, Hypothesis will replay
161 any failures from the database in |settings.database| for that test (in
162 |Phase.reuse|).
163
164 The database is best thought of as a cache that you never need to invalidate.
165 Entries may be transparently dropped when upgrading your Hypothesis version
166 or changing your test. Do not rely on the database for correctness; to ensure
167 Hypothesis always tries an input, use |@example|.
168
169 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis
170 provides several concrete database subclasses. To write your own database class,
171 see :doc:`/how-to/custom-database`.
172
173 Change listening
174 ----------------
175
176 An optional extension to |ExampleDatabase| is change listening. On databases
177 which support change listening, calling |ExampleDatabase.add_listener| adds
178 a function as a change listener, which will be called whenever a value is
179 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener|
180 for details.
181
182 All databases in Hypothesis support change listening. Custom database classes
183 are not required to support change listening, though they will not be compatible
184 with features that require change listening until they do so.
185
186 .. note::
187
188 While no Hypothesis features currently require change listening, change
189 listening is required by `HypoFuzz <https://hypofuzz.com/>`_.
190
191 Database methods
192 ----------------
193
194 Required methods:
195
196 * |ExampleDatabase.save|
197 * |ExampleDatabase.fetch|
198 * |ExampleDatabase.delete|
199
200 Optional methods:
201
202 * |ExampleDatabase.move|
203
204 Change listening methods:
205
206 * |ExampleDatabase.add_listener|
207 * |ExampleDatabase.remove_listener|
208 * |ExampleDatabase.clear_listeners|
209 * |ExampleDatabase._start_listening|
210 * |ExampleDatabase._stop_listening|
211 * |ExampleDatabase._broadcast_change|
212 """
213
214 def __init__(self) -> None:
215 self._listeners: list[ListenerT] = []
216
217 @abc.abstractmethod
218 def save(self, key: bytes, value: bytes) -> None:
219 """Save ``value`` under ``key``.
220
221 If ``value`` is already present in ``key``, silently do nothing.
222 """
223 raise NotImplementedError(f"{type(self).__name__}.save")
224
225 @abc.abstractmethod
226 def fetch(self, key: bytes) -> Iterable[bytes]:
227 """Return an iterable over all values matching this key."""
228 raise NotImplementedError(f"{type(self).__name__}.fetch")
229
230 @abc.abstractmethod
231 def delete(self, key: bytes, value: bytes) -> None:
232 """Remove ``value`` from ``key``.
233
234 If ``value`` is not present in ``key``, silently do nothing.
235 """
236 raise NotImplementedError(f"{type(self).__name__}.delete")
237
238 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
239 """
240 Move ``value`` from key ``src`` to key ``dest``.
241
242 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``,
243 but may have a more efficient implementation.
244
245 Note that ``value`` will be inserted at ``dest`` regardless of whether
246 it is currently present at ``src``.
247 """
248 if src == dest:
249 self.save(src, value)
250 return
251 self.delete(src, value)
252 self.save(dest, value)
253
254 def add_listener(self, f: ListenerT, /) -> None:
255 """
256 Add a change listener. ``f`` will be called whenever a value is saved,
257 deleted, or moved in the database.
258
259 ``f`` can be called with two different event values:
260
261 * ``("save", (key, value))``
262 * ``("delete", (key, value))``
263
264 where ``key`` and ``value`` are both ``bytes``.
265
266 There is no ``move`` event. Instead, a move is broadcasted as a
267 ``delete`` event followed by a ``save`` event.
268
269 For the ``delete`` event, ``value`` may be ``None``. This might occur if
270 the database knows that a deletion has occurred in ``key``, but does not
271 know what value was deleted.
272 """
273 had_listeners = bool(self._listeners)
274 self._listeners.append(f)
275 if not had_listeners:
276 self._start_listening()
277
278 def remove_listener(self, f: ListenerT, /) -> None:
279 """
280 Removes ``f`` from the list of change listeners.
281
282 If ``f`` is not in the list of change listeners, silently do nothing.
283 """
284 if f not in self._listeners:
285 return
286 self._listeners.remove(f)
287 if not self._listeners:
288 self._stop_listening()
289
290 def clear_listeners(self) -> None:
291 """Remove all change listeners."""
292 had_listeners = bool(self._listeners)
293 self._listeners.clear()
294 if had_listeners:
295 self._stop_listening()
296
297 def _broadcast_change(self, event: ListenerEventT) -> None:
298 """
299 Called when a value has been either added to or deleted from a key in
300 the underlying database store. The possible values for ``event`` are:
301
302 * ``("save", (key, value))``
303 * ``("delete", (key, value))``
304
305 ``value`` may be ``None`` for the ``delete`` event, indicating we know
306 that some value was deleted under this key, but not its exact value.
307
308 Note that you should not assume your instance is the only reference to
309 the underlying database store. For example, if two instances of
310 |DirectoryBasedExampleDatabase| reference the same directory,
311 _broadcast_change should be called whenever a file is added or removed
312 from the directory, even if that database was not responsible for
313 changing the file.
314 """
315 for listener in self._listeners:
316 listener(event)
317
318 def _start_listening(self) -> None:
319 """
320 Called when the database adds a change listener, and did not previously
321 have any change listeners. Intended to allow databases to wait to start
322 expensive listening operations until necessary.
323
324 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate,
325 so you do not need to handle the case of multiple consecutive
326 ``_start_listening`` calls without an intermediate ``_stop_listening``
327 call.
328 """
329 warnings.warn(
330 f"{self.__class__} does not support listening for changes",
331 HypothesisWarning,
332 stacklevel=4,
333 )
334
335 def _stop_listening(self) -> None:
336 """
337 Called whenever no change listeners remain on the database.
338
339 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate,
340 so you do not need to handle the case of multiple consecutive
341 ``_stop_listening`` calls without an intermediate ``_start_listening``
342 call.
343 """
344 warnings.warn(
345 f"{self.__class__} does not support stopping listening for changes",
346 HypothesisWarning,
347 stacklevel=4,
348 )
349
350
351class InMemoryExampleDatabase(ExampleDatabase):
352 """A non-persistent example database, implemented in terms of an in-memory
353 dictionary.
354
355 This can be useful if you call a test function several times in a single
356 session, or for testing other database implementations, but because it
357 does not persist between runs we do not recommend it for general use.
358 """
359
360 def __init__(self) -> None:
361 super().__init__()
362 self.data: dict[bytes, set[bytes]] = {}
363
364 def __repr__(self) -> str:
365 return f"InMemoryExampleDatabase({self.data!r})"
366
367 def __eq__(self, other: object) -> bool:
368 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data
369
370 def fetch(self, key: bytes) -> Iterable[bytes]:
371 yield from self.data.get(key, ())
372
373 def save(self, key: bytes, value: bytes) -> None:
374 value = bytes(value)
375 values = self.data.setdefault(key, set())
376 changed = value not in values
377 values.add(value)
378
379 if changed:
380 self._broadcast_change(("save", (key, value)))
381
382 def delete(self, key: bytes, value: bytes) -> None:
383 value = bytes(value)
384 values = self.data.get(key, set())
385 changed = value in values
386 values.discard(value)
387
388 if changed:
389 self._broadcast_change(("delete", (key, value)))
390
391 def _start_listening(self) -> None:
392 # declare compatibility with the listener api, but do the actual
393 # implementation in .delete and .save, since we know we are the only
394 # writer to .data.
395 pass
396
397 def _stop_listening(self) -> None:
398 pass
399
400
401def _hash(key: bytes) -> str:
402 return sha384(key).hexdigest()[:16]
403
404
405class DirectoryBasedExampleDatabase(ExampleDatabase):
406 """Use a directory to store Hypothesis examples as files.
407
408 Each test corresponds to a directory, and each example to a file within that
409 directory. While the contents are fairly opaque, a
410 |DirectoryBasedExampleDatabase| can be shared by checking the directory
411 into version control, for example with the following ``.gitignore``::
412
413 # Ignore files cached by Hypothesis...
414 .hypothesis/*
415 # except for the examples directory
416 !.hypothesis/examples/
417
418 Note however that this only makes sense if you also pin to an exact version of
419 Hypothesis, and we would usually recommend implementing a shared database with
420 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper.
421 """
422
423 # we keep a database entry of the full values of all the database keys.
424 # currently only used for inverse mapping of hash -> key in change listening.
425 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys"
426 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name)
427
428 def __init__(self, path: StrPathT) -> None:
429 super().__init__()
430 self.path = Path(path)
431 self.keypaths: dict[bytes, Path] = {}
432 self._observer: BaseObserver | None = None
433
434 def __repr__(self) -> str:
435 return f"DirectoryBasedExampleDatabase({self.path!r})"
436
437 def __eq__(self, other: object) -> bool:
438 return (
439 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path
440 )
441
442 def _key_path(self, key: bytes) -> Path:
443 try:
444 return self.keypaths[key]
445 except KeyError:
446 pass
447 self.keypaths[key] = self.path / _hash(key)
448 return self.keypaths[key]
449
450 def _value_path(self, key: bytes, value: bytes) -> Path:
451 return self._key_path(key) / _hash(value)
452
453 def fetch(self, key: bytes) -> Iterable[bytes]:
454 kp = self._key_path(key)
455 if not kp.is_dir():
456 return
457
458 try:
459 for path in os.listdir(kp):
460 try:
461 yield (kp / path).read_bytes()
462 except OSError:
463 pass
464 except OSError: # pragma: no cover
465 # the `kp` directory might have been deleted in the meantime
466 pass
467
468 def save(self, key: bytes, value: bytes) -> None:
469 key_path = self._key_path(key)
470 if key_path.name != self._metakeys_hash:
471 # add this key to our meta entry of all keys - taking care to avoid
472 # infinite recursion.
473 self.save(self._metakeys_name, key)
474
475 # Note: we attempt to create the dir in question now. We
476 # already checked for permissions, but there can still be other issues,
477 # e.g. the disk is full, or permissions might have been changed.
478 try:
479 key_path.mkdir(exist_ok=True, parents=True)
480 path = self._value_path(key, value)
481 if not path.exists():
482 # to mimic an atomic write, create and write in a temporary
483 # directory, and only move to the final path after. This avoids
484 # any intermediate state where the file is created (and empty)
485 # but not yet written to.
486 fd, tmpname = tempfile.mkstemp()
487 tmppath = Path(tmpname)
488 os.write(fd, value)
489 os.close(fd)
490 try:
491 tmppath.rename(path)
492 except OSError as err: # pragma: no cover
493 if err.errno == errno.EXDEV:
494 # Can't rename across filesystem boundaries, see e.g.
495 # https://github.com/HypothesisWorks/hypothesis/issues/4335
496 try:
497 path.write_bytes(tmppath.read_bytes())
498 except OSError:
499 pass
500 tmppath.unlink()
501 assert not tmppath.exists()
502 except OSError: # pragma: no cover
503 pass
504
505 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
506 if src == dest:
507 self.save(src, value)
508 return
509
510 src_path = self._value_path(src, value)
511 dest_path = self._value_path(dest, value)
512 # if the dest key path does not exist, os.renames will create it for us,
513 # and we will never track its creation in the meta keys entry. Do so now.
514 if not self._key_path(dest).exists():
515 self.save(self._metakeys_name, dest)
516
517 try:
518 os.renames(src_path, dest_path)
519 except OSError:
520 self.delete(src, value)
521 self.save(dest, value)
522
523 def delete(self, key: bytes, value: bytes) -> None:
524 try:
525 self._value_path(key, value).unlink()
526 except OSError:
527 return
528
529 # try deleting the key dir, which will only succeed if the dir is empty
530 # (i.e. ``value`` was the last value in this key).
531 try:
532 self._key_path(key).rmdir()
533 except OSError:
534 pass
535 else:
536 # if the deletion succeeded, also delete this key entry from metakeys.
537 # (if this key happens to be the metakey itself, this deletion will
538 # fail; that's ok and faster than checking for this rare case.)
539 self.delete(self._metakeys_name, key)
540
541 def _start_listening(self) -> None:
542 try:
543 from watchdog.events import (
544 DirCreatedEvent,
545 DirDeletedEvent,
546 DirMovedEvent,
547 FileCreatedEvent,
548 FileDeletedEvent,
549 FileMovedEvent,
550 FileSystemEventHandler,
551 )
552 from watchdog.observers import Observer
553 except ImportError:
554 warnings.warn(
555 f"listening for changes in a {self.__class__.__name__} "
556 "requires the watchdog library. To install, run "
557 "`pip install hypothesis[watchdog]`",
558 HypothesisWarning,
559 stacklevel=4,
560 )
561 return
562
563 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)}
564 _metakeys_hash = self._metakeys_hash
565 _broadcast_change = self._broadcast_change
566
567 class Handler(FileSystemEventHandler):
568 def on_created(
569 _self, event: Union[FileCreatedEvent, DirCreatedEvent]
570 ) -> None:
571 # we only registered for the file creation event
572 assert not isinstance(event, DirCreatedEvent)
573 # watchdog events are only bytes if we passed a byte path to
574 # .schedule
575 assert isinstance(event.src_path, str)
576
577 value_path = Path(event.src_path)
578 # the parent dir represents the key, and its name is the key hash
579 key_hash = value_path.parent.name
580
581 if key_hash == _metakeys_hash:
582 try:
583 hash_to_key[value_path.name] = value_path.read_bytes()
584 except OSError: # pragma: no cover
585 # this might occur if all the values in a key have been
586 # deleted and DirectoryBasedExampleDatabase removes its
587 # metakeys entry (which is `value_path` here`).
588 pass
589 return
590
591 key = hash_to_key.get(key_hash)
592 if key is None: # pragma: no cover
593 # we didn't recognize this key. This shouldn't ever happen,
594 # but some race condition trickery might cause this.
595 return
596
597 try:
598 value = value_path.read_bytes()
599 except OSError: # pragma: no cover
600 return
601
602 _broadcast_change(("save", (key, value)))
603
604 def on_deleted(
605 self, event: Union[FileDeletedEvent, DirDeletedEvent]
606 ) -> None:
607 assert not isinstance(event, DirDeletedEvent)
608 assert isinstance(event.src_path, str)
609
610 value_path = Path(event.src_path)
611 key = hash_to_key.get(value_path.parent.name)
612 if key is None: # pragma: no cover
613 return
614
615 _broadcast_change(("delete", (key, None)))
616
617 def on_moved(self, event: Union[FileMovedEvent, DirMovedEvent]) -> None:
618 assert not isinstance(event, DirMovedEvent)
619 assert isinstance(event.src_path, str)
620 assert isinstance(event.dest_path, str)
621
622 src_path = Path(event.src_path)
623 dest_path = Path(event.dest_path)
624 k1 = hash_to_key.get(src_path.parent.name)
625 k2 = hash_to_key.get(dest_path.parent.name)
626
627 if k1 is None or k2 is None: # pragma: no cover
628 return
629
630 try:
631 value = dest_path.read_bytes()
632 except OSError: # pragma: no cover
633 return
634
635 _broadcast_change(("delete", (k1, value)))
636 _broadcast_change(("save", (k2, value)))
637
638 # If we add a listener to a DirectoryBasedExampleDatabase whose database
639 # directory doesn't yet exist, the watchdog observer will not fire any
640 # events, even after the directory gets created.
641 #
642 # Ensure the directory exists before starting the observer.
643 self.path.mkdir(exist_ok=True, parents=True)
644 self._observer = Observer()
645 self._observer.schedule(
646 Handler(),
647 # remove type: ignore when released
648 # https://github.com/gorakhargosh/watchdog/pull/1096
649 self.path, # type: ignore
650 recursive=True,
651 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent],
652 )
653 self._observer.start()
654
655 def _stop_listening(self) -> None:
656 assert self._observer is not None
657 self._observer.stop()
658 self._observer.join()
659 self._observer = None
660
661
662class ReadOnlyDatabase(ExampleDatabase):
663 """A wrapper to make the given database read-only.
664
665 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and
666 ``move`` into silent no-ops.
667
668 Note that this disables Hypothesis' automatic discarding of stale examples.
669 It is designed to allow local machines to access a shared database (e.g. from CI
670 servers), without propagating changes back from a local or in-development branch.
671 """
672
673 def __init__(self, db: ExampleDatabase) -> None:
674 super().__init__()
675 assert isinstance(db, ExampleDatabase)
676 self._wrapped = db
677
678 def __repr__(self) -> str:
679 return f"ReadOnlyDatabase({self._wrapped!r})"
680
681 def __eq__(self, other: object) -> bool:
682 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped
683
684 def fetch(self, key: bytes) -> Iterable[bytes]:
685 yield from self._wrapped.fetch(key)
686
687 def save(self, key: bytes, value: bytes) -> None:
688 pass
689
690 def delete(self, key: bytes, value: bytes) -> None:
691 pass
692
693 def _start_listening(self) -> None:
694 # we're read only, so there are no changes to broadcast.
695 pass
696
697 def _stop_listening(self) -> None:
698 pass
699
700
701class MultiplexedDatabase(ExampleDatabase):
702 """A wrapper around multiple databases.
703
704 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against
705 all of the wrapped databases. ``fetch`` does not yield duplicate values, even
706 if the same value is present in two or more of the wrapped databases.
707
708 This combines well with a :class:`ReadOnlyDatabase`, as follows:
709
710 .. code-block:: python
711
712 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/")
713 shared = CustomNetworkDatabase()
714
715 settings.register_profile("ci", database=shared)
716 settings.register_profile(
717 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared))
718 )
719 settings.load_profile("ci" if os.environ.get("CI") else "dev")
720
721 So your CI system or fuzzing runs can populate a central shared database;
722 while local runs on development machines can reproduce any failures from CI
723 but will only cache their own failures locally and cannot remove examples
724 from the shared database.
725 """
726
727 def __init__(self, *dbs: ExampleDatabase) -> None:
728 super().__init__()
729 assert all(isinstance(db, ExampleDatabase) for db in dbs)
730 self._wrapped = dbs
731
732 def __repr__(self) -> str:
733 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped)))
734
735 def __eq__(self, other: object) -> bool:
736 return (
737 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped
738 )
739
740 def fetch(self, key: bytes) -> Iterable[bytes]:
741 seen = set()
742 for db in self._wrapped:
743 for value in db.fetch(key):
744 if value not in seen:
745 yield value
746 seen.add(value)
747
748 def save(self, key: bytes, value: bytes) -> None:
749 for db in self._wrapped:
750 db.save(key, value)
751
752 def delete(self, key: bytes, value: bytes) -> None:
753 for db in self._wrapped:
754 db.delete(key, value)
755
756 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
757 for db in self._wrapped:
758 db.move(src, dest, value)
759
760 def _start_listening(self) -> None:
761 for db in self._wrapped:
762 db.add_listener(self._broadcast_change)
763
764 def _stop_listening(self) -> None:
765 for db in self._wrapped:
766 db.remove_listener(self._broadcast_change)
767
768
769class GitHubArtifactDatabase(ExampleDatabase):
770 """
771 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact.
772
773 You can use this for sharing example databases between CI runs and developers, allowing
774 the latter to get read-only access to the former. This is particularly useful for
775 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_),
776 where the CI system can help find new failing examples through fuzzing,
777 and developers can reproduce them locally without any manual effort.
778
779 .. note::
780 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides
781 this automatically, but it needs to be set manually for local usage. In a developer machine,
782 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_.
783 If the repository is private, it's necessary for the token to have ``repo`` scope
784 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token.
785
786
787 In most cases, this will be used
788 through the :class:`~hypothesis.database.MultiplexedDatabase`,
789 by combining a local directory-based database with this one. For example:
790
791 .. code-block:: python
792
793 local = DirectoryBasedExampleDatabase(".hypothesis/examples")
794 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo"))
795
796 settings.register_profile("ci", database=local)
797 settings.register_profile("dev", database=MultiplexedDatabase(local, shared))
798 # We don't want to use the shared database in CI, only to populate its local one.
799 # which the workflow should then upload as an artifact.
800 settings.load_profile("ci" if os.environ.get("CI") else "dev")
801
802 .. note::
803 Because this database is read-only, you always need to wrap it with the
804 :class:`ReadOnlyDatabase`.
805
806 A setup like this can be paired with a GitHub Actions workflow including
807 something like the following:
808
809 .. code-block:: yaml
810
811 - name: Download example database
812 uses: dawidd6/action-download-artifact@v9
813 with:
814 name: hypothesis-example-db
815 path: .hypothesis/examples
816 if_no_artifact_found: warn
817 workflow_conclusion: completed
818
819 - name: Run tests
820 run: pytest
821
822 - name: Upload example database
823 uses: actions/upload-artifact@v3
824 if: always()
825 with:
826 name: hypothesis-example-db
827 path: .hypothesis/examples
828
829 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_
830 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_
831 does not support downloading artifacts from previous workflow runs.
832
833 The database automatically implements a simple file-based cache with a default expiration period
834 of 1 day. You can adjust this through the ``cache_timeout`` property.
835
836 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``).
837 """
838
839 def __init__(
840 self,
841 owner: str,
842 repo: str,
843 artifact_name: str = "hypothesis-example-db",
844 cache_timeout: timedelta = timedelta(days=1),
845 path: Optional[StrPathT] = None,
846 ):
847 super().__init__()
848 self.owner = owner
849 self.repo = repo
850 self.artifact_name = artifact_name
851 self.cache_timeout = cache_timeout
852
853 # Get the GitHub token from the environment
854 # It's unnecessary to use a token if the repo is public
855 self.token: Optional[str] = getenv("GITHUB_TOKEN")
856
857 if path is None:
858 self.path: Path = Path(
859 storage_directory(f"github-artifacts/{self.artifact_name}/")
860 )
861 else:
862 self.path = Path(path)
863
864 # We don't want to initialize the cache until we need to
865 self._initialized: bool = False
866 self._disabled: bool = False
867
868 # This is the path to the artifact in usage
869 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip
870 self._artifact: Optional[Path] = None
871 # This caches the artifact structure
872 self._access_cache: Optional[dict[PurePath, set[PurePath]]] = None
873
874 # Message to display if user doesn't wrap around ReadOnlyDatabase
875 self._read_only_message = (
876 "This database is read-only. "
877 "Please wrap this class with ReadOnlyDatabase"
878 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))."
879 )
880
881 def __repr__(self) -> str:
882 return (
883 f"GitHubArtifactDatabase(owner={self.owner!r}, "
884 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})"
885 )
886
887 def __eq__(self, other: object) -> bool:
888 return (
889 isinstance(other, GitHubArtifactDatabase)
890 and self.owner == other.owner
891 and self.repo == other.repo
892 and self.artifact_name == other.artifact_name
893 and self.path == other.path
894 )
895
896 def _prepare_for_io(self) -> None:
897 assert self._artifact is not None, "Artifact not loaded."
898
899 if self._initialized: # pragma: no cover
900 return
901
902 # Test that the artifact is valid
903 try:
904 with ZipFile(self._artifact) as f:
905 if f.testzip(): # pragma: no cover
906 raise BadZipFile
907
908 # Turns out that testzip() doesn't work quite well
909 # doing the cache initialization here instead
910 # will give us more coverage of the artifact.
911
912 # Cache the files inside each keypath
913 self._access_cache = {}
914 with ZipFile(self._artifact) as zf:
915 namelist = zf.namelist()
916 # Iterate over files in the artifact
917 for filename in namelist:
918 fileinfo = zf.getinfo(filename)
919 if fileinfo.is_dir():
920 self._access_cache[PurePath(filename)] = set()
921 else:
922 # Get the keypath from the filename
923 keypath = PurePath(filename).parent
924 # Add the file to the keypath
925 self._access_cache[keypath].add(PurePath(filename))
926 except BadZipFile:
927 warnings.warn(
928 "The downloaded artifact from GitHub is invalid. "
929 "This could be because the artifact was corrupted, "
930 "or because the artifact was not created by Hypothesis. ",
931 HypothesisWarning,
932 stacklevel=3,
933 )
934 self._disabled = True
935
936 self._initialized = True
937
938 def _initialize_db(self) -> None:
939 # Trigger warning that we suppressed earlier by intent_to_write=False
940 storage_directory(self.path.name)
941 # Create the cache directory if it doesn't exist
942 self.path.mkdir(exist_ok=True, parents=True)
943
944 # Get all artifacts
945 cached_artifacts = sorted(
946 self.path.glob("*.zip"),
947 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")),
948 )
949
950 # Remove all but the latest artifact
951 for artifact in cached_artifacts[:-1]:
952 artifact.unlink()
953
954 try:
955 found_artifact = cached_artifacts[-1]
956 except IndexError:
957 found_artifact = None
958
959 # Check if the latest artifact is a cache hit
960 if found_artifact is not None and (
961 datetime.now(timezone.utc)
962 - datetime.fromisoformat(found_artifact.stem.replace("_", ":"))
963 < self.cache_timeout
964 ):
965 self._artifact = found_artifact
966 else:
967 # Download the latest artifact from GitHub
968 new_artifact = self._fetch_artifact()
969
970 if new_artifact:
971 if found_artifact is not None:
972 found_artifact.unlink()
973 self._artifact = new_artifact
974 elif found_artifact is not None:
975 warnings.warn(
976 "Using an expired artifact as a fallback for the database: "
977 f"{found_artifact}",
978 HypothesisWarning,
979 stacklevel=2,
980 )
981 self._artifact = found_artifact
982 else:
983 warnings.warn(
984 "Couldn't acquire a new or existing artifact. Disabling database.",
985 HypothesisWarning,
986 stacklevel=2,
987 )
988 self._disabled = True
989 return
990
991 self._prepare_for_io()
992
993 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover
994 request = Request(
995 url,
996 headers={
997 "Accept": "application/vnd.github+json",
998 "X-GitHub-Api-Version": "2022-11-28 ",
999 "Authorization": f"Bearer {self.token}",
1000 },
1001 )
1002 warning_message = None
1003 response_bytes: Optional[bytes] = None
1004 try:
1005 with urlopen(request) as response:
1006 response_bytes = response.read()
1007 except HTTPError as e:
1008 if e.code == 401:
1009 warning_message = (
1010 "Authorization failed when trying to download artifact from GitHub. "
1011 "Check that you have a valid GITHUB_TOKEN set in your environment."
1012 )
1013 else:
1014 warning_message = (
1015 "Could not get the latest artifact from GitHub. "
1016 "This could be because because the repository "
1017 "or artifact does not exist. "
1018 )
1019 except URLError:
1020 warning_message = "Could not connect to GitHub to get the latest artifact. "
1021 except TimeoutError:
1022 warning_message = (
1023 "Could not connect to GitHub to get the latest artifact "
1024 "(connection timed out)."
1025 )
1026
1027 if warning_message is not None:
1028 warnings.warn(warning_message, HypothesisWarning, stacklevel=4)
1029 return None
1030
1031 return response_bytes
1032
1033 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover
1034 # Get the list of artifacts from GitHub
1035 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts"
1036 response_bytes = self._get_bytes(url)
1037 if response_bytes is None:
1038 return None
1039
1040 artifacts = json.loads(response_bytes)["artifacts"]
1041 artifacts = [a for a in artifacts if a["name"] == self.artifact_name]
1042
1043 if not artifacts:
1044 return None
1045
1046 # Get the latest artifact from the list
1047 artifact = max(artifacts, key=lambda a: a["created_at"])
1048 url = artifact["archive_download_url"]
1049
1050 # Download the artifact
1051 artifact_bytes = self._get_bytes(url)
1052 if artifact_bytes is None:
1053 return None
1054
1055 # Save the artifact to the cache
1056 # We replace ":" with "_" to ensure the filenames are compatible
1057 # with Windows filesystems
1058 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_")
1059 artifact_path = self.path / f"{timestamp}.zip"
1060 try:
1061 artifact_path.write_bytes(artifact_bytes)
1062 except OSError:
1063 warnings.warn(
1064 "Could not save the latest artifact from GitHub. ",
1065 HypothesisWarning,
1066 stacklevel=3,
1067 )
1068 return None
1069
1070 return artifact_path
1071
1072 @staticmethod
1073 @lru_cache
1074 def _key_path(key: bytes) -> PurePath:
1075 return PurePath(_hash(key) + "/")
1076
1077 def fetch(self, key: bytes) -> Iterable[bytes]:
1078 if self._disabled:
1079 return
1080
1081 if not self._initialized:
1082 self._initialize_db()
1083 if self._disabled:
1084 return
1085
1086 assert self._artifact is not None
1087 assert self._access_cache is not None
1088
1089 kp = self._key_path(key)
1090
1091 with ZipFile(self._artifact) as zf:
1092 # Get the all files in the the kp from the cache
1093 filenames = self._access_cache.get(kp, ())
1094 for filename in filenames:
1095 with zf.open(filename.as_posix()) as f:
1096 yield f.read()
1097
1098 # Read-only interface
1099 def save(self, key: bytes, value: bytes) -> None:
1100 raise RuntimeError(self._read_only_message)
1101
1102 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1103 raise RuntimeError(self._read_only_message)
1104
1105 def delete(self, key: bytes, value: bytes) -> None:
1106 raise RuntimeError(self._read_only_message)
1107
1108
1109class BackgroundWriteDatabase(ExampleDatabase):
1110 """A wrapper which defers writes on the given database to a background thread.
1111
1112 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any
1113 enqueued writes to finish before fetching from the database.
1114 """
1115
1116 def __init__(self, db: ExampleDatabase) -> None:
1117 super().__init__()
1118 self._db = db
1119 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue()
1120 self._thread: Optional[Thread] = None
1121
1122 def _ensure_thread(self):
1123 if self._thread is None:
1124 self._thread = Thread(target=self._worker, daemon=True)
1125 self._thread.start()
1126 # avoid an unbounded timeout during gc. 0.1 should be plenty for most
1127 # use cases.
1128 weakref.finalize(self, self._join, 0.1)
1129
1130 def __repr__(self) -> str:
1131 return f"BackgroundWriteDatabase({self._db!r})"
1132
1133 def __eq__(self, other: object) -> bool:
1134 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db
1135
1136 def _worker(self) -> None:
1137 while True:
1138 method, args = self._queue.get()
1139 getattr(self._db, method)(*args)
1140 self._queue.task_done()
1141
1142 def _join(self, timeout: Optional[float] = None) -> None:
1143 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634
1144 with self._queue.all_tasks_done:
1145 while self._queue.unfinished_tasks:
1146 self._queue.all_tasks_done.wait(timeout)
1147
1148 def fetch(self, key: bytes) -> Iterable[bytes]:
1149 self._join()
1150 return self._db.fetch(key)
1151
1152 def save(self, key: bytes, value: bytes) -> None:
1153 self._ensure_thread()
1154 self._queue.put(("save", (key, value)))
1155
1156 def delete(self, key: bytes, value: bytes) -> None:
1157 self._ensure_thread()
1158 self._queue.put(("delete", (key, value)))
1159
1160 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1161 self._ensure_thread()
1162 self._queue.put(("move", (src, dest, value)))
1163
1164 def _start_listening(self) -> None:
1165 self._db.add_listener(self._broadcast_change)
1166
1167 def _stop_listening(self) -> None:
1168 self._db.remove_listener(self._broadcast_change)
1169
1170
1171def _pack_uleb128(value: int) -> bytes:
1172 """
1173 Serialize an integer into variable-length bytes. For each byte, the first 7
1174 bits represent (part of) the integer, while the last bit indicates whether the
1175 integer continues into the next byte.
1176
1177 https://en.wikipedia.org/wiki/LEB128
1178 """
1179 parts = bytearray()
1180 assert value >= 0
1181 while True:
1182 # chop off 7 bits
1183 byte = value & ((1 << 7) - 1)
1184 value >>= 7
1185 # set the continuation bit if we have more left
1186 if value:
1187 byte |= 1 << 7
1188
1189 parts.append(byte)
1190 if not value:
1191 break
1192 return bytes(parts)
1193
1194
1195def _unpack_uleb128(buffer: bytes) -> tuple[int, int]:
1196 """
1197 Inverts _pack_uleb128, and also returns the index at which at which we stopped
1198 reading.
1199 """
1200 value = 0
1201 for i, byte in enumerate(buffer):
1202 n = byte & ((1 << 7) - 1)
1203 value |= n << (i * 7)
1204
1205 if not byte >> 7:
1206 break
1207 return (i + 1, value)
1208
1209
1210def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes:
1211 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes."""
1212 # We use a custom serialization format for this, which might seem crazy - but our
1213 # data is a flat sequence of elements, and standard tools like protobuf or msgpack
1214 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode.
1215 #
1216 # We simply encode each element with a metadata byte, if needed a uint16 size, and
1217 # then the payload bytes. For booleans, the payload is inlined into the metadata.
1218 parts = []
1219 for choice in choices:
1220 if isinstance(choice, bool):
1221 # `000_0000v` - tag zero, low bit payload.
1222 parts.append(b"\1" if choice else b"\0")
1223 continue
1224
1225 # `tag_ssss [uint16 size?] [payload]`
1226 if isinstance(choice, float):
1227 tag = 1 << 5
1228 choice = struct.pack("!d", choice)
1229 elif isinstance(choice, int):
1230 tag = 2 << 5
1231 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True)
1232 elif isinstance(choice, bytes):
1233 tag = 3 << 5
1234 else:
1235 assert isinstance(choice, str)
1236 tag = 4 << 5
1237 choice = choice.encode(errors="surrogatepass")
1238
1239 size = len(choice)
1240 if size < 0b11111:
1241 parts.append((tag | size).to_bytes(1, "big"))
1242 else:
1243 parts.append((tag | 0b11111).to_bytes(1, "big"))
1244 parts.append(_pack_uleb128(size))
1245 parts.append(choice)
1246
1247 return b"".join(parts)
1248
1249
1250def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]:
1251 # See above for an explanation of the format.
1252 parts: list[ChoiceT] = []
1253 idx = 0
1254 while idx < len(buffer):
1255 tag = buffer[idx] >> 5
1256 size = buffer[idx] & 0b11111
1257 idx += 1
1258
1259 if tag == 0:
1260 parts.append(bool(size))
1261 continue
1262 if size == 0b11111:
1263 (offset, size) = _unpack_uleb128(buffer[idx:])
1264 idx += offset
1265 chunk = buffer[idx : idx + size]
1266 idx += size
1267
1268 if tag == 1:
1269 assert size == 8, "expected float64"
1270 parts.extend(struct.unpack("!d", chunk))
1271 elif tag == 2:
1272 parts.append(int.from_bytes(chunk, "big", signed=True))
1273 elif tag == 3:
1274 parts.append(chunk)
1275 else:
1276 assert tag == 4
1277 parts.append(chunk.decode(errors="surrogatepass"))
1278 return tuple(parts)
1279
1280
1281def choices_from_bytes(buffer: bytes, /) -> Optional[tuple[ChoiceT, ...]]:
1282 """
1283 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes.
1284
1285 Returns None if the given bytestring is not a valid serialization of choice
1286 sequences.
1287 """
1288 try:
1289 return _choices_from_bytes(buffer)
1290 except Exception:
1291 # deserialization error, eg because our format changed or someone put junk
1292 # data in the db.
1293 return None