1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import abc
12import errno
13import json
14import os
15import struct
16import sys
17import tempfile
18import warnings
19import weakref
20from collections.abc import Iterable
21from datetime import datetime, timedelta, timezone
22from functools import lru_cache
23from hashlib import sha384
24from os import PathLike, getenv
25from pathlib import Path, PurePath
26from queue import Queue
27from threading import Thread
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 Callable,
32 ClassVar,
33 Literal,
34 Optional,
35 Union,
36 cast,
37)
38from urllib.error import HTTPError, URLError
39from urllib.request import Request, urlopen
40from zipfile import BadZipFile, ZipFile
41
42from hypothesis._settings import note_deprecation
43from hypothesis.configuration import storage_directory
44from hypothesis.errors import HypothesisException, HypothesisWarning
45from hypothesis.internal.conjecture.choice import ChoiceT
46from hypothesis.utils.conventions import UniqueIdentifier, not_set
47
48__all__ = [
49 "DirectoryBasedExampleDatabase",
50 "ExampleDatabase",
51 "GitHubArtifactDatabase",
52 "InMemoryExampleDatabase",
53 "MultiplexedDatabase",
54 "ReadOnlyDatabase",
55]
56
57if TYPE_CHECKING:
58 from typing import TypeAlias
59
60 from watchdog.observers.api import BaseObserver
61
62StrPathT: "TypeAlias" = Union[str, PathLike[str]]
63SaveDataT: "TypeAlias" = tuple[bytes, bytes] # key, value
64DeleteDataT: "TypeAlias" = tuple[bytes, Optional[bytes]] # key, value
65ListenerEventT: "TypeAlias" = Union[
66 tuple[Literal["save"], SaveDataT], tuple[Literal["delete"], DeleteDataT]
67]
68ListenerT: "TypeAlias" = Callable[[ListenerEventT], Any]
69
70
71def _usable_dir(path: StrPathT) -> bool:
72 """
73 Returns True if the desired path can be used as database path because
74 either the directory exists and can be used, or its root directory can
75 be used and we can make the directory as needed.
76 """
77 path = Path(path)
78 try:
79 while not path.exists():
80 # Loop terminates because the root dir ('/' on unix) always exists.
81 path = path.parent
82 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK)
83 except PermissionError:
84 return False
85
86
87def _db_for_path(
88 path: Optional[Union[StrPathT, UniqueIdentifier, Literal[":memory:"]]] = None,
89) -> "ExampleDatabase":
90 if path is not_set:
91 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover
92 raise HypothesisException(
93 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any "
94 "effect. Configure your database location via a settings profile instead.\n"
95 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles"
96 )
97
98 path = storage_directory("examples", intent_to_write=False)
99 if not _usable_dir(path): # pragma: no cover
100 warnings.warn(
101 "The database setting is not configured, and the default "
102 "location is unusable - falling back to an in-memory "
103 f"database for this session. {path=}",
104 HypothesisWarning,
105 stacklevel=3,
106 )
107 return InMemoryExampleDatabase()
108 if path in (None, ":memory:"):
109 return InMemoryExampleDatabase()
110 path = cast(StrPathT, path)
111 return DirectoryBasedExampleDatabase(path)
112
113
114class _EDMeta(abc.ABCMeta):
115 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase":
116 if self is ExampleDatabase:
117 note_deprecation(
118 "Creating a database using the abstract ExampleDatabase() class "
119 "is deprecated. Prefer using a concrete subclass, like "
120 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). "
121 'In particular, the special string ExampleDatabase(":memory:") '
122 "should be replaced by InMemoryExampleDatabase().",
123 since="2025-04-07",
124 has_codemod=False,
125 )
126 return _db_for_path(*args, **kwargs)
127 return super().__call__(*args, **kwargs)
128
129
130# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase
131# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx
132# maintains a list of metaclass-call-methods to ignore, and while they would prefer
133# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we
134# can insert ourselves here.
135#
136# This code only runs if Sphinx has already been imported; and it would live in our
137# docs/conf.py except that we would also like it to work for anyone documenting
138# downstream ExampleDatabase subclasses too.
139if "sphinx" in sys.modules:
140 try:
141 from sphinx.ext.autodoc import _METACLASS_CALL_BLACKLIST
142
143 _METACLASS_CALL_BLACKLIST.append("hypothesis.database._EDMeta.__call__")
144 except Exception:
145 pass
146
147
148class ExampleDatabase(metaclass=_EDMeta):
149 """
150 A Hypothesis database, for use in |settings.database|.
151
152 Hypothesis automatically saves failures to the database set in
153 |settings.database|. The next time the test is run, Hypothesis will replay
154 any failures from the database in |settings.database| for that test (in
155 |Phase.reuse|).
156
157 The database is best thought of as a cache that you never need to invalidate.
158 Entries may be transparently dropped when upgrading your Hypothesis version
159 or changing your test. Do not rely on the database for correctness; to ensure
160 Hypothesis always tries an input, use |@example|.
161
162 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis
163 provides several concrete database subclasses. To write your own database class,
164 see :doc:`/how-to/custom-database`.
165
166 Change listening
167 ----------------
168
169 An optional extension to |ExampleDatabase| is change listening. On databases
170 which support change listening, calling |ExampleDatabase.add_listener| adds
171 a function as a change listener, which will be called whenever a value is
172 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener|
173 for details.
174
175 All databases in Hypothesis support change listening. Custom database classes
176 are not required to support change listening, though they will not be compatible
177 with features that require change listening until they do so.
178
179 .. note::
180
181 While no Hypothesis features currently require change listening, change
182 listening is required by `HypoFuzz <https://hypofuzz.com/>`_.
183
184 Database methods
185 ----------------
186
187 Required methods:
188
189 * |ExampleDatabase.save|
190 * |ExampleDatabase.fetch|
191 * |ExampleDatabase.delete|
192
193 Optional methods:
194
195 * |ExampleDatabase.move|
196
197 Change listening methods:
198
199 * |ExampleDatabase.add_listener|
200 * |ExampleDatabase.remove_listener|
201 * |ExampleDatabase.clear_listeners|
202 * |ExampleDatabase._start_listening|
203 * |ExampleDatabase._stop_listening|
204 * |ExampleDatabase._broadcast_change|
205 """
206
207 def __init__(self) -> None:
208 self._listeners: list[ListenerT] = []
209
210 @abc.abstractmethod
211 def save(self, key: bytes, value: bytes) -> None:
212 """Save ``value`` under ``key``.
213
214 If ``value`` is already present in ``key``, silently do nothing.
215 """
216 raise NotImplementedError(f"{type(self).__name__}.save")
217
218 @abc.abstractmethod
219 def fetch(self, key: bytes) -> Iterable[bytes]:
220 """Return an iterable over all values matching this key."""
221 raise NotImplementedError(f"{type(self).__name__}.fetch")
222
223 @abc.abstractmethod
224 def delete(self, key: bytes, value: bytes) -> None:
225 """Remove ``value`` from ``key``.
226
227 If ``value`` is not present in ``key``, silently do nothing.
228 """
229 raise NotImplementedError(f"{type(self).__name__}.delete")
230
231 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
232 """
233 Move ``value`` from key ``src`` to key ``dest``.
234
235 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``,
236 but may have a more efficient implementation.
237
238 Note that ``value`` will be inserted at ``dest`` regardless of whether
239 it is currently present at ``src``.
240 """
241 if src == dest:
242 self.save(src, value)
243 return
244 self.delete(src, value)
245 self.save(dest, value)
246
247 def add_listener(self, f: ListenerT, /) -> None:
248 """
249 Add a change listener. ``f`` will be called whenever a value is saved,
250 deleted, or moved in the database.
251
252 ``f`` can be called with two different event values:
253
254 * ``("save", (key, value))``
255 * ``("delete", (key, value))``
256
257 where ``key`` and ``value`` are both ``bytes``.
258
259 There is no ``move`` event. Instead, a move is broadcasted as a
260 ``delete`` event followed by a ``save`` event.
261
262 For the ``delete`` event, ``value`` may be ``None``. This might occur if
263 the database knows that a deletion has occurred in ``key``, but does not
264 know what value was deleted.
265 """
266 had_listeners = bool(self._listeners)
267 self._listeners.append(f)
268 if not had_listeners:
269 self._start_listening()
270
271 def remove_listener(self, f: ListenerT, /) -> None:
272 """
273 Removes ``f`` from the list of change listeners.
274
275 If ``f`` is not in the list of change listeners, silently do nothing.
276 """
277 if f not in self._listeners:
278 return
279 self._listeners.remove(f)
280 if not self._listeners:
281 self._stop_listening()
282
283 def clear_listeners(self) -> None:
284 """Remove all change listeners."""
285 had_listeners = bool(self._listeners)
286 self._listeners.clear()
287 if had_listeners:
288 self._stop_listening()
289
290 def _broadcast_change(self, event: ListenerEventT) -> None:
291 """
292 Called when a value has been either added to or deleted from a key in
293 the underlying database store. The possible values for ``event`` are:
294
295 * ``("save", (key, value))``
296 * ``("delete", (key, value))``
297
298 ``value`` may be ``None`` for the ``delete`` event, indicating we know
299 that some value was deleted under this key, but not its exact value.
300
301 Note that you should not assume your instance is the only reference to
302 the underlying database store. For example, if two instances of
303 |DirectoryBasedExampleDatabase| reference the same directory,
304 _broadcast_change should be called whenever a file is added or removed
305 from the directory, even if that database was not responsible for
306 changing the file.
307 """
308 for listener in self._listeners:
309 listener(event)
310
311 def _start_listening(self) -> None:
312 """
313 Called when the database adds a change listener, and did not previously
314 have any change listeners. Intended to allow databases to wait to start
315 expensive listening operations until necessary.
316
317 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate,
318 so you do not need to handle the case of multiple consecutive
319 ``_start_listening`` calls without an intermediate ``_stop_listening``
320 call.
321 """
322 warnings.warn(
323 f"{self.__class__} does not support listening for changes",
324 HypothesisWarning,
325 stacklevel=4,
326 )
327
328 def _stop_listening(self) -> None:
329 """
330 Called whenever no change listeners remain on the database.
331
332 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate,
333 so you do not need to handle the case of multiple consecutive
334 ``_stop_listening`` calls without an intermediate ``_start_listening``
335 call.
336 """
337 warnings.warn(
338 f"{self.__class__} does not support stopping listening for changes",
339 HypothesisWarning,
340 stacklevel=4,
341 )
342
343
344class InMemoryExampleDatabase(ExampleDatabase):
345 """A non-persistent example database, implemented in terms of an in-memory
346 dictionary.
347
348 This can be useful if you call a test function several times in a single
349 session, or for testing other database implementations, but because it
350 does not persist between runs we do not recommend it for general use.
351 """
352
353 def __init__(self) -> None:
354 super().__init__()
355 self.data: dict[bytes, set[bytes]] = {}
356
357 def __repr__(self) -> str:
358 return f"InMemoryExampleDatabase({self.data!r})"
359
360 def __eq__(self, other: object) -> bool:
361 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data
362
363 def fetch(self, key: bytes) -> Iterable[bytes]:
364 yield from self.data.get(key, ())
365
366 def save(self, key: bytes, value: bytes) -> None:
367 value = bytes(value)
368 values = self.data.setdefault(key, set())
369 changed = value not in values
370 values.add(value)
371
372 if changed:
373 self._broadcast_change(("save", (key, value)))
374
375 def delete(self, key: bytes, value: bytes) -> None:
376 value = bytes(value)
377 values = self.data.get(key, set())
378 changed = value in values
379 values.discard(value)
380
381 if changed:
382 self._broadcast_change(("delete", (key, value)))
383
384 def _start_listening(self) -> None:
385 # declare compatibility with the listener api, but do the actual
386 # implementation in .delete and .save, since we know we are the only
387 # writer to .data.
388 pass
389
390 def _stop_listening(self) -> None:
391 pass
392
393
394def _hash(key: bytes) -> str:
395 return sha384(key).hexdigest()[:16]
396
397
398class DirectoryBasedExampleDatabase(ExampleDatabase):
399 """Use a directory to store Hypothesis examples as files.
400
401 Each test corresponds to a directory, and each example to a file within that
402 directory. While the contents are fairly opaque, a
403 |DirectoryBasedExampleDatabase| can be shared by checking the directory
404 into version control, for example with the following ``.gitignore``::
405
406 # Ignore files cached by Hypothesis...
407 .hypothesis/*
408 # except for the examples directory
409 !.hypothesis/examples/
410
411 Note however that this only makes sense if you also pin to an exact version of
412 Hypothesis, and we would usually recommend implementing a shared database with
413 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper.
414 """
415
416 # we keep a database entry of the full values of all the database keys.
417 # currently only used for inverse mapping of hash -> key in change listening.
418 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys"
419 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name)
420
421 def __init__(self, path: StrPathT) -> None:
422 super().__init__()
423 self.path = Path(path)
424 self.keypaths: dict[bytes, Path] = {}
425 self._observer: BaseObserver | None = None
426
427 def __repr__(self) -> str:
428 return f"DirectoryBasedExampleDatabase({self.path!r})"
429
430 def __eq__(self, other: object) -> bool:
431 return (
432 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path
433 )
434
435 def _key_path(self, key: bytes) -> Path:
436 try:
437 return self.keypaths[key]
438 except KeyError:
439 pass
440 self.keypaths[key] = self.path / _hash(key)
441 return self.keypaths[key]
442
443 def _value_path(self, key: bytes, value: bytes) -> Path:
444 return self._key_path(key) / _hash(value)
445
446 def fetch(self, key: bytes) -> Iterable[bytes]:
447 kp = self._key_path(key)
448 if not kp.is_dir():
449 return
450
451 try:
452 for path in os.listdir(kp):
453 try:
454 yield (kp / path).read_bytes()
455 except OSError:
456 pass
457 except OSError: # pragma: no cover
458 # the `kp` directory might have been deleted in the meantime
459 pass
460
461 def save(self, key: bytes, value: bytes) -> None:
462 key_path = self._key_path(key)
463 if key_path.name != self._metakeys_hash:
464 # add this key to our meta entry of all keys - taking care to avoid
465 # infinite recursion.
466 self.save(self._metakeys_name, key)
467
468 # Note: we attempt to create the dir in question now. We
469 # already checked for permissions, but there can still be other issues,
470 # e.g. the disk is full, or permissions might have been changed.
471 try:
472 key_path.mkdir(exist_ok=True, parents=True)
473 path = self._value_path(key, value)
474 if not path.exists():
475 # to mimic an atomic write, create and write in a temporary
476 # directory, and only move to the final path after. This avoids
477 # any intermediate state where the file is created (and empty)
478 # but not yet written to.
479 fd, tmpname = tempfile.mkstemp()
480 tmppath = Path(tmpname)
481 os.write(fd, value)
482 os.close(fd)
483 try:
484 tmppath.rename(path)
485 except OSError as err: # pragma: no cover
486 if err.errno == errno.EXDEV:
487 # Can't rename across filesystem boundaries, see e.g.
488 # https://github.com/HypothesisWorks/hypothesis/issues/4335
489 try:
490 path.write_bytes(tmppath.read_bytes())
491 except OSError:
492 pass
493 tmppath.unlink()
494 assert not tmppath.exists()
495 except OSError: # pragma: no cover
496 pass
497
498 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
499 if src == dest:
500 self.save(src, value)
501 return
502
503 src_path = self._value_path(src, value)
504 dest_path = self._value_path(dest, value)
505 # if the dest key path does not exist, os.renames will create it for us,
506 # and we will never track its creation in the meta keys entry. Do so now.
507 if not self._key_path(dest).exists():
508 self.save(self._metakeys_name, dest)
509
510 try:
511 os.renames(src_path, dest_path)
512 except OSError:
513 self.delete(src, value)
514 self.save(dest, value)
515
516 def delete(self, key: bytes, value: bytes) -> None:
517 try:
518 self._value_path(key, value).unlink()
519 except OSError:
520 return
521
522 # try deleting the key dir, which will only succeed if the dir is empty
523 # (i.e. ``value`` was the last value in this key).
524 try:
525 self._key_path(key).rmdir()
526 except OSError:
527 pass
528 else:
529 # if the deletion succeeded, also delete this key entry from metakeys.
530 # (if this key happens to be the metakey itself, this deletion will
531 # fail; that's ok and faster than checking for this rare case.)
532 self.delete(self._metakeys_name, key)
533
534 def _start_listening(self) -> None:
535 try:
536 from watchdog.events import (
537 DirCreatedEvent,
538 DirDeletedEvent,
539 DirMovedEvent,
540 FileCreatedEvent,
541 FileDeletedEvent,
542 FileMovedEvent,
543 FileSystemEventHandler,
544 )
545 from watchdog.observers import Observer
546 except ImportError:
547 warnings.warn(
548 f"listening for changes in a {self.__class__.__name__} "
549 "requires the watchdog library. To install, run "
550 "`pip install hypothesis[watchdog]`",
551 HypothesisWarning,
552 stacklevel=4,
553 )
554 return
555
556 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)}
557 _metakeys_hash = self._metakeys_hash
558 _broadcast_change = self._broadcast_change
559
560 class Handler(FileSystemEventHandler):
561 def on_created(
562 _self, event: Union[FileCreatedEvent, DirCreatedEvent]
563 ) -> None:
564 # we only registered for the file creation event
565 assert not isinstance(event, DirCreatedEvent)
566 # watchdog events are only bytes if we passed a byte path to
567 # .schedule
568 assert isinstance(event.src_path, str)
569
570 value_path = Path(event.src_path)
571 # the parent dir represents the key, and its name is the key hash
572 key_hash = value_path.parent.name
573
574 if key_hash == _metakeys_hash:
575 try:
576 hash_to_key[value_path.name] = value_path.read_bytes()
577 except OSError: # pragma: no cover
578 # this might occur if all the values in a key have been
579 # deleted and DirectoryBasedExampleDatabase removes its
580 # metakeys entry (which is `value_path` here`).
581 pass
582 return
583
584 key = hash_to_key.get(key_hash)
585 if key is None: # pragma: no cover
586 # we didn't recognize this key. This shouldn't ever happen,
587 # but some race condition trickery might cause this.
588 return
589
590 try:
591 value = value_path.read_bytes()
592 except OSError: # pragma: no cover
593 return
594
595 _broadcast_change(("save", (key, value)))
596
597 def on_deleted(
598 self, event: Union[FileDeletedEvent, DirDeletedEvent]
599 ) -> None:
600 assert not isinstance(event, DirDeletedEvent)
601 assert isinstance(event.src_path, str)
602
603 value_path = Path(event.src_path)
604 key = hash_to_key.get(value_path.parent.name)
605 if key is None: # pragma: no cover
606 return
607
608 _broadcast_change(("delete", (key, None)))
609
610 def on_moved(self, event: Union[FileMovedEvent, DirMovedEvent]) -> None:
611 assert not isinstance(event, DirMovedEvent)
612 assert isinstance(event.src_path, str)
613 assert isinstance(event.dest_path, str)
614
615 src_path = Path(event.src_path)
616 dest_path = Path(event.dest_path)
617 k1 = hash_to_key.get(src_path.parent.name)
618 k2 = hash_to_key.get(dest_path.parent.name)
619
620 if k1 is None or k2 is None: # pragma: no cover
621 return
622
623 try:
624 value = dest_path.read_bytes()
625 except OSError: # pragma: no cover
626 return
627
628 _broadcast_change(("delete", (k1, value)))
629 _broadcast_change(("save", (k2, value)))
630
631 # If we add a listener to a DirectoryBasedExampleDatabase whose database
632 # directory doesn't yet exist, the watchdog observer will not fire any
633 # events, even after the directory gets created.
634 #
635 # Ensure the directory exists before starting the observer.
636 self.path.mkdir(exist_ok=True, parents=True)
637 self._observer = Observer()
638 self._observer.schedule(
639 Handler(),
640 # remove type: ignore when released
641 # https://github.com/gorakhargosh/watchdog/pull/1096
642 self.path, # type: ignore
643 recursive=True,
644 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent],
645 )
646 self._observer.start()
647
648 def _stop_listening(self) -> None:
649 assert self._observer is not None
650 self._observer.stop()
651 self._observer.join()
652 self._observer = None
653
654
655class ReadOnlyDatabase(ExampleDatabase):
656 """A wrapper to make the given database read-only.
657
658 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and
659 ``move`` into silent no-ops.
660
661 Note that this disables Hypothesis' automatic discarding of stale examples.
662 It is designed to allow local machines to access a shared database (e.g. from CI
663 servers), without propagating changes back from a local or in-development branch.
664 """
665
666 def __init__(self, db: ExampleDatabase) -> None:
667 super().__init__()
668 assert isinstance(db, ExampleDatabase)
669 self._wrapped = db
670
671 def __repr__(self) -> str:
672 return f"ReadOnlyDatabase({self._wrapped!r})"
673
674 def __eq__(self, other: object) -> bool:
675 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped
676
677 def fetch(self, key: bytes) -> Iterable[bytes]:
678 yield from self._wrapped.fetch(key)
679
680 def save(self, key: bytes, value: bytes) -> None:
681 pass
682
683 def delete(self, key: bytes, value: bytes) -> None:
684 pass
685
686 def _start_listening(self) -> None:
687 # we're read only, so there are no changes to broadcast.
688 pass
689
690 def _stop_listening(self) -> None:
691 pass
692
693
694class MultiplexedDatabase(ExampleDatabase):
695 """A wrapper around multiple databases.
696
697 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against
698 all of the wrapped databases. ``fetch`` does not yield duplicate values, even
699 if the same value is present in two or more of the wrapped databases.
700
701 This combines well with a :class:`ReadOnlyDatabase`, as follows:
702
703 .. code-block:: python
704
705 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/")
706 shared = CustomNetworkDatabase()
707
708 settings.register_profile("ci", database=shared)
709 settings.register_profile(
710 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared))
711 )
712 settings.load_profile("ci" if os.environ.get("CI") else "dev")
713
714 So your CI system or fuzzing runs can populate a central shared database;
715 while local runs on development machines can reproduce any failures from CI
716 but will only cache their own failures locally and cannot remove examples
717 from the shared database.
718 """
719
720 def __init__(self, *dbs: ExampleDatabase) -> None:
721 super().__init__()
722 assert all(isinstance(db, ExampleDatabase) for db in dbs)
723 self._wrapped = dbs
724
725 def __repr__(self) -> str:
726 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped)))
727
728 def __eq__(self, other: object) -> bool:
729 return (
730 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped
731 )
732
733 def fetch(self, key: bytes) -> Iterable[bytes]:
734 seen = set()
735 for db in self._wrapped:
736 for value in db.fetch(key):
737 if value not in seen:
738 yield value
739 seen.add(value)
740
741 def save(self, key: bytes, value: bytes) -> None:
742 for db in self._wrapped:
743 db.save(key, value)
744
745 def delete(self, key: bytes, value: bytes) -> None:
746 for db in self._wrapped:
747 db.delete(key, value)
748
749 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
750 for db in self._wrapped:
751 db.move(src, dest, value)
752
753 def _start_listening(self) -> None:
754 for db in self._wrapped:
755 db.add_listener(self._broadcast_change)
756
757 def _stop_listening(self) -> None:
758 for db in self._wrapped:
759 db.remove_listener(self._broadcast_change)
760
761
762class GitHubArtifactDatabase(ExampleDatabase):
763 """
764 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact.
765
766 You can use this for sharing example databases between CI runs and developers, allowing
767 the latter to get read-only access to the former. This is particularly useful for
768 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_),
769 where the CI system can help find new failing examples through fuzzing,
770 and developers can reproduce them locally without any manual effort.
771
772 .. note::
773 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides
774 this automatically, but it needs to be set manually for local usage. In a developer machine,
775 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_.
776 If the repository is private, it's necessary for the token to have ``repo`` scope
777 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token.
778
779
780 In most cases, this will be used
781 through the :class:`~hypothesis.database.MultiplexedDatabase`,
782 by combining a local directory-based database with this one. For example:
783
784 .. code-block:: python
785
786 local = DirectoryBasedExampleDatabase(".hypothesis/examples")
787 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo"))
788
789 settings.register_profile("ci", database=local)
790 settings.register_profile("dev", database=MultiplexedDatabase(local, shared))
791 # We don't want to use the shared database in CI, only to populate its local one.
792 # which the workflow should then upload as an artifact.
793 settings.load_profile("ci" if os.environ.get("CI") else "dev")
794
795 .. note::
796 Because this database is read-only, you always need to wrap it with the
797 :class:`ReadOnlyDatabase`.
798
799 A setup like this can be paired with a GitHub Actions workflow including
800 something like the following:
801
802 .. code-block:: yaml
803
804 - name: Download example database
805 uses: dawidd6/action-download-artifact@v9
806 with:
807 name: hypothesis-example-db
808 path: .hypothesis/examples
809 if_no_artifact_found: warn
810 workflow_conclusion: completed
811
812 - name: Run tests
813 run: pytest
814
815 - name: Upload example database
816 uses: actions/upload-artifact@v3
817 if: always()
818 with:
819 name: hypothesis-example-db
820 path: .hypothesis/examples
821
822 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_
823 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_
824 does not support downloading artifacts from previous workflow runs.
825
826 The database automatically implements a simple file-based cache with a default expiration period
827 of 1 day. You can adjust this through the ``cache_timeout`` property.
828
829 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``).
830 """
831
832 def __init__(
833 self,
834 owner: str,
835 repo: str,
836 artifact_name: str = "hypothesis-example-db",
837 cache_timeout: timedelta = timedelta(days=1),
838 path: Optional[StrPathT] = None,
839 ):
840 super().__init__()
841 self.owner = owner
842 self.repo = repo
843 self.artifact_name = artifact_name
844 self.cache_timeout = cache_timeout
845
846 # Get the GitHub token from the environment
847 # It's unnecessary to use a token if the repo is public
848 self.token: Optional[str] = getenv("GITHUB_TOKEN")
849
850 if path is None:
851 self.path: Path = Path(
852 storage_directory(f"github-artifacts/{self.artifact_name}/")
853 )
854 else:
855 self.path = Path(path)
856
857 # We don't want to initialize the cache until we need to
858 self._initialized: bool = False
859 self._disabled: bool = False
860
861 # This is the path to the artifact in usage
862 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip
863 self._artifact: Optional[Path] = None
864 # This caches the artifact structure
865 self._access_cache: Optional[dict[PurePath, set[PurePath]]] = None
866
867 # Message to display if user doesn't wrap around ReadOnlyDatabase
868 self._read_only_message = (
869 "This database is read-only. "
870 "Please wrap this class with ReadOnlyDatabase"
871 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))."
872 )
873
874 def __repr__(self) -> str:
875 return (
876 f"GitHubArtifactDatabase(owner={self.owner!r}, "
877 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})"
878 )
879
880 def __eq__(self, other: object) -> bool:
881 return (
882 isinstance(other, GitHubArtifactDatabase)
883 and self.owner == other.owner
884 and self.repo == other.repo
885 and self.artifact_name == other.artifact_name
886 and self.path == other.path
887 )
888
889 def _prepare_for_io(self) -> None:
890 assert self._artifact is not None, "Artifact not loaded."
891
892 if self._initialized: # pragma: no cover
893 return
894
895 # Test that the artifact is valid
896 try:
897 with ZipFile(self._artifact) as f:
898 if f.testzip(): # pragma: no cover
899 raise BadZipFile
900
901 # Turns out that testzip() doesn't work quite well
902 # doing the cache initialization here instead
903 # will give us more coverage of the artifact.
904
905 # Cache the files inside each keypath
906 self._access_cache = {}
907 with ZipFile(self._artifact) as zf:
908 namelist = zf.namelist()
909 # Iterate over files in the artifact
910 for filename in namelist:
911 fileinfo = zf.getinfo(filename)
912 if fileinfo.is_dir():
913 self._access_cache[PurePath(filename)] = set()
914 else:
915 # Get the keypath from the filename
916 keypath = PurePath(filename).parent
917 # Add the file to the keypath
918 self._access_cache[keypath].add(PurePath(filename))
919 except BadZipFile:
920 warnings.warn(
921 "The downloaded artifact from GitHub is invalid. "
922 "This could be because the artifact was corrupted, "
923 "or because the artifact was not created by Hypothesis. ",
924 HypothesisWarning,
925 stacklevel=3,
926 )
927 self._disabled = True
928
929 self._initialized = True
930
931 def _initialize_db(self) -> None:
932 # Trigger warning that we suppressed earlier by intent_to_write=False
933 storage_directory(self.path.name)
934 # Create the cache directory if it doesn't exist
935 self.path.mkdir(exist_ok=True, parents=True)
936
937 # Get all artifacts
938 cached_artifacts = sorted(
939 self.path.glob("*.zip"),
940 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")),
941 )
942
943 # Remove all but the latest artifact
944 for artifact in cached_artifacts[:-1]:
945 artifact.unlink()
946
947 try:
948 found_artifact = cached_artifacts[-1]
949 except IndexError:
950 found_artifact = None
951
952 # Check if the latest artifact is a cache hit
953 if found_artifact is not None and (
954 datetime.now(timezone.utc)
955 - datetime.fromisoformat(found_artifact.stem.replace("_", ":"))
956 < self.cache_timeout
957 ):
958 self._artifact = found_artifact
959 else:
960 # Download the latest artifact from GitHub
961 new_artifact = self._fetch_artifact()
962
963 if new_artifact:
964 if found_artifact is not None:
965 found_artifact.unlink()
966 self._artifact = new_artifact
967 elif found_artifact is not None:
968 warnings.warn(
969 "Using an expired artifact as a fallback for the database: "
970 f"{found_artifact}",
971 HypothesisWarning,
972 stacklevel=2,
973 )
974 self._artifact = found_artifact
975 else:
976 warnings.warn(
977 "Couldn't acquire a new or existing artifact. Disabling database.",
978 HypothesisWarning,
979 stacklevel=2,
980 )
981 self._disabled = True
982 return
983
984 self._prepare_for_io()
985
986 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover
987 request = Request(
988 url,
989 headers={
990 "Accept": "application/vnd.github+json",
991 "X-GitHub-Api-Version": "2022-11-28 ",
992 "Authorization": f"Bearer {self.token}",
993 },
994 )
995 warning_message = None
996 response_bytes: Optional[bytes] = None
997 try:
998 with urlopen(request) as response:
999 response_bytes = response.read()
1000 except HTTPError as e:
1001 if e.code == 401:
1002 warning_message = (
1003 "Authorization failed when trying to download artifact from GitHub. "
1004 "Check that you have a valid GITHUB_TOKEN set in your environment."
1005 )
1006 else:
1007 warning_message = (
1008 "Could not get the latest artifact from GitHub. "
1009 "This could be because because the repository "
1010 "or artifact does not exist. "
1011 )
1012 except URLError:
1013 warning_message = "Could not connect to GitHub to get the latest artifact. "
1014 except TimeoutError:
1015 warning_message = (
1016 "Could not connect to GitHub to get the latest artifact "
1017 "(connection timed out)."
1018 )
1019
1020 if warning_message is not None:
1021 warnings.warn(warning_message, HypothesisWarning, stacklevel=4)
1022 return None
1023
1024 return response_bytes
1025
1026 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover
1027 # Get the list of artifacts from GitHub
1028 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts"
1029 response_bytes = self._get_bytes(url)
1030 if response_bytes is None:
1031 return None
1032
1033 artifacts = json.loads(response_bytes)["artifacts"]
1034 artifacts = [a for a in artifacts if a["name"] == self.artifact_name]
1035
1036 if not artifacts:
1037 return None
1038
1039 # Get the latest artifact from the list
1040 artifact = max(artifacts, key=lambda a: a["created_at"])
1041 url = artifact["archive_download_url"]
1042
1043 # Download the artifact
1044 artifact_bytes = self._get_bytes(url)
1045 if artifact_bytes is None:
1046 return None
1047
1048 # Save the artifact to the cache
1049 # We replace ":" with "_" to ensure the filenames are compatible
1050 # with Windows filesystems
1051 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_")
1052 artifact_path = self.path / f"{timestamp}.zip"
1053 try:
1054 artifact_path.write_bytes(artifact_bytes)
1055 except OSError:
1056 warnings.warn(
1057 "Could not save the latest artifact from GitHub. ",
1058 HypothesisWarning,
1059 stacklevel=3,
1060 )
1061 return None
1062
1063 return artifact_path
1064
1065 @staticmethod
1066 @lru_cache
1067 def _key_path(key: bytes) -> PurePath:
1068 return PurePath(_hash(key) + "/")
1069
1070 def fetch(self, key: bytes) -> Iterable[bytes]:
1071 if self._disabled:
1072 return
1073
1074 if not self._initialized:
1075 self._initialize_db()
1076 if self._disabled:
1077 return
1078
1079 assert self._artifact is not None
1080 assert self._access_cache is not None
1081
1082 kp = self._key_path(key)
1083
1084 with ZipFile(self._artifact) as zf:
1085 # Get the all files in the the kp from the cache
1086 filenames = self._access_cache.get(kp, ())
1087 for filename in filenames:
1088 with zf.open(filename.as_posix()) as f:
1089 yield f.read()
1090
1091 # Read-only interface
1092 def save(self, key: bytes, value: bytes) -> None:
1093 raise RuntimeError(self._read_only_message)
1094
1095 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1096 raise RuntimeError(self._read_only_message)
1097
1098 def delete(self, key: bytes, value: bytes) -> None:
1099 raise RuntimeError(self._read_only_message)
1100
1101
1102class BackgroundWriteDatabase(ExampleDatabase):
1103 """A wrapper which defers writes on the given database to a background thread.
1104
1105 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any
1106 enqueued writes to finish before fetching from the database.
1107 """
1108
1109 def __init__(self, db: ExampleDatabase) -> None:
1110 super().__init__()
1111 self._db = db
1112 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue()
1113 self._thread: Optional[Thread] = None
1114
1115 def _ensure_thread(self):
1116 if self._thread is None:
1117 self._thread = Thread(target=self._worker, daemon=True)
1118 self._thread.start()
1119 # avoid an unbounded timeout during gc. 0.1 should be plenty for most
1120 # use cases.
1121 weakref.finalize(self, self._join, 0.1)
1122
1123 def __repr__(self) -> str:
1124 return f"BackgroundWriteDatabase({self._db!r})"
1125
1126 def __eq__(self, other: object) -> bool:
1127 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db
1128
1129 def _worker(self) -> None:
1130 while True:
1131 method, args = self._queue.get()
1132 getattr(self._db, method)(*args)
1133 self._queue.task_done()
1134
1135 def _join(self, timeout: Optional[float] = None) -> None:
1136 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634
1137 with self._queue.all_tasks_done:
1138 while self._queue.unfinished_tasks:
1139 self._queue.all_tasks_done.wait(timeout)
1140
1141 def fetch(self, key: bytes) -> Iterable[bytes]:
1142 self._join()
1143 return self._db.fetch(key)
1144
1145 def save(self, key: bytes, value: bytes) -> None:
1146 self._ensure_thread()
1147 self._queue.put(("save", (key, value)))
1148
1149 def delete(self, key: bytes, value: bytes) -> None:
1150 self._ensure_thread()
1151 self._queue.put(("delete", (key, value)))
1152
1153 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1154 self._ensure_thread()
1155 self._queue.put(("move", (src, dest, value)))
1156
1157 def _start_listening(self) -> None:
1158 self._db.add_listener(self._broadcast_change)
1159
1160 def _stop_listening(self) -> None:
1161 self._db.remove_listener(self._broadcast_change)
1162
1163
1164def _pack_uleb128(value: int) -> bytes:
1165 """
1166 Serialize an integer into variable-length bytes. For each byte, the first 7
1167 bits represent (part of) the integer, while the last bit indicates whether the
1168 integer continues into the next byte.
1169
1170 https://en.wikipedia.org/wiki/LEB128
1171 """
1172 parts = bytearray()
1173 assert value >= 0
1174 while True:
1175 # chop off 7 bits
1176 byte = value & ((1 << 7) - 1)
1177 value >>= 7
1178 # set the continuation bit if we have more left
1179 if value:
1180 byte |= 1 << 7
1181
1182 parts.append(byte)
1183 if not value:
1184 break
1185 return bytes(parts)
1186
1187
1188def _unpack_uleb128(buffer: bytes) -> tuple[int, int]:
1189 """
1190 Inverts _pack_uleb128, and also returns the index at which at which we stopped
1191 reading.
1192 """
1193 value = 0
1194 for i, byte in enumerate(buffer):
1195 n = byte & ((1 << 7) - 1)
1196 value |= n << (i * 7)
1197
1198 if not byte >> 7:
1199 break
1200 return (i + 1, value)
1201
1202
1203def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes:
1204 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes."""
1205 # We use a custom serialization format for this, which might seem crazy - but our
1206 # data is a flat sequence of elements, and standard tools like protobuf or msgpack
1207 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode.
1208 #
1209 # We simply encode each element with a metadata byte, if needed a uint16 size, and
1210 # then the payload bytes. For booleans, the payload is inlined into the metadata.
1211 parts = []
1212 for choice in choices:
1213 if isinstance(choice, bool):
1214 # `000_0000v` - tag zero, low bit payload.
1215 parts.append(b"\1" if choice else b"\0")
1216 continue
1217
1218 # `tag_ssss [uint16 size?] [payload]`
1219 if isinstance(choice, float):
1220 tag = 1 << 5
1221 choice = struct.pack("!d", choice)
1222 elif isinstance(choice, int):
1223 tag = 2 << 5
1224 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True)
1225 elif isinstance(choice, bytes):
1226 tag = 3 << 5
1227 else:
1228 assert isinstance(choice, str)
1229 tag = 4 << 5
1230 choice = choice.encode(errors="surrogatepass")
1231
1232 size = len(choice)
1233 if size < 0b11111:
1234 parts.append((tag | size).to_bytes(1, "big"))
1235 else:
1236 parts.append((tag | 0b11111).to_bytes(1, "big"))
1237 parts.append(_pack_uleb128(size))
1238 parts.append(choice)
1239
1240 return b"".join(parts)
1241
1242
1243def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]:
1244 # See above for an explanation of the format.
1245 parts: list[ChoiceT] = []
1246 idx = 0
1247 while idx < len(buffer):
1248 tag = buffer[idx] >> 5
1249 size = buffer[idx] & 0b11111
1250 idx += 1
1251
1252 if tag == 0:
1253 parts.append(bool(size))
1254 continue
1255 if size == 0b11111:
1256 (offset, size) = _unpack_uleb128(buffer[idx:])
1257 idx += offset
1258 chunk = buffer[idx : idx + size]
1259 idx += size
1260
1261 if tag == 1:
1262 assert size == 8, "expected float64"
1263 parts.extend(struct.unpack("!d", chunk))
1264 elif tag == 2:
1265 parts.append(int.from_bytes(chunk, "big", signed=True))
1266 elif tag == 3:
1267 parts.append(chunk)
1268 else:
1269 assert tag == 4
1270 parts.append(chunk.decode(errors="surrogatepass"))
1271 return tuple(parts)
1272
1273
1274def choices_from_bytes(buffer: bytes, /) -> Optional[tuple[ChoiceT, ...]]:
1275 """
1276 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes.
1277
1278 Returns None if the given bytestring is not a valid serialization of choice
1279 sequences.
1280 """
1281 try:
1282 return _choices_from_bytes(buffer)
1283 except Exception:
1284 # deserialization error, eg because our format changed or someone put junk
1285 # data in the db.
1286 return None