Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/database.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11import abc
12import errno
13import json
14import os
15import struct
16import sys
17import tempfile
18import warnings
19import weakref
20from collections.abc import Callable, Iterable
21from datetime import datetime, timedelta, timezone
22from functools import lru_cache
23from hashlib import sha384
24from os import PathLike, getenv
25from pathlib import Path, PurePath
26from queue import Queue
27from threading import Thread
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 ClassVar,
32 Literal,
33 TypeAlias,
34 cast,
35)
36from urllib.error import HTTPError, URLError
37from urllib.request import Request, urlopen
38from zipfile import BadZipFile, ZipFile
40from hypothesis.configuration import storage_directory
41from hypothesis.errors import HypothesisException, HypothesisWarning
42from hypothesis.internal.conjecture.choice import ChoiceT
43from hypothesis.utils.conventions import UniqueIdentifier, not_set
44from hypothesis.utils.deprecation import note_deprecation
46__all__ = [
47 "DirectoryBasedExampleDatabase",
48 "ExampleDatabase",
49 "GitHubArtifactDatabase",
50 "InMemoryExampleDatabase",
51 "MultiplexedDatabase",
52 "ReadOnlyDatabase",
53]
55if TYPE_CHECKING:
56 from watchdog.observers.api import BaseObserver
58StrPathT: TypeAlias = str | PathLike[str]
59SaveDataT: TypeAlias = tuple[bytes, bytes] # key, value
60DeleteDataT: TypeAlias = tuple[bytes, bytes | None] # key, value
61ListenerEventT: TypeAlias = (
62 tuple[Literal["save"], SaveDataT] | tuple[Literal["delete"], DeleteDataT]
63)
64ListenerT: TypeAlias = Callable[[ListenerEventT], Any]
67def _usable_dir(path: StrPathT) -> bool:
68 """
69 Returns True if the desired path can be used as database path because
70 either the directory exists and can be used, or its root directory can
71 be used and we can make the directory as needed.
72 """
73 path = Path(path)
74 try:
75 while not path.exists():
76 # Loop terminates because the root dir ('/' on unix) always exists.
77 path = path.parent
78 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK)
79 except PermissionError: # pragma: no cover
80 # path.exists() returns False on 3.14+ instead of raising. See
81 # https://docs.python.org/3.14/library/pathlib.html#querying-file-type-and-status
82 return False
85def _db_for_path(
86 path: StrPathT | UniqueIdentifier | Literal[":memory:"] | None = None,
87) -> "ExampleDatabase":
88 if path is not_set:
89 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover
90 raise HypothesisException(
91 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any "
92 "effect. Configure your database location via a settings profile instead.\n"
93 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles"
94 )
96 path = storage_directory("examples", intent_to_write=False)
97 if not _usable_dir(path): # pragma: no cover
98 warnings.warn(
99 "The database setting is not configured, and the default "
100 "location is unusable - falling back to an in-memory "
101 f"database for this session. {path=}",
102 HypothesisWarning,
103 stacklevel=3,
104 )
105 return InMemoryExampleDatabase()
106 if path in (None, ":memory:"):
107 return InMemoryExampleDatabase()
108 path = cast(StrPathT, path)
109 return DirectoryBasedExampleDatabase(path)
112class _EDMeta(abc.ABCMeta):
113 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase":
114 if self is ExampleDatabase:
115 note_deprecation(
116 "Creating a database using the abstract ExampleDatabase() class "
117 "is deprecated. Prefer using a concrete subclass, like "
118 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). "
119 'In particular, the special string ExampleDatabase(":memory:") '
120 "should be replaced by InMemoryExampleDatabase().",
121 since="2025-04-07",
122 has_codemod=False,
123 )
124 return _db_for_path(*args, **kwargs)
125 return super().__call__(*args, **kwargs)
128# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase
129# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx
130# maintains a list of metaclass-call-methods to ignore, and while they would prefer
131# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we
132# can insert ourselves here.
133#
134# This code only runs if Sphinx has already been imported; and it would live in our
135# docs/conf.py except that we would also like it to work for anyone documenting
136# downstream ExampleDatabase subclasses too.
137#
138# We avoid type-checking this block due to this combination facts:
139# * our check-types-api CI job runs under 3.14
140# * tools.txt therefore pins to a newer version of sphinx which uses 3.12+ `type`
141# syntax
142# * in test_mypy.py, mypy sees this block, sees sphinx is installed, tries parsing
143# sphinx code, and errors
144#
145# Putting `and not TYPE_CHECKING` here is just a convenience for our testing setup
146# (because we don't split mypy tests by running CI version, eg), not for runtime
147# behavior.
148if "sphinx" in sys.modules and not TYPE_CHECKING: # pragma: no cover
149 try:
150 import sphinx.ext.autodoc
152 signature = "hypothesis.database._EDMeta.__call__"
154 # _METACLASS_CALL_BLACKLIST moved in newer sphinx versions
155 try:
156 import sphinx.ext.autodoc._dynamic._signatures as _module
157 except ImportError:
158 _module = sphinx.ext.autodoc
160 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions
161 if isinstance(_module._METACLASS_CALL_BLACKLIST, frozenset):
162 _module._METACLASS_CALL_BLACKLIST = _module._METACLASS_CALL_BLACKLIST | {
163 signature
164 }
165 else:
166 _module._METACLASS_CALL_BLACKLIST.append(signature)
167 except Exception:
168 pass
171class ExampleDatabase(metaclass=_EDMeta):
172 """
173 A Hypothesis database, for use in |settings.database|.
175 Hypothesis automatically saves failures to the database set in
176 |settings.database|. The next time the test is run, Hypothesis will replay
177 any failures from the database in |settings.database| for that test (in
178 |Phase.reuse|).
180 The database is best thought of as a cache that you never need to invalidate.
181 Entries may be transparently dropped when upgrading your Hypothesis version
182 or changing your test. Do not rely on the database for correctness; to ensure
183 Hypothesis always tries an input, use |@example|.
185 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis
186 provides several concrete database subclasses. To write your own database class,
187 see :doc:`/how-to/custom-database`.
189 Change listening
190 ----------------
192 An optional extension to |ExampleDatabase| is change listening. On databases
193 which support change listening, calling |ExampleDatabase.add_listener| adds
194 a function as a change listener, which will be called whenever a value is
195 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener|
196 for details.
198 All databases in Hypothesis support change listening. Custom database classes
199 are not required to support change listening, though they will not be compatible
200 with features that require change listening until they do so.
202 .. note::
204 While no Hypothesis features currently require change listening, change
205 listening is required by `HypoFuzz <https://hypofuzz.com/>`_.
207 Database methods
208 ----------------
210 Required methods:
212 * |ExampleDatabase.save|
213 * |ExampleDatabase.fetch|
214 * |ExampleDatabase.delete|
216 Optional methods:
218 * |ExampleDatabase.move|
220 Change listening methods:
222 * |ExampleDatabase.add_listener|
223 * |ExampleDatabase.remove_listener|
224 * |ExampleDatabase.clear_listeners|
225 * |ExampleDatabase._start_listening|
226 * |ExampleDatabase._stop_listening|
227 * |ExampleDatabase._broadcast_change|
228 """
230 def __init__(self) -> None:
231 self._listeners: list[ListenerT] = []
233 @abc.abstractmethod
234 def save(self, key: bytes, value: bytes) -> None:
235 """Save ``value`` under ``key``.
237 If ``value`` is already present in ``key``, silently do nothing.
238 """
239 raise NotImplementedError(f"{type(self).__name__}.save")
241 @abc.abstractmethod
242 def fetch(self, key: bytes) -> Iterable[bytes]:
243 """Return an iterable over all values matching this key."""
244 raise NotImplementedError(f"{type(self).__name__}.fetch")
246 @abc.abstractmethod
247 def delete(self, key: bytes, value: bytes) -> None:
248 """Remove ``value`` from ``key``.
250 If ``value`` is not present in ``key``, silently do nothing.
251 """
252 raise NotImplementedError(f"{type(self).__name__}.delete")
254 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
255 """
256 Move ``value`` from key ``src`` to key ``dest``.
258 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``,
259 but may have a more efficient implementation.
261 Note that ``value`` will be inserted at ``dest`` regardless of whether
262 it is currently present at ``src``.
263 """
264 if src == dest:
265 self.save(src, value)
266 return
267 self.delete(src, value)
268 self.save(dest, value)
270 def add_listener(self, f: ListenerT, /) -> None:
271 """
272 Add a change listener. ``f`` will be called whenever a value is saved,
273 deleted, or moved in the database.
275 ``f`` can be called with two different event values:
277 * ``("save", (key, value))``
278 * ``("delete", (key, value))``
280 where ``key`` and ``value`` are both ``bytes``.
282 There is no ``move`` event. Instead, a move is broadcasted as a
283 ``delete`` event followed by a ``save`` event.
285 For the ``delete`` event, ``value`` may be ``None``. This might occur if
286 the database knows that a deletion has occurred in ``key``, but does not
287 know what value was deleted.
288 """
289 had_listeners = bool(self._listeners)
290 self._listeners.append(f)
291 if not had_listeners:
292 self._start_listening()
294 def remove_listener(self, f: ListenerT, /) -> None:
295 """
296 Removes ``f`` from the list of change listeners.
298 If ``f`` is not in the list of change listeners, silently do nothing.
299 """
300 if f not in self._listeners:
301 return
302 self._listeners.remove(f)
303 if not self._listeners:
304 self._stop_listening()
306 def clear_listeners(self) -> None:
307 """Remove all change listeners."""
308 had_listeners = bool(self._listeners)
309 self._listeners.clear()
310 if had_listeners:
311 self._stop_listening()
313 def _broadcast_change(self, event: ListenerEventT) -> None:
314 """
315 Called when a value has been either added to or deleted from a key in
316 the underlying database store. The possible values for ``event`` are:
318 * ``("save", (key, value))``
319 * ``("delete", (key, value))``
321 ``value`` may be ``None`` for the ``delete`` event, indicating we know
322 that some value was deleted under this key, but not its exact value.
324 Note that you should not assume your instance is the only reference to
325 the underlying database store. For example, if two instances of
326 |DirectoryBasedExampleDatabase| reference the same directory,
327 _broadcast_change should be called whenever a file is added or removed
328 from the directory, even if that database was not responsible for
329 changing the file.
330 """
331 for listener in self._listeners:
332 listener(event)
334 def _start_listening(self) -> None:
335 """
336 Called when the database adds a change listener, and did not previously
337 have any change listeners. Intended to allow databases to wait to start
338 expensive listening operations until necessary.
340 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate,
341 so you do not need to handle the case of multiple consecutive
342 ``_start_listening`` calls without an intermediate ``_stop_listening``
343 call.
344 """
345 warnings.warn(
346 f"{self.__class__} does not support listening for changes",
347 HypothesisWarning,
348 stacklevel=4,
349 )
351 def _stop_listening(self) -> None:
352 """
353 Called whenever no change listeners remain on the database.
355 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate,
356 so you do not need to handle the case of multiple consecutive
357 ``_stop_listening`` calls without an intermediate ``_start_listening``
358 call.
359 """
360 warnings.warn(
361 f"{self.__class__} does not support stopping listening for changes",
362 HypothesisWarning,
363 stacklevel=4,
364 )
367class InMemoryExampleDatabase(ExampleDatabase):
368 """A non-persistent example database, implemented in terms of an in-memory
369 dictionary.
371 This can be useful if you call a test function several times in a single
372 session, or for testing other database implementations, but because it
373 does not persist between runs we do not recommend it for general use.
374 """
376 def __init__(self) -> None:
377 super().__init__()
378 self.data: dict[bytes, set[bytes]] = {}
380 def __repr__(self) -> str:
381 return f"InMemoryExampleDatabase({self.data!r})"
383 def __eq__(self, other: object) -> bool:
384 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data
386 def fetch(self, key: bytes) -> Iterable[bytes]:
387 yield from self.data.get(key, ())
389 def save(self, key: bytes, value: bytes) -> None:
390 value = bytes(value)
391 values = self.data.setdefault(key, set())
392 changed = value not in values
393 values.add(value)
395 if changed:
396 self._broadcast_change(("save", (key, value)))
398 def delete(self, key: bytes, value: bytes) -> None:
399 value = bytes(value)
400 values = self.data.get(key, set())
401 changed = value in values
402 values.discard(value)
404 if changed:
405 self._broadcast_change(("delete", (key, value)))
407 def _start_listening(self) -> None:
408 # declare compatibility with the listener api, but do the actual
409 # implementation in .delete and .save, since we know we are the only
410 # writer to .data.
411 pass
413 def _stop_listening(self) -> None:
414 pass
417def _hash(key: bytes) -> str:
418 return sha384(key).hexdigest()[:16]
421class DirectoryBasedExampleDatabase(ExampleDatabase):
422 """Use a directory to store Hypothesis examples as files.
424 Each test corresponds to a directory, and each example to a file within that
425 directory. While the contents are fairly opaque, a
426 |DirectoryBasedExampleDatabase| can be shared by checking the directory
427 into version control, for example with the following ``.gitignore``::
429 # Ignore files cached by Hypothesis...
430 .hypothesis/*
431 # except for the examples directory
432 !.hypothesis/examples/
434 Note however that this only makes sense if you also pin to an exact version of
435 Hypothesis, and we would usually recommend implementing a shared database with
436 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper.
437 """
439 # we keep a database entry of the full values of all the database keys.
440 # currently only used for inverse mapping of hash -> key in change listening.
441 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys"
442 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name)
444 def __init__(self, path: StrPathT) -> None:
445 super().__init__()
446 self.path = Path(path)
447 self.keypaths: dict[bytes, Path] = {}
448 self._observer: BaseObserver | None = None
450 def __repr__(self) -> str:
451 return f"DirectoryBasedExampleDatabase({self.path!r})"
453 def __eq__(self, other: object) -> bool:
454 return (
455 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path
456 )
458 def _key_path(self, key: bytes) -> Path:
459 try:
460 return self.keypaths[key]
461 except KeyError:
462 pass
463 self.keypaths[key] = self.path / _hash(key)
464 return self.keypaths[key]
466 def _value_path(self, key: bytes, value: bytes) -> Path:
467 return self._key_path(key) / _hash(value)
469 def fetch(self, key: bytes) -> Iterable[bytes]:
470 kp = self._key_path(key)
471 if not kp.is_dir():
472 return
474 try:
475 for path in os.listdir(kp):
476 try:
477 yield (kp / path).read_bytes()
478 except OSError:
479 pass
480 except OSError: # pragma: no cover
481 # the `kp` directory might have been deleted in the meantime
482 pass
484 def save(self, key: bytes, value: bytes) -> None:
485 key_path = self._key_path(key)
486 if key_path.name != self._metakeys_hash:
487 # add this key to our meta entry of all keys - taking care to avoid
488 # infinite recursion.
489 self.save(self._metakeys_name, key)
491 # Note: we attempt to create the dir in question now. We
492 # already checked for permissions, but there can still be other issues,
493 # e.g. the disk is full, or permissions might have been changed.
494 try:
495 key_path.mkdir(exist_ok=True, parents=True)
496 path = self._value_path(key, value)
497 if not path.exists():
498 # to mimic an atomic write, create and write in a temporary
499 # directory, and only move to the final path after. This avoids
500 # any intermediate state where the file is created (and empty)
501 # but not yet written to.
502 fd, tmpname = tempfile.mkstemp()
503 tmppath = Path(tmpname)
504 os.write(fd, value)
505 os.close(fd)
506 try:
507 tmppath.rename(path)
508 except OSError as err: # pragma: no cover
509 if err.errno == errno.EXDEV:
510 # Can't rename across filesystem boundaries, see e.g.
511 # https://github.com/HypothesisWorks/hypothesis/issues/4335
512 try:
513 path.write_bytes(tmppath.read_bytes())
514 except OSError:
515 pass
516 tmppath.unlink()
517 assert not tmppath.exists()
518 except OSError: # pragma: no cover
519 pass
521 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
522 if src == dest:
523 self.save(src, value)
524 return
526 src_path = self._value_path(src, value)
527 dest_path = self._value_path(dest, value)
528 # if the dest key path does not exist, os.renames will create it for us,
529 # and we will never track its creation in the meta keys entry. Do so now.
530 if not self._key_path(dest).exists():
531 self.save(self._metakeys_name, dest)
533 try:
534 os.renames(src_path, dest_path)
535 except OSError:
536 self.delete(src, value)
537 self.save(dest, value)
539 def delete(self, key: bytes, value: bytes) -> None:
540 try:
541 self._value_path(key, value).unlink()
542 except OSError:
543 return
545 # try deleting the key dir, which will only succeed if the dir is empty
546 # (i.e. ``value`` was the last value in this key).
547 try:
548 self._key_path(key).rmdir()
549 except OSError:
550 pass
551 else:
552 # if the deletion succeeded, also delete this key entry from metakeys.
553 # (if this key happens to be the metakey itself, this deletion will
554 # fail; that's ok and faster than checking for this rare case.)
555 self.delete(self._metakeys_name, key)
557 def _start_listening(self) -> None:
558 try:
559 from watchdog.events import (
560 DirCreatedEvent,
561 DirDeletedEvent,
562 DirMovedEvent,
563 FileCreatedEvent,
564 FileDeletedEvent,
565 FileMovedEvent,
566 FileSystemEventHandler,
567 )
568 from watchdog.observers import Observer
569 except ImportError:
570 warnings.warn(
571 f"listening for changes in a {self.__class__.__name__} "
572 "requires the watchdog library. To install, run "
573 "`pip install hypothesis[watchdog]`",
574 HypothesisWarning,
575 stacklevel=4,
576 )
577 return
579 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)}
580 _metakeys_hash = self._metakeys_hash
581 _broadcast_change = self._broadcast_change
583 class Handler(
584 FileSystemEventHandler
585 ): # pragma: no cover # skipped in test_database.py for now
586 def on_created(_self, event: FileCreatedEvent | DirCreatedEvent) -> None:
587 # we only registered for the file creation event
588 assert not isinstance(event, DirCreatedEvent)
589 # watchdog events are only bytes if we passed a byte path to
590 # .schedule
591 assert isinstance(event.src_path, str)
593 value_path = Path(event.src_path)
594 # the parent dir represents the key, and its name is the key hash
595 key_hash = value_path.parent.name
597 if key_hash == _metakeys_hash:
598 try:
599 hash_to_key[value_path.name] = value_path.read_bytes()
600 except OSError: # pragma: no cover
601 # this might occur if all the values in a key have been
602 # deleted and DirectoryBasedExampleDatabase removes its
603 # metakeys entry (which is `value_path` here`).
604 pass
605 return
607 key = hash_to_key.get(key_hash)
608 if key is None: # pragma: no cover
609 # we didn't recognize this key. This shouldn't ever happen,
610 # but some race condition trickery might cause this.
611 return
613 try:
614 value = value_path.read_bytes()
615 except OSError: # pragma: no cover
616 return
618 _broadcast_change(("save", (key, value)))
620 def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None:
621 assert not isinstance(event, DirDeletedEvent)
622 assert isinstance(event.src_path, str)
624 value_path = Path(event.src_path)
625 key = hash_to_key.get(value_path.parent.name)
626 if key is None: # pragma: no cover
627 return
629 _broadcast_change(("delete", (key, None)))
631 def on_moved(self, event: FileMovedEvent | DirMovedEvent) -> None:
632 assert not isinstance(event, DirMovedEvent)
633 assert isinstance(event.src_path, str)
634 assert isinstance(event.dest_path, str)
636 src_path = Path(event.src_path)
637 dest_path = Path(event.dest_path)
638 k1 = hash_to_key.get(src_path.parent.name)
639 k2 = hash_to_key.get(dest_path.parent.name)
641 if k1 is None or k2 is None: # pragma: no cover
642 return
644 try:
645 value = dest_path.read_bytes()
646 except OSError: # pragma: no cover
647 return
649 _broadcast_change(("delete", (k1, value)))
650 _broadcast_change(("save", (k2, value)))
652 # If we add a listener to a DirectoryBasedExampleDatabase whose database
653 # directory doesn't yet exist, the watchdog observer will not fire any
654 # events, even after the directory gets created.
655 #
656 # Ensure the directory exists before starting the observer.
657 self.path.mkdir(exist_ok=True, parents=True)
658 self._observer = Observer()
659 self._observer.schedule(
660 Handler(),
661 # remove type: ignore when released
662 # https://github.com/gorakhargosh/watchdog/pull/1096
663 self.path, # type: ignore
664 recursive=True,
665 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent],
666 )
667 self._observer.start()
669 def _stop_listening(self) -> None:
670 assert self._observer is not None
671 self._observer.stop()
672 self._observer.join()
673 self._observer = None
676class ReadOnlyDatabase(ExampleDatabase):
677 """A wrapper to make the given database read-only.
679 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and
680 ``move`` into silent no-ops.
682 Note that this disables Hypothesis' automatic discarding of stale examples.
683 It is designed to allow local machines to access a shared database (e.g. from CI
684 servers), without propagating changes back from a local or in-development branch.
685 """
687 def __init__(self, db: ExampleDatabase) -> None:
688 super().__init__()
689 assert isinstance(db, ExampleDatabase)
690 self._wrapped = db
692 def __repr__(self) -> str:
693 return f"ReadOnlyDatabase({self._wrapped!r})"
695 def __eq__(self, other: object) -> bool:
696 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped
698 def fetch(self, key: bytes) -> Iterable[bytes]:
699 yield from self._wrapped.fetch(key)
701 def save(self, key: bytes, value: bytes) -> None:
702 pass
704 def delete(self, key: bytes, value: bytes) -> None:
705 pass
707 def _start_listening(self) -> None:
708 # we're read only, so there are no changes to broadcast.
709 pass
711 def _stop_listening(self) -> None:
712 pass
715class MultiplexedDatabase(ExampleDatabase):
716 """A wrapper around multiple databases.
718 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against
719 all of the wrapped databases. ``fetch`` does not yield duplicate values, even
720 if the same value is present in two or more of the wrapped databases.
722 This combines well with a :class:`ReadOnlyDatabase`, as follows:
724 .. code-block:: python
726 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/")
727 shared = CustomNetworkDatabase()
729 settings.register_profile("ci", database=shared)
730 settings.register_profile(
731 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared))
732 )
733 settings.load_profile("ci" if os.environ.get("CI") else "dev")
735 So your CI system or fuzzing runs can populate a central shared database;
736 while local runs on development machines can reproduce any failures from CI
737 but will only cache their own failures locally and cannot remove examples
738 from the shared database.
739 """
741 def __init__(self, *dbs: ExampleDatabase) -> None:
742 super().__init__()
743 assert all(isinstance(db, ExampleDatabase) for db in dbs)
744 self._wrapped = dbs
746 def __repr__(self) -> str:
747 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped)))
749 def __eq__(self, other: object) -> bool:
750 return (
751 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped
752 )
754 def fetch(self, key: bytes) -> Iterable[bytes]:
755 seen = set()
756 for db in self._wrapped:
757 for value in db.fetch(key):
758 if value not in seen:
759 yield value
760 seen.add(value)
762 def save(self, key: bytes, value: bytes) -> None:
763 for db in self._wrapped:
764 db.save(key, value)
766 def delete(self, key: bytes, value: bytes) -> None:
767 for db in self._wrapped:
768 db.delete(key, value)
770 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
771 for db in self._wrapped:
772 db.move(src, dest, value)
774 def _start_listening(self) -> None:
775 for db in self._wrapped:
776 db.add_listener(self._broadcast_change)
778 def _stop_listening(self) -> None:
779 for db in self._wrapped:
780 db.remove_listener(self._broadcast_change)
783class GitHubArtifactDatabase(ExampleDatabase):
784 """
785 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact.
787 You can use this for sharing example databases between CI runs and developers, allowing
788 the latter to get read-only access to the former. This is particularly useful for
789 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_),
790 where the CI system can help find new failing examples through fuzzing,
791 and developers can reproduce them locally without any manual effort.
793 .. note::
794 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides
795 this automatically, but it needs to be set manually for local usage. In a developer machine,
796 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_.
797 If the repository is private, it's necessary for the token to have ``repo`` scope
798 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token.
801 In most cases, this will be used
802 through the :class:`~hypothesis.database.MultiplexedDatabase`,
803 by combining a local directory-based database with this one. For example:
805 .. code-block:: python
807 local = DirectoryBasedExampleDatabase(".hypothesis/examples")
808 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo"))
810 settings.register_profile("ci", database=local)
811 settings.register_profile("dev", database=MultiplexedDatabase(local, shared))
812 # We don't want to use the shared database in CI, only to populate its local one.
813 # which the workflow should then upload as an artifact.
814 settings.load_profile("ci" if os.environ.get("CI") else "dev")
816 .. note::
817 Because this database is read-only, you always need to wrap it with the
818 :class:`ReadOnlyDatabase`.
820 A setup like this can be paired with a GitHub Actions workflow including
821 something like the following:
823 .. code-block:: yaml
825 - name: Download example database
826 uses: dawidd6/action-download-artifact@v9
827 with:
828 name: hypothesis-example-db
829 path: .hypothesis/examples
830 if_no_artifact_found: warn
831 workflow_conclusion: completed
833 - name: Run tests
834 run: pytest
836 - name: Upload example database
837 uses: actions/upload-artifact@v3
838 if: always()
839 with:
840 name: hypothesis-example-db
841 path: .hypothesis/examples
843 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_
844 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_
845 does not support downloading artifacts from previous workflow runs.
847 The database automatically implements a simple file-based cache with a default expiration period
848 of 1 day. You can adjust this through the ``cache_timeout`` property.
850 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``).
851 """
853 def __init__(
854 self,
855 owner: str,
856 repo: str,
857 artifact_name: str = "hypothesis-example-db",
858 cache_timeout: timedelta = timedelta(days=1),
859 path: StrPathT | None = None,
860 ):
861 super().__init__()
862 self.owner = owner
863 self.repo = repo
864 self.artifact_name = artifact_name
865 self.cache_timeout = cache_timeout
867 # Get the GitHub token from the environment
868 # It's unnecessary to use a token if the repo is public
869 self.token: str | None = getenv("GITHUB_TOKEN")
871 if path is None:
872 self.path: Path = Path(
873 storage_directory(f"github-artifacts/{self.artifact_name}/")
874 )
875 else:
876 self.path = Path(path)
878 # We don't want to initialize the cache until we need to
879 self._initialized: bool = False
880 self._disabled: bool = False
882 # This is the path to the artifact in usage
883 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip
884 self._artifact: Path | None = None
885 # This caches the artifact structure
886 self._access_cache: dict[PurePath, set[PurePath]] | None = None
888 # Message to display if user doesn't wrap around ReadOnlyDatabase
889 self._read_only_message = (
890 "This database is read-only. "
891 "Please wrap this class with ReadOnlyDatabase"
892 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))."
893 )
895 def __repr__(self) -> str:
896 return (
897 f"GitHubArtifactDatabase(owner={self.owner!r}, "
898 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})"
899 )
901 def __eq__(self, other: object) -> bool:
902 return (
903 isinstance(other, GitHubArtifactDatabase)
904 and self.owner == other.owner
905 and self.repo == other.repo
906 and self.artifact_name == other.artifact_name
907 and self.path == other.path
908 )
910 def _prepare_for_io(self) -> None:
911 assert self._artifact is not None, "Artifact not loaded."
913 if self._initialized: # pragma: no cover
914 return
916 # Test that the artifact is valid
917 try:
918 with ZipFile(self._artifact) as f:
919 if f.testzip(): # pragma: no cover
920 raise BadZipFile
922 # Turns out that testzip() doesn't work quite well
923 # doing the cache initialization here instead
924 # will give us more coverage of the artifact.
926 # Cache the files inside each keypath
927 self._access_cache = {}
928 with ZipFile(self._artifact) as zf:
929 namelist = zf.namelist()
930 # Iterate over files in the artifact
931 for filename in namelist:
932 fileinfo = zf.getinfo(filename)
933 if fileinfo.is_dir():
934 self._access_cache[PurePath(filename)] = set()
935 else:
936 # Get the keypath from the filename
937 keypath = PurePath(filename).parent
938 # Add the file to the keypath
939 self._access_cache[keypath].add(PurePath(filename))
940 except BadZipFile:
941 warnings.warn(
942 "The downloaded artifact from GitHub is invalid. "
943 "This could be because the artifact was corrupted, "
944 "or because the artifact was not created by Hypothesis. ",
945 HypothesisWarning,
946 stacklevel=3,
947 )
948 self._disabled = True
950 self._initialized = True
952 def _initialize_db(self) -> None:
953 # Trigger warning that we suppressed earlier by intent_to_write=False
954 storage_directory(self.path.name)
955 # Create the cache directory if it doesn't exist
956 self.path.mkdir(exist_ok=True, parents=True)
958 # Get all artifacts
959 cached_artifacts = sorted(
960 self.path.glob("*.zip"),
961 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")),
962 )
964 # Remove all but the latest artifact
965 for artifact in cached_artifacts[:-1]:
966 artifact.unlink()
968 try:
969 found_artifact = cached_artifacts[-1]
970 except IndexError:
971 found_artifact = None
973 # Check if the latest artifact is a cache hit
974 if found_artifact is not None and (
975 datetime.now(timezone.utc)
976 - datetime.fromisoformat(found_artifact.stem.replace("_", ":"))
977 < self.cache_timeout
978 ):
979 self._artifact = found_artifact
980 else:
981 # Download the latest artifact from GitHub
982 new_artifact = self._fetch_artifact()
984 if new_artifact:
985 if found_artifact is not None:
986 found_artifact.unlink()
987 self._artifact = new_artifact
988 elif found_artifact is not None:
989 warnings.warn(
990 "Using an expired artifact as a fallback for the database: "
991 f"{found_artifact}",
992 HypothesisWarning,
993 stacklevel=2,
994 )
995 self._artifact = found_artifact
996 else:
997 warnings.warn(
998 "Couldn't acquire a new or existing artifact. Disabling database.",
999 HypothesisWarning,
1000 stacklevel=2,
1001 )
1002 self._disabled = True
1003 return
1005 self._prepare_for_io()
1007 def _get_bytes(self, url: str) -> bytes | None: # pragma: no cover
1008 request = Request(
1009 url,
1010 headers={
1011 "Accept": "application/vnd.github+json",
1012 "X-GitHub-Api-Version": "2022-11-28 ",
1013 "Authorization": f"Bearer {self.token}",
1014 },
1015 )
1016 warning_message = None
1017 response_bytes: bytes | None = None
1018 try:
1019 with urlopen(request) as response:
1020 response_bytes = response.read()
1021 except HTTPError as e:
1022 if e.code == 401:
1023 warning_message = (
1024 "Authorization failed when trying to download artifact from GitHub. "
1025 "Check that you have a valid GITHUB_TOKEN set in your environment."
1026 )
1027 else:
1028 warning_message = (
1029 "Could not get the latest artifact from GitHub. "
1030 "This could be because the repository "
1031 "or artifact does not exist. "
1032 )
1033 # see https://github.com/python/cpython/issues/128734
1034 e.close()
1035 except URLError:
1036 warning_message = "Could not connect to GitHub to get the latest artifact. "
1037 except TimeoutError:
1038 warning_message = (
1039 "Could not connect to GitHub to get the latest artifact "
1040 "(connection timed out)."
1041 )
1043 if warning_message is not None:
1044 warnings.warn(warning_message, HypothesisWarning, stacklevel=4)
1045 return None
1047 return response_bytes
1049 def _fetch_artifact(self) -> Path | None: # pragma: no cover
1050 # Get the list of artifacts from GitHub
1051 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts"
1052 response_bytes = self._get_bytes(url)
1053 if response_bytes is None:
1054 return None
1056 artifacts = json.loads(response_bytes)["artifacts"]
1057 artifacts = [a for a in artifacts if a["name"] == self.artifact_name]
1059 if not artifacts:
1060 return None
1062 # Get the latest artifact from the list
1063 artifact = max(artifacts, key=lambda a: a["created_at"])
1064 url = artifact["archive_download_url"]
1066 # Download the artifact
1067 artifact_bytes = self._get_bytes(url)
1068 if artifact_bytes is None:
1069 return None
1071 # Save the artifact to the cache
1072 # We replace ":" with "_" to ensure the filenames are compatible
1073 # with Windows filesystems
1074 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_")
1075 artifact_path = self.path / f"{timestamp}.zip"
1076 try:
1077 artifact_path.write_bytes(artifact_bytes)
1078 except OSError:
1079 warnings.warn(
1080 "Could not save the latest artifact from GitHub. ",
1081 HypothesisWarning,
1082 stacklevel=3,
1083 )
1084 return None
1086 return artifact_path
1088 @staticmethod
1089 @lru_cache
1090 def _key_path(key: bytes) -> PurePath:
1091 return PurePath(_hash(key) + "/")
1093 def fetch(self, key: bytes) -> Iterable[bytes]:
1094 if self._disabled:
1095 return
1097 if not self._initialized:
1098 self._initialize_db()
1099 if self._disabled:
1100 return
1102 assert self._artifact is not None
1103 assert self._access_cache is not None
1105 kp = self._key_path(key)
1107 with ZipFile(self._artifact) as zf:
1108 # Get all the files in the kp from the cache
1109 filenames = self._access_cache.get(kp, ())
1110 for filename in filenames:
1111 with zf.open(filename.as_posix()) as f:
1112 yield f.read()
1114 # Read-only interface
1115 def save(self, key: bytes, value: bytes) -> None:
1116 raise RuntimeError(self._read_only_message)
1118 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1119 raise RuntimeError(self._read_only_message)
1121 def delete(self, key: bytes, value: bytes) -> None:
1122 raise RuntimeError(self._read_only_message)
1125class BackgroundWriteDatabase(ExampleDatabase):
1126 """A wrapper which defers writes on the given database to a background thread.
1128 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any
1129 enqueued writes to finish before fetching from the database.
1130 """
1132 def __init__(self, db: ExampleDatabase) -> None:
1133 super().__init__()
1134 self._db = db
1135 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue()
1136 self._thread: Thread | None = None
1138 def _ensure_thread(self):
1139 if self._thread is None:
1140 self._thread = Thread(target=self._worker, daemon=True)
1141 self._thread.start()
1142 # avoid an unbounded timeout during gc. 0.1 should be plenty for most
1143 # use cases.
1144 weakref.finalize(self, self._join, 0.1)
1146 def __repr__(self) -> str:
1147 return f"BackgroundWriteDatabase({self._db!r})"
1149 def __eq__(self, other: object) -> bool:
1150 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db
1152 def _worker(self) -> None:
1153 while True:
1154 method, args = self._queue.get()
1155 getattr(self._db, method)(*args)
1156 self._queue.task_done()
1158 def _join(self, timeout: float | None = None) -> None:
1159 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634
1160 with self._queue.all_tasks_done:
1161 while self._queue.unfinished_tasks:
1162 self._queue.all_tasks_done.wait(timeout)
1164 def fetch(self, key: bytes) -> Iterable[bytes]:
1165 self._join()
1166 return self._db.fetch(key)
1168 def save(self, key: bytes, value: bytes) -> None:
1169 self._ensure_thread()
1170 self._queue.put(("save", (key, value)))
1172 def delete(self, key: bytes, value: bytes) -> None:
1173 self._ensure_thread()
1174 self._queue.put(("delete", (key, value)))
1176 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1177 self._ensure_thread()
1178 self._queue.put(("move", (src, dest, value)))
1180 def _start_listening(self) -> None:
1181 self._db.add_listener(self._broadcast_change)
1183 def _stop_listening(self) -> None:
1184 self._db.remove_listener(self._broadcast_change)
1187def _pack_uleb128(value: int) -> bytes:
1188 """
1189 Serialize an integer into variable-length bytes. For each byte, the first 7
1190 bits represent (part of) the integer, while the last bit indicates whether the
1191 integer continues into the next byte.
1193 https://en.wikipedia.org/wiki/LEB128
1194 """
1195 parts = bytearray()
1196 assert value >= 0
1197 while True:
1198 # chop off 7 bits
1199 byte = value & ((1 << 7) - 1)
1200 value >>= 7
1201 # set the continuation bit if we have more left
1202 if value:
1203 byte |= 1 << 7
1205 parts.append(byte)
1206 if not value:
1207 break
1208 return bytes(parts)
1211def _unpack_uleb128(buffer: bytes) -> tuple[int, int]:
1212 """
1213 Inverts _pack_uleb128, and also returns the index at which at which we stopped
1214 reading.
1215 """
1216 value = 0
1217 for i, byte in enumerate(buffer):
1218 n = byte & ((1 << 7) - 1)
1219 value |= n << (i * 7)
1221 if not byte >> 7:
1222 break
1223 return (i + 1, value)
1226def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes:
1227 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes."""
1228 # We use a custom serialization format for this, which might seem crazy - but our
1229 # data is a flat sequence of elements, and standard tools like protobuf or msgpack
1230 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode.
1231 #
1232 # We simply encode each element with a metadata byte, if needed a uint16 size, and
1233 # then the payload bytes. For booleans, the payload is inlined into the metadata.
1234 parts = []
1235 for choice in choices:
1236 if isinstance(choice, bool):
1237 # `000_0000v` - tag zero, low bit payload.
1238 parts.append(b"\1" if choice else b"\0")
1239 continue
1241 # `tag_ssss [uint16 size?] [payload]`
1242 if isinstance(choice, float):
1243 tag = 1 << 5
1244 choice = struct.pack("!d", choice)
1245 elif isinstance(choice, int):
1246 tag = 2 << 5
1247 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True)
1248 elif isinstance(choice, bytes):
1249 tag = 3 << 5
1250 else:
1251 assert isinstance(choice, str)
1252 tag = 4 << 5
1253 choice = choice.encode(errors="surrogatepass")
1255 size = len(choice)
1256 if size < 0b11111:
1257 parts.append((tag | size).to_bytes(1, "big"))
1258 else:
1259 parts.append((tag | 0b11111).to_bytes(1, "big"))
1260 parts.append(_pack_uleb128(size))
1261 parts.append(choice)
1263 return b"".join(parts)
1266def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]:
1267 # See above for an explanation of the format.
1268 parts: list[ChoiceT] = []
1269 idx = 0
1270 while idx < len(buffer):
1271 tag = buffer[idx] >> 5
1272 size = buffer[idx] & 0b11111
1273 idx += 1
1275 if tag == 0:
1276 parts.append(bool(size))
1277 continue
1278 if size == 0b11111:
1279 offset, size = _unpack_uleb128(buffer[idx:])
1280 idx += offset
1281 chunk = buffer[idx : idx + size]
1282 idx += size
1284 if tag == 1:
1285 assert size == 8, "expected float64"
1286 parts.extend(struct.unpack("!d", chunk))
1287 elif tag == 2:
1288 parts.append(int.from_bytes(chunk, "big", signed=True))
1289 elif tag == 3:
1290 parts.append(chunk)
1291 else:
1292 assert tag == 4
1293 parts.append(chunk.decode(errors="surrogatepass"))
1294 return tuple(parts)
1297def choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...] | None:
1298 """
1299 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes.
1301 Returns None if the given bytestring is not a valid serialization of choice
1302 sequences.
1303 """
1304 try:
1305 return _choices_from_bytes(buffer)
1306 except Exception:
1307 # deserialization error, eg because our format changed or someone put junk
1308 # data in the db.
1309 return None