1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import abc
12import errno
13import json
14import os
15import struct
16import sys
17import tempfile
18import warnings
19import weakref
20from collections.abc import Iterable
21from datetime import datetime, timedelta, timezone
22from functools import lru_cache
23from hashlib import sha384
24from os import PathLike, getenv
25from pathlib import Path, PurePath
26from queue import Queue
27from threading import Thread
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 Callable,
32 ClassVar,
33 Literal,
34 Optional,
35 Union,
36 cast,
37)
38from urllib.error import HTTPError, URLError
39from urllib.request import Request, urlopen
40from zipfile import BadZipFile, ZipFile
41
42from hypothesis._settings import note_deprecation
43from hypothesis.configuration import storage_directory
44from hypothesis.errors import HypothesisException, HypothesisWarning
45from hypothesis.internal.conjecture.choice import ChoiceT
46from hypothesis.utils.conventions import UniqueIdentifier, not_set
47
48__all__ = [
49 "DirectoryBasedExampleDatabase",
50 "ExampleDatabase",
51 "GitHubArtifactDatabase",
52 "InMemoryExampleDatabase",
53 "MultiplexedDatabase",
54 "ReadOnlyDatabase",
55]
56
57if TYPE_CHECKING:
58 from typing import TypeAlias
59
60 from watchdog.observers.api import BaseObserver
61
62StrPathT: "TypeAlias" = Union[str, PathLike[str]]
63SaveDataT: "TypeAlias" = tuple[bytes, bytes] # key, value
64DeleteDataT: "TypeAlias" = tuple[bytes, Optional[bytes]] # key, value
65ListenerEventT: "TypeAlias" = Union[
66 tuple[Literal["save"], SaveDataT], tuple[Literal["delete"], DeleteDataT]
67]
68ListenerT: "TypeAlias" = Callable[[ListenerEventT], Any]
69
70
71def _usable_dir(path: StrPathT) -> bool:
72 """
73 Returns True if the desired path can be used as database path because
74 either the directory exists and can be used, or its root directory can
75 be used and we can make the directory as needed.
76 """
77 path = Path(path)
78 try:
79 while not path.exists():
80 # Loop terminates because the root dir ('/' on unix) always exists.
81 path = path.parent
82 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK)
83 except PermissionError:
84 return False
85
86
87def _db_for_path(
88 path: Optional[Union[StrPathT, UniqueIdentifier, Literal[":memory:"]]] = None,
89) -> "ExampleDatabase":
90 if path is not_set:
91 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover
92 raise HypothesisException(
93 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any "
94 "effect. Configure your database location via a settings profile instead.\n"
95 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles"
96 )
97
98 path = storage_directory("examples", intent_to_write=False)
99 if not _usable_dir(path): # pragma: no cover
100 warnings.warn(
101 "The database setting is not configured, and the default "
102 "location is unusable - falling back to an in-memory "
103 f"database for this session. {path=}",
104 HypothesisWarning,
105 stacklevel=3,
106 )
107 return InMemoryExampleDatabase()
108 if path in (None, ":memory:"):
109 return InMemoryExampleDatabase()
110 path = cast(StrPathT, path)
111 return DirectoryBasedExampleDatabase(path)
112
113
114class _EDMeta(abc.ABCMeta):
115 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase":
116 if self is ExampleDatabase:
117 note_deprecation(
118 "Creating a database using the abstract ExampleDatabase() class "
119 "is deprecated. Prefer using a concrete subclass, like "
120 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). "
121 'In particular, the special string ExampleDatabase(":memory:") '
122 "should be replaced by InMemoryExampleDatabase().",
123 since="2025-04-07",
124 has_codemod=False,
125 )
126 return _db_for_path(*args, **kwargs)
127 return super().__call__(*args, **kwargs)
128
129
130# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase
131# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx
132# maintains a list of metaclass-call-methods to ignore, and while they would prefer
133# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we
134# can insert ourselves here.
135#
136# This code only runs if Sphinx has already been imported; and it would live in our
137# docs/conf.py except that we would also like it to work for anyone documenting
138# downstream ExampleDatabase subclasses too.
139if "sphinx" in sys.modules:
140 try:
141 import sphinx.ext.autodoc
142
143 signature = "hypothesis.database._EDMeta.__call__"
144 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions
145 if isinstance(sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST, frozenset):
146 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST = (
147 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST | {signature}
148 )
149 else:
150 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST.append(signature)
151 except Exception:
152 pass
153
154
155class ExampleDatabase(metaclass=_EDMeta):
156 """
157 A Hypothesis database, for use in |settings.database|.
158
159 Hypothesis automatically saves failures to the database set in
160 |settings.database|. The next time the test is run, Hypothesis will replay
161 any failures from the database in |settings.database| for that test (in
162 |Phase.reuse|).
163
164 The database is best thought of as a cache that you never need to invalidate.
165 Entries may be transparently dropped when upgrading your Hypothesis version
166 or changing your test. Do not rely on the database for correctness; to ensure
167 Hypothesis always tries an input, use |@example|.
168
169 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis
170 provides several concrete database subclasses. To write your own database class,
171 see :doc:`/how-to/custom-database`.
172
173 Change listening
174 ----------------
175
176 An optional extension to |ExampleDatabase| is change listening. On databases
177 which support change listening, calling |ExampleDatabase.add_listener| adds
178 a function as a change listener, which will be called whenever a value is
179 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener|
180 for details.
181
182 All databases in Hypothesis support change listening. Custom database classes
183 are not required to support change listening, though they will not be compatible
184 with features that require change listening until they do so.
185
186 .. note::
187
188 While no Hypothesis features currently require change listening, change
189 listening is required by `HypoFuzz <https://hypofuzz.com/>`_.
190
191 Database methods
192 ----------------
193
194 Required methods:
195
196 * |ExampleDatabase.save|
197 * |ExampleDatabase.fetch|
198 * |ExampleDatabase.delete|
199
200 Optional methods:
201
202 * |ExampleDatabase.move|
203
204 Change listening methods:
205
206 * |ExampleDatabase.add_listener|
207 * |ExampleDatabase.remove_listener|
208 * |ExampleDatabase.clear_listeners|
209 * |ExampleDatabase._start_listening|
210 * |ExampleDatabase._stop_listening|
211 * |ExampleDatabase._broadcast_change|
212 """
213
214 def __init__(self) -> None:
215 self._listeners: list[ListenerT] = []
216
217 @abc.abstractmethod
218 def save(self, key: bytes, value: bytes) -> None:
219 """Save ``value`` under ``key``.
220
221 If ``value`` is already present in ``key``, silently do nothing.
222 """
223 raise NotImplementedError(f"{type(self).__name__}.save")
224
225 @abc.abstractmethod
226 def fetch(self, key: bytes) -> Iterable[bytes]:
227 """Return an iterable over all values matching this key."""
228 raise NotImplementedError(f"{type(self).__name__}.fetch")
229
230 @abc.abstractmethod
231 def delete(self, key: bytes, value: bytes) -> None:
232 """Remove ``value`` from ``key``.
233
234 If ``value`` is not present in ``key``, silently do nothing.
235 """
236 raise NotImplementedError(f"{type(self).__name__}.delete")
237
238 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
239 """
240 Move ``value`` from key ``src`` to key ``dest``.
241
242 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``,
243 but may have a more efficient implementation.
244
245 Note that ``value`` will be inserted at ``dest`` regardless of whether
246 it is currently present at ``src``.
247 """
248 if src == dest:
249 self.save(src, value)
250 return
251 self.delete(src, value)
252 self.save(dest, value)
253
254 def add_listener(self, f: ListenerT, /) -> None:
255 """
256 Add a change listener. ``f`` will be called whenever a value is saved,
257 deleted, or moved in the database.
258
259 ``f`` can be called with two different event values:
260
261 * ``("save", (key, value))``
262 * ``("delete", (key, value))``
263
264 where ``key`` and ``value`` are both ``bytes``.
265
266 There is no ``move`` event. Instead, a move is broadcasted as a
267 ``delete`` event followed by a ``save`` event.
268
269 For the ``delete`` event, ``value`` may be ``None``. This might occur if
270 the database knows that a deletion has occurred in ``key``, but does not
271 know what value was deleted.
272 """
273 had_listeners = bool(self._listeners)
274 self._listeners.append(f)
275 if not had_listeners:
276 self._start_listening()
277
278 def remove_listener(self, f: ListenerT, /) -> None:
279 """
280 Removes ``f`` from the list of change listeners.
281
282 If ``f`` is not in the list of change listeners, silently do nothing.
283 """
284 if f not in self._listeners:
285 return
286 self._listeners.remove(f)
287 if not self._listeners:
288 self._stop_listening()
289
290 def clear_listeners(self) -> None:
291 """Remove all change listeners."""
292 had_listeners = bool(self._listeners)
293 self._listeners.clear()
294 if had_listeners:
295 self._stop_listening()
296
297 def _broadcast_change(self, event: ListenerEventT) -> None:
298 """
299 Called when a value has been either added to or deleted from a key in
300 the underlying database store. The possible values for ``event`` are:
301
302 * ``("save", (key, value))``
303 * ``("delete", (key, value))``
304
305 ``value`` may be ``None`` for the ``delete`` event, indicating we know
306 that some value was deleted under this key, but not its exact value.
307
308 Note that you should not assume your instance is the only reference to
309 the underlying database store. For example, if two instances of
310 |DirectoryBasedExampleDatabase| reference the same directory,
311 _broadcast_change should be called whenever a file is added or removed
312 from the directory, even if that database was not responsible for
313 changing the file.
314 """
315 for listener in self._listeners:
316 listener(event)
317
318 def _start_listening(self) -> None:
319 """
320 Called when the database adds a change listener, and did not previously
321 have any change listeners. Intended to allow databases to wait to start
322 expensive listening operations until necessary.
323
324 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate,
325 so you do not need to handle the case of multiple consecutive
326 ``_start_listening`` calls without an intermediate ``_stop_listening``
327 call.
328 """
329 warnings.warn(
330 f"{self.__class__} does not support listening for changes",
331 HypothesisWarning,
332 stacklevel=4,
333 )
334
335 def _stop_listening(self) -> None:
336 """
337 Called whenever no change listeners remain on the database.
338
339 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate,
340 so you do not need to handle the case of multiple consecutive
341 ``_stop_listening`` calls without an intermediate ``_start_listening``
342 call.
343 """
344 warnings.warn(
345 f"{self.__class__} does not support stopping listening for changes",
346 HypothesisWarning,
347 stacklevel=4,
348 )
349
350
351class InMemoryExampleDatabase(ExampleDatabase):
352 """A non-persistent example database, implemented in terms of an in-memory
353 dictionary.
354
355 This can be useful if you call a test function several times in a single
356 session, or for testing other database implementations, but because it
357 does not persist between runs we do not recommend it for general use.
358 """
359
360 def __init__(self) -> None:
361 super().__init__()
362 self.data: dict[bytes, set[bytes]] = {}
363
364 def __repr__(self) -> str:
365 return f"InMemoryExampleDatabase({self.data!r})"
366
367 def __eq__(self, other: object) -> bool:
368 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data
369
370 def fetch(self, key: bytes) -> Iterable[bytes]:
371 yield from self.data.get(key, ())
372
373 def save(self, key: bytes, value: bytes) -> None:
374 value = bytes(value)
375 values = self.data.setdefault(key, set())
376 changed = value not in values
377 values.add(value)
378
379 if changed:
380 self._broadcast_change(("save", (key, value)))
381
382 def delete(self, key: bytes, value: bytes) -> None:
383 value = bytes(value)
384 values = self.data.get(key, set())
385 changed = value in values
386 values.discard(value)
387
388 if changed:
389 self._broadcast_change(("delete", (key, value)))
390
391 def _start_listening(self) -> None:
392 # declare compatibility with the listener api, but do the actual
393 # implementation in .delete and .save, since we know we are the only
394 # writer to .data.
395 pass
396
397 def _stop_listening(self) -> None:
398 pass
399
400
401def _hash(key: bytes) -> str:
402 return sha384(key).hexdigest()[:16]
403
404
405class DirectoryBasedExampleDatabase(ExampleDatabase):
406 """Use a directory to store Hypothesis examples as files.
407
408 Each test corresponds to a directory, and each example to a file within that
409 directory. While the contents are fairly opaque, a
410 |DirectoryBasedExampleDatabase| can be shared by checking the directory
411 into version control, for example with the following ``.gitignore``::
412
413 # Ignore files cached by Hypothesis...
414 .hypothesis/*
415 # except for the examples directory
416 !.hypothesis/examples/
417
418 Note however that this only makes sense if you also pin to an exact version of
419 Hypothesis, and we would usually recommend implementing a shared database with
420 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper.
421 """
422
423 # we keep a database entry of the full values of all the database keys.
424 # currently only used for inverse mapping of hash -> key in change listening.
425 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys"
426 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name)
427
428 def __init__(self, path: StrPathT) -> None:
429 super().__init__()
430 self.path = Path(path)
431 self.keypaths: dict[bytes, Path] = {}
432 self._observer: BaseObserver | None = None
433
434 def __repr__(self) -> str:
435 return f"DirectoryBasedExampleDatabase({self.path!r})"
436
437 def __eq__(self, other: object) -> bool:
438 return (
439 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path
440 )
441
442 def _key_path(self, key: bytes) -> Path:
443 try:
444 return self.keypaths[key]
445 except KeyError:
446 pass
447 self.keypaths[key] = self.path / _hash(key)
448 return self.keypaths[key]
449
450 def _value_path(self, key: bytes, value: bytes) -> Path:
451 return self._key_path(key) / _hash(value)
452
453 def fetch(self, key: bytes) -> Iterable[bytes]:
454 kp = self._key_path(key)
455 if not kp.is_dir():
456 return
457
458 try:
459 for path in os.listdir(kp):
460 try:
461 yield (kp / path).read_bytes()
462 except OSError:
463 pass
464 except OSError: # pragma: no cover
465 # the `kp` directory might have been deleted in the meantime
466 pass
467
468 def save(self, key: bytes, value: bytes) -> None:
469 key_path = self._key_path(key)
470 if key_path.name != self._metakeys_hash:
471 # add this key to our meta entry of all keys - taking care to avoid
472 # infinite recursion.
473 self.save(self._metakeys_name, key)
474
475 # Note: we attempt to create the dir in question now. We
476 # already checked for permissions, but there can still be other issues,
477 # e.g. the disk is full, or permissions might have been changed.
478 try:
479 key_path.mkdir(exist_ok=True, parents=True)
480 path = self._value_path(key, value)
481 if not path.exists():
482 # to mimic an atomic write, create and write in a temporary
483 # directory, and only move to the final path after. This avoids
484 # any intermediate state where the file is created (and empty)
485 # but not yet written to.
486 fd, tmpname = tempfile.mkstemp()
487 tmppath = Path(tmpname)
488 os.write(fd, value)
489 os.close(fd)
490 try:
491 tmppath.rename(path)
492 except OSError as err: # pragma: no cover
493 if err.errno == errno.EXDEV:
494 # Can't rename across filesystem boundaries, see e.g.
495 # https://github.com/HypothesisWorks/hypothesis/issues/4335
496 try:
497 path.write_bytes(tmppath.read_bytes())
498 except OSError:
499 pass
500 tmppath.unlink()
501 assert not tmppath.exists()
502 except OSError: # pragma: no cover
503 pass
504
505 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
506 if src == dest:
507 self.save(src, value)
508 return
509
510 src_path = self._value_path(src, value)
511 dest_path = self._value_path(dest, value)
512 # if the dest key path does not exist, os.renames will create it for us,
513 # and we will never track its creation in the meta keys entry. Do so now.
514 if not self._key_path(dest).exists():
515 self.save(self._metakeys_name, dest)
516
517 try:
518 os.renames(src_path, dest_path)
519 except OSError:
520 self.delete(src, value)
521 self.save(dest, value)
522
523 def delete(self, key: bytes, value: bytes) -> None:
524 try:
525 self._value_path(key, value).unlink()
526 except OSError:
527 return
528
529 # try deleting the key dir, which will only succeed if the dir is empty
530 # (i.e. ``value`` was the last value in this key).
531 try:
532 self._key_path(key).rmdir()
533 except OSError:
534 pass
535 else:
536 # if the deletion succeeded, also delete this key entry from metakeys.
537 # (if this key happens to be the metakey itself, this deletion will
538 # fail; that's ok and faster than checking for this rare case.)
539 self.delete(self._metakeys_name, key)
540
541 def _start_listening(self) -> None:
542 try:
543 from watchdog.events import (
544 DirCreatedEvent,
545 DirDeletedEvent,
546 DirMovedEvent,
547 FileCreatedEvent,
548 FileDeletedEvent,
549 FileMovedEvent,
550 FileSystemEventHandler,
551 )
552 from watchdog.observers import Observer
553 except ImportError:
554 warnings.warn(
555 f"listening for changes in a {self.__class__.__name__} "
556 "requires the watchdog library. To install, run "
557 "`pip install hypothesis[watchdog]`",
558 HypothesisWarning,
559 stacklevel=4,
560 )
561 return
562
563 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)}
564 _metakeys_hash = self._metakeys_hash
565 _broadcast_change = self._broadcast_change
566
567 class Handler(FileSystemEventHandler):
568 def on_created(
569 _self, event: Union[FileCreatedEvent, DirCreatedEvent]
570 ) -> None:
571 # we only registered for the file creation event
572 assert not isinstance(event, DirCreatedEvent)
573 # watchdog events are only bytes if we passed a byte path to
574 # .schedule
575 assert isinstance(event.src_path, str)
576
577 value_path = Path(event.src_path)
578 # the parent dir represents the key, and its name is the key hash
579 key_hash = value_path.parent.name
580
581 if key_hash == _metakeys_hash:
582 try:
583 hash_to_key[value_path.name] = value_path.read_bytes()
584 except OSError: # pragma: no cover
585 # this might occur if all the values in a key have been
586 # deleted and DirectoryBasedExampleDatabase removes its
587 # metakeys entry (which is `value_path` here`).
588 pass
589 return
590
591 key = hash_to_key.get(key_hash)
592 if key is None: # pragma: no cover
593 # we didn't recognize this key. This shouldn't ever happen,
594 # but some race condition trickery might cause this.
595 return
596
597 try:
598 value = value_path.read_bytes()
599 except OSError: # pragma: no cover
600 return
601
602 _broadcast_change(("save", (key, value)))
603
604 def on_deleted(
605 self, event: Union[FileDeletedEvent, DirDeletedEvent]
606 ) -> None:
607 assert not isinstance(event, DirDeletedEvent)
608 assert isinstance(event.src_path, str)
609
610 value_path = Path(event.src_path)
611 key = hash_to_key.get(value_path.parent.name)
612 if key is None: # pragma: no cover
613 return
614
615 _broadcast_change(("delete", (key, None)))
616
617 def on_moved(self, event: Union[FileMovedEvent, DirMovedEvent]) -> None:
618 assert not isinstance(event, DirMovedEvent)
619 assert isinstance(event.src_path, str)
620 assert isinstance(event.dest_path, str)
621
622 src_path = Path(event.src_path)
623 dest_path = Path(event.dest_path)
624 k1 = hash_to_key.get(src_path.parent.name)
625 k2 = hash_to_key.get(dest_path.parent.name)
626
627 if k1 is None or k2 is None: # pragma: no cover
628 return
629
630 try:
631 value = dest_path.read_bytes()
632 except OSError: # pragma: no cover
633 return
634
635 _broadcast_change(("delete", (k1, value)))
636 _broadcast_change(("save", (k2, value)))
637
638 # If we add a listener to a DirectoryBasedExampleDatabase whose database
639 # directory doesn't yet exist, the watchdog observer will not fire any
640 # events, even after the directory gets created.
641 #
642 # Ensure the directory exists before starting the observer.
643 self.path.mkdir(exist_ok=True, parents=True)
644 self._observer = Observer()
645 self._observer.schedule(
646 Handler(),
647 # remove type: ignore when released
648 # https://github.com/gorakhargosh/watchdog/pull/1096
649 self.path, # type: ignore
650 recursive=True,
651 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent],
652 )
653 self._observer.start()
654
655 def _stop_listening(self) -> None:
656 assert self._observer is not None
657 self._observer.stop()
658 self._observer.join()
659 self._observer = None
660
661
662class ReadOnlyDatabase(ExampleDatabase):
663 """A wrapper to make the given database read-only.
664
665 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and
666 ``move`` into silent no-ops.
667
668 Note that this disables Hypothesis' automatic discarding of stale examples.
669 It is designed to allow local machines to access a shared database (e.g. from CI
670 servers), without propagating changes back from a local or in-development branch.
671 """
672
673 def __init__(self, db: ExampleDatabase) -> None:
674 super().__init__()
675 assert isinstance(db, ExampleDatabase)
676 self._wrapped = db
677
678 def __repr__(self) -> str:
679 return f"ReadOnlyDatabase({self._wrapped!r})"
680
681 def __eq__(self, other: object) -> bool:
682 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped
683
684 def fetch(self, key: bytes) -> Iterable[bytes]:
685 yield from self._wrapped.fetch(key)
686
687 def save(self, key: bytes, value: bytes) -> None:
688 pass
689
690 def delete(self, key: bytes, value: bytes) -> None:
691 pass
692
693 def _start_listening(self) -> None:
694 # we're read only, so there are no changes to broadcast.
695 pass
696
697 def _stop_listening(self) -> None:
698 pass
699
700
701class MultiplexedDatabase(ExampleDatabase):
702 """A wrapper around multiple databases.
703
704 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against
705 all of the wrapped databases. ``fetch`` does not yield duplicate values, even
706 if the same value is present in two or more of the wrapped databases.
707
708 This combines well with a :class:`ReadOnlyDatabase`, as follows:
709
710 .. code-block:: python
711
712 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/")
713 shared = CustomNetworkDatabase()
714
715 settings.register_profile("ci", database=shared)
716 settings.register_profile(
717 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared))
718 )
719 settings.load_profile("ci" if os.environ.get("CI") else "dev")
720
721 So your CI system or fuzzing runs can populate a central shared database;
722 while local runs on development machines can reproduce any failures from CI
723 but will only cache their own failures locally and cannot remove examples
724 from the shared database.
725 """
726
727 def __init__(self, *dbs: ExampleDatabase) -> None:
728 super().__init__()
729 assert all(isinstance(db, ExampleDatabase) for db in dbs)
730 self._wrapped = dbs
731
732 def __repr__(self) -> str:
733 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped)))
734
735 def __eq__(self, other: object) -> bool:
736 return (
737 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped
738 )
739
740 def fetch(self, key: bytes) -> Iterable[bytes]:
741 seen = set()
742 for db in self._wrapped:
743 for value in db.fetch(key):
744 if value not in seen:
745 yield value
746 seen.add(value)
747
748 def save(self, key: bytes, value: bytes) -> None:
749 for db in self._wrapped:
750 db.save(key, value)
751
752 def delete(self, key: bytes, value: bytes) -> None:
753 for db in self._wrapped:
754 db.delete(key, value)
755
756 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
757 for db in self._wrapped:
758 db.move(src, dest, value)
759
760 def _start_listening(self) -> None:
761 for db in self._wrapped:
762 db.add_listener(self._broadcast_change)
763
764 def _stop_listening(self) -> None:
765 for db in self._wrapped:
766 db.remove_listener(self._broadcast_change)
767
768
769class GitHubArtifactDatabase(ExampleDatabase):
770 """
771 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact.
772
773 You can use this for sharing example databases between CI runs and developers, allowing
774 the latter to get read-only access to the former. This is particularly useful for
775 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_),
776 where the CI system can help find new failing examples through fuzzing,
777 and developers can reproduce them locally without any manual effort.
778
779 .. note::
780 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides
781 this automatically, but it needs to be set manually for local usage. In a developer machine,
782 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_.
783 If the repository is private, it's necessary for the token to have ``repo`` scope
784 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token.
785
786
787 In most cases, this will be used
788 through the :class:`~hypothesis.database.MultiplexedDatabase`,
789 by combining a local directory-based database with this one. For example:
790
791 .. code-block:: python
792
793 local = DirectoryBasedExampleDatabase(".hypothesis/examples")
794 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo"))
795
796 settings.register_profile("ci", database=local)
797 settings.register_profile("dev", database=MultiplexedDatabase(local, shared))
798 # We don't want to use the shared database in CI, only to populate its local one.
799 # which the workflow should then upload as an artifact.
800 settings.load_profile("ci" if os.environ.get("CI") else "dev")
801
802 .. note::
803 Because this database is read-only, you always need to wrap it with the
804 :class:`ReadOnlyDatabase`.
805
806 A setup like this can be paired with a GitHub Actions workflow including
807 something like the following:
808
809 .. code-block:: yaml
810
811 - name: Download example database
812 uses: dawidd6/action-download-artifact@v9
813 with:
814 name: hypothesis-example-db
815 path: .hypothesis/examples
816 if_no_artifact_found: warn
817 workflow_conclusion: completed
818
819 - name: Run tests
820 run: pytest
821
822 - name: Upload example database
823 uses: actions/upload-artifact@v3
824 if: always()
825 with:
826 name: hypothesis-example-db
827 path: .hypothesis/examples
828
829 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_
830 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_
831 does not support downloading artifacts from previous workflow runs.
832
833 The database automatically implements a simple file-based cache with a default expiration period
834 of 1 day. You can adjust this through the ``cache_timeout`` property.
835
836 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``).
837 """
838
839 def __init__(
840 self,
841 owner: str,
842 repo: str,
843 artifact_name: str = "hypothesis-example-db",
844 cache_timeout: timedelta = timedelta(days=1),
845 path: Optional[StrPathT] = None,
846 ):
847 super().__init__()
848 self.owner = owner
849 self.repo = repo
850 self.artifact_name = artifact_name
851 self.cache_timeout = cache_timeout
852
853 # Get the GitHub token from the environment
854 # It's unnecessary to use a token if the repo is public
855 self.token: Optional[str] = getenv("GITHUB_TOKEN")
856
857 if path is None:
858 self.path: Path = Path(
859 storage_directory(f"github-artifacts/{self.artifact_name}/")
860 )
861 else:
862 self.path = Path(path)
863
864 # We don't want to initialize the cache until we need to
865 self._initialized: bool = False
866 self._disabled: bool = False
867
868 # This is the path to the artifact in usage
869 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip
870 self._artifact: Optional[Path] = None
871 # This caches the artifact structure
872 self._access_cache: Optional[dict[PurePath, set[PurePath]]] = None
873
874 # Message to display if user doesn't wrap around ReadOnlyDatabase
875 self._read_only_message = (
876 "This database is read-only. "
877 "Please wrap this class with ReadOnlyDatabase"
878 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))."
879 )
880
881 def __repr__(self) -> str:
882 return (
883 f"GitHubArtifactDatabase(owner={self.owner!r}, "
884 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})"
885 )
886
887 def __eq__(self, other: object) -> bool:
888 return (
889 isinstance(other, GitHubArtifactDatabase)
890 and self.owner == other.owner
891 and self.repo == other.repo
892 and self.artifact_name == other.artifact_name
893 and self.path == other.path
894 )
895
896 def _prepare_for_io(self) -> None:
897 assert self._artifact is not None, "Artifact not loaded."
898
899 if self._initialized: # pragma: no cover
900 return
901
902 # Test that the artifact is valid
903 try:
904 with ZipFile(self._artifact) as f:
905 if f.testzip(): # pragma: no cover
906 raise BadZipFile
907
908 # Turns out that testzip() doesn't work quite well
909 # doing the cache initialization here instead
910 # will give us more coverage of the artifact.
911
912 # Cache the files inside each keypath
913 self._access_cache = {}
914 with ZipFile(self._artifact) as zf:
915 namelist = zf.namelist()
916 # Iterate over files in the artifact
917 for filename in namelist:
918 fileinfo = zf.getinfo(filename)
919 if fileinfo.is_dir():
920 self._access_cache[PurePath(filename)] = set()
921 else:
922 # Get the keypath from the filename
923 keypath = PurePath(filename).parent
924 # Add the file to the keypath
925 self._access_cache[keypath].add(PurePath(filename))
926 except BadZipFile:
927 warnings.warn(
928 "The downloaded artifact from GitHub is invalid. "
929 "This could be because the artifact was corrupted, "
930 "or because the artifact was not created by Hypothesis. ",
931 HypothesisWarning,
932 stacklevel=3,
933 )
934 self._disabled = True
935
936 self._initialized = True
937
938 def _initialize_db(self) -> None:
939 # Trigger warning that we suppressed earlier by intent_to_write=False
940 storage_directory(self.path.name)
941 # Create the cache directory if it doesn't exist
942 self.path.mkdir(exist_ok=True, parents=True)
943
944 # Get all artifacts
945 cached_artifacts = sorted(
946 self.path.glob("*.zip"),
947 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")),
948 )
949
950 # Remove all but the latest artifact
951 for artifact in cached_artifacts[:-1]:
952 artifact.unlink()
953
954 try:
955 found_artifact = cached_artifacts[-1]
956 except IndexError:
957 found_artifact = None
958
959 # Check if the latest artifact is a cache hit
960 if found_artifact is not None and (
961 datetime.now(timezone.utc)
962 - datetime.fromisoformat(found_artifact.stem.replace("_", ":"))
963 < self.cache_timeout
964 ):
965 self._artifact = found_artifact
966 else:
967 # Download the latest artifact from GitHub
968 new_artifact = self._fetch_artifact()
969
970 if new_artifact:
971 if found_artifact is not None:
972 found_artifact.unlink()
973 self._artifact = new_artifact
974 elif found_artifact is not None:
975 warnings.warn(
976 "Using an expired artifact as a fallback for the database: "
977 f"{found_artifact}",
978 HypothesisWarning,
979 stacklevel=2,
980 )
981 self._artifact = found_artifact
982 else:
983 warnings.warn(
984 "Couldn't acquire a new or existing artifact. Disabling database.",
985 HypothesisWarning,
986 stacklevel=2,
987 )
988 self._disabled = True
989 return
990
991 self._prepare_for_io()
992
993 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover
994 request = Request(
995 url,
996 headers={
997 "Accept": "application/vnd.github+json",
998 "X-GitHub-Api-Version": "2022-11-28 ",
999 "Authorization": f"Bearer {self.token}",
1000 },
1001 )
1002 warning_message = None
1003 response_bytes: Optional[bytes] = None
1004 try:
1005 with urlopen(request) as response:
1006 response_bytes = response.read()
1007 except HTTPError as e:
1008 if e.code == 401:
1009 warning_message = (
1010 "Authorization failed when trying to download artifact from GitHub. "
1011 "Check that you have a valid GITHUB_TOKEN set in your environment."
1012 )
1013 else:
1014 warning_message = (
1015 "Could not get the latest artifact from GitHub. "
1016 "This could be because because the repository "
1017 "or artifact does not exist. "
1018 )
1019 # see https://github.com/python/cpython/issues/128734
1020 e.close()
1021 except URLError:
1022 warning_message = "Could not connect to GitHub to get the latest artifact. "
1023 except TimeoutError:
1024 warning_message = (
1025 "Could not connect to GitHub to get the latest artifact "
1026 "(connection timed out)."
1027 )
1028
1029 if warning_message is not None:
1030 warnings.warn(warning_message, HypothesisWarning, stacklevel=4)
1031 return None
1032
1033 return response_bytes
1034
1035 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover
1036 # Get the list of artifacts from GitHub
1037 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts"
1038 response_bytes = self._get_bytes(url)
1039 if response_bytes is None:
1040 return None
1041
1042 artifacts = json.loads(response_bytes)["artifacts"]
1043 artifacts = [a for a in artifacts if a["name"] == self.artifact_name]
1044
1045 if not artifacts:
1046 return None
1047
1048 # Get the latest artifact from the list
1049 artifact = max(artifacts, key=lambda a: a["created_at"])
1050 url = artifact["archive_download_url"]
1051
1052 # Download the artifact
1053 artifact_bytes = self._get_bytes(url)
1054 if artifact_bytes is None:
1055 return None
1056
1057 # Save the artifact to the cache
1058 # We replace ":" with "_" to ensure the filenames are compatible
1059 # with Windows filesystems
1060 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_")
1061 artifact_path = self.path / f"{timestamp}.zip"
1062 try:
1063 artifact_path.write_bytes(artifact_bytes)
1064 except OSError:
1065 warnings.warn(
1066 "Could not save the latest artifact from GitHub. ",
1067 HypothesisWarning,
1068 stacklevel=3,
1069 )
1070 return None
1071
1072 return artifact_path
1073
1074 @staticmethod
1075 @lru_cache
1076 def _key_path(key: bytes) -> PurePath:
1077 return PurePath(_hash(key) + "/")
1078
1079 def fetch(self, key: bytes) -> Iterable[bytes]:
1080 if self._disabled:
1081 return
1082
1083 if not self._initialized:
1084 self._initialize_db()
1085 if self._disabled:
1086 return
1087
1088 assert self._artifact is not None
1089 assert self._access_cache is not None
1090
1091 kp = self._key_path(key)
1092
1093 with ZipFile(self._artifact) as zf:
1094 # Get the all files in the the kp from the cache
1095 filenames = self._access_cache.get(kp, ())
1096 for filename in filenames:
1097 with zf.open(filename.as_posix()) as f:
1098 yield f.read()
1099
1100 # Read-only interface
1101 def save(self, key: bytes, value: bytes) -> None:
1102 raise RuntimeError(self._read_only_message)
1103
1104 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1105 raise RuntimeError(self._read_only_message)
1106
1107 def delete(self, key: bytes, value: bytes) -> None:
1108 raise RuntimeError(self._read_only_message)
1109
1110
1111class BackgroundWriteDatabase(ExampleDatabase):
1112 """A wrapper which defers writes on the given database to a background thread.
1113
1114 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any
1115 enqueued writes to finish before fetching from the database.
1116 """
1117
1118 def __init__(self, db: ExampleDatabase) -> None:
1119 super().__init__()
1120 self._db = db
1121 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue()
1122 self._thread: Optional[Thread] = None
1123
1124 def _ensure_thread(self):
1125 if self._thread is None:
1126 self._thread = Thread(target=self._worker, daemon=True)
1127 self._thread.start()
1128 # avoid an unbounded timeout during gc. 0.1 should be plenty for most
1129 # use cases.
1130 weakref.finalize(self, self._join, 0.1)
1131
1132 def __repr__(self) -> str:
1133 return f"BackgroundWriteDatabase({self._db!r})"
1134
1135 def __eq__(self, other: object) -> bool:
1136 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db
1137
1138 def _worker(self) -> None:
1139 while True:
1140 method, args = self._queue.get()
1141 getattr(self._db, method)(*args)
1142 self._queue.task_done()
1143
1144 def _join(self, timeout: Optional[float] = None) -> None:
1145 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634
1146 with self._queue.all_tasks_done:
1147 while self._queue.unfinished_tasks:
1148 self._queue.all_tasks_done.wait(timeout)
1149
1150 def fetch(self, key: bytes) -> Iterable[bytes]:
1151 self._join()
1152 return self._db.fetch(key)
1153
1154 def save(self, key: bytes, value: bytes) -> None:
1155 self._ensure_thread()
1156 self._queue.put(("save", (key, value)))
1157
1158 def delete(self, key: bytes, value: bytes) -> None:
1159 self._ensure_thread()
1160 self._queue.put(("delete", (key, value)))
1161
1162 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1163 self._ensure_thread()
1164 self._queue.put(("move", (src, dest, value)))
1165
1166 def _start_listening(self) -> None:
1167 self._db.add_listener(self._broadcast_change)
1168
1169 def _stop_listening(self) -> None:
1170 self._db.remove_listener(self._broadcast_change)
1171
1172
1173def _pack_uleb128(value: int) -> bytes:
1174 """
1175 Serialize an integer into variable-length bytes. For each byte, the first 7
1176 bits represent (part of) the integer, while the last bit indicates whether the
1177 integer continues into the next byte.
1178
1179 https://en.wikipedia.org/wiki/LEB128
1180 """
1181 parts = bytearray()
1182 assert value >= 0
1183 while True:
1184 # chop off 7 bits
1185 byte = value & ((1 << 7) - 1)
1186 value >>= 7
1187 # set the continuation bit if we have more left
1188 if value:
1189 byte |= 1 << 7
1190
1191 parts.append(byte)
1192 if not value:
1193 break
1194 return bytes(parts)
1195
1196
1197def _unpack_uleb128(buffer: bytes) -> tuple[int, int]:
1198 """
1199 Inverts _pack_uleb128, and also returns the index at which at which we stopped
1200 reading.
1201 """
1202 value = 0
1203 for i, byte in enumerate(buffer):
1204 n = byte & ((1 << 7) - 1)
1205 value |= n << (i * 7)
1206
1207 if not byte >> 7:
1208 break
1209 return (i + 1, value)
1210
1211
1212def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes:
1213 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes."""
1214 # We use a custom serialization format for this, which might seem crazy - but our
1215 # data is a flat sequence of elements, and standard tools like protobuf or msgpack
1216 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode.
1217 #
1218 # We simply encode each element with a metadata byte, if needed a uint16 size, and
1219 # then the payload bytes. For booleans, the payload is inlined into the metadata.
1220 parts = []
1221 for choice in choices:
1222 if isinstance(choice, bool):
1223 # `000_0000v` - tag zero, low bit payload.
1224 parts.append(b"\1" if choice else b"\0")
1225 continue
1226
1227 # `tag_ssss [uint16 size?] [payload]`
1228 if isinstance(choice, float):
1229 tag = 1 << 5
1230 choice = struct.pack("!d", choice)
1231 elif isinstance(choice, int):
1232 tag = 2 << 5
1233 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True)
1234 elif isinstance(choice, bytes):
1235 tag = 3 << 5
1236 else:
1237 assert isinstance(choice, str)
1238 tag = 4 << 5
1239 choice = choice.encode(errors="surrogatepass")
1240
1241 size = len(choice)
1242 if size < 0b11111:
1243 parts.append((tag | size).to_bytes(1, "big"))
1244 else:
1245 parts.append((tag | 0b11111).to_bytes(1, "big"))
1246 parts.append(_pack_uleb128(size))
1247 parts.append(choice)
1248
1249 return b"".join(parts)
1250
1251
1252def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]:
1253 # See above for an explanation of the format.
1254 parts: list[ChoiceT] = []
1255 idx = 0
1256 while idx < len(buffer):
1257 tag = buffer[idx] >> 5
1258 size = buffer[idx] & 0b11111
1259 idx += 1
1260
1261 if tag == 0:
1262 parts.append(bool(size))
1263 continue
1264 if size == 0b11111:
1265 (offset, size) = _unpack_uleb128(buffer[idx:])
1266 idx += offset
1267 chunk = buffer[idx : idx + size]
1268 idx += size
1269
1270 if tag == 1:
1271 assert size == 8, "expected float64"
1272 parts.extend(struct.unpack("!d", chunk))
1273 elif tag == 2:
1274 parts.append(int.from_bytes(chunk, "big", signed=True))
1275 elif tag == 3:
1276 parts.append(chunk)
1277 else:
1278 assert tag == 4
1279 parts.append(chunk.decode(errors="surrogatepass"))
1280 return tuple(parts)
1281
1282
1283def choices_from_bytes(buffer: bytes, /) -> Optional[tuple[ChoiceT, ...]]:
1284 """
1285 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes.
1286
1287 Returns None if the given bytestring is not a valid serialization of choice
1288 sequences.
1289 """
1290 try:
1291 return _choices_from_bytes(buffer)
1292 except Exception:
1293 # deserialization error, eg because our format changed or someone put junk
1294 # data in the db.
1295 return None