Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/hypothesis/database.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
11import abc
12import errno
13import json
14import os
15import struct
16import sys
17import tempfile
18import warnings
19import weakref
20from collections.abc import Iterable
21from datetime import datetime, timedelta, timezone
22from functools import lru_cache
23from hashlib import sha384
24from os import PathLike, getenv
25from pathlib import Path, PurePath
26from queue import Queue
27from threading import Thread
28from typing import (
29 TYPE_CHECKING,
30 Any,
31 Callable,
32 ClassVar,
33 Literal,
34 Optional,
35 Union,
36 cast,
37)
38from urllib.error import HTTPError, URLError
39from urllib.request import Request, urlopen
40from zipfile import BadZipFile, ZipFile
42from hypothesis._settings import note_deprecation
43from hypothesis.configuration import storage_directory
44from hypothesis.errors import HypothesisException, HypothesisWarning
45from hypothesis.internal.conjecture.choice import ChoiceT
46from hypothesis.utils.conventions import UniqueIdentifier, not_set
48__all__ = [
49 "DirectoryBasedExampleDatabase",
50 "ExampleDatabase",
51 "GitHubArtifactDatabase",
52 "InMemoryExampleDatabase",
53 "MultiplexedDatabase",
54 "ReadOnlyDatabase",
55]
57if TYPE_CHECKING:
58 from typing import TypeAlias
60 from watchdog.observers.api import BaseObserver
62StrPathT: "TypeAlias" = Union[str, PathLike[str]]
63SaveDataT: "TypeAlias" = tuple[bytes, bytes] # key, value
64DeleteDataT: "TypeAlias" = tuple[bytes, Optional[bytes]] # key, value
65ListenerEventT: "TypeAlias" = Union[
66 tuple[Literal["save"], SaveDataT], tuple[Literal["delete"], DeleteDataT]
67]
68ListenerT: "TypeAlias" = Callable[[ListenerEventT], Any]
71def _usable_dir(path: StrPathT) -> bool:
72 """
73 Returns True if the desired path can be used as database path because
74 either the directory exists and can be used, or its root directory can
75 be used and we can make the directory as needed.
76 """
77 path = Path(path)
78 try:
79 while not path.exists():
80 # Loop terminates because the root dir ('/' on unix) always exists.
81 path = path.parent
82 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK)
83 except PermissionError:
84 return False
87def _db_for_path(
88 path: Optional[Union[StrPathT, UniqueIdentifier, Literal[":memory:"]]] = None,
89) -> "ExampleDatabase":
90 if path is not_set:
91 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover
92 raise HypothesisException(
93 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any "
94 "effect. Configure your database location via a settings profile instead.\n"
95 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles"
96 )
98 path = storage_directory("examples", intent_to_write=False)
99 if not _usable_dir(path): # pragma: no cover
100 warnings.warn(
101 "The database setting is not configured, and the default "
102 "location is unusable - falling back to an in-memory "
103 f"database for this session. {path=}",
104 HypothesisWarning,
105 stacklevel=3,
106 )
107 return InMemoryExampleDatabase()
108 if path in (None, ":memory:"):
109 return InMemoryExampleDatabase()
110 path = cast(StrPathT, path)
111 return DirectoryBasedExampleDatabase(path)
114class _EDMeta(abc.ABCMeta):
115 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase":
116 if self is ExampleDatabase:
117 note_deprecation(
118 "Creating a database using the abstract ExampleDatabase() class "
119 "is deprecated. Prefer using a concrete subclass, like "
120 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). "
121 'In particular, the special string ExampleDatabase(":memory:") '
122 "should be replaced by InMemoryExampleDatabase().",
123 since="2025-04-07",
124 has_codemod=False,
125 )
126 return _db_for_path(*args, **kwargs)
127 return super().__call__(*args, **kwargs)
130# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase
131# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx
132# maintains a list of metaclass-call-methods to ignore, and while they would prefer
133# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we
134# can insert ourselves here.
135#
136# This code only runs if Sphinx has already been imported; and it would live in our
137# docs/conf.py except that we would also like it to work for anyone documenting
138# downstream ExampleDatabase subclasses too.
139if "sphinx" in sys.modules:
140 try:
141 from sphinx.ext.autodoc import _METACLASS_CALL_BLACKLIST
143 _METACLASS_CALL_BLACKLIST.append("hypothesis.database._EDMeta.__call__")
144 except Exception:
145 pass
148class ExampleDatabase(metaclass=_EDMeta):
149 """
150 A Hypothesis database, for use in the |settings.database| setting.
152 Using an |ExampleDatabase| allows Hypothesis to
153 remember and replay failing examples, and ensure that your tests are never
154 flaky. We provide several concrete subclasses, and you can write a custom
155 database backed by your preferred way to store a ``dict[bytes, set[bytes]]``.
157 Change listening
158 ----------------
160 An optional extension to |ExampleDatabase| is change listening. On databases
161 which support it, you can call |ExampleDatabase.add_listener| to add a function
162 as a change listener, which will be called whenever a value is added, deleted,
163 or moved. See |ExampleDatabase.add_listener| for details.
165 All databases in Hypothesis support change listening. Custom database classes
166 are not required to support change listening, unless they want to be
167 compatible with features that require change listening. Currently, no Hypothesis
168 features require change listening.
170 .. note::
172 Change listening is required by `HypoFuzz <https://hypofuzz.com/>`_.
174 Methods
175 -------
177 Required methods:
179 * |ExampleDatabase.save|
180 * |ExampleDatabase.fetch|
181 * |ExampleDatabase.delete|
183 Optional methods:
185 * |ExampleDatabase.move|
187 Change listening methods:
189 * |ExampleDatabase.add_listener|
190 * |ExampleDatabase.remove_listener|
191 * |ExampleDatabase.clear_listeners|
192 * |ExampleDatabase._start_listening|
193 * |ExampleDatabase._stop_listening|
194 * |ExampleDatabase._broadcast_change|
195 """
197 def __init__(self) -> None:
198 self._listeners: list[ListenerT] = []
200 @abc.abstractmethod
201 def save(self, key: bytes, value: bytes) -> None:
202 """Save ``value`` under ``key``.
204 If ``value`` is already present in ``key``, silently do nothing.
205 """
206 raise NotImplementedError(f"{type(self).__name__}.save")
208 @abc.abstractmethod
209 def fetch(self, key: bytes) -> Iterable[bytes]:
210 """Return an iterable over all values matching this key."""
211 raise NotImplementedError(f"{type(self).__name__}.fetch")
213 @abc.abstractmethod
214 def delete(self, key: bytes, value: bytes) -> None:
215 """Remove ``value`` from ``key``.
217 If ``value`` is not present in ``key``, silently do nothing.
218 """
219 raise NotImplementedError(f"{type(self).__name__}.delete")
221 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
222 """
223 Move ``value`` from key ``src`` to key ``dest``.
225 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``,
226 but may have a more efficient implementation.
228 Note that ``value`` will be inserted at ``dest`` regardless of whether
229 it is currently present at ``src``.
230 """
231 if src == dest:
232 self.save(src, value)
233 return
234 self.delete(src, value)
235 self.save(dest, value)
237 def add_listener(self, f: ListenerT, /) -> None:
238 """
239 Add a change listener. ``f`` will be called whenever a value is saved,
240 deleted, or moved in the database.
242 ``f`` can be called with two different event values:
244 * ``("save", (key, value))``
245 * ``("delete", (key, value))``
247 where ``key`` and ``value`` are both ``bytes``.
249 There is no ``move`` event. Instead, a move is broadcasted as a
250 ``delete`` event followed by a ``save`` event.
252 For the ``delete`` event, ``value`` may be ``None``. This might occur if
253 the database knows that a deletion has occurred in ``key``, but does not
254 know what value was deleted.
255 """
256 had_listeners = bool(self._listeners)
257 self._listeners.append(f)
258 if not had_listeners:
259 self._start_listening()
261 def remove_listener(self, f: ListenerT, /) -> None:
262 """
263 Removes ``f`` from the list of change listeners.
265 If ``f`` is not in the list of change listeners, silently do nothing.
266 """
267 if f not in self._listeners:
268 return
269 self._listeners.remove(f)
270 if not self._listeners:
271 self._stop_listening()
273 def clear_listeners(self) -> None:
274 """Remove all change listeners."""
275 had_listeners = bool(self._listeners)
276 self._listeners.clear()
277 if had_listeners:
278 self._stop_listening()
280 def _broadcast_change(self, event: ListenerEventT) -> None:
281 """
282 Called when a value has been either added to or deleted from a key in
283 the underlying database store. The possible values for ``event`` are:
285 * ``("save", (key, value))``
286 * ``("delete", (key, value))``
288 ``value`` may be ``None`` for the ``delete`` event, indicating we know
289 that some value was deleted under this key, but not its exact value.
291 Note that you should not assume your instance is the only reference to
292 the underlying database store. For example, if two instances of
293 |DirectoryBasedExampleDatabase| reference the same directory,
294 _broadcast_change should be called whenever a file is added or removed
295 from the directory, even if that database was not responsible for
296 changing the file.
297 """
298 for listener in self._listeners:
299 listener(event)
301 def _start_listening(self) -> None:
302 """
303 Called when the database adds a change listener, and did not previously
304 have any change listeners. Intended to allow databases to wait to start
305 expensive listening operations until necessary.
307 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate,
308 so you do not need to handle the case of multiple consecutive
309 ``_start_listening`` calls without an intermediate ``_stop_listening``
310 call.
311 """
312 warnings.warn(
313 f"{self.__class__} does not support listening for changes",
314 HypothesisWarning,
315 stacklevel=4,
316 )
318 def _stop_listening(self) -> None:
319 """
320 Called whenever no change listeners remain on the database.
322 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate,
323 so you do not need to handle the case of multiple consecutive
324 ``_stop_listening`` calls without an intermediate ``_start_listening``
325 call.
326 """
327 warnings.warn(
328 f"{self.__class__} does not support stopping listening for changes",
329 HypothesisWarning,
330 stacklevel=4,
331 )
334class InMemoryExampleDatabase(ExampleDatabase):
335 """A non-persistent example database, implemented in terms of an in-memory
336 dictionary.
338 This can be useful if you call a test function several times in a single
339 session, or for testing other database implementations, but because it
340 does not persist between runs we do not recommend it for general use.
341 """
343 def __init__(self) -> None:
344 super().__init__()
345 self.data: dict[bytes, set[bytes]] = {}
347 def __repr__(self) -> str:
348 return f"InMemoryExampleDatabase({self.data!r})"
350 def __eq__(self, other: object) -> bool:
351 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data
353 def fetch(self, key: bytes) -> Iterable[bytes]:
354 yield from self.data.get(key, ())
356 def save(self, key: bytes, value: bytes) -> None:
357 value = bytes(value)
358 values = self.data.setdefault(key, set())
359 changed = value not in values
360 values.add(value)
362 if changed:
363 self._broadcast_change(("save", (key, value)))
365 def delete(self, key: bytes, value: bytes) -> None:
366 value = bytes(value)
367 values = self.data.get(key, set())
368 changed = value in values
369 values.discard(value)
371 if changed:
372 self._broadcast_change(("delete", (key, value)))
374 def _start_listening(self) -> None:
375 # declare compatibility with the listener api, but do the actual
376 # implementation in .delete and .save, since we know we are the only
377 # writer to .data.
378 pass
380 def _stop_listening(self) -> None:
381 pass
384def _hash(key: bytes) -> str:
385 return sha384(key).hexdigest()[:16]
388class DirectoryBasedExampleDatabase(ExampleDatabase):
389 """Use a directory to store Hypothesis examples as files.
391 Each test corresponds to a directory, and each example to a file within that
392 directory. While the contents are fairly opaque, a
393 |DirectoryBasedExampleDatabase| can be shared by checking the directory
394 into version control, for example with the following ``.gitignore``::
396 # Ignore files cached by Hypothesis...
397 .hypothesis/*
398 # except for the examples directory
399 !.hypothesis/examples/
401 Note however that this only makes sense if you also pin to an exact version of
402 Hypothesis, and we would usually recommend implementing a shared database with
403 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper.
404 """
406 # we keep a database entry of the full values of all the database keys.
407 # currently only used for inverse mapping of hash -> key in change listening.
408 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys"
409 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name)
411 def __init__(self, path: StrPathT) -> None:
412 super().__init__()
413 self.path = Path(path)
414 self.keypaths: dict[bytes, Path] = {}
415 self._observer: BaseObserver | None = None
417 def __repr__(self) -> str:
418 return f"DirectoryBasedExampleDatabase({self.path!r})"
420 def __eq__(self, other: object) -> bool:
421 return (
422 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path
423 )
425 def _key_path(self, key: bytes) -> Path:
426 try:
427 return self.keypaths[key]
428 except KeyError:
429 pass
430 self.keypaths[key] = self.path / _hash(key)
431 return self.keypaths[key]
433 def _value_path(self, key: bytes, value: bytes) -> Path:
434 return self._key_path(key) / _hash(value)
436 def fetch(self, key: bytes) -> Iterable[bytes]:
437 kp = self._key_path(key)
438 if not kp.is_dir():
439 return
440 for path in os.listdir(kp):
441 try:
442 yield (kp / path).read_bytes()
443 except OSError:
444 pass
446 def save(self, key: bytes, value: bytes) -> None:
447 key_path = self._key_path(key)
448 if key_path.name != self._metakeys_hash:
449 # add this key to our meta entry of all keys - taking care to avoid
450 # infinite recursion.
451 self.save(self._metakeys_name, key)
453 # Note: we attempt to create the dir in question now. We
454 # already checked for permissions, but there can still be other issues,
455 # e.g. the disk is full, or permissions might have been changed.
456 try:
457 key_path.mkdir(exist_ok=True, parents=True)
458 path = self._value_path(key, value)
459 if not path.exists():
460 # to mimic an atomic write, create and write in a temporary
461 # directory, and only move to the final path after. This avoids
462 # any intermediate state where the file is created (and empty)
463 # but not yet written to.
464 fd, tmpname = tempfile.mkstemp()
465 tmppath = Path(tmpname)
466 os.write(fd, value)
467 os.close(fd)
468 try:
469 tmppath.rename(path)
470 except OSError as err: # pragma: no cover
471 if err.errno == errno.EXDEV:
472 # Can't rename across filesystem boundaries, see e.g.
473 # https://github.com/HypothesisWorks/hypothesis/issues/4335
474 try:
475 path.write_bytes(tmppath.read_bytes())
476 except OSError:
477 pass
478 tmppath.unlink()
479 assert not tmppath.exists()
480 except OSError: # pragma: no cover
481 pass
483 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
484 if src == dest:
485 self.save(src, value)
486 return
488 src_path = self._value_path(src, value)
489 dest_path = self._value_path(dest, value)
490 # if the dest key path does not exist, os.renames will create it for us,
491 # and we will never track its creation in the meta keys entry. Do so now.
492 if not self._key_path(dest).exists():
493 self.save(self._metakeys_name, dest)
495 try:
496 os.renames(src_path, dest_path)
497 except OSError:
498 self.delete(src, value)
499 self.save(dest, value)
501 def delete(self, key: bytes, value: bytes) -> None:
502 try:
503 self._value_path(key, value).unlink()
504 except OSError:
505 pass
507 def _start_listening(self) -> None:
508 try:
509 from watchdog.events import (
510 DirCreatedEvent,
511 DirDeletedEvent,
512 DirMovedEvent,
513 FileCreatedEvent,
514 FileDeletedEvent,
515 FileMovedEvent,
516 FileSystemEventHandler,
517 )
518 from watchdog.observers import Observer
519 except ImportError:
520 warnings.warn(
521 f"listening for changes in a {self.__class__.__name__} "
522 "requires the watchdog library. To install, run "
523 "`pip install hypothesis[watchdog]`",
524 HypothesisWarning,
525 stacklevel=4,
526 )
527 return
529 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)}
530 _metakeys_hash = self._metakeys_hash
531 _broadcast_change = self._broadcast_change
533 class Handler(FileSystemEventHandler):
534 def on_created(
535 _self, event: Union[FileCreatedEvent, DirCreatedEvent]
536 ) -> None:
537 # we only registered for the file creation event
538 assert not isinstance(event, DirCreatedEvent)
539 # watchdog events are only bytes if we passed a byte path to
540 # .schedule
541 assert isinstance(event.src_path, str)
543 value_path = Path(event.src_path)
544 # the parent dir represents the key, and its name is the key hash
545 key_hash = value_path.parent.name
547 if key_hash == _metakeys_hash:
548 hash_to_key[value_path.name] = value_path.read_bytes()
549 return
551 key = hash_to_key.get(key_hash)
552 if key is None: # pragma: no cover
553 # we didn't recognize this key. This shouldn't ever happen,
554 # but some race condition trickery might cause this.
555 return
557 try:
558 value = value_path.read_bytes()
559 except OSError: # pragma: no cover
560 return
562 _broadcast_change(("save", (key, value)))
564 def on_deleted(
565 self, event: Union[FileDeletedEvent, DirDeletedEvent]
566 ) -> None:
567 assert not isinstance(event, DirDeletedEvent)
568 assert isinstance(event.src_path, str)
570 value_path = Path(event.src_path)
571 key = hash_to_key.get(value_path.parent.name)
572 if key is None: # pragma: no cover
573 return
575 _broadcast_change(("delete", (key, None)))
577 def on_moved(self, event: Union[FileMovedEvent, DirMovedEvent]) -> None:
578 assert not isinstance(event, DirMovedEvent)
579 assert isinstance(event.src_path, str)
580 assert isinstance(event.dest_path, str)
582 src_path = Path(event.src_path)
583 dest_path = Path(event.dest_path)
584 k1 = hash_to_key.get(src_path.parent.name)
585 k2 = hash_to_key.get(dest_path.parent.name)
587 if k1 is None or k2 is None: # pragma: no cover
588 return
590 try:
591 value = dest_path.read_bytes()
592 except OSError: # pragma: no cover
593 return
595 _broadcast_change(("delete", (k1, value)))
596 _broadcast_change(("save", (k2, value)))
598 # If we add a listener to a DirectoryBasedExampleDatabase whose database
599 # directory doesn't yet exist, the watchdog observer will not fire any
600 # events, even after the directory gets created.
601 #
602 # Ensure the directory exists before starting the observer.
603 self.path.mkdir(exist_ok=True, parents=True)
604 self._observer = Observer()
605 self._observer.schedule(
606 Handler(),
607 # remove type: ignore when released
608 # https://github.com/gorakhargosh/watchdog/pull/1096
609 self.path, # type: ignore
610 recursive=True,
611 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent],
612 )
613 self._observer.start()
615 def _stop_listening(self) -> None:
616 assert self._observer is not None
617 self._observer.stop()
618 self._observer.join()
619 self._observer = None
622class ReadOnlyDatabase(ExampleDatabase):
623 """A wrapper to make the given database read-only.
625 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and
626 ``move`` into silent no-ops.
628 Note that this disables Hypothesis' automatic discarding of stale examples.
629 It is designed to allow local machines to access a shared database (e.g. from CI
630 servers), without propagating changes back from a local or in-development branch.
631 """
633 def __init__(self, db: ExampleDatabase) -> None:
634 super().__init__()
635 assert isinstance(db, ExampleDatabase)
636 self._wrapped = db
638 def __repr__(self) -> str:
639 return f"ReadOnlyDatabase({self._wrapped!r})"
641 def __eq__(self, other: object) -> bool:
642 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped
644 def fetch(self, key: bytes) -> Iterable[bytes]:
645 yield from self._wrapped.fetch(key)
647 def save(self, key: bytes, value: bytes) -> None:
648 pass
650 def delete(self, key: bytes, value: bytes) -> None:
651 pass
653 def _start_listening(self) -> None:
654 # we're read only, so there are no changes to broadcast.
655 pass
657 def _stop_listening(self) -> None:
658 pass
661class MultiplexedDatabase(ExampleDatabase):
662 """A wrapper around multiple databases.
664 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against
665 all of the wrapped databases. ``fetch`` does not yield duplicate values, even
666 if the same value is present in two or more of the wrapped databases.
668 This combines well with a :class:`ReadOnlyDatabase`, as follows:
670 .. code-block:: python
672 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/")
673 shared = CustomNetworkDatabase()
675 settings.register_profile("ci", database=shared)
676 settings.register_profile(
677 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared))
678 )
679 settings.load_profile("ci" if os.environ.get("CI") else "dev")
681 So your CI system or fuzzing runs can populate a central shared database;
682 while local runs on development machines can reproduce any failures from CI
683 but will only cache their own failures locally and cannot remove examples
684 from the shared database.
685 """
687 def __init__(self, *dbs: ExampleDatabase) -> None:
688 super().__init__()
689 assert all(isinstance(db, ExampleDatabase) for db in dbs)
690 self._wrapped = dbs
692 def __repr__(self) -> str:
693 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped)))
695 def __eq__(self, other: object) -> bool:
696 return (
697 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped
698 )
700 def fetch(self, key: bytes) -> Iterable[bytes]:
701 seen = set()
702 for db in self._wrapped:
703 for value in db.fetch(key):
704 if value not in seen:
705 yield value
706 seen.add(value)
708 def save(self, key: bytes, value: bytes) -> None:
709 for db in self._wrapped:
710 db.save(key, value)
712 def delete(self, key: bytes, value: bytes) -> None:
713 for db in self._wrapped:
714 db.delete(key, value)
716 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
717 for db in self._wrapped:
718 db.move(src, dest, value)
720 def _start_listening(self) -> None:
721 for db in self._wrapped:
722 db.add_listener(self._broadcast_change)
724 def _stop_listening(self) -> None:
725 for db in self._wrapped:
726 db.remove_listener(self._broadcast_change)
729class GitHubArtifactDatabase(ExampleDatabase):
730 """
731 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact.
733 You can use this for sharing example databases between CI runs and developers, allowing
734 the latter to get read-only access to the former. This is particularly useful for
735 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_),
736 where the CI system can help find new failing examples through fuzzing,
737 and developers can reproduce them locally without any manual effort.
739 .. note::
740 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides
741 this automatically, but it needs to be set manually for local usage. In a developer machine,
742 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_.
743 If the repository is private, it's necessary for the token to have ``repo`` scope
744 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token.
747 In most cases, this will be used
748 through the :class:`~hypothesis.database.MultiplexedDatabase`,
749 by combining a local directory-based database with this one. For example:
751 .. code-block:: python
753 local = DirectoryBasedExampleDatabase(".hypothesis/examples")
754 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo"))
756 settings.register_profile("ci", database=local)
757 settings.register_profile("dev", database=MultiplexedDatabase(local, shared))
758 # We don't want to use the shared database in CI, only to populate its local one.
759 # which the workflow should then upload as an artifact.
760 settings.load_profile("ci" if os.environ.get("CI") else "dev")
762 .. note::
763 Because this database is read-only, you always need to wrap it with the
764 :class:`ReadOnlyDatabase`.
766 A setup like this can be paired with a GitHub Actions workflow including
767 something like the following:
769 .. code-block:: yaml
771 - name: Download example database
772 uses: dawidd6/action-download-artifact@v9
773 with:
774 name: hypothesis-example-db
775 path: .hypothesis/examples
776 if_no_artifact_found: warn
777 workflow_conclusion: completed
779 - name: Run tests
780 run: pytest
782 - name: Upload example database
783 uses: actions/upload-artifact@v3
784 if: always()
785 with:
786 name: hypothesis-example-db
787 path: .hypothesis/examples
789 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_
790 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_
791 does not support downloading artifacts from previous workflow runs.
793 The database automatically implements a simple file-based cache with a default expiration period
794 of 1 day. You can adjust this through the ``cache_timeout`` property.
796 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``).
797 """
799 def __init__(
800 self,
801 owner: str,
802 repo: str,
803 artifact_name: str = "hypothesis-example-db",
804 cache_timeout: timedelta = timedelta(days=1),
805 path: Optional[StrPathT] = None,
806 ):
807 super().__init__()
808 self.owner = owner
809 self.repo = repo
810 self.artifact_name = artifact_name
811 self.cache_timeout = cache_timeout
813 # Get the GitHub token from the environment
814 # It's unnecessary to use a token if the repo is public
815 self.token: Optional[str] = getenv("GITHUB_TOKEN")
817 if path is None:
818 self.path: Path = Path(
819 storage_directory(f"github-artifacts/{self.artifact_name}/")
820 )
821 else:
822 self.path = Path(path)
824 # We don't want to initialize the cache until we need to
825 self._initialized: bool = False
826 self._disabled: bool = False
828 # This is the path to the artifact in usage
829 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip
830 self._artifact: Optional[Path] = None
831 # This caches the artifact structure
832 self._access_cache: Optional[dict[PurePath, set[PurePath]]] = None
834 # Message to display if user doesn't wrap around ReadOnlyDatabase
835 self._read_only_message = (
836 "This database is read-only. "
837 "Please wrap this class with ReadOnlyDatabase"
838 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))."
839 )
841 def __repr__(self) -> str:
842 return (
843 f"GitHubArtifactDatabase(owner={self.owner!r}, "
844 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})"
845 )
847 def __eq__(self, other: object) -> bool:
848 return (
849 isinstance(other, GitHubArtifactDatabase)
850 and self.owner == other.owner
851 and self.repo == other.repo
852 and self.artifact_name == other.artifact_name
853 and self.path == other.path
854 )
856 def _prepare_for_io(self) -> None:
857 assert self._artifact is not None, "Artifact not loaded."
859 if self._initialized: # pragma: no cover
860 return
862 # Test that the artifact is valid
863 try:
864 with ZipFile(self._artifact) as f:
865 if f.testzip(): # pragma: no cover
866 raise BadZipFile
868 # Turns out that testzip() doesn't work quite well
869 # doing the cache initialization here instead
870 # will give us more coverage of the artifact.
872 # Cache the files inside each keypath
873 self._access_cache = {}
874 with ZipFile(self._artifact) as zf:
875 namelist = zf.namelist()
876 # Iterate over files in the artifact
877 for filename in namelist:
878 fileinfo = zf.getinfo(filename)
879 if fileinfo.is_dir():
880 self._access_cache[PurePath(filename)] = set()
881 else:
882 # Get the keypath from the filename
883 keypath = PurePath(filename).parent
884 # Add the file to the keypath
885 self._access_cache[keypath].add(PurePath(filename))
886 except BadZipFile:
887 warnings.warn(
888 "The downloaded artifact from GitHub is invalid. "
889 "This could be because the artifact was corrupted, "
890 "or because the artifact was not created by Hypothesis. ",
891 HypothesisWarning,
892 stacklevel=3,
893 )
894 self._disabled = True
896 self._initialized = True
898 def _initialize_db(self) -> None:
899 # Trigger warning that we suppressed earlier by intent_to_write=False
900 storage_directory(self.path.name)
901 # Create the cache directory if it doesn't exist
902 self.path.mkdir(exist_ok=True, parents=True)
904 # Get all artifacts
905 cached_artifacts = sorted(
906 self.path.glob("*.zip"),
907 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")),
908 )
910 # Remove all but the latest artifact
911 for artifact in cached_artifacts[:-1]:
912 artifact.unlink()
914 try:
915 found_artifact = cached_artifacts[-1]
916 except IndexError:
917 found_artifact = None
919 # Check if the latest artifact is a cache hit
920 if found_artifact is not None and (
921 datetime.now(timezone.utc)
922 - datetime.fromisoformat(found_artifact.stem.replace("_", ":"))
923 < self.cache_timeout
924 ):
925 self._artifact = found_artifact
926 else:
927 # Download the latest artifact from GitHub
928 new_artifact = self._fetch_artifact()
930 if new_artifact:
931 if found_artifact is not None:
932 found_artifact.unlink()
933 self._artifact = new_artifact
934 elif found_artifact is not None:
935 warnings.warn(
936 "Using an expired artifact as a fallback for the database: "
937 f"{found_artifact}",
938 HypothesisWarning,
939 stacklevel=2,
940 )
941 self._artifact = found_artifact
942 else:
943 warnings.warn(
944 "Couldn't acquire a new or existing artifact. Disabling database.",
945 HypothesisWarning,
946 stacklevel=2,
947 )
948 self._disabled = True
949 return
951 self._prepare_for_io()
953 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover
954 request = Request(
955 url,
956 headers={
957 "Accept": "application/vnd.github+json",
958 "X-GitHub-Api-Version": "2022-11-28 ",
959 "Authorization": f"Bearer {self.token}",
960 },
961 )
962 warning_message = None
963 response_bytes: Optional[bytes] = None
964 try:
965 with urlopen(request) as response:
966 response_bytes = response.read()
967 except HTTPError as e:
968 if e.code == 401:
969 warning_message = (
970 "Authorization failed when trying to download artifact from GitHub. "
971 "Check that you have a valid GITHUB_TOKEN set in your environment."
972 )
973 else:
974 warning_message = (
975 "Could not get the latest artifact from GitHub. "
976 "This could be because because the repository "
977 "or artifact does not exist. "
978 )
979 except URLError:
980 warning_message = "Could not connect to GitHub to get the latest artifact. "
981 except TimeoutError:
982 warning_message = (
983 "Could not connect to GitHub to get the latest artifact "
984 "(connection timed out)."
985 )
987 if warning_message is not None:
988 warnings.warn(warning_message, HypothesisWarning, stacklevel=4)
989 return None
991 return response_bytes
993 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover
994 # Get the list of artifacts from GitHub
995 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts"
996 response_bytes = self._get_bytes(url)
997 if response_bytes is None:
998 return None
1000 artifacts = json.loads(response_bytes)["artifacts"]
1001 artifacts = [a for a in artifacts if a["name"] == self.artifact_name]
1003 if not artifacts:
1004 return None
1006 # Get the latest artifact from the list
1007 artifact = max(artifacts, key=lambda a: a["created_at"])
1008 url = artifact["archive_download_url"]
1010 # Download the artifact
1011 artifact_bytes = self._get_bytes(url)
1012 if artifact_bytes is None:
1013 return None
1015 # Save the artifact to the cache
1016 # We replace ":" with "_" to ensure the filenames are compatible
1017 # with Windows filesystems
1018 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_")
1019 artifact_path = self.path / f"{timestamp}.zip"
1020 try:
1021 artifact_path.write_bytes(artifact_bytes)
1022 except OSError:
1023 warnings.warn(
1024 "Could not save the latest artifact from GitHub. ",
1025 HypothesisWarning,
1026 stacklevel=3,
1027 )
1028 return None
1030 return artifact_path
1032 @staticmethod
1033 @lru_cache
1034 def _key_path(key: bytes) -> PurePath:
1035 return PurePath(_hash(key) + "/")
1037 def fetch(self, key: bytes) -> Iterable[bytes]:
1038 if self._disabled:
1039 return
1041 if not self._initialized:
1042 self._initialize_db()
1043 if self._disabled:
1044 return
1046 assert self._artifact is not None
1047 assert self._access_cache is not None
1049 kp = self._key_path(key)
1051 with ZipFile(self._artifact) as zf:
1052 # Get the all files in the the kp from the cache
1053 filenames = self._access_cache.get(kp, ())
1054 for filename in filenames:
1055 with zf.open(filename.as_posix()) as f:
1056 yield f.read()
1058 # Read-only interface
1059 def save(self, key: bytes, value: bytes) -> None:
1060 raise RuntimeError(self._read_only_message)
1062 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1063 raise RuntimeError(self._read_only_message)
1065 def delete(self, key: bytes, value: bytes) -> None:
1066 raise RuntimeError(self._read_only_message)
1069class BackgroundWriteDatabase(ExampleDatabase):
1070 """A wrapper which defers writes on the given database to a background thread.
1072 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any
1073 enqueued writes to finish before fetching from the database.
1074 """
1076 def __init__(self, db: ExampleDatabase) -> None:
1077 super().__init__()
1078 self._db = db
1079 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue()
1080 self._thread = Thread(target=self._worker, daemon=True)
1081 self._thread.start()
1082 # avoid an unbounded timeout during gc. 0.1 should be plenty for most
1083 # use cases.
1084 weakref.finalize(self, self._join, 0.1)
1086 def __repr__(self) -> str:
1087 return f"BackgroundWriteDatabase({self._db!r})"
1089 def __eq__(self, other: object) -> bool:
1090 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db
1092 def _worker(self) -> None:
1093 while True:
1094 method, args = self._queue.get()
1095 getattr(self._db, method)(*args)
1096 self._queue.task_done()
1098 def _join(self, timeout: Optional[float] = None) -> None:
1099 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634
1100 with self._queue.all_tasks_done:
1101 while self._queue.unfinished_tasks:
1102 self._queue.all_tasks_done.wait(timeout)
1104 def fetch(self, key: bytes) -> Iterable[bytes]:
1105 self._join()
1106 return self._db.fetch(key)
1108 def save(self, key: bytes, value: bytes) -> None:
1109 self._queue.put(("save", (key, value)))
1111 def delete(self, key: bytes, value: bytes) -> None:
1112 self._queue.put(("delete", (key, value)))
1114 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
1115 self._queue.put(("move", (src, dest, value)))
1117 def _start_listening(self) -> None:
1118 self._db.add_listener(self._broadcast_change)
1120 def _stop_listening(self) -> None:
1121 self._db.remove_listener(self._broadcast_change)
1124def _pack_uleb128(value: int) -> bytes:
1125 """
1126 Serialize an integer into variable-length bytes. For each byte, the first 7
1127 bits represent (part of) the integer, while the last bit indicates whether the
1128 integer continues into the next byte.
1130 https://en.wikipedia.org/wiki/LEB128
1131 """
1132 parts = bytearray()
1133 assert value >= 0
1134 while True:
1135 # chop off 7 bits
1136 byte = value & ((1 << 7) - 1)
1137 value >>= 7
1138 # set the continuation bit if we have more left
1139 if value:
1140 byte |= 1 << 7
1142 parts.append(byte)
1143 if not value:
1144 break
1145 return bytes(parts)
1148def _unpack_uleb128(buffer: bytes) -> tuple[int, int]:
1149 """
1150 Inverts _pack_uleb128, and also returns the index at which at which we stopped
1151 reading.
1152 """
1153 value = 0
1154 for i, byte in enumerate(buffer):
1155 n = byte & ((1 << 7) - 1)
1156 value |= n << (i * 7)
1158 if not byte >> 7:
1159 break
1160 return (i + 1, value)
1163def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes:
1164 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes."""
1165 # We use a custom serialization format for this, which might seem crazy - but our
1166 # data is a flat sequence of elements, and standard tools like protobuf or msgpack
1167 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode.
1168 #
1169 # We simply encode each element with a metadata byte, if needed a uint16 size, and
1170 # then the payload bytes. For booleans, the payload is inlined into the metadata.
1171 parts = []
1172 for choice in choices:
1173 if isinstance(choice, bool):
1174 # `000_0000v` - tag zero, low bit payload.
1175 parts.append(b"\1" if choice else b"\0")
1176 continue
1178 # `tag_ssss [uint16 size?] [payload]`
1179 if isinstance(choice, float):
1180 tag = 1 << 5
1181 choice = struct.pack("!d", choice)
1182 elif isinstance(choice, int):
1183 tag = 2 << 5
1184 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True)
1185 elif isinstance(choice, bytes):
1186 tag = 3 << 5
1187 else:
1188 assert isinstance(choice, str)
1189 tag = 4 << 5
1190 choice = choice.encode(errors="surrogatepass")
1192 size = len(choice)
1193 if size < 0b11111:
1194 parts.append((tag | size).to_bytes(1, "big"))
1195 else:
1196 parts.append((tag | 0b11111).to_bytes(1, "big"))
1197 parts.append(_pack_uleb128(size))
1198 parts.append(choice)
1200 return b"".join(parts)
1203def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]:
1204 # See above for an explanation of the format.
1205 parts: list[ChoiceT] = []
1206 idx = 0
1207 while idx < len(buffer):
1208 tag = buffer[idx] >> 5
1209 size = buffer[idx] & 0b11111
1210 idx += 1
1212 if tag == 0:
1213 parts.append(bool(size))
1214 continue
1215 if size == 0b11111:
1216 (offset, size) = _unpack_uleb128(buffer[idx:])
1217 idx += offset
1218 chunk = buffer[idx : idx + size]
1219 idx += size
1221 if tag == 1:
1222 assert size == 8, "expected float64"
1223 parts.extend(struct.unpack("!d", chunk))
1224 elif tag == 2:
1225 parts.append(int.from_bytes(chunk, "big", signed=True))
1226 elif tag == 3:
1227 parts.append(chunk)
1228 else:
1229 assert tag == 4
1230 parts.append(chunk.decode(errors="surrogatepass"))
1231 return tuple(parts)
1234def choices_from_bytes(buffer: bytes, /) -> Optional[tuple[ChoiceT, ...]]:
1235 """
1236 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes.
1238 Returns None if the given bytestring is not a valid serialization of choice
1239 sequences.
1240 """
1241 try:
1242 return _choices_from_bytes(buffer)
1243 except Exception:
1244 # deserialization error, eg because our format changed or someone put junk
1245 # data in the db.
1246 return None