Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/database.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11import abc
12import errno
13import json
14import os
15import struct
16import sys
17import tempfile
18import warnings
19import weakref
20from collections.abc import Callable, Iterable
21from datetime import datetime, timedelta, timezone
22from functools import lru_cache
23from hashlib import sha384
24from os import PathLike, getenv
25from pathlib import Path, PurePath
26from queue import Queue
27from threading import Thread
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 ClassVar,
32 Literal,
33 TypeAlias,
34 cast,
35)
36from urllib.error import HTTPError, URLError
37from urllib.request import Request, urlopen
38from zipfile import BadZipFile, ZipFile
40from hypothesis.configuration import StorageDirectory, storage_directory
41from hypothesis.errors import HypothesisException, HypothesisWarning
42from hypothesis.internal.conjecture.choice import ChoiceT
43from hypothesis.utils.conventions import UniqueIdentifier, not_set
44from hypothesis.utils.deprecation import note_deprecation
46__all__ = [
47 "DirectoryBasedExampleDatabase",
48 "ExampleDatabase",
49 "GitHubArtifactDatabase",
50 "InMemoryExampleDatabase",
51 "MultiplexedDatabase",
52 "ReadOnlyDatabase",
53]
55if TYPE_CHECKING:
56 from watchdog.observers.api import BaseObserver
58StrPathT: TypeAlias = str | PathLike[str]
59SaveDataT: TypeAlias = tuple[bytes, bytes] # key, value
60DeleteDataT: TypeAlias = tuple[bytes, bytes | None] # key, value
61ListenerEventT: TypeAlias = (
62 tuple[Literal["save"], SaveDataT] | tuple[Literal["delete"], DeleteDataT]
63)
64ListenerT: TypeAlias = Callable[[ListenerEventT], Any]
67def _usable_dir(path: StrPathT) -> bool:
68 """
69 Returns True if the desired path can be used as database path because
70 either the directory exists and can be used, or its root directory can
71 be used and we can make the directory as needed.
72 """
73 path = Path(path)
74 try:
75 while not path.exists():
76 # Loop terminates because the root dir ('/' on unix) always exists.
77 path = path.parent
78 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK)
79 except PermissionError: # pragma: no cover
80 # path.exists() returns False on 3.14+ instead of raising. See
81 # https://docs.python.org/3.14/library/pathlib.html#querying-file-type-and-status
82 return False
85def _db_for_path(
86 path: StrPathT | UniqueIdentifier | Literal[":memory:"] | None = None,
87) -> "ExampleDatabase":
88 if path is not_set:
89 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover
90 raise HypothesisException(
91 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any "
92 "effect. Configure your database location via a settings profile instead.\n"
93 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles"
94 )
96 storage_dir = storage_directory("examples", intent_to_write=False)
97 if not _usable_dir(storage_dir.path): # pragma: no cover
98 warnings.warn(
99 "The database setting is not configured, and the default "
100 "location is unusable - falling back to an in-memory "
101 f"database for this session. path={storage_dir.path!r}",
102 HypothesisWarning,
103 stacklevel=3,
104 )
105 return InMemoryExampleDatabase()
106 return _StorageDirectoryDatabase(storage_dir)
107 if path in (None, ":memory:"):
108 return InMemoryExampleDatabase()
109 path = cast(StrPathT, path)
110 return DirectoryBasedExampleDatabase(path)
113class _EDMeta(abc.ABCMeta):
114 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase":
115 if self is ExampleDatabase:
116 note_deprecation(
117 "Creating a database using the abstract ExampleDatabase() class "
118 "is deprecated. Prefer using a concrete subclass, like "
119 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). "
120 'In particular, the special string ExampleDatabase(":memory:") '
121 "should be replaced by InMemoryExampleDatabase().",
122 since="2025-04-07",
123 has_codemod=False,
124 )
125 return _db_for_path(*args, **kwargs)
126 return super().__call__(*args, **kwargs)
129# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase
130# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx
131# maintains a list of metaclass-call-methods to ignore, and while they would prefer
132# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we
133# can insert ourselves here.
134#
135# This code only runs if Sphinx has already been imported; and it would live in our
136# docs/conf.py except that we would also like it to work for anyone documenting
137# downstream ExampleDatabase subclasses too.
138#
139# We avoid type-checking this block due to this combination facts:
140# * our check-types-api CI job runs under 3.14
141# * tools.txt therefore pins to a newer version of sphinx which uses 3.12+ `type`
142# syntax
143# * in test_mypy.py, mypy sees this block, sees sphinx is installed, tries parsing
144# sphinx code, and errors
145#
146# Putting `and not TYPE_CHECKING` here is just a convenience for our testing setup
147# (because we don't split mypy tests by running CI version, eg), not for runtime
148# behavior.
149if "sphinx" in sys.modules and not TYPE_CHECKING: # pragma: no cover
150 try:
151 import sphinx.ext.autodoc
153 signature = "hypothesis.database._EDMeta.__call__"
155 # _METACLASS_CALL_BLACKLIST moved in newer sphinx versions
156 try:
157 import sphinx.ext.autodoc._dynamic._signatures as _module
158 except ImportError:
159 _module = sphinx.ext.autodoc
161 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions
162 if isinstance(_module._METACLASS_CALL_BLACKLIST, frozenset):
163 _module._METACLASS_CALL_BLACKLIST = _module._METACLASS_CALL_BLACKLIST | {
164 signature
165 }
166 else:
167 _module._METACLASS_CALL_BLACKLIST.append(signature)
168 except Exception:
169 pass
172class ExampleDatabase(metaclass=_EDMeta):
173 """
174 A Hypothesis database, for use in |settings.database|.
176 Hypothesis automatically saves failures to the database set in
177 |settings.database|. The next time the test is run, Hypothesis will replay
178 any failures from the database in |settings.database| for that test (in
179 |Phase.reuse|).
181 The database is best thought of as a cache that you never need to invalidate.
182 Entries may be transparently dropped when upgrading your Hypothesis version
183 or changing your test. Do not rely on the database for correctness; to ensure
184 Hypothesis always tries an input, use |@example|.
186 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis
187 provides several concrete database subclasses. To write your own database class,
188 see :doc:`/how-to/custom-database`.
190 Change listening
191 ----------------
193 An optional extension to |ExampleDatabase| is change listening. On databases
194 which support change listening, calling |ExampleDatabase.add_listener| adds
195 a function as a change listener, which will be called whenever a value is
196 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener|
197 for details.
199 All databases in Hypothesis support change listening. Custom database classes
200 are not required to support change listening, though they will not be compatible
201 with features that require change listening until they do so.
203 .. note::
205 While no Hypothesis features currently require change listening, change
206 listening is required by `HypoFuzz <https://hypofuzz.com/>`_.
208 Database methods
209 ----------------
211 Required methods:
213 * |ExampleDatabase.save|
214 * |ExampleDatabase.fetch|
215 * |ExampleDatabase.delete|
217 Optional methods:
219 * |ExampleDatabase.move|
221 Change listening methods:
223 * |ExampleDatabase.add_listener|
224 * |ExampleDatabase.remove_listener|
225 * |ExampleDatabase.clear_listeners|
226 * |ExampleDatabase._start_listening|
227 * |ExampleDatabase._stop_listening|
228 * |ExampleDatabase._broadcast_change|
229 """
231 def __init__(self) -> None:
232 self._listeners: list[ListenerT] = []
234 @abc.abstractmethod
235 def save(self, key: bytes, value: bytes) -> None:
236 """Save ``value`` under ``key``.
238 If ``value`` is already present in ``key``, silently do nothing.
239 """
240 raise NotImplementedError(f"{type(self).__name__}.save")
242 @abc.abstractmethod
243 def fetch(self, key: bytes) -> Iterable[bytes]:
244 """Return an iterable over all values matching this key."""
245 raise NotImplementedError(f"{type(self).__name__}.fetch")
247 @abc.abstractmethod
248 def delete(self, key: bytes, value: bytes) -> None:
249 """Remove ``value`` from ``key``.
251 If ``value`` is not present in ``key``, silently do nothing.
252 """
253 raise NotImplementedError(f"{type(self).__name__}.delete")
255 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
256 """
257 Move ``value`` from key ``src`` to key ``dest``.
259 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``,
260 but may have a more efficient implementation.
262 Note that ``value`` will be inserted at ``dest`` regardless of whether
263 it is currently present at ``src``.
264 """
265 if src == dest:
266 self.save(src, value)
267 return
268 self.delete(src, value)
269 self.save(dest, value)
271 def add_listener(self, f: ListenerT, /) -> None:
272 """
273 Add a change listener. ``f`` will be called whenever a value is saved,
274 deleted, or moved in the database.
276 ``f`` can be called with two different event values:
278 * ``("save", (key, value))``
279 * ``("delete", (key, value))``
281 where ``key`` and ``value`` are both ``bytes``.
283 There is no ``move`` event. Instead, a move is broadcasted as a
284 ``delete`` event followed by a ``save`` event.
286 For the ``delete`` event, ``value`` may be ``None``. This might occur if
287 the database knows that a deletion has occurred in ``key``, but does not
288 know what value was deleted.
289 """
290 had_listeners = bool(self._listeners)
291 self._listeners.append(f)
292 if not had_listeners:
293 self._start_listening()
295 def remove_listener(self, f: ListenerT, /) -> None:
296 """
297 Removes ``f`` from the list of change listeners.
299 If ``f`` is not in the list of change listeners, silently do nothing.
300 """
301 if f not in self._listeners:
302 return
303 self._listeners.remove(f)
304 if not self._listeners:
305 self._stop_listening()
307 def clear_listeners(self) -> None:
308 """Remove all change listeners."""
309 had_listeners = bool(self._listeners)
310 self._listeners.clear()
311 if had_listeners:
312 self._stop_listening()
314 def _broadcast_change(self, event: ListenerEventT) -> None:
315 """
316 Called when a value has been either added to or deleted from a key in
317 the underlying database store. The possible values for ``event`` are:
319 * ``("save", (key, value))``
320 * ``("delete", (key, value))``
322 ``value`` may be ``None`` for the ``delete`` event, indicating we know
323 that some value was deleted under this key, but not its exact value.
325 Note that you should not assume your instance is the only reference to
326 the underlying database store. For example, if two instances of
327 |DirectoryBasedExampleDatabase| reference the same directory,
328 _broadcast_change should be called whenever a file is added or removed
329 from the directory, even if that database was not responsible for
330 changing the file.
331 """
332 for listener in self._listeners:
333 listener(event)
335 def _start_listening(self) -> None:
336 """
337 Called when the database adds a change listener, and did not previously
338 have any change listeners. Intended to allow databases to wait to start
339 expensive listening operations until necessary.
341 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate,
342 so you do not need to handle the case of multiple consecutive
343 ``_start_listening`` calls without an intermediate ``_stop_listening``
344 call.
345 """
346 warnings.warn(
347 f"{self.__class__} does not support listening for changes",
348 HypothesisWarning,
349 stacklevel=4,
350 )
352 def _stop_listening(self) -> None:
353 """
354 Called whenever no change listeners remain on the database.
356 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate,
357 so you do not need to handle the case of multiple consecutive
358 ``_stop_listening`` calls without an intermediate ``_start_listening``
359 call.
360 """
361 warnings.warn(
362 f"{self.__class__} does not support stopping listening for changes",
363 HypothesisWarning,
364 stacklevel=4,
365 )
368class InMemoryExampleDatabase(ExampleDatabase):
369 """A non-persistent example database, implemented in terms of an in-memory
370 dictionary.
372 This can be useful if you call a test function several times in a single
373 session, or for testing other database implementations, but because it
374 does not persist between runs we do not recommend it for general use.
375 """
377 def __init__(self) -> None:
378 super().__init__()
379 self.data: dict[bytes, set[bytes]] = {}
381 def __repr__(self) -> str:
382 return f"InMemoryExampleDatabase({self.data!r})"
384 def __eq__(self, other: object) -> bool:
385 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data
387 def fetch(self, key: bytes) -> Iterable[bytes]:
388 yield from self.data.get(key, ())
390 def save(self, key: bytes, value: bytes) -> None:
391 value = bytes(value)
392 values = self.data.setdefault(key, set())
393 changed = value not in values
394 values.add(value)
396 if changed:
397 self._broadcast_change(("save", (key, value)))
399 def delete(self, key: bytes, value: bytes) -> None:
400 value = bytes(value)
401 values = self.data.get(key, set())
402 changed = value in values
403 values.discard(value)
405 if changed:
406 self._broadcast_change(("delete", (key, value)))
408 def _start_listening(self) -> None:
409 # declare compatibility with the listener api, but do the actual
410 # implementation in .delete and .save, since we know we are the only
411 # writer to .data.
412 pass
414 def _stop_listening(self) -> None:
415 pass
418def _hash(key: bytes) -> str:
419 return sha384(key).hexdigest()[:16]
422class DirectoryBasedExampleDatabase(ExampleDatabase):
423 """Use a directory to store Hypothesis examples as files.
425 Each test corresponds to a directory, and each example to a file within that
426 directory. While the contents are fairly opaque, a
427 |DirectoryBasedExampleDatabase| can be shared by checking the directory
428 into version control, for example with the following ``.gitignore``::
430 # Ignore files cached by Hypothesis...
431 .hypothesis/*
432 # except for the examples directory
433 !.hypothesis/examples/
435 Note however that this only makes sense if you also pin to an exact version of
436 Hypothesis, and we would usually recommend implementing a shared database with
437 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper.
438 """
440 # we keep a database entry of the full values of all the database keys.
441 # currently only used for inverse mapping of hash -> key in change listening.
442 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys"
443 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name)
445 def __init__(self, path: StrPathT) -> None:
446 super().__init__()
447 self.path = Path(path)
448 self.keypaths: dict[bytes, Path] = {}
449 self._observer: BaseObserver | None = None
450 self._ensure_directory_exists_called = False
452 def _ensure_directory_exists(self) -> None:
453 # disk hits are expensive: early-return for performance
454 if self._ensure_directory_exists_called:
455 return
457 self.path.mkdir(exist_ok=True, parents=True)
458 self._ensure_directory_exists_called = True
460 def __repr__(self) -> str:
461 return f"DirectoryBasedExampleDatabase({self.path!r})"
463 def __eq__(self, other: object) -> bool:
464 return (
465 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path
466 )
468 def _key_path(self, key: bytes) -> Path:
469 try:
470 return self.keypaths[key]
471 except KeyError:
472 pass
473 self.keypaths[key] = self.path / _hash(key)
474 return self.keypaths[key]
476 def _value_path(self, key: bytes, value: bytes) -> Path:
477 return self._key_path(key) / _hash(value)
479 def fetch(self, key: bytes) -> Iterable[bytes]:
480 kp = self._key_path(key)
481 if not kp.is_dir():
482 return
484 try:
485 for path in os.listdir(kp):
486 try:
487 yield (kp / path).read_bytes()
488 except OSError:
489 pass
490 except OSError: # pragma: no cover
491 # the `kp` directory might have been deleted in the meantime
492 pass
494 def save(self, key: bytes, value: bytes) -> None:
495 key_path = self._key_path(key)
496 if key_path.name != self._metakeys_hash:
497 # add this key to our meta entry of all keys - taking care to avoid
498 # infinite recursion.
499 self.save(self._metakeys_name, key)
501 # Note: we attempt to create the dir in question now. We
502 # already checked for permissions, but there can still be other issues,
503 # e.g. the disk is full, or permissions might have been changed.
504 try:
505 self._ensure_directory_exists()
506 key_path.mkdir(exist_ok=True, parents=True)
507 path = self._value_path(key, value)
508 if not path.exists():
509 # to mimic an atomic write, create and write in a temporary
510 # directory, and only move to the final path after. This avoids
511 # any intermediate state where the file is created (and empty)
512 # but not yet written to.
513 fd, tmpname = tempfile.mkstemp()
514 tmppath = Path(tmpname)
515 os.write(fd, value)
516 os.close(fd)
517 try:
518 tmppath.rename(path)
519 except OSError as err: # pragma: no cover
520 if err.errno == errno.EXDEV:
521 # Can't rename across filesystem boundaries, see e.g.
522 # https://github.com/HypothesisWorks/hypothesis/issues/4335
523 try:
524 path.write_bytes(tmppath.read_bytes())
525 except OSError:
526 pass
527 tmppath.unlink()
528 assert not tmppath.exists()
529 except OSError: # pragma: no cover
530 pass
532 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
533 if src == dest:
534 self.save(src, value)
535 return
537 src_path = self._value_path(src, value)
538 dest_path = self._value_path(dest, value)
539 # if the dest key path does not exist, os.renames will create it for us,
540 # and we will never track its creation in the meta keys entry. Do so now.
541 if not self._key_path(dest).exists():
542 self.save(self._metakeys_name, dest)
544 try:
545 os.renames(src_path, dest_path)
546 except OSError:
547 self.delete(src, value)
548 self.save(dest, value)
550 def delete(self, key: bytes, value: bytes) -> None:
551 try:
552 self._value_path(key, value).unlink()
553 except OSError:
554 return
556 # try deleting the key dir, which will only succeed if the dir is empty
557 # (i.e. ``value`` was the last value in this key).
558 try:
559 self._key_path(key).rmdir()
560 except OSError:
561 pass
562 else:
563 # if the deletion succeeded, also delete this key entry from metakeys.
564 # (if this key happens to be the metakey itself, this deletion will
565 # fail; that's ok and faster than checking for this rare case.)
566 self.delete(self._metakeys_name, key)
568 def _start_listening(self) -> None:
569 try:
570 from watchdog.events import (
571 DirCreatedEvent,
572 DirDeletedEvent,
573 DirMovedEvent,
574 FileCreatedEvent,
575 FileDeletedEvent,
576 FileMovedEvent,
577 FileSystemEventHandler,
578 )
579 from watchdog.observers import Observer
580 except ImportError:
581 warnings.warn(
582 f"listening for changes in a {self.__class__.__name__} "
583 "requires the watchdog library. To install, run "
584 "`pip install hypothesis[watchdog]`",
585 HypothesisWarning,
586 stacklevel=4,
587 )
588 return
590 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)}
591 _metakeys_hash = self._metakeys_hash
592 _broadcast_change = self._broadcast_change
594 class Handler(
595 FileSystemEventHandler
596 ): # pragma: no cover # skipped in test_database.py for now
597 def on_created(_self, event: FileCreatedEvent | DirCreatedEvent) -> None:
598 # we only registered for the file creation event
599 assert not isinstance(event, DirCreatedEvent)
600 # watchdog events are only bytes if we passed a byte path to
601 # .schedule
602 assert isinstance(event.src_path, str)
604 value_path = Path(event.src_path)
605 # the parent dir represents the key, and its name is the key hash
606 key_hash = value_path.parent.name
608 if key_hash == _metakeys_hash:
609 try:
610 hash_to_key[value_path.name] = value_path.read_bytes()
611 except OSError: # pragma: no cover
612 # this might occur if all the values in a key have been
613 # deleted and DirectoryBasedExampleDatabase removes its
614 # metakeys entry (which is `value_path` here`).
615 pass
616 return
618 key = hash_to_key.get(key_hash)
619 if key is None: # pragma: no cover
620 # we didn't recognize this key. This shouldn't ever happen,
621 # but some race condition trickery might cause this.
622 return
624 try:
625 value = value_path.read_bytes()
626 except OSError: # pragma: no cover
627 return
629 _broadcast_change(("save", (key, value)))
631 def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None:
632 assert not isinstance(event, DirDeletedEvent)
633 assert isinstance(event.src_path, str)
635 value_path = Path(event.src_path)
636 key = hash_to_key.get(value_path.parent.name)
637 if key is None: # pragma: no cover
638 return
640 _broadcast_change(("delete", (key, None)))
642 def on_moved(self, event: FileMovedEvent | DirMovedEvent) -> None:
643 assert not isinstance(event, DirMovedEvent)
644 assert isinstance(event.src_path, str)
645 assert isinstance(event.dest_path, str)
647 src_path = Path(event.src_path)
648 dest_path = Path(event.dest_path)
649 k1 = hash_to_key.get(src_path.parent.name)
650 k2 = hash_to_key.get(dest_path.parent.name)
652 if k1 is None or k2 is None: # pragma: no cover
653 return
655 try:
656 value = dest_path.read_bytes()
657 except OSError: # pragma: no cover
658 return
660 _broadcast_change(("delete", (k1, value)))
661 _broadcast_change(("save", (k2, value)))
663 # If we add a listener to a DirectoryBasedExampleDatabase whose database
664 # directory doesn't yet exist, the watchdog observer will not fire any
665 # events, even after the directory gets created.
666 #
667 # Ensure the directory exists before starting the observer.
668 self._ensure_directory_exists()
669 self._observer = Observer()
670 self._observer.schedule(
671 Handler(),
672 # remove type: ignore when released
673 # https://github.com/gorakhargosh/watchdog/pull/1096
674 self.path, # type: ignore
675 recursive=True,
676 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent],
677 )
678 self._observer.start()
680 def _stop_listening(self) -> None:
681 assert self._observer is not None
682 self._observer.stop()
683 self._observer.join()
684 self._observer = None
687class _StorageDirectoryDatabase(DirectoryBasedExampleDatabase):
688 # A DirectoryBasedExampleDatabase which is located at the same directory as the storage
689 # directory. This lets our database logic interact with our logic for writing .gitignore
690 # files to the storage directory.
691 #
692 # The reason why we need this class is because the first interaction we have
693 # with .hypothesis might be writing a file to .hypothesis/examples, and
694 # DirectoryBasedExampleDatabase.save would otherwise create .hypothesis without
695 # performing our .gitignore logic.
697 def __init__(self, storage_dir: StorageDirectory) -> None:
698 super().__init__(storage_dir.path)
699 self._storage_dir = storage_dir
701 def _ensure_directory_exists(self) -> None:
702 if self._ensure_directory_exists_called:
703 return
705 self._storage_dir.create_if_missing()
706 self._ensure_directory_exists_called = True
709class ReadOnlyDatabase(ExampleDatabase):
710 """A wrapper to make the given database read-only.
712 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and
713 ``move`` into silent no-ops.
715 Note that this disables Hypothesis' automatic discarding of stale examples.
716 It is designed to allow local machines to access a shared database (e.g. from CI
717 servers), without propagating changes back from a local or in-development branch.
718 """
720 def __init__(self, db: ExampleDatabase) -> None:
721 super().__init__()
722 assert isinstance(db, ExampleDatabase)
723 self._wrapped = db
725 def __repr__(self) -> str:
726 return f"ReadOnlyDatabase({self._wrapped!r})"
728 def __eq__(self, other: object) -> bool:
729 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped
731 def fetch(self, key: bytes) -> Iterable[bytes]:
732 yield from self._wrapped.fetch(key)
734 def save(self, key: bytes, value: bytes) -> None:
735 pass
737 def delete(self, key: bytes, value: bytes) -> None:
738 pass
740 def _start_listening(self) -> None:
741 # we're read only, so there are no changes to broadcast.
742 pass
744 def _stop_listening(self) -> None:
745 pass
748class MultiplexedDatabase(ExampleDatabase):
749 """A wrapper around multiple databases.
751 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against
752 all of the wrapped databases. ``fetch`` does not yield duplicate values, even
753 if the same value is present in two or more of the wrapped databases.
755 This combines well with a :class:`ReadOnlyDatabase`, as follows:
757 .. code-block:: python
759 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/")
760 shared = CustomNetworkDatabase()
762 settings.register_profile("ci", database=shared)
763 settings.register_profile(
764 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared))
765 )
766 settings.load_profile("ci" if os.environ.get("CI") else "dev")
768 So your CI system or fuzzing runs can populate a central shared database;
769 while local runs on development machines can reproduce any failures from CI
770 but will only cache their own failures locally and cannot remove examples
771 from the shared database.
772 """
774 def __init__(self, *dbs: ExampleDatabase) -> None:
775 super().__init__()
776 assert all(isinstance(db, ExampleDatabase) for db in dbs)
777 self._wrapped = dbs
779 def __repr__(self) -> str:
780 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped)))
782 def __eq__(self, other: object) -> bool:
783 return (
784 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped
785 )
787 def fetch(self, key: bytes) -> Iterable[bytes]:
788 seen = set()
789 for db in self._wrapped:
790 for value in db.fetch(key):
791 if value not in seen:
792 yield value
793 seen.add(value)
795 def save(self, key: bytes, value: bytes) -> None:
796 for db in self._wrapped:
797 db.save(key, value)
799 def delete(self, key: bytes, value: bytes) -> None:
800 for db in self._wrapped:
801 db.delete(key, value)
803 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
804 for db in self._wrapped:
805 db.move(src, dest, value)
807 def _start_listening(self) -> None:
808 for db in self._wrapped:
809 db.add_listener(self._broadcast_change)
811 def _stop_listening(self) -> None:
812 for db in self._wrapped:
813 db.remove_listener(self._broadcast_change)
816class GitHubArtifactDatabase(ExampleDatabase):
817 """
818 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact.
820 You can use this for sharing example databases between CI runs and developers, allowing
821 the latter to get read-only access to the former. This is particularly useful for
822 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_),
823 where the CI system can help find new failing examples through fuzzing,
824 and developers can reproduce them locally without any manual effort.
826 .. note::
827 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides
828 this automatically, but it needs to be set manually for local usage. In a developer machine,
829 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_.
830 If the repository is private, it's necessary for the token to have ``repo`` scope
831 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token.
834 In most cases, this will be used
835 through the :class:`~hypothesis.database.MultiplexedDatabase`,
836 by combining a local directory-based database with this one. For example:
838 .. code-block:: python
840 local = DirectoryBasedExampleDatabase(".hypothesis/examples")
841 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo"))
843 settings.register_profile("ci", database=local)
844 settings.register_profile("dev", database=MultiplexedDatabase(local, shared))
845 # We don't want to use the shared database in CI, only to populate its local one.
846 # which the workflow should then upload as an artifact.
847 settings.load_profile("ci" if os.environ.get("CI") else "dev")
849 .. note::
850 Because this database is read-only, you always need to wrap it with the
851 :class:`ReadOnlyDatabase`.
853 A setup like this can be paired with a GitHub Actions workflow including
854 something like the following:
856 .. code-block:: yaml
858 - name: Download example database
859 uses: dawidd6/action-download-artifact@v9
860 with:
861 name: hypothesis-example-db
862 path: .hypothesis/examples
863 if_no_artifact_found: warn
864 workflow_conclusion: completed
866 - name: Run tests
867 run: pytest
869 - name: Upload example database
870 uses: actions/upload-artifact@v3
871 if: always()
872 with:
873 name: hypothesis-example-db
874 path: .hypothesis/examples
876 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_
877 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_
878 does not support downloading artifacts from previous workflow runs.
880 The database automatically implements a simple file-based cache with a default expiration period
881 of 1 day. You can adjust this through the ``cache_timeout`` property.
883 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``).
884 """
886 def __init__(
887 self,
888 owner: str,
889 repo: str,
890 artifact_name: str = "hypothesis-example-db",
891 cache_timeout: timedelta = timedelta(days=1),
892 path: StrPathT | None = None,
893 ):
894 super().__init__()
895 self.owner = owner
896 self.repo = repo
897 self.artifact_name = artifact_name
898 self.cache_timeout = cache_timeout
900 # Get the GitHub token from the environment
901 # It's unnecessary to use a token if the repo is public
902 self.token: str | None = getenv("GITHUB_TOKEN")
904 self._storage_dir: StorageDirectory | None = None
905 if path is None:
906 self._storage_dir = storage_directory(
907 f"github-artifacts/{self.artifact_name}/"
908 )
909 self.path = self._storage_dir.path
910 else:
911 self.path = Path(path)
913 # We don't want to initialize the cache until we need to
914 self._initialized: bool = False
915 self._disabled: bool = False
917 # This is the path to the artifact in usage
918 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip
919 self._artifact: Path | None = None
920 # This caches the artifact structure
921 self._access_cache: dict[PurePath, set[PurePath]] | None = None
923 # Message to display if user doesn't wrap around ReadOnlyDatabase
924 self._read_only_message = (
925 "This database is read-only. "
926 "Please wrap this class with ReadOnlyDatabase"
927 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))."
928 )
930 def __repr__(self) -> str:
931 return (
932 f"GitHubArtifactDatabase(owner={self.owner!r}, "
933 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})"
934 )
936 def __eq__(self, other: object) -> bool:
937 return (
938 isinstance(other, GitHubArtifactDatabase)
939 and self.owner == other.owner
940 and self.repo == other.repo
941 and self.artifact_name == other.artifact_name
942 and self.path == other.path
943 )
945 def _prepare_for_io(self) -> None:
946 assert self._artifact is not None, "Artifact not loaded."
948 if self._initialized: # pragma: no cover
949 return
951 # Test that the artifact is valid
952 try:
953 with ZipFile(self._artifact) as f:
954 if f.testzip(): # pragma: no cover
955 raise BadZipFile
957 # Turns out that testzip() doesn't work quite well
958 # doing the cache initialization here instead
959 # will give us more coverage of the artifact.
961 # Cache the files inside each keypath
962 self._access_cache = {}
963 with ZipFile(self._artifact) as zf:
964 namelist = zf.namelist()
965 # Iterate over files in the artifact
966 for filename in namelist:
967 fileinfo = zf.getinfo(filename)
968 if fileinfo.is_dir():
969 self._access_cache[PurePath(filename)] = set()
970 else:
971 # Get the keypath from the filename
972 keypath = PurePath(filename).parent
973 # Add the file to the keypath
974 self._access_cache[keypath].add(PurePath(filename))
975 except BadZipFile:
976 warnings.warn(
977 "The downloaded artifact from GitHub is invalid. "
978 "This could be because the artifact was corrupted, "
979 "or because the artifact was not created by Hypothesis. ",
980 HypothesisWarning,
981 stacklevel=3,
982 )
983 self._disabled = True
985 self._initialized = True
987 def _initialize_db(self) -> None:
988 # Trigger warning that we suppressed earlier by intent_to_write=False
989 storage_directory(self.path.name)
990 # Create the cache directory if it doesn't exist
991 if self._storage_dir is not None: # pragma: no cover
992 self._storage_dir.create_if_missing()
993 else:
994 self.path.mkdir(exist_ok=True, parents=True)
996 # Get all artifacts
997 cached_artifacts = sorted(
998 self.path.glob("*.zip"),
999 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")),
1000 )
1002 # Remove all but the latest artifact
1003 for artifact in cached_artifacts[:-1]:
1004 artifact.unlink()
1006 try:
1007 found_artifact = cached_artifacts[-1]
1008 except IndexError:
1009 found_artifact = None
1011 # Check if the latest artifact is a cache hit
1012 if found_artifact is not None and (
1013 datetime.now(timezone.utc)
1014 - datetime.fromisoformat(found_artifact.stem.replace("_", ":"))
1015 < self.cache_timeout
1016 ):
1017 self._artifact = found_artifact
1018 else:
1019 # Download the latest artifact from GitHub
1020 new_artifact = self._fetch_artifact()
1022 if new_artifact:
1023 if found_artifact is not None:
1024 found_artifact.unlink()
1025 self._artifact = new_artifact
1026 elif found_artifact is not None:
1027 warnings.warn(
1028 "Using an expired artifact as a fallback for the database: "
1029 f"{found_artifact}",
1030 HypothesisWarning,
1031 stacklevel=2,
1032 )
1033 self._artifact = found_artifact
1034 else:
1035 warnings.warn(
1036 "Couldn't acquire a new or existing artifact. Disabling database.",
1037 HypothesisWarning,
1038 stacklevel=2,
1039 )
1040 self._disabled = True
1041 return
1043 self._prepare_for_io()
1045 def _get_bytes(self, url: str) -> bytes | None: # pragma: no cover
1046 request = Request(
1047 url,
1048 headers={
1049 "Accept": "application/vnd.github+json",
1050 "X-GitHub-Api-Version": "2022-11-28 ",
1051 "Authorization": f"Bearer {self.token}",
1052 },
1053 )
1054 warning_message = None
1055 response_bytes: bytes | None = None
1056 try:
1057 with urlopen(request) as response:
1058 response_bytes = response.read()
1059 except HTTPError as e:
1060 if e.code == 401:
1061 warning_message = (
1062 "Authorization failed when trying to download artifact from GitHub. "
1063 "Check that you have a valid GITHUB_TOKEN set in your environment."
1064 )
1065 else:
1066 warning_message = (
1067 "Could not get the latest artifact from GitHub. "
1068 "This could be because the repository "
1069 "or artifact does not exist. "
1070 )
1071 # see https://github.com/python/cpython/issues/128734
1072 e.close()
1073 except URLError:
1074 warning_message = "Could not connect to GitHub to get the latest artifact. "
1075 except TimeoutError:
1076 warning_message = (
1077 "Could not connect to GitHub to get the latest artifact "
1078 "(connection timed out)."
1079 )
1081 if warning_message is not None:
1082 warnings.warn(warning_message, HypothesisWarning, stacklevel=4)
1083 return None
1085 return response_bytes
1087 def _fetch_artifact(self) -> Path | None: # pragma: no cover
1088 # Get the list of artifacts from GitHub
1089 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts"
1090 response_bytes = self._get_bytes(url)
1091 if response_bytes is None:
1092 return None
1094 artifacts = json.loads(response_bytes)["artifacts"]
1095 artifacts = [a for a in artifacts if a["name"] == self.artifact_name]
1097 if not artifacts:
1098 return None
1100 # Get the latest artifact from the list
1101 artifact = max(artifacts, key=lambda a: a["created_at"])
1102 url = artifact["archive_download_url"]
1104 # Download the artifact
1105 artifact_bytes = self._get_bytes(url)
1106 if artifact_bytes is None:
1107 return None
1109 # Save the artifact to the cache
1110 # We replace ":" with "_" to ensure the filenames are compatible
1111 # with Windows filesystems
1112 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_")
1113 artifact_path = self.path / f"{timestamp}.zip"
1114 try:
1115 artifact_path.write_bytes(artifact_bytes)
1116 except OSError:
1117 warnings.warn(
1118 "Could not save the latest artifact from GitHub. ",
1119 HypothesisWarning,
1120 stacklevel=3,
1121 )
1122 return None
1124 return artifact_path
1126 @staticmethod
1127 @lru_cache
1128 def _key_path(key: bytes) -> PurePath:
1129 return PurePath(_hash(key) + "/")
1131 def fetch(self, key: bytes) -> Iterable[bytes]:
1132 if self._disabled:
1133 return
1135 if not self._initialized:
1136 self._initialize_db()
1137 if self._disabled:
1138 return
1140 assert self._artifact is not None
1141 assert self._access_cache is not None
1143 kp = self._key_path(key)
1145 with ZipFile(self._artifact) as zf:
1146 # Get all the files in the kp from the cache
1147 filenames = self._access_cache.get(kp, ())
1148 for filename in filenames:
1149 with zf.open(filename.as_posix()) as f:
1150 yield f.read()
1152 # Read-only interface
1153 def save(self, key: bytes, value: bytes) -> None:
1154 raise RuntimeError(self._read_only_message)
1156 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1157 raise RuntimeError(self._read_only_message)
1159 def delete(self, key: bytes, value: bytes) -> None:
1160 raise RuntimeError(self._read_only_message)
1163class BackgroundWriteDatabase(ExampleDatabase):
1164 """A wrapper which defers writes on the given database to a background thread.
1166 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any
1167 enqueued writes to finish before fetching from the database.
1168 """
1170 def __init__(self, db: ExampleDatabase) -> None:
1171 super().__init__()
1172 self._db = db
1173 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue()
1174 self._thread: Thread | None = None
1176 def _ensure_thread(self):
1177 if self._thread is None:
1178 self._thread = Thread(target=self._worker, daemon=True)
1179 self._thread.start()
1180 # avoid an unbounded timeout during gc. 0.1 should be plenty for most
1181 # use cases.
1182 weakref.finalize(self, self._join, 0.1)
1184 def __repr__(self) -> str:
1185 return f"BackgroundWriteDatabase({self._db!r})"
1187 def __eq__(self, other: object) -> bool:
1188 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db
1190 def _worker(self) -> None:
1191 while True:
1192 method, args = self._queue.get()
1193 getattr(self._db, method)(*args)
1194 self._queue.task_done()
1196 def _join(self, timeout: float | None = None) -> None:
1197 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634
1198 with self._queue.all_tasks_done:
1199 while self._queue.unfinished_tasks:
1200 self._queue.all_tasks_done.wait(timeout)
1202 def fetch(self, key: bytes) -> Iterable[bytes]:
1203 self._join()
1204 return self._db.fetch(key)
1206 def save(self, key: bytes, value: bytes) -> None:
1207 self._ensure_thread()
1208 self._queue.put(("save", (key, value)))
1210 def delete(self, key: bytes, value: bytes) -> None:
1211 self._ensure_thread()
1212 self._queue.put(("delete", (key, value)))
1214 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1215 self._ensure_thread()
1216 self._queue.put(("move", (src, dest, value)))
1218 def _start_listening(self) -> None:
1219 self._db.add_listener(self._broadcast_change)
1221 def _stop_listening(self) -> None:
1222 self._db.remove_listener(self._broadcast_change)
1225def _pack_uleb128(value: int) -> bytes:
1226 """
1227 Serialize an integer into variable-length bytes. For each byte, the first 7
1228 bits represent (part of) the integer, while the last bit indicates whether the
1229 integer continues into the next byte.
1231 https://en.wikipedia.org/wiki/LEB128
1232 """
1233 parts = bytearray()
1234 assert value >= 0
1235 while True:
1236 # chop off 7 bits
1237 byte = value & ((1 << 7) - 1)
1238 value >>= 7
1239 # set the continuation bit if we have more left
1240 if value:
1241 byte |= 1 << 7
1243 parts.append(byte)
1244 if not value:
1245 break
1246 return bytes(parts)
1249def _unpack_uleb128(buffer: bytes) -> tuple[int, int]:
1250 """
1251 Inverts _pack_uleb128, and also returns the index at which at which we stopped
1252 reading.
1253 """
1254 value = 0
1255 for i, byte in enumerate(buffer):
1256 n = byte & ((1 << 7) - 1)
1257 value |= n << (i * 7)
1259 if not byte >> 7:
1260 break
1261 return (i + 1, value)
1264def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes:
1265 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes."""
1266 # We use a custom serialization format for this, which might seem crazy - but our
1267 # data is a flat sequence of elements, and standard tools like protobuf or msgpack
1268 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode.
1269 #
1270 # We simply encode each element with a metadata byte, if needed a uint16 size, and
1271 # then the payload bytes. For booleans, the payload is inlined into the metadata.
1272 parts = []
1273 for choice in choices:
1274 if isinstance(choice, bool):
1275 # `000_0000v` - tag zero, low bit payload.
1276 parts.append(b"\1" if choice else b"\0")
1277 continue
1279 # `tag_ssss [uint16 size?] [payload]`
1280 if isinstance(choice, float):
1281 tag = 1 << 5
1282 choice = struct.pack("!d", choice)
1283 elif isinstance(choice, int):
1284 tag = 2 << 5
1285 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True)
1286 elif isinstance(choice, bytes):
1287 tag = 3 << 5
1288 else:
1289 assert isinstance(choice, str)
1290 tag = 4 << 5
1291 choice = choice.encode(errors="surrogatepass")
1293 size = len(choice)
1294 if size < 0b11111:
1295 parts.append((tag | size).to_bytes(1, "big"))
1296 else:
1297 parts.append((tag | 0b11111).to_bytes(1, "big"))
1298 parts.append(_pack_uleb128(size))
1299 parts.append(choice)
1301 return b"".join(parts)
1304def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]:
1305 # See above for an explanation of the format.
1306 parts: list[ChoiceT] = []
1307 idx = 0
1308 while idx < len(buffer):
1309 tag = buffer[idx] >> 5
1310 size = buffer[idx] & 0b11111
1311 idx += 1
1313 if tag == 0:
1314 parts.append(bool(size))
1315 continue
1316 if size == 0b11111:
1317 offset, size = _unpack_uleb128(buffer[idx:])
1318 idx += offset
1319 chunk = buffer[idx : idx + size]
1320 idx += size
1322 if tag == 1:
1323 assert size == 8, "expected float64"
1324 parts.extend(struct.unpack("!d", chunk))
1325 elif tag == 2:
1326 parts.append(int.from_bytes(chunk, "big", signed=True))
1327 elif tag == 3:
1328 parts.append(chunk)
1329 else:
1330 assert tag == 4
1331 parts.append(chunk.decode(errors="surrogatepass"))
1332 return tuple(parts)
1335def choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...] | None:
1336 """
1337 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes.
1339 Returns None if the given bytestring is not a valid serialization of choice
1340 sequences.
1341 """
1342 try:
1343 return _choices_from_bytes(buffer)
1344 except Exception:
1345 # deserialization error, eg because our format changed or someone put junk
1346 # data in the db.
1347 return None