Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/database.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11import abc
12import errno
13import json
14import os
15import struct
16import sys
17import tempfile
18import warnings
19import weakref
20from collections.abc import Callable, Iterable
21from datetime import datetime, timedelta, timezone
22from functools import lru_cache
23from hashlib import sha384
24from os import PathLike, getenv
25from pathlib import Path, PurePath
26from queue import Queue
27from threading import Thread
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 ClassVar,
32 Literal,
33 TypeAlias,
34 cast,
35)
36from urllib.error import HTTPError, URLError
37from urllib.request import Request, urlopen
38from zipfile import BadZipFile, ZipFile
40from hypothesis._settings import note_deprecation
41from hypothesis.configuration import storage_directory
42from hypothesis.errors import HypothesisException, HypothesisWarning
43from hypothesis.internal.conjecture.choice import ChoiceT
44from hypothesis.utils.conventions import UniqueIdentifier, not_set
46__all__ = [
47 "DirectoryBasedExampleDatabase",
48 "ExampleDatabase",
49 "GitHubArtifactDatabase",
50 "InMemoryExampleDatabase",
51 "MultiplexedDatabase",
52 "ReadOnlyDatabase",
53]
55if TYPE_CHECKING:
56 from watchdog.observers.api import BaseObserver
58StrPathT: TypeAlias = str | PathLike[str]
59SaveDataT: TypeAlias = tuple[bytes, bytes] # key, value
60DeleteDataT: TypeAlias = tuple[bytes, bytes | None] # key, value
61ListenerEventT: TypeAlias = (
62 tuple[Literal["save"], SaveDataT] | tuple[Literal["delete"], DeleteDataT]
63)
64ListenerT: TypeAlias = Callable[[ListenerEventT], Any]
67def _usable_dir(path: StrPathT) -> bool:
68 """
69 Returns True if the desired path can be used as database path because
70 either the directory exists and can be used, or its root directory can
71 be used and we can make the directory as needed.
72 """
73 path = Path(path)
74 try:
75 while not path.exists():
76 # Loop terminates because the root dir ('/' on unix) always exists.
77 path = path.parent
78 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK)
79 except PermissionError:
80 return False
83def _db_for_path(
84 path: StrPathT | UniqueIdentifier | Literal[":memory:"] | None = None,
85) -> "ExampleDatabase":
86 if path is not_set:
87 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover
88 raise HypothesisException(
89 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any "
90 "effect. Configure your database location via a settings profile instead.\n"
91 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles"
92 )
94 path = storage_directory("examples", intent_to_write=False)
95 if not _usable_dir(path): # pragma: no cover
96 warnings.warn(
97 "The database setting is not configured, and the default "
98 "location is unusable - falling back to an in-memory "
99 f"database for this session. {path=}",
100 HypothesisWarning,
101 stacklevel=3,
102 )
103 return InMemoryExampleDatabase()
104 if path in (None, ":memory:"):
105 return InMemoryExampleDatabase()
106 path = cast(StrPathT, path)
107 return DirectoryBasedExampleDatabase(path)
110class _EDMeta(abc.ABCMeta):
111 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase":
112 if self is ExampleDatabase:
113 note_deprecation(
114 "Creating a database using the abstract ExampleDatabase() class "
115 "is deprecated. Prefer using a concrete subclass, like "
116 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). "
117 'In particular, the special string ExampleDatabase(":memory:") '
118 "should be replaced by InMemoryExampleDatabase().",
119 since="2025-04-07",
120 has_codemod=False,
121 )
122 return _db_for_path(*args, **kwargs)
123 return super().__call__(*args, **kwargs)
126# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase
127# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx
128# maintains a list of metaclass-call-methods to ignore, and while they would prefer
129# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we
130# can insert ourselves here.
131#
132# This code only runs if Sphinx has already been imported; and it would live in our
133# docs/conf.py except that we would also like it to work for anyone documenting
134# downstream ExampleDatabase subclasses too.
135if "sphinx" in sys.modules:
136 try:
137 import sphinx.ext.autodoc
139 signature = "hypothesis.database._EDMeta.__call__"
140 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions
141 if isinstance(sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST, frozenset):
142 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST = (
143 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST | {signature}
144 )
145 else:
146 sphinx.ext.autodoc._METACLASS_CALL_BLACKLIST.append(signature)
147 except Exception:
148 pass
151class ExampleDatabase(metaclass=_EDMeta):
152 """
153 A Hypothesis database, for use in |settings.database|.
155 Hypothesis automatically saves failures to the database set in
156 |settings.database|. The next time the test is run, Hypothesis will replay
157 any failures from the database in |settings.database| for that test (in
158 |Phase.reuse|).
160 The database is best thought of as a cache that you never need to invalidate.
161 Entries may be transparently dropped when upgrading your Hypothesis version
162 or changing your test. Do not rely on the database for correctness; to ensure
163 Hypothesis always tries an input, use |@example|.
165 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis
166 provides several concrete database subclasses. To write your own database class,
167 see :doc:`/how-to/custom-database`.
169 Change listening
170 ----------------
172 An optional extension to |ExampleDatabase| is change listening. On databases
173 which support change listening, calling |ExampleDatabase.add_listener| adds
174 a function as a change listener, which will be called whenever a value is
175 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener|
176 for details.
178 All databases in Hypothesis support change listening. Custom database classes
179 are not required to support change listening, though they will not be compatible
180 with features that require change listening until they do so.
182 .. note::
184 While no Hypothesis features currently require change listening, change
185 listening is required by `HypoFuzz <https://hypofuzz.com/>`_.
187 Database methods
188 ----------------
190 Required methods:
192 * |ExampleDatabase.save|
193 * |ExampleDatabase.fetch|
194 * |ExampleDatabase.delete|
196 Optional methods:
198 * |ExampleDatabase.move|
200 Change listening methods:
202 * |ExampleDatabase.add_listener|
203 * |ExampleDatabase.remove_listener|
204 * |ExampleDatabase.clear_listeners|
205 * |ExampleDatabase._start_listening|
206 * |ExampleDatabase._stop_listening|
207 * |ExampleDatabase._broadcast_change|
208 """
210 def __init__(self) -> None:
211 self._listeners: list[ListenerT] = []
213 @abc.abstractmethod
214 def save(self, key: bytes, value: bytes) -> None:
215 """Save ``value`` under ``key``.
217 If ``value`` is already present in ``key``, silently do nothing.
218 """
219 raise NotImplementedError(f"{type(self).__name__}.save")
221 @abc.abstractmethod
222 def fetch(self, key: bytes) -> Iterable[bytes]:
223 """Return an iterable over all values matching this key."""
224 raise NotImplementedError(f"{type(self).__name__}.fetch")
226 @abc.abstractmethod
227 def delete(self, key: bytes, value: bytes) -> None:
228 """Remove ``value`` from ``key``.
230 If ``value`` is not present in ``key``, silently do nothing.
231 """
232 raise NotImplementedError(f"{type(self).__name__}.delete")
234 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
235 """
236 Move ``value`` from key ``src`` to key ``dest``.
238 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``,
239 but may have a more efficient implementation.
241 Note that ``value`` will be inserted at ``dest`` regardless of whether
242 it is currently present at ``src``.
243 """
244 if src == dest:
245 self.save(src, value)
246 return
247 self.delete(src, value)
248 self.save(dest, value)
250 def add_listener(self, f: ListenerT, /) -> None:
251 """
252 Add a change listener. ``f`` will be called whenever a value is saved,
253 deleted, or moved in the database.
255 ``f`` can be called with two different event values:
257 * ``("save", (key, value))``
258 * ``("delete", (key, value))``
260 where ``key`` and ``value`` are both ``bytes``.
262 There is no ``move`` event. Instead, a move is broadcasted as a
263 ``delete`` event followed by a ``save`` event.
265 For the ``delete`` event, ``value`` may be ``None``. This might occur if
266 the database knows that a deletion has occurred in ``key``, but does not
267 know what value was deleted.
268 """
269 had_listeners = bool(self._listeners)
270 self._listeners.append(f)
271 if not had_listeners:
272 self._start_listening()
274 def remove_listener(self, f: ListenerT, /) -> None:
275 """
276 Removes ``f`` from the list of change listeners.
278 If ``f`` is not in the list of change listeners, silently do nothing.
279 """
280 if f not in self._listeners:
281 return
282 self._listeners.remove(f)
283 if not self._listeners:
284 self._stop_listening()
286 def clear_listeners(self) -> None:
287 """Remove all change listeners."""
288 had_listeners = bool(self._listeners)
289 self._listeners.clear()
290 if had_listeners:
291 self._stop_listening()
293 def _broadcast_change(self, event: ListenerEventT) -> None:
294 """
295 Called when a value has been either added to or deleted from a key in
296 the underlying database store. The possible values for ``event`` are:
298 * ``("save", (key, value))``
299 * ``("delete", (key, value))``
301 ``value`` may be ``None`` for the ``delete`` event, indicating we know
302 that some value was deleted under this key, but not its exact value.
304 Note that you should not assume your instance is the only reference to
305 the underlying database store. For example, if two instances of
306 |DirectoryBasedExampleDatabase| reference the same directory,
307 _broadcast_change should be called whenever a file is added or removed
308 from the directory, even if that database was not responsible for
309 changing the file.
310 """
311 for listener in self._listeners:
312 listener(event)
314 def _start_listening(self) -> None:
315 """
316 Called when the database adds a change listener, and did not previously
317 have any change listeners. Intended to allow databases to wait to start
318 expensive listening operations until necessary.
320 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate,
321 so you do not need to handle the case of multiple consecutive
322 ``_start_listening`` calls without an intermediate ``_stop_listening``
323 call.
324 """
325 warnings.warn(
326 f"{self.__class__} does not support listening for changes",
327 HypothesisWarning,
328 stacklevel=4,
329 )
331 def _stop_listening(self) -> None:
332 """
333 Called whenever no change listeners remain on the database.
335 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate,
336 so you do not need to handle the case of multiple consecutive
337 ``_stop_listening`` calls without an intermediate ``_start_listening``
338 call.
339 """
340 warnings.warn(
341 f"{self.__class__} does not support stopping listening for changes",
342 HypothesisWarning,
343 stacklevel=4,
344 )
347class InMemoryExampleDatabase(ExampleDatabase):
348 """A non-persistent example database, implemented in terms of an in-memory
349 dictionary.
351 This can be useful if you call a test function several times in a single
352 session, or for testing other database implementations, but because it
353 does not persist between runs we do not recommend it for general use.
354 """
356 def __init__(self) -> None:
357 super().__init__()
358 self.data: dict[bytes, set[bytes]] = {}
360 def __repr__(self) -> str:
361 return f"InMemoryExampleDatabase({self.data!r})"
363 def __eq__(self, other: object) -> bool:
364 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data
366 def fetch(self, key: bytes) -> Iterable[bytes]:
367 yield from self.data.get(key, ())
369 def save(self, key: bytes, value: bytes) -> None:
370 value = bytes(value)
371 values = self.data.setdefault(key, set())
372 changed = value not in values
373 values.add(value)
375 if changed:
376 self._broadcast_change(("save", (key, value)))
378 def delete(self, key: bytes, value: bytes) -> None:
379 value = bytes(value)
380 values = self.data.get(key, set())
381 changed = value in values
382 values.discard(value)
384 if changed:
385 self._broadcast_change(("delete", (key, value)))
387 def _start_listening(self) -> None:
388 # declare compatibility with the listener api, but do the actual
389 # implementation in .delete and .save, since we know we are the only
390 # writer to .data.
391 pass
393 def _stop_listening(self) -> None:
394 pass
397def _hash(key: bytes) -> str:
398 return sha384(key).hexdigest()[:16]
401class DirectoryBasedExampleDatabase(ExampleDatabase):
402 """Use a directory to store Hypothesis examples as files.
404 Each test corresponds to a directory, and each example to a file within that
405 directory. While the contents are fairly opaque, a
406 |DirectoryBasedExampleDatabase| can be shared by checking the directory
407 into version control, for example with the following ``.gitignore``::
409 # Ignore files cached by Hypothesis...
410 .hypothesis/*
411 # except for the examples directory
412 !.hypothesis/examples/
414 Note however that this only makes sense if you also pin to an exact version of
415 Hypothesis, and we would usually recommend implementing a shared database with
416 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper.
417 """
419 # we keep a database entry of the full values of all the database keys.
420 # currently only used for inverse mapping of hash -> key in change listening.
421 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys"
422 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name)
424 def __init__(self, path: StrPathT) -> None:
425 super().__init__()
426 self.path = Path(path)
427 self.keypaths: dict[bytes, Path] = {}
428 self._observer: BaseObserver | None = None
430 def __repr__(self) -> str:
431 return f"DirectoryBasedExampleDatabase({self.path!r})"
433 def __eq__(self, other: object) -> bool:
434 return (
435 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path
436 )
438 def _key_path(self, key: bytes) -> Path:
439 try:
440 return self.keypaths[key]
441 except KeyError:
442 pass
443 self.keypaths[key] = self.path / _hash(key)
444 return self.keypaths[key]
446 def _value_path(self, key: bytes, value: bytes) -> Path:
447 return self._key_path(key) / _hash(value)
449 def fetch(self, key: bytes) -> Iterable[bytes]:
450 kp = self._key_path(key)
451 if not kp.is_dir():
452 return
454 try:
455 for path in os.listdir(kp):
456 try:
457 yield (kp / path).read_bytes()
458 except OSError:
459 pass
460 except OSError: # pragma: no cover
461 # the `kp` directory might have been deleted in the meantime
462 pass
464 def save(self, key: bytes, value: bytes) -> None:
465 key_path = self._key_path(key)
466 if key_path.name != self._metakeys_hash:
467 # add this key to our meta entry of all keys - taking care to avoid
468 # infinite recursion.
469 self.save(self._metakeys_name, key)
471 # Note: we attempt to create the dir in question now. We
472 # already checked for permissions, but there can still be other issues,
473 # e.g. the disk is full, or permissions might have been changed.
474 try:
475 key_path.mkdir(exist_ok=True, parents=True)
476 path = self._value_path(key, value)
477 if not path.exists():
478 # to mimic an atomic write, create and write in a temporary
479 # directory, and only move to the final path after. This avoids
480 # any intermediate state where the file is created (and empty)
481 # but not yet written to.
482 fd, tmpname = tempfile.mkstemp()
483 tmppath = Path(tmpname)
484 os.write(fd, value)
485 os.close(fd)
486 try:
487 tmppath.rename(path)
488 except OSError as err: # pragma: no cover
489 if err.errno == errno.EXDEV:
490 # Can't rename across filesystem boundaries, see e.g.
491 # https://github.com/HypothesisWorks/hypothesis/issues/4335
492 try:
493 path.write_bytes(tmppath.read_bytes())
494 except OSError:
495 pass
496 tmppath.unlink()
497 assert not tmppath.exists()
498 except OSError: # pragma: no cover
499 pass
501 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
502 if src == dest:
503 self.save(src, value)
504 return
506 src_path = self._value_path(src, value)
507 dest_path = self._value_path(dest, value)
508 # if the dest key path does not exist, os.renames will create it for us,
509 # and we will never track its creation in the meta keys entry. Do so now.
510 if not self._key_path(dest).exists():
511 self.save(self._metakeys_name, dest)
513 try:
514 os.renames(src_path, dest_path)
515 except OSError:
516 self.delete(src, value)
517 self.save(dest, value)
519 def delete(self, key: bytes, value: bytes) -> None:
520 try:
521 self._value_path(key, value).unlink()
522 except OSError:
523 return
525 # try deleting the key dir, which will only succeed if the dir is empty
526 # (i.e. ``value`` was the last value in this key).
527 try:
528 self._key_path(key).rmdir()
529 except OSError:
530 pass
531 else:
532 # if the deletion succeeded, also delete this key entry from metakeys.
533 # (if this key happens to be the metakey itself, this deletion will
534 # fail; that's ok and faster than checking for this rare case.)
535 self.delete(self._metakeys_name, key)
537 def _start_listening(self) -> None:
538 try:
539 from watchdog.events import (
540 DirCreatedEvent,
541 DirDeletedEvent,
542 DirMovedEvent,
543 FileCreatedEvent,
544 FileDeletedEvent,
545 FileMovedEvent,
546 FileSystemEventHandler,
547 )
548 from watchdog.observers import Observer
549 except ImportError:
550 warnings.warn(
551 f"listening for changes in a {self.__class__.__name__} "
552 "requires the watchdog library. To install, run "
553 "`pip install hypothesis[watchdog]`",
554 HypothesisWarning,
555 stacklevel=4,
556 )
557 return
559 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)}
560 _metakeys_hash = self._metakeys_hash
561 _broadcast_change = self._broadcast_change
563 class Handler(FileSystemEventHandler):
564 def on_created(_self, event: FileCreatedEvent | DirCreatedEvent) -> None:
565 # we only registered for the file creation event
566 assert not isinstance(event, DirCreatedEvent)
567 # watchdog events are only bytes if we passed a byte path to
568 # .schedule
569 assert isinstance(event.src_path, str)
571 value_path = Path(event.src_path)
572 # the parent dir represents the key, and its name is the key hash
573 key_hash = value_path.parent.name
575 if key_hash == _metakeys_hash:
576 try:
577 hash_to_key[value_path.name] = value_path.read_bytes()
578 except OSError: # pragma: no cover
579 # this might occur if all the values in a key have been
580 # deleted and DirectoryBasedExampleDatabase removes its
581 # metakeys entry (which is `value_path` here`).
582 pass
583 return
585 key = hash_to_key.get(key_hash)
586 if key is None: # pragma: no cover
587 # we didn't recognize this key. This shouldn't ever happen,
588 # but some race condition trickery might cause this.
589 return
591 try:
592 value = value_path.read_bytes()
593 except OSError: # pragma: no cover
594 return
596 _broadcast_change(("save", (key, value)))
598 def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None:
599 assert not isinstance(event, DirDeletedEvent)
600 assert isinstance(event.src_path, str)
602 value_path = Path(event.src_path)
603 key = hash_to_key.get(value_path.parent.name)
604 if key is None: # pragma: no cover
605 return
607 _broadcast_change(("delete", (key, None)))
609 def on_moved(self, event: FileMovedEvent | DirMovedEvent) -> None:
610 assert not isinstance(event, DirMovedEvent)
611 assert isinstance(event.src_path, str)
612 assert isinstance(event.dest_path, str)
614 src_path = Path(event.src_path)
615 dest_path = Path(event.dest_path)
616 k1 = hash_to_key.get(src_path.parent.name)
617 k2 = hash_to_key.get(dest_path.parent.name)
619 if k1 is None or k2 is None: # pragma: no cover
620 return
622 try:
623 value = dest_path.read_bytes()
624 except OSError: # pragma: no cover
625 return
627 _broadcast_change(("delete", (k1, value)))
628 _broadcast_change(("save", (k2, value)))
630 # If we add a listener to a DirectoryBasedExampleDatabase whose database
631 # directory doesn't yet exist, the watchdog observer will not fire any
632 # events, even after the directory gets created.
633 #
634 # Ensure the directory exists before starting the observer.
635 self.path.mkdir(exist_ok=True, parents=True)
636 self._observer = Observer()
637 self._observer.schedule(
638 Handler(),
639 # remove type: ignore when released
640 # https://github.com/gorakhargosh/watchdog/pull/1096
641 self.path, # type: ignore
642 recursive=True,
643 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent],
644 )
645 self._observer.start()
647 def _stop_listening(self) -> None:
648 assert self._observer is not None
649 self._observer.stop()
650 self._observer.join()
651 self._observer = None
654class ReadOnlyDatabase(ExampleDatabase):
655 """A wrapper to make the given database read-only.
657 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and
658 ``move`` into silent no-ops.
660 Note that this disables Hypothesis' automatic discarding of stale examples.
661 It is designed to allow local machines to access a shared database (e.g. from CI
662 servers), without propagating changes back from a local or in-development branch.
663 """
665 def __init__(self, db: ExampleDatabase) -> None:
666 super().__init__()
667 assert isinstance(db, ExampleDatabase)
668 self._wrapped = db
670 def __repr__(self) -> str:
671 return f"ReadOnlyDatabase({self._wrapped!r})"
673 def __eq__(self, other: object) -> bool:
674 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped
676 def fetch(self, key: bytes) -> Iterable[bytes]:
677 yield from self._wrapped.fetch(key)
679 def save(self, key: bytes, value: bytes) -> None:
680 pass
682 def delete(self, key: bytes, value: bytes) -> None:
683 pass
685 def _start_listening(self) -> None:
686 # we're read only, so there are no changes to broadcast.
687 pass
689 def _stop_listening(self) -> None:
690 pass
693class MultiplexedDatabase(ExampleDatabase):
694 """A wrapper around multiple databases.
696 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against
697 all of the wrapped databases. ``fetch`` does not yield duplicate values, even
698 if the same value is present in two or more of the wrapped databases.
700 This combines well with a :class:`ReadOnlyDatabase`, as follows:
702 .. code-block:: python
704 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/")
705 shared = CustomNetworkDatabase()
707 settings.register_profile("ci", database=shared)
708 settings.register_profile(
709 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared))
710 )
711 settings.load_profile("ci" if os.environ.get("CI") else "dev")
713 So your CI system or fuzzing runs can populate a central shared database;
714 while local runs on development machines can reproduce any failures from CI
715 but will only cache their own failures locally and cannot remove examples
716 from the shared database.
717 """
719 def __init__(self, *dbs: ExampleDatabase) -> None:
720 super().__init__()
721 assert all(isinstance(db, ExampleDatabase) for db in dbs)
722 self._wrapped = dbs
724 def __repr__(self) -> str:
725 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped)))
727 def __eq__(self, other: object) -> bool:
728 return (
729 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped
730 )
732 def fetch(self, key: bytes) -> Iterable[bytes]:
733 seen = set()
734 for db in self._wrapped:
735 for value in db.fetch(key):
736 if value not in seen:
737 yield value
738 seen.add(value)
740 def save(self, key: bytes, value: bytes) -> None:
741 for db in self._wrapped:
742 db.save(key, value)
744 def delete(self, key: bytes, value: bytes) -> None:
745 for db in self._wrapped:
746 db.delete(key, value)
748 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
749 for db in self._wrapped:
750 db.move(src, dest, value)
752 def _start_listening(self) -> None:
753 for db in self._wrapped:
754 db.add_listener(self._broadcast_change)
756 def _stop_listening(self) -> None:
757 for db in self._wrapped:
758 db.remove_listener(self._broadcast_change)
761class GitHubArtifactDatabase(ExampleDatabase):
762 """
763 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact.
765 You can use this for sharing example databases between CI runs and developers, allowing
766 the latter to get read-only access to the former. This is particularly useful for
767 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_),
768 where the CI system can help find new failing examples through fuzzing,
769 and developers can reproduce them locally without any manual effort.
771 .. note::
772 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides
773 this automatically, but it needs to be set manually for local usage. In a developer machine,
774 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_.
775 If the repository is private, it's necessary for the token to have ``repo`` scope
776 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token.
779 In most cases, this will be used
780 through the :class:`~hypothesis.database.MultiplexedDatabase`,
781 by combining a local directory-based database with this one. For example:
783 .. code-block:: python
785 local = DirectoryBasedExampleDatabase(".hypothesis/examples")
786 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo"))
788 settings.register_profile("ci", database=local)
789 settings.register_profile("dev", database=MultiplexedDatabase(local, shared))
790 # We don't want to use the shared database in CI, only to populate its local one.
791 # which the workflow should then upload as an artifact.
792 settings.load_profile("ci" if os.environ.get("CI") else "dev")
794 .. note::
795 Because this database is read-only, you always need to wrap it with the
796 :class:`ReadOnlyDatabase`.
798 A setup like this can be paired with a GitHub Actions workflow including
799 something like the following:
801 .. code-block:: yaml
803 - name: Download example database
804 uses: dawidd6/action-download-artifact@v9
805 with:
806 name: hypothesis-example-db
807 path: .hypothesis/examples
808 if_no_artifact_found: warn
809 workflow_conclusion: completed
811 - name: Run tests
812 run: pytest
814 - name: Upload example database
815 uses: actions/upload-artifact@v3
816 if: always()
817 with:
818 name: hypothesis-example-db
819 path: .hypothesis/examples
821 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_
822 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_
823 does not support downloading artifacts from previous workflow runs.
825 The database automatically implements a simple file-based cache with a default expiration period
826 of 1 day. You can adjust this through the ``cache_timeout`` property.
828 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``).
829 """
831 def __init__(
832 self,
833 owner: str,
834 repo: str,
835 artifact_name: str = "hypothesis-example-db",
836 cache_timeout: timedelta = timedelta(days=1),
837 path: StrPathT | None = None,
838 ):
839 super().__init__()
840 self.owner = owner
841 self.repo = repo
842 self.artifact_name = artifact_name
843 self.cache_timeout = cache_timeout
845 # Get the GitHub token from the environment
846 # It's unnecessary to use a token if the repo is public
847 self.token: str | None = getenv("GITHUB_TOKEN")
849 if path is None:
850 self.path: Path = Path(
851 storage_directory(f"github-artifacts/{self.artifact_name}/")
852 )
853 else:
854 self.path = Path(path)
856 # We don't want to initialize the cache until we need to
857 self._initialized: bool = False
858 self._disabled: bool = False
860 # This is the path to the artifact in usage
861 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip
862 self._artifact: Path | None = None
863 # This caches the artifact structure
864 self._access_cache: dict[PurePath, set[PurePath]] | None = None
866 # Message to display if user doesn't wrap around ReadOnlyDatabase
867 self._read_only_message = (
868 "This database is read-only. "
869 "Please wrap this class with ReadOnlyDatabase"
870 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))."
871 )
873 def __repr__(self) -> str:
874 return (
875 f"GitHubArtifactDatabase(owner={self.owner!r}, "
876 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})"
877 )
879 def __eq__(self, other: object) -> bool:
880 return (
881 isinstance(other, GitHubArtifactDatabase)
882 and self.owner == other.owner
883 and self.repo == other.repo
884 and self.artifact_name == other.artifact_name
885 and self.path == other.path
886 )
888 def _prepare_for_io(self) -> None:
889 assert self._artifact is not None, "Artifact not loaded."
891 if self._initialized: # pragma: no cover
892 return
894 # Test that the artifact is valid
895 try:
896 with ZipFile(self._artifact) as f:
897 if f.testzip(): # pragma: no cover
898 raise BadZipFile
900 # Turns out that testzip() doesn't work quite well
901 # doing the cache initialization here instead
902 # will give us more coverage of the artifact.
904 # Cache the files inside each keypath
905 self._access_cache = {}
906 with ZipFile(self._artifact) as zf:
907 namelist = zf.namelist()
908 # Iterate over files in the artifact
909 for filename in namelist:
910 fileinfo = zf.getinfo(filename)
911 if fileinfo.is_dir():
912 self._access_cache[PurePath(filename)] = set()
913 else:
914 # Get the keypath from the filename
915 keypath = PurePath(filename).parent
916 # Add the file to the keypath
917 self._access_cache[keypath].add(PurePath(filename))
918 except BadZipFile:
919 warnings.warn(
920 "The downloaded artifact from GitHub is invalid. "
921 "This could be because the artifact was corrupted, "
922 "or because the artifact was not created by Hypothesis. ",
923 HypothesisWarning,
924 stacklevel=3,
925 )
926 self._disabled = True
928 self._initialized = True
930 def _initialize_db(self) -> None:
931 # Trigger warning that we suppressed earlier by intent_to_write=False
932 storage_directory(self.path.name)
933 # Create the cache directory if it doesn't exist
934 self.path.mkdir(exist_ok=True, parents=True)
936 # Get all artifacts
937 cached_artifacts = sorted(
938 self.path.glob("*.zip"),
939 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")),
940 )
942 # Remove all but the latest artifact
943 for artifact in cached_artifacts[:-1]:
944 artifact.unlink()
946 try:
947 found_artifact = cached_artifacts[-1]
948 except IndexError:
949 found_artifact = None
951 # Check if the latest artifact is a cache hit
952 if found_artifact is not None and (
953 datetime.now(timezone.utc)
954 - datetime.fromisoformat(found_artifact.stem.replace("_", ":"))
955 < self.cache_timeout
956 ):
957 self._artifact = found_artifact
958 else:
959 # Download the latest artifact from GitHub
960 new_artifact = self._fetch_artifact()
962 if new_artifact:
963 if found_artifact is not None:
964 found_artifact.unlink()
965 self._artifact = new_artifact
966 elif found_artifact is not None:
967 warnings.warn(
968 "Using an expired artifact as a fallback for the database: "
969 f"{found_artifact}",
970 HypothesisWarning,
971 stacklevel=2,
972 )
973 self._artifact = found_artifact
974 else:
975 warnings.warn(
976 "Couldn't acquire a new or existing artifact. Disabling database.",
977 HypothesisWarning,
978 stacklevel=2,
979 )
980 self._disabled = True
981 return
983 self._prepare_for_io()
985 def _get_bytes(self, url: str) -> bytes | None: # pragma: no cover
986 request = Request(
987 url,
988 headers={
989 "Accept": "application/vnd.github+json",
990 "X-GitHub-Api-Version": "2022-11-28 ",
991 "Authorization": f"Bearer {self.token}",
992 },
993 )
994 warning_message = None
995 response_bytes: bytes | None = None
996 try:
997 with urlopen(request) as response:
998 response_bytes = response.read()
999 except HTTPError as e:
1000 if e.code == 401:
1001 warning_message = (
1002 "Authorization failed when trying to download artifact from GitHub. "
1003 "Check that you have a valid GITHUB_TOKEN set in your environment."
1004 )
1005 else:
1006 warning_message = (
1007 "Could not get the latest artifact from GitHub. "
1008 "This could be because because the repository "
1009 "or artifact does not exist. "
1010 )
1011 # see https://github.com/python/cpython/issues/128734
1012 e.close()
1013 except URLError:
1014 warning_message = "Could not connect to GitHub to get the latest artifact. "
1015 except TimeoutError:
1016 warning_message = (
1017 "Could not connect to GitHub to get the latest artifact "
1018 "(connection timed out)."
1019 )
1021 if warning_message is not None:
1022 warnings.warn(warning_message, HypothesisWarning, stacklevel=4)
1023 return None
1025 return response_bytes
1027 def _fetch_artifact(self) -> Path | None: # pragma: no cover
1028 # Get the list of artifacts from GitHub
1029 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts"
1030 response_bytes = self._get_bytes(url)
1031 if response_bytes is None:
1032 return None
1034 artifacts = json.loads(response_bytes)["artifacts"]
1035 artifacts = [a for a in artifacts if a["name"] == self.artifact_name]
1037 if not artifacts:
1038 return None
1040 # Get the latest artifact from the list
1041 artifact = max(artifacts, key=lambda a: a["created_at"])
1042 url = artifact["archive_download_url"]
1044 # Download the artifact
1045 artifact_bytes = self._get_bytes(url)
1046 if artifact_bytes is None:
1047 return None
1049 # Save the artifact to the cache
1050 # We replace ":" with "_" to ensure the filenames are compatible
1051 # with Windows filesystems
1052 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_")
1053 artifact_path = self.path / f"{timestamp}.zip"
1054 try:
1055 artifact_path.write_bytes(artifact_bytes)
1056 except OSError:
1057 warnings.warn(
1058 "Could not save the latest artifact from GitHub. ",
1059 HypothesisWarning,
1060 stacklevel=3,
1061 )
1062 return None
1064 return artifact_path
1066 @staticmethod
1067 @lru_cache
1068 def _key_path(key: bytes) -> PurePath:
1069 return PurePath(_hash(key) + "/")
1071 def fetch(self, key: bytes) -> Iterable[bytes]:
1072 if self._disabled:
1073 return
1075 if not self._initialized:
1076 self._initialize_db()
1077 if self._disabled:
1078 return
1080 assert self._artifact is not None
1081 assert self._access_cache is not None
1083 kp = self._key_path(key)
1085 with ZipFile(self._artifact) as zf:
1086 # Get the all files in the the kp from the cache
1087 filenames = self._access_cache.get(kp, ())
1088 for filename in filenames:
1089 with zf.open(filename.as_posix()) as f:
1090 yield f.read()
1092 # Read-only interface
1093 def save(self, key: bytes, value: bytes) -> None:
1094 raise RuntimeError(self._read_only_message)
1096 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1097 raise RuntimeError(self._read_only_message)
1099 def delete(self, key: bytes, value: bytes) -> None:
1100 raise RuntimeError(self._read_only_message)
1103class BackgroundWriteDatabase(ExampleDatabase):
1104 """A wrapper which defers writes on the given database to a background thread.
1106 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any
1107 enqueued writes to finish before fetching from the database.
1108 """
1110 def __init__(self, db: ExampleDatabase) -> None:
1111 super().__init__()
1112 self._db = db
1113 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue()
1114 self._thread: Thread | None = None
1116 def _ensure_thread(self):
1117 if self._thread is None:
1118 self._thread = Thread(target=self._worker, daemon=True)
1119 self._thread.start()
1120 # avoid an unbounded timeout during gc. 0.1 should be plenty for most
1121 # use cases.
1122 weakref.finalize(self, self._join, 0.1)
1124 def __repr__(self) -> str:
1125 return f"BackgroundWriteDatabase({self._db!r})"
1127 def __eq__(self, other: object) -> bool:
1128 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db
1130 def _worker(self) -> None:
1131 while True:
1132 method, args = self._queue.get()
1133 getattr(self._db, method)(*args)
1134 self._queue.task_done()
1136 def _join(self, timeout: float | None = None) -> None:
1137 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634
1138 with self._queue.all_tasks_done:
1139 while self._queue.unfinished_tasks:
1140 self._queue.all_tasks_done.wait(timeout)
1142 def fetch(self, key: bytes) -> Iterable[bytes]:
1143 self._join()
1144 return self._db.fetch(key)
1146 def save(self, key: bytes, value: bytes) -> None:
1147 self._ensure_thread()
1148 self._queue.put(("save", (key, value)))
1150 def delete(self, key: bytes, value: bytes) -> None:
1151 self._ensure_thread()
1152 self._queue.put(("delete", (key, value)))
1154 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1155 self._ensure_thread()
1156 self._queue.put(("move", (src, dest, value)))
1158 def _start_listening(self) -> None:
1159 self._db.add_listener(self._broadcast_change)
1161 def _stop_listening(self) -> None:
1162 self._db.remove_listener(self._broadcast_change)
1165def _pack_uleb128(value: int) -> bytes:
1166 """
1167 Serialize an integer into variable-length bytes. For each byte, the first 7
1168 bits represent (part of) the integer, while the last bit indicates whether the
1169 integer continues into the next byte.
1171 https://en.wikipedia.org/wiki/LEB128
1172 """
1173 parts = bytearray()
1174 assert value >= 0
1175 while True:
1176 # chop off 7 bits
1177 byte = value & ((1 << 7) - 1)
1178 value >>= 7
1179 # set the continuation bit if we have more left
1180 if value:
1181 byte |= 1 << 7
1183 parts.append(byte)
1184 if not value:
1185 break
1186 return bytes(parts)
1189def _unpack_uleb128(buffer: bytes) -> tuple[int, int]:
1190 """
1191 Inverts _pack_uleb128, and also returns the index at which at which we stopped
1192 reading.
1193 """
1194 value = 0
1195 for i, byte in enumerate(buffer):
1196 n = byte & ((1 << 7) - 1)
1197 value |= n << (i * 7)
1199 if not byte >> 7:
1200 break
1201 return (i + 1, value)
1204def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes:
1205 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes."""
1206 # We use a custom serialization format for this, which might seem crazy - but our
1207 # data is a flat sequence of elements, and standard tools like protobuf or msgpack
1208 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode.
1209 #
1210 # We simply encode each element with a metadata byte, if needed a uint16 size, and
1211 # then the payload bytes. For booleans, the payload is inlined into the metadata.
1212 parts = []
1213 for choice in choices:
1214 if isinstance(choice, bool):
1215 # `000_0000v` - tag zero, low bit payload.
1216 parts.append(b"\1" if choice else b"\0")
1217 continue
1219 # `tag_ssss [uint16 size?] [payload]`
1220 if isinstance(choice, float):
1221 tag = 1 << 5
1222 choice = struct.pack("!d", choice)
1223 elif isinstance(choice, int):
1224 tag = 2 << 5
1225 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True)
1226 elif isinstance(choice, bytes):
1227 tag = 3 << 5
1228 else:
1229 assert isinstance(choice, str)
1230 tag = 4 << 5
1231 choice = choice.encode(errors="surrogatepass")
1233 size = len(choice)
1234 if size < 0b11111:
1235 parts.append((tag | size).to_bytes(1, "big"))
1236 else:
1237 parts.append((tag | 0b11111).to_bytes(1, "big"))
1238 parts.append(_pack_uleb128(size))
1239 parts.append(choice)
1241 return b"".join(parts)
1244def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]:
1245 # See above for an explanation of the format.
1246 parts: list[ChoiceT] = []
1247 idx = 0
1248 while idx < len(buffer):
1249 tag = buffer[idx] >> 5
1250 size = buffer[idx] & 0b11111
1251 idx += 1
1253 if tag == 0:
1254 parts.append(bool(size))
1255 continue
1256 if size == 0b11111:
1257 (offset, size) = _unpack_uleb128(buffer[idx:])
1258 idx += offset
1259 chunk = buffer[idx : idx + size]
1260 idx += size
1262 if tag == 1:
1263 assert size == 8, "expected float64"
1264 parts.extend(struct.unpack("!d", chunk))
1265 elif tag == 2:
1266 parts.append(int.from_bytes(chunk, "big", signed=True))
1267 elif tag == 3:
1268 parts.append(chunk)
1269 else:
1270 assert tag == 4
1271 parts.append(chunk.decode(errors="surrogatepass"))
1272 return tuple(parts)
1275def choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...] | None:
1276 """
1277 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes.
1279 Returns None if the given bytestring is not a valid serialization of choice
1280 sequences.
1281 """
1282 try:
1283 return _choices_from_bytes(buffer)
1284 except Exception:
1285 # deserialization error, eg because our format changed or someone put junk
1286 # data in the db.
1287 return None