1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import abc
12import errno
13import json
14import os
15import struct
16import sys
17import tempfile
18import warnings
19import weakref
20from collections.abc import Iterable
21from datetime import datetime, timedelta, timezone
22from functools import lru_cache
23from hashlib import sha384
24from os import PathLike, getenv
25from pathlib import Path, PurePath
26from queue import Queue
27from threading import Thread
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 Callable,
32 ClassVar,
33 Literal,
34 Optional,
35 Union,
36 cast,
37)
38from urllib.error import HTTPError, URLError
39from urllib.request import Request, urlopen
40from zipfile import BadZipFile, ZipFile
41
42from hypothesis._settings import note_deprecation
43from hypothesis.configuration import storage_directory
44from hypothesis.errors import HypothesisException, HypothesisWarning
45from hypothesis.internal.conjecture.choice import ChoiceT
46from hypothesis.utils.conventions import UniqueIdentifier, not_set
47
48__all__ = [
49 "DirectoryBasedExampleDatabase",
50 "ExampleDatabase",
51 "GitHubArtifactDatabase",
52 "InMemoryExampleDatabase",
53 "MultiplexedDatabase",
54 "ReadOnlyDatabase",
55]
56
57if TYPE_CHECKING:
58 from typing import TypeAlias
59
60 from watchdog.observers.api import BaseObserver
61
62StrPathT: "TypeAlias" = Union[str, PathLike[str]]
63SaveDataT: "TypeAlias" = tuple[bytes, bytes] # key, value
64DeleteDataT: "TypeAlias" = tuple[bytes, Optional[bytes]] # key, value
65ListenerEventT: "TypeAlias" = Union[
66 tuple[Literal["save"], SaveDataT], tuple[Literal["delete"], DeleteDataT]
67]
68ListenerT: "TypeAlias" = Callable[[ListenerEventT], Any]
69
70
71def _usable_dir(path: StrPathT) -> bool:
72 """
73 Returns True if the desired path can be used as database path because
74 either the directory exists and can be used, or its root directory can
75 be used and we can make the directory as needed.
76 """
77 path = Path(path)
78 try:
79 while not path.exists():
80 # Loop terminates because the root dir ('/' on unix) always exists.
81 path = path.parent
82 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK)
83 except PermissionError:
84 return False
85
86
87def _db_for_path(
88 path: Optional[Union[StrPathT, UniqueIdentifier, Literal[":memory:"]]] = None,
89) -> "ExampleDatabase":
90 if path is not_set:
91 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover
92 raise HypothesisException(
93 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any "
94 "effect. Configure your database location via a settings profile instead.\n"
95 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles"
96 )
97
98 path = storage_directory("examples", intent_to_write=False)
99 if not _usable_dir(path): # pragma: no cover
100 warnings.warn(
101 "The database setting is not configured, and the default "
102 "location is unusable - falling back to an in-memory "
103 f"database for this session. {path=}",
104 HypothesisWarning,
105 stacklevel=3,
106 )
107 return InMemoryExampleDatabase()
108 if path in (None, ":memory:"):
109 return InMemoryExampleDatabase()
110 path = cast(StrPathT, path)
111 return DirectoryBasedExampleDatabase(path)
112
113
114class _EDMeta(abc.ABCMeta):
115 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase":
116 if self is ExampleDatabase:
117 note_deprecation(
118 "Creating a database using the abstract ExampleDatabase() class "
119 "is deprecated. Prefer using a concrete subclass, like "
120 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). "
121 'In particular, the special string ExampleDatabase(":memory:") '
122 "should be replaced by InMemoryExampleDatabase().",
123 since="2025-04-07",
124 has_codemod=False,
125 )
126 return _db_for_path(*args, **kwargs)
127 return super().__call__(*args, **kwargs)
128
129
130# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase
131# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx
132# maintains a list of metaclass-call-methods to ignore, and while they would prefer
133# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we
134# can insert ourselves here.
135#
136# This code only runs if Sphinx has already been imported; and it would live in our
137# docs/conf.py except that we would also like it to work for anyone documenting
138# downstream ExampleDatabase subclasses too.
139if "sphinx" in sys.modules:
140 try:
141 from sphinx.ext.autodoc import _METACLASS_CALL_BLACKLIST
142
143 _METACLASS_CALL_BLACKLIST.append("hypothesis.database._EDMeta.__call__")
144 except Exception:
145 pass
146
147
148class ExampleDatabase(metaclass=_EDMeta):
149 """
150 A Hypothesis database, for use in the |settings.database| setting.
151
152 Using an |ExampleDatabase| allows Hypothesis to
153 remember and replay failing examples, and ensure that your tests are never
154 flaky. We provide several concrete subclasses, and you can write a custom
155 database backed by your preferred way to store a ``dict[bytes, set[bytes]]``.
156
157 Change listening
158 ----------------
159
160 An optional extension to |ExampleDatabase| is change listening. On databases
161 which support it, you can call |ExampleDatabase.add_listener| to add a function
162 as a change listener, which will be called whenever a value is added, deleted,
163 or moved. See |ExampleDatabase.add_listener| for details.
164
165 All databases in Hypothesis support change listening. Custom database classes
166 are not required to support change listening, unless they want to be
167 compatible with features that require change listening. Currently, no Hypothesis
168 features require change listening.
169
170 .. note::
171
172 Change listening is required by `HypoFuzz <https://hypofuzz.com/>`_.
173
174 Methods
175 -------
176
177 Required methods:
178
179 * |ExampleDatabase.save|
180 * |ExampleDatabase.fetch|
181 * |ExampleDatabase.delete|
182
183 Optional methods:
184
185 * |ExampleDatabase.move|
186
187 Change listening methods:
188
189 * |ExampleDatabase.add_listener|
190 * |ExampleDatabase.remove_listener|
191 * |ExampleDatabase.clear_listeners|
192 * |ExampleDatabase._start_listening|
193 * |ExampleDatabase._stop_listening|
194 * |ExampleDatabase._broadcast_change|
195 """
196
197 def __init__(self) -> None:
198 self._listeners: list[ListenerT] = []
199
200 @abc.abstractmethod
201 def save(self, key: bytes, value: bytes) -> None:
202 """Save ``value`` under ``key``.
203
204 If ``value`` is already present in ``key``, silently do nothing.
205 """
206 raise NotImplementedError(f"{type(self).__name__}.save")
207
208 @abc.abstractmethod
209 def fetch(self, key: bytes) -> Iterable[bytes]:
210 """Return an iterable over all values matching this key."""
211 raise NotImplementedError(f"{type(self).__name__}.fetch")
212
213 @abc.abstractmethod
214 def delete(self, key: bytes, value: bytes) -> None:
215 """Remove ``value`` from ``key``.
216
217 If ``value`` is not present in ``key``, silently do nothing.
218 """
219 raise NotImplementedError(f"{type(self).__name__}.delete")
220
221 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
222 """
223 Move ``value`` from key ``src`` to key ``dest``.
224
225 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``,
226 but may have a more efficient implementation.
227
228 Note that ``value`` will be inserted at ``dest`` regardless of whether
229 it is currently present at ``src``.
230 """
231 if src == dest:
232 self.save(src, value)
233 return
234 self.delete(src, value)
235 self.save(dest, value)
236
237 def add_listener(self, f: ListenerT, /) -> None:
238 """
239 Add a change listener. ``f`` will be called whenever a value is saved,
240 deleted, or moved in the database.
241
242 ``f`` can be called with two different event values:
243
244 * ``("save", (key, value))``
245 * ``("delete", (key, value))``
246
247 where ``key`` and ``value`` are both ``bytes``.
248
249 There is no ``move`` event. Instead, a move is broadcasted as a
250 ``delete`` event followed by a ``save`` event.
251
252 For the ``delete`` event, ``value`` may be ``None``. This might occur if
253 the database knows that a deletion has occurred in ``key``, but does not
254 know what value was deleted.
255 """
256 had_listeners = bool(self._listeners)
257 self._listeners.append(f)
258 if not had_listeners:
259 self._start_listening()
260
261 def remove_listener(self, f: ListenerT, /) -> None:
262 """
263 Removes ``f`` from the list of change listeners.
264
265 If ``f`` is not in the list of change listeners, silently do nothing.
266 """
267 if f not in self._listeners:
268 return
269 self._listeners.remove(f)
270 if not self._listeners:
271 self._stop_listening()
272
273 def clear_listeners(self) -> None:
274 """Remove all change listeners."""
275 had_listeners = bool(self._listeners)
276 self._listeners.clear()
277 if had_listeners:
278 self._stop_listening()
279
280 def _broadcast_change(self, event: ListenerEventT) -> None:
281 """
282 Called when a value has been either added to or deleted from a key in
283 the underlying database store. The possible values for ``event`` are:
284
285 * ``("save", (key, value))``
286 * ``("delete", (key, value))``
287
288 ``value`` may be ``None`` for the ``delete`` event, indicating we know
289 that some value was deleted under this key, but not its exact value.
290
291 Note that you should not assume your instance is the only reference to
292 the underlying database store. For example, if two instances of
293 |DirectoryBasedExampleDatabase| reference the same directory,
294 _broadcast_change should be called whenever a file is added or removed
295 from the directory, even if that database was not responsible for
296 changing the file.
297 """
298 for listener in self._listeners:
299 listener(event)
300
301 def _start_listening(self) -> None:
302 """
303 Called when the database adds a change listener, and did not previously
304 have any change listeners. Intended to allow databases to wait to start
305 expensive listening operations until necessary.
306
307 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate,
308 so you do not need to handle the case of multiple consecutive
309 ``_start_listening`` calls without an intermediate ``_stop_listening``
310 call.
311 """
312 warnings.warn(
313 f"{self.__class__} does not support listening for changes",
314 HypothesisWarning,
315 stacklevel=4,
316 )
317
318 def _stop_listening(self) -> None:
319 """
320 Called whenever no change listeners remain on the database.
321
322 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate,
323 so you do not need to handle the case of multiple consecutive
324 ``_stop_listening`` calls without an intermediate ``_start_listening``
325 call.
326 """
327 warnings.warn(
328 f"{self.__class__} does not support stopping listening for changes",
329 HypothesisWarning,
330 stacklevel=4,
331 )
332
333
334class InMemoryExampleDatabase(ExampleDatabase):
335 """A non-persistent example database, implemented in terms of an in-memory
336 dictionary.
337
338 This can be useful if you call a test function several times in a single
339 session, or for testing other database implementations, but because it
340 does not persist between runs we do not recommend it for general use.
341 """
342
343 def __init__(self) -> None:
344 super().__init__()
345 self.data: dict[bytes, set[bytes]] = {}
346
347 def __repr__(self) -> str:
348 return f"InMemoryExampleDatabase({self.data!r})"
349
350 def __eq__(self, other: object) -> bool:
351 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data
352
353 def fetch(self, key: bytes) -> Iterable[bytes]:
354 yield from self.data.get(key, ())
355
356 def save(self, key: bytes, value: bytes) -> None:
357 value = bytes(value)
358 values = self.data.setdefault(key, set())
359 changed = value not in values
360 values.add(value)
361
362 if changed:
363 self._broadcast_change(("save", (key, value)))
364
365 def delete(self, key: bytes, value: bytes) -> None:
366 value = bytes(value)
367 values = self.data.get(key, set())
368 changed = value in values
369 values.discard(value)
370
371 if changed:
372 self._broadcast_change(("delete", (key, value)))
373
374 def _start_listening(self) -> None:
375 # declare compatibility with the listener api, but do the actual
376 # implementation in .delete and .save, since we know we are the only
377 # writer to .data.
378 pass
379
380 def _stop_listening(self) -> None:
381 pass
382
383
384def _hash(key: bytes) -> str:
385 return sha384(key).hexdigest()[:16]
386
387
388class DirectoryBasedExampleDatabase(ExampleDatabase):
389 """Use a directory to store Hypothesis examples as files.
390
391 Each test corresponds to a directory, and each example to a file within that
392 directory. While the contents are fairly opaque, a
393 |DirectoryBasedExampleDatabase| can be shared by checking the directory
394 into version control, for example with the following ``.gitignore``::
395
396 # Ignore files cached by Hypothesis...
397 .hypothesis/*
398 # except for the examples directory
399 !.hypothesis/examples/
400
401 Note however that this only makes sense if you also pin to an exact version of
402 Hypothesis, and we would usually recommend implementing a shared database with
403 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper.
404 """
405
406 # we keep a database entry of the full values of all the database keys.
407 # currently only used for inverse mapping of hash -> key in change listening.
408 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys"
409 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name)
410
411 def __init__(self, path: StrPathT) -> None:
412 super().__init__()
413 self.path = Path(path)
414 self.keypaths: dict[bytes, Path] = {}
415 self._observer: BaseObserver | None = None
416
417 def __repr__(self) -> str:
418 return f"DirectoryBasedExampleDatabase({self.path!r})"
419
420 def __eq__(self, other: object) -> bool:
421 return (
422 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path
423 )
424
425 def _key_path(self, key: bytes) -> Path:
426 try:
427 return self.keypaths[key]
428 except KeyError:
429 pass
430 self.keypaths[key] = self.path / _hash(key)
431 return self.keypaths[key]
432
433 def _value_path(self, key: bytes, value: bytes) -> Path:
434 return self._key_path(key) / _hash(value)
435
436 def fetch(self, key: bytes) -> Iterable[bytes]:
437 kp = self._key_path(key)
438 if not kp.is_dir():
439 return
440 for path in os.listdir(kp):
441 try:
442 yield (kp / path).read_bytes()
443 except OSError:
444 pass
445
446 def save(self, key: bytes, value: bytes) -> None:
447 key_path = self._key_path(key)
448 if key_path.name != self._metakeys_hash:
449 # add this key to our meta entry of all keys - taking care to avoid
450 # infinite recursion.
451 self.save(self._metakeys_name, key)
452
453 # Note: we attempt to create the dir in question now. We
454 # already checked for permissions, but there can still be other issues,
455 # e.g. the disk is full, or permissions might have been changed.
456 try:
457 key_path.mkdir(exist_ok=True, parents=True)
458 path = self._value_path(key, value)
459 if not path.exists():
460 # to mimic an atomic write, create and write in a temporary
461 # directory, and only move to the final path after. This avoids
462 # any intermediate state where the file is created (and empty)
463 # but not yet written to.
464 fd, tmpname = tempfile.mkstemp()
465 tmppath = Path(tmpname)
466 os.write(fd, value)
467 os.close(fd)
468 try:
469 tmppath.rename(path)
470 except OSError as err: # pragma: no cover
471 if err.errno == errno.EXDEV:
472 # Can't rename across filesystem boundaries, see e.g.
473 # https://github.com/HypothesisWorks/hypothesis/issues/4335
474 try:
475 path.write_bytes(tmppath.read_bytes())
476 except OSError:
477 pass
478 tmppath.unlink()
479 assert not tmppath.exists()
480 except OSError: # pragma: no cover
481 pass
482
483 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
484 if src == dest:
485 self.save(src, value)
486 return
487
488 src_path = self._value_path(src, value)
489 dest_path = self._value_path(dest, value)
490 # if the dest key path does not exist, os.renames will create it for us,
491 # and we will never track its creation in the meta keys entry. Do so now.
492 if not self._key_path(dest).exists():
493 self.save(self._metakeys_name, dest)
494
495 try:
496 os.renames(src_path, dest_path)
497 except OSError:
498 self.delete(src, value)
499 self.save(dest, value)
500
501 def delete(self, key: bytes, value: bytes) -> None:
502 try:
503 self._value_path(key, value).unlink()
504 except OSError:
505 pass
506
507 def _start_listening(self) -> None:
508 try:
509 from watchdog.events import (
510 DirCreatedEvent,
511 DirDeletedEvent,
512 DirMovedEvent,
513 FileCreatedEvent,
514 FileDeletedEvent,
515 FileMovedEvent,
516 FileSystemEventHandler,
517 )
518 from watchdog.observers import Observer
519 except ImportError:
520 warnings.warn(
521 f"listening for changes in a {self.__class__.__name__} "
522 "requires the watchdog library. To install, run "
523 "`pip install hypothesis[watchdog]`",
524 HypothesisWarning,
525 stacklevel=4,
526 )
527 return
528
529 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)}
530 _metakeys_hash = self._metakeys_hash
531 _broadcast_change = self._broadcast_change
532
533 class Handler(FileSystemEventHandler):
534 def on_created(
535 _self, event: Union[FileCreatedEvent, DirCreatedEvent]
536 ) -> None:
537 # we only registered for the file creation event
538 assert not isinstance(event, DirCreatedEvent)
539 # watchdog events are only bytes if we passed a byte path to
540 # .schedule
541 assert isinstance(event.src_path, str)
542
543 value_path = Path(event.src_path)
544 # the parent dir represents the key, and its name is the key hash
545 key_hash = value_path.parent.name
546
547 if key_hash == _metakeys_hash:
548 hash_to_key[value_path.name] = value_path.read_bytes()
549 return
550
551 key = hash_to_key.get(key_hash)
552 if key is None: # pragma: no cover
553 # we didn't recognize this key. This shouldn't ever happen,
554 # but some race condition trickery might cause this.
555 return
556
557 try:
558 value = value_path.read_bytes()
559 except OSError: # pragma: no cover
560 return
561
562 _broadcast_change(("save", (key, value)))
563
564 def on_deleted(
565 self, event: Union[FileDeletedEvent, DirDeletedEvent]
566 ) -> None:
567 assert not isinstance(event, DirDeletedEvent)
568 assert isinstance(event.src_path, str)
569
570 value_path = Path(event.src_path)
571 key = hash_to_key.get(value_path.parent.name)
572 if key is None: # pragma: no cover
573 return
574
575 _broadcast_change(("delete", (key, None)))
576
577 def on_moved(self, event: Union[FileMovedEvent, DirMovedEvent]) -> None:
578 assert not isinstance(event, DirMovedEvent)
579 assert isinstance(event.src_path, str)
580 assert isinstance(event.dest_path, str)
581
582 src_path = Path(event.src_path)
583 dest_path = Path(event.dest_path)
584 k1 = hash_to_key.get(src_path.parent.name)
585 k2 = hash_to_key.get(dest_path.parent.name)
586
587 if k1 is None or k2 is None: # pragma: no cover
588 return
589
590 try:
591 value = dest_path.read_bytes()
592 except OSError: # pragma: no cover
593 return
594
595 _broadcast_change(("delete", (k1, value)))
596 _broadcast_change(("save", (k2, value)))
597
598 # If we add a listener to a DirectoryBasedExampleDatabase whose database
599 # directory doesn't yet exist, the watchdog observer will not fire any
600 # events, even after the directory gets created.
601 #
602 # Ensure the directory exists before starting the observer.
603 self.path.mkdir(exist_ok=True, parents=True)
604 self._observer = Observer()
605 self._observer.schedule(
606 Handler(),
607 # remove type: ignore when released
608 # https://github.com/gorakhargosh/watchdog/pull/1096
609 self.path, # type: ignore
610 recursive=True,
611 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent],
612 )
613 self._observer.start()
614
615 def _stop_listening(self) -> None:
616 assert self._observer is not None
617 self._observer.stop()
618 self._observer.join()
619 self._observer = None
620
621
622class ReadOnlyDatabase(ExampleDatabase):
623 """A wrapper to make the given database read-only.
624
625 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and
626 ``move`` into silent no-ops.
627
628 Note that this disables Hypothesis' automatic discarding of stale examples.
629 It is designed to allow local machines to access a shared database (e.g. from CI
630 servers), without propagating changes back from a local or in-development branch.
631 """
632
633 def __init__(self, db: ExampleDatabase) -> None:
634 super().__init__()
635 assert isinstance(db, ExampleDatabase)
636 self._wrapped = db
637
638 def __repr__(self) -> str:
639 return f"ReadOnlyDatabase({self._wrapped!r})"
640
641 def __eq__(self, other: object) -> bool:
642 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped
643
644 def fetch(self, key: bytes) -> Iterable[bytes]:
645 yield from self._wrapped.fetch(key)
646
647 def save(self, key: bytes, value: bytes) -> None:
648 pass
649
650 def delete(self, key: bytes, value: bytes) -> None:
651 pass
652
653 def _start_listening(self) -> None:
654 # we're read only, so there are no changes to broadcast.
655 pass
656
657 def _stop_listening(self) -> None:
658 pass
659
660
661class MultiplexedDatabase(ExampleDatabase):
662 """A wrapper around multiple databases.
663
664 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against
665 all of the wrapped databases. ``fetch`` does not yield duplicate values, even
666 if the same value is present in two or more of the wrapped databases.
667
668 This combines well with a :class:`ReadOnlyDatabase`, as follows:
669
670 .. code-block:: python
671
672 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/")
673 shared = CustomNetworkDatabase()
674
675 settings.register_profile("ci", database=shared)
676 settings.register_profile(
677 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared))
678 )
679 settings.load_profile("ci" if os.environ.get("CI") else "dev")
680
681 So your CI system or fuzzing runs can populate a central shared database;
682 while local runs on development machines can reproduce any failures from CI
683 but will only cache their own failures locally and cannot remove examples
684 from the shared database.
685 """
686
687 def __init__(self, *dbs: ExampleDatabase) -> None:
688 super().__init__()
689 assert all(isinstance(db, ExampleDatabase) for db in dbs)
690 self._wrapped = dbs
691
692 def __repr__(self) -> str:
693 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped)))
694
695 def __eq__(self, other: object) -> bool:
696 return (
697 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped
698 )
699
700 def fetch(self, key: bytes) -> Iterable[bytes]:
701 seen = set()
702 for db in self._wrapped:
703 for value in db.fetch(key):
704 if value not in seen:
705 yield value
706 seen.add(value)
707
708 def save(self, key: bytes, value: bytes) -> None:
709 for db in self._wrapped:
710 db.save(key, value)
711
712 def delete(self, key: bytes, value: bytes) -> None:
713 for db in self._wrapped:
714 db.delete(key, value)
715
716 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
717 for db in self._wrapped:
718 db.move(src, dest, value)
719
720 def _start_listening(self) -> None:
721 for db in self._wrapped:
722 db.add_listener(self._broadcast_change)
723
724 def _stop_listening(self) -> None:
725 for db in self._wrapped:
726 db.remove_listener(self._broadcast_change)
727
728
729class GitHubArtifactDatabase(ExampleDatabase):
730 """
731 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact.
732
733 You can use this for sharing example databases between CI runs and developers, allowing
734 the latter to get read-only access to the former. This is particularly useful for
735 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_),
736 where the CI system can help find new failing examples through fuzzing,
737 and developers can reproduce them locally without any manual effort.
738
739 .. note::
740 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides
741 this automatically, but it needs to be set manually for local usage. In a developer machine,
742 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_.
743 If the repository is private, it's necessary for the token to have ``repo`` scope
744 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token.
745
746
747 In most cases, this will be used
748 through the :class:`~hypothesis.database.MultiplexedDatabase`,
749 by combining a local directory-based database with this one. For example:
750
751 .. code-block:: python
752
753 local = DirectoryBasedExampleDatabase(".hypothesis/examples")
754 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo"))
755
756 settings.register_profile("ci", database=local)
757 settings.register_profile("dev", database=MultiplexedDatabase(local, shared))
758 # We don't want to use the shared database in CI, only to populate its local one.
759 # which the workflow should then upload as an artifact.
760 settings.load_profile("ci" if os.environ.get("CI") else "dev")
761
762 .. note::
763 Because this database is read-only, you always need to wrap it with the
764 :class:`ReadOnlyDatabase`.
765
766 A setup like this can be paired with a GitHub Actions workflow including
767 something like the following:
768
769 .. code-block:: yaml
770
771 - name: Download example database
772 uses: dawidd6/action-download-artifact@v9
773 with:
774 name: hypothesis-example-db
775 path: .hypothesis/examples
776 if_no_artifact_found: warn
777 workflow_conclusion: completed
778
779 - name: Run tests
780 run: pytest
781
782 - name: Upload example database
783 uses: actions/upload-artifact@v3
784 if: always()
785 with:
786 name: hypothesis-example-db
787 path: .hypothesis/examples
788
789 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_
790 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_
791 does not support downloading artifacts from previous workflow runs.
792
793 The database automatically implements a simple file-based cache with a default expiration period
794 of 1 day. You can adjust this through the ``cache_timeout`` property.
795
796 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``).
797 """
798
799 def __init__(
800 self,
801 owner: str,
802 repo: str,
803 artifact_name: str = "hypothesis-example-db",
804 cache_timeout: timedelta = timedelta(days=1),
805 path: Optional[StrPathT] = None,
806 ):
807 super().__init__()
808 self.owner = owner
809 self.repo = repo
810 self.artifact_name = artifact_name
811 self.cache_timeout = cache_timeout
812
813 # Get the GitHub token from the environment
814 # It's unnecessary to use a token if the repo is public
815 self.token: Optional[str] = getenv("GITHUB_TOKEN")
816
817 if path is None:
818 self.path: Path = Path(
819 storage_directory(f"github-artifacts/{self.artifact_name}/")
820 )
821 else:
822 self.path = Path(path)
823
824 # We don't want to initialize the cache until we need to
825 self._initialized: bool = False
826 self._disabled: bool = False
827
828 # This is the path to the artifact in usage
829 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip
830 self._artifact: Optional[Path] = None
831 # This caches the artifact structure
832 self._access_cache: Optional[dict[PurePath, set[PurePath]]] = None
833
834 # Message to display if user doesn't wrap around ReadOnlyDatabase
835 self._read_only_message = (
836 "This database is read-only. "
837 "Please wrap this class with ReadOnlyDatabase"
838 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))."
839 )
840
841 def __repr__(self) -> str:
842 return (
843 f"GitHubArtifactDatabase(owner={self.owner!r}, "
844 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})"
845 )
846
847 def __eq__(self, other: object) -> bool:
848 return (
849 isinstance(other, GitHubArtifactDatabase)
850 and self.owner == other.owner
851 and self.repo == other.repo
852 and self.artifact_name == other.artifact_name
853 and self.path == other.path
854 )
855
856 def _prepare_for_io(self) -> None:
857 assert self._artifact is not None, "Artifact not loaded."
858
859 if self._initialized: # pragma: no cover
860 return
861
862 # Test that the artifact is valid
863 try:
864 with ZipFile(self._artifact) as f:
865 if f.testzip(): # pragma: no cover
866 raise BadZipFile
867
868 # Turns out that testzip() doesn't work quite well
869 # doing the cache initialization here instead
870 # will give us more coverage of the artifact.
871
872 # Cache the files inside each keypath
873 self._access_cache = {}
874 with ZipFile(self._artifact) as zf:
875 namelist = zf.namelist()
876 # Iterate over files in the artifact
877 for filename in namelist:
878 fileinfo = zf.getinfo(filename)
879 if fileinfo.is_dir():
880 self._access_cache[PurePath(filename)] = set()
881 else:
882 # Get the keypath from the filename
883 keypath = PurePath(filename).parent
884 # Add the file to the keypath
885 self._access_cache[keypath].add(PurePath(filename))
886 except BadZipFile:
887 warnings.warn(
888 "The downloaded artifact from GitHub is invalid. "
889 "This could be because the artifact was corrupted, "
890 "or because the artifact was not created by Hypothesis. ",
891 HypothesisWarning,
892 stacklevel=3,
893 )
894 self._disabled = True
895
896 self._initialized = True
897
898 def _initialize_db(self) -> None:
899 # Trigger warning that we suppressed earlier by intent_to_write=False
900 storage_directory(self.path.name)
901 # Create the cache directory if it doesn't exist
902 self.path.mkdir(exist_ok=True, parents=True)
903
904 # Get all artifacts
905 cached_artifacts = sorted(
906 self.path.glob("*.zip"),
907 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")),
908 )
909
910 # Remove all but the latest artifact
911 for artifact in cached_artifacts[:-1]:
912 artifact.unlink()
913
914 try:
915 found_artifact = cached_artifacts[-1]
916 except IndexError:
917 found_artifact = None
918
919 # Check if the latest artifact is a cache hit
920 if found_artifact is not None and (
921 datetime.now(timezone.utc)
922 - datetime.fromisoformat(found_artifact.stem.replace("_", ":"))
923 < self.cache_timeout
924 ):
925 self._artifact = found_artifact
926 else:
927 # Download the latest artifact from GitHub
928 new_artifact = self._fetch_artifact()
929
930 if new_artifact:
931 if found_artifact is not None:
932 found_artifact.unlink()
933 self._artifact = new_artifact
934 elif found_artifact is not None:
935 warnings.warn(
936 "Using an expired artifact as a fallback for the database: "
937 f"{found_artifact}",
938 HypothesisWarning,
939 stacklevel=2,
940 )
941 self._artifact = found_artifact
942 else:
943 warnings.warn(
944 "Couldn't acquire a new or existing artifact. Disabling database.",
945 HypothesisWarning,
946 stacklevel=2,
947 )
948 self._disabled = True
949 return
950
951 self._prepare_for_io()
952
953 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover
954 request = Request(
955 url,
956 headers={
957 "Accept": "application/vnd.github+json",
958 "X-GitHub-Api-Version": "2022-11-28 ",
959 "Authorization": f"Bearer {self.token}",
960 },
961 )
962 warning_message = None
963 response_bytes: Optional[bytes] = None
964 try:
965 with urlopen(request) as response:
966 response_bytes = response.read()
967 except HTTPError as e:
968 if e.code == 401:
969 warning_message = (
970 "Authorization failed when trying to download artifact from GitHub. "
971 "Check that you have a valid GITHUB_TOKEN set in your environment."
972 )
973 else:
974 warning_message = (
975 "Could not get the latest artifact from GitHub. "
976 "This could be because because the repository "
977 "or artifact does not exist. "
978 )
979 except URLError:
980 warning_message = "Could not connect to GitHub to get the latest artifact. "
981 except TimeoutError:
982 warning_message = (
983 "Could not connect to GitHub to get the latest artifact "
984 "(connection timed out)."
985 )
986
987 if warning_message is not None:
988 warnings.warn(warning_message, HypothesisWarning, stacklevel=4)
989 return None
990
991 return response_bytes
992
993 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover
994 # Get the list of artifacts from GitHub
995 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts"
996 response_bytes = self._get_bytes(url)
997 if response_bytes is None:
998 return None
999
1000 artifacts = json.loads(response_bytes)["artifacts"]
1001 artifacts = [a for a in artifacts if a["name"] == self.artifact_name]
1002
1003 if not artifacts:
1004 return None
1005
1006 # Get the latest artifact from the list
1007 artifact = max(artifacts, key=lambda a: a["created_at"])
1008 url = artifact["archive_download_url"]
1009
1010 # Download the artifact
1011 artifact_bytes = self._get_bytes(url)
1012 if artifact_bytes is None:
1013 return None
1014
1015 # Save the artifact to the cache
1016 # We replace ":" with "_" to ensure the filenames are compatible
1017 # with Windows filesystems
1018 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_")
1019 artifact_path = self.path / f"{timestamp}.zip"
1020 try:
1021 artifact_path.write_bytes(artifact_bytes)
1022 except OSError:
1023 warnings.warn(
1024 "Could not save the latest artifact from GitHub. ",
1025 HypothesisWarning,
1026 stacklevel=3,
1027 )
1028 return None
1029
1030 return artifact_path
1031
1032 @staticmethod
1033 @lru_cache
1034 def _key_path(key: bytes) -> PurePath:
1035 return PurePath(_hash(key) + "/")
1036
1037 def fetch(self, key: bytes) -> Iterable[bytes]:
1038 if self._disabled:
1039 return
1040
1041 if not self._initialized:
1042 self._initialize_db()
1043 if self._disabled:
1044 return
1045
1046 assert self._artifact is not None
1047 assert self._access_cache is not None
1048
1049 kp = self._key_path(key)
1050
1051 with ZipFile(self._artifact) as zf:
1052 # Get the all files in the the kp from the cache
1053 filenames = self._access_cache.get(kp, ())
1054 for filename in filenames:
1055 with zf.open(filename.as_posix()) as f:
1056 yield f.read()
1057
1058 # Read-only interface
1059 def save(self, key: bytes, value: bytes) -> None:
1060 raise RuntimeError(self._read_only_message)
1061
1062 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1063 raise RuntimeError(self._read_only_message)
1064
1065 def delete(self, key: bytes, value: bytes) -> None:
1066 raise RuntimeError(self._read_only_message)
1067
1068
1069class BackgroundWriteDatabase(ExampleDatabase):
1070 """A wrapper which defers writes on the given database to a background thread.
1071
1072 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any
1073 enqueued writes to finish before fetching from the database.
1074 """
1075
1076 def __init__(self, db: ExampleDatabase) -> None:
1077 super().__init__()
1078 self._db = db
1079 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue()
1080 self._thread = Thread(target=self._worker, daemon=True)
1081 self._thread.start()
1082 # avoid an unbounded timeout during gc. 0.1 should be plenty for most
1083 # use cases.
1084 weakref.finalize(self, self._join, 0.1)
1085
1086 def __repr__(self) -> str:
1087 return f"BackgroundWriteDatabase({self._db!r})"
1088
1089 def __eq__(self, other: object) -> bool:
1090 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db
1091
1092 def _worker(self) -> None:
1093 while True:
1094 method, args = self._queue.get()
1095 getattr(self._db, method)(*args)
1096 self._queue.task_done()
1097
1098 def _join(self, timeout: Optional[float] = None) -> None:
1099 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634
1100 with self._queue.all_tasks_done:
1101 while self._queue.unfinished_tasks:
1102 self._queue.all_tasks_done.wait(timeout)
1103
1104 def fetch(self, key: bytes) -> Iterable[bytes]:
1105 self._join()
1106 return self._db.fetch(key)
1107
1108 def save(self, key: bytes, value: bytes) -> None:
1109 self._queue.put(("save", (key, value)))
1110
1111 def delete(self, key: bytes, value: bytes) -> None:
1112 self._queue.put(("delete", (key, value)))
1113
1114 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1115 self._queue.put(("move", (src, dest, value)))
1116
1117 def _start_listening(self) -> None:
1118 self._db.add_listener(self._broadcast_change)
1119
1120 def _stop_listening(self) -> None:
1121 self._db.remove_listener(self._broadcast_change)
1122
1123
1124def _pack_uleb128(value: int) -> bytes:
1125 """
1126 Serialize an integer into variable-length bytes. For each byte, the first 7
1127 bits represent (part of) the integer, while the last bit indicates whether the
1128 integer continues into the next byte.
1129
1130 https://en.wikipedia.org/wiki/LEB128
1131 """
1132 parts = bytearray()
1133 assert value >= 0
1134 while True:
1135 # chop off 7 bits
1136 byte = value & ((1 << 7) - 1)
1137 value >>= 7
1138 # set the continuation bit if we have more left
1139 if value:
1140 byte |= 1 << 7
1141
1142 parts.append(byte)
1143 if not value:
1144 break
1145 return bytes(parts)
1146
1147
1148def _unpack_uleb128(buffer: bytes) -> tuple[int, int]:
1149 """
1150 Inverts _pack_uleb128, and also returns the index at which at which we stopped
1151 reading.
1152 """
1153 value = 0
1154 for i, byte in enumerate(buffer):
1155 n = byte & ((1 << 7) - 1)
1156 value |= n << (i * 7)
1157
1158 if not byte >> 7:
1159 break
1160 return (i + 1, value)
1161
1162
1163def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes:
1164 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes."""
1165 # We use a custom serialization format for this, which might seem crazy - but our
1166 # data is a flat sequence of elements, and standard tools like protobuf or msgpack
1167 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode.
1168 #
1169 # We simply encode each element with a metadata byte, if needed a uint16 size, and
1170 # then the payload bytes. For booleans, the payload is inlined into the metadata.
1171 parts = []
1172 for choice in choices:
1173 if isinstance(choice, bool):
1174 # `000_0000v` - tag zero, low bit payload.
1175 parts.append(b"\1" if choice else b"\0")
1176 continue
1177
1178 # `tag_ssss [uint16 size?] [payload]`
1179 if isinstance(choice, float):
1180 tag = 1 << 5
1181 choice = struct.pack("!d", choice)
1182 elif isinstance(choice, int):
1183 tag = 2 << 5
1184 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True)
1185 elif isinstance(choice, bytes):
1186 tag = 3 << 5
1187 else:
1188 assert isinstance(choice, str)
1189 tag = 4 << 5
1190 choice = choice.encode(errors="surrogatepass")
1191
1192 size = len(choice)
1193 if size < 0b11111:
1194 parts.append((tag | size).to_bytes(1, "big"))
1195 else:
1196 parts.append((tag | 0b11111).to_bytes(1, "big"))
1197 parts.append(_pack_uleb128(size))
1198 parts.append(choice)
1199
1200 return b"".join(parts)
1201
1202
1203def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]:
1204 # See above for an explanation of the format.
1205 parts: list[ChoiceT] = []
1206 idx = 0
1207 while idx < len(buffer):
1208 tag = buffer[idx] >> 5
1209 size = buffer[idx] & 0b11111
1210 idx += 1
1211
1212 if tag == 0:
1213 parts.append(bool(size))
1214 continue
1215 if size == 0b11111:
1216 (offset, size) = _unpack_uleb128(buffer[idx:])
1217 idx += offset
1218 chunk = buffer[idx : idx + size]
1219 idx += size
1220
1221 if tag == 1:
1222 assert size == 8, "expected float64"
1223 parts.extend(struct.unpack("!d", chunk))
1224 elif tag == 2:
1225 parts.append(int.from_bytes(chunk, "big", signed=True))
1226 elif tag == 3:
1227 parts.append(chunk)
1228 else:
1229 assert tag == 4
1230 parts.append(chunk.decode(errors="surrogatepass"))
1231 return tuple(parts)
1232
1233
1234def choices_from_bytes(buffer: bytes, /) -> Optional[tuple[ChoiceT, ...]]:
1235 """
1236 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes.
1237
1238 Returns None if the given bytestring is not a valid serialization of choice
1239 sequences.
1240 """
1241 try:
1242 return _choices_from_bytes(buffer)
1243 except Exception:
1244 # deserialization error, eg because our format changed or someone put junk
1245 # data in the db.
1246 return None