1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import abc
12import binascii
13import json
14import os
15import sys
16import warnings
17from datetime import datetime, timedelta, timezone
18from functools import lru_cache
19from hashlib import sha384
20from os import getenv
21from pathlib import Path, PurePath
22from typing import Dict, Iterable, Optional, Set
23from urllib.error import HTTPError, URLError
24from urllib.request import Request, urlopen
25from zipfile import BadZipFile, ZipFile
26
27from hypothesis.configuration import storage_directory
28from hypothesis.errors import HypothesisException, HypothesisWarning
29from hypothesis.utils.conventions import not_set
30
31__all__ = [
32 "DirectoryBasedExampleDatabase",
33 "ExampleDatabase",
34 "InMemoryExampleDatabase",
35 "MultiplexedDatabase",
36 "ReadOnlyDatabase",
37 "GitHubArtifactDatabase",
38]
39
40
41def _usable_dir(path: os.PathLike) -> bool:
42 """
43 Returns True if the desired path can be used as database path because
44 either the directory exists and can be used, or its root directory can
45 be used and we can make the directory as needed.
46 """
47 path = Path(path)
48 try:
49 while not path.exists():
50 # Loop terminates because the root dir ('/' on unix) always exists.
51 path = path.parent
52 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK)
53 except PermissionError:
54 return False
55
56
57def _db_for_path(path=None):
58 if path is not_set:
59 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover
60 raise HypothesisException(
61 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any "
62 "effect. Configure your database location via a settings profile instead.\n"
63 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles"
64 )
65
66 path = storage_directory("examples", intent_to_write=False)
67 if not _usable_dir(path): # pragma: no cover
68 warnings.warn(
69 "The database setting is not configured, and the default "
70 "location is unusable - falling back to an in-memory "
71 f"database for this session. {path=}",
72 HypothesisWarning,
73 stacklevel=3,
74 )
75 return InMemoryExampleDatabase()
76 if path in (None, ":memory:"):
77 return InMemoryExampleDatabase()
78 return DirectoryBasedExampleDatabase(path)
79
80
81class _EDMeta(abc.ABCMeta):
82 def __call__(self, *args, **kwargs):
83 if self is ExampleDatabase:
84 return _db_for_path(*args, **kwargs)
85 return super().__call__(*args, **kwargs)
86
87
88# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase
89# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx
90# maintains a list of metaclass-call-methods to ignore, and while they would prefer
91# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we
92# can insert ourselves here.
93#
94# This code only runs if Sphinx has already been imported; and it would live in our
95# docs/conf.py except that we would also like it to work for anyone documenting
96# downstream ExampleDatabase subclasses too.
97if "sphinx" in sys.modules:
98 try:
99 from sphinx.ext.autodoc import _METACLASS_CALL_BLACKLIST
100
101 _METACLASS_CALL_BLACKLIST.append("hypothesis.database._EDMeta.__call__")
102 except Exception:
103 pass
104
105
106class ExampleDatabase(metaclass=_EDMeta):
107 """An abstract base class for storing examples in Hypothesis' internal format.
108
109 An ExampleDatabase maps each ``bytes`` key to many distinct ``bytes``
110 values, like a ``Mapping[bytes, AbstractSet[bytes]]``.
111 """
112
113 @abc.abstractmethod
114 def save(self, key: bytes, value: bytes) -> None:
115 """Save ``value`` under ``key``.
116
117 If this value is already present for this key, silently do nothing.
118 """
119 raise NotImplementedError(f"{type(self).__name__}.save")
120
121 @abc.abstractmethod
122 def fetch(self, key: bytes) -> Iterable[bytes]:
123 """Return an iterable over all values matching this key."""
124 raise NotImplementedError(f"{type(self).__name__}.fetch")
125
126 @abc.abstractmethod
127 def delete(self, key: bytes, value: bytes) -> None:
128 """Remove this value from this key.
129
130 If this value is not present, silently do nothing.
131 """
132 raise NotImplementedError(f"{type(self).__name__}.delete")
133
134 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
135 """Move ``value`` from key ``src`` to key ``dest``. Equivalent to
136 ``delete(src, value)`` followed by ``save(src, value)``, but may
137 have a more efficient implementation.
138
139 Note that ``value`` will be inserted at ``dest`` regardless of whether
140 it is currently present at ``src``.
141 """
142 if src == dest:
143 self.save(src, value)
144 return
145 self.delete(src, value)
146 self.save(dest, value)
147
148
149class InMemoryExampleDatabase(ExampleDatabase):
150 """A non-persistent example database, implemented in terms of a dict of sets.
151
152 This can be useful if you call a test function several times in a single
153 session, or for testing other database implementations, but because it
154 does not persist between runs we do not recommend it for general use.
155 """
156
157 def __init__(self):
158 self.data = {}
159
160 def __repr__(self) -> str:
161 return f"InMemoryExampleDatabase({self.data!r})"
162
163 def fetch(self, key: bytes) -> Iterable[bytes]:
164 yield from self.data.get(key, ())
165
166 def save(self, key: bytes, value: bytes) -> None:
167 self.data.setdefault(key, set()).add(bytes(value))
168
169 def delete(self, key: bytes, value: bytes) -> None:
170 self.data.get(key, set()).discard(bytes(value))
171
172
173def _hash(key):
174 return sha384(key).hexdigest()[:16]
175
176
177class DirectoryBasedExampleDatabase(ExampleDatabase):
178 """Use a directory to store Hypothesis examples as files.
179
180 Each test corresponds to a directory, and each example to a file within that
181 directory. While the contents are fairly opaque, a
182 ``DirectoryBasedExampleDatabase`` can be shared by checking the directory
183 into version control, for example with the following ``.gitignore``::
184
185 # Ignore files cached by Hypothesis...
186 .hypothesis/*
187 # except for the examples directory
188 !.hypothesis/examples/
189
190 Note however that this only makes sense if you also pin to an exact version of
191 Hypothesis, and we would usually recommend implementing a shared database with
192 a network datastore - see :class:`~hypothesis.database.ExampleDatabase`, and
193 the :class:`~hypothesis.database.MultiplexedDatabase` helper.
194 """
195
196 def __init__(self, path: os.PathLike) -> None:
197 self.path = Path(path)
198 self.keypaths: Dict[bytes, Path] = {}
199
200 def __repr__(self) -> str:
201 return f"DirectoryBasedExampleDatabase({self.path!r})"
202
203 def _key_path(self, key: bytes) -> Path:
204 try:
205 return self.keypaths[key]
206 except KeyError:
207 pass
208 self.keypaths[key] = self.path / _hash(key)
209 return self.keypaths[key]
210
211 def _value_path(self, key, value):
212 return self._key_path(key) / _hash(value)
213
214 def fetch(self, key: bytes) -> Iterable[bytes]:
215 kp = self._key_path(key)
216 if not kp.is_dir():
217 return
218 for path in os.listdir(kp):
219 try:
220 yield (kp / path).read_bytes()
221 except OSError:
222 pass
223
224 def save(self, key: bytes, value: bytes) -> None:
225 # Note: we attempt to create the dir in question now. We
226 # already checked for permissions, but there can still be other issues,
227 # e.g. the disk is full, or permissions might have been changed.
228 self._key_path(key).mkdir(exist_ok=True, parents=True)
229 path = self._value_path(key, value)
230 if not path.exists():
231 suffix = binascii.hexlify(os.urandom(16)).decode("ascii")
232 tmpname = path.with_suffix(f"{path.suffix}.{suffix}")
233 tmpname.write_bytes(value)
234 try:
235 tmpname.rename(path)
236 except OSError: # pragma: no cover
237 tmpname.unlink()
238 assert not tmpname.exists()
239
240 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
241 if src == dest:
242 self.save(src, value)
243 return
244 try:
245 os.renames(
246 self._value_path(src, value),
247 self._value_path(dest, value),
248 )
249 except OSError:
250 self.delete(src, value)
251 self.save(dest, value)
252
253 def delete(self, key: bytes, value: bytes) -> None:
254 try:
255 self._value_path(key, value).unlink()
256 except OSError:
257 pass
258
259
260class ReadOnlyDatabase(ExampleDatabase):
261 """A wrapper to make the given database read-only.
262
263 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and
264 ``move`` into silent no-ops.
265
266 Note that this disables Hypothesis' automatic discarding of stale examples.
267 It is designed to allow local machines to access a shared database (e.g. from CI
268 servers), without propagating changes back from a local or in-development branch.
269 """
270
271 def __init__(self, db: ExampleDatabase) -> None:
272 assert isinstance(db, ExampleDatabase)
273 self._wrapped = db
274
275 def __repr__(self) -> str:
276 return f"ReadOnlyDatabase({self._wrapped!r})"
277
278 def fetch(self, key: bytes) -> Iterable[bytes]:
279 yield from self._wrapped.fetch(key)
280
281 def save(self, key: bytes, value: bytes) -> None:
282 pass
283
284 def delete(self, key: bytes, value: bytes) -> None:
285 pass
286
287
288class MultiplexedDatabase(ExampleDatabase):
289 """A wrapper around multiple databases.
290
291 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against
292 all of the wrapped databases. ``fetch`` does not yield duplicate values, even
293 if the same value is present in two or more of the wrapped databases.
294
295 This combines well with a :class:`ReadOnlyDatabase`, as follows:
296
297 .. code-block:: python
298
299 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/")
300 shared = CustomNetworkDatabase()
301
302 settings.register_profile("ci", database=shared)
303 settings.register_profile(
304 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared))
305 )
306 settings.load_profile("ci" if os.environ.get("CI") else "dev")
307
308 So your CI system or fuzzing runs can populate a central shared database;
309 while local runs on development machines can reproduce any failures from CI
310 but will only cache their own failures locally and cannot remove examples
311 from the shared database.
312 """
313
314 def __init__(self, *dbs: ExampleDatabase) -> None:
315 assert all(isinstance(db, ExampleDatabase) for db in dbs)
316 self._wrapped = dbs
317
318 def __repr__(self) -> str:
319 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped)))
320
321 def fetch(self, key: bytes) -> Iterable[bytes]:
322 seen = set()
323 for db in self._wrapped:
324 for value in db.fetch(key):
325 if value not in seen:
326 yield value
327 seen.add(value)
328
329 def save(self, key: bytes, value: bytes) -> None:
330 for db in self._wrapped:
331 db.save(key, value)
332
333 def delete(self, key: bytes, value: bytes) -> None:
334 for db in self._wrapped:
335 db.delete(key, value)
336
337 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
338 for db in self._wrapped:
339 db.move(src, dest, value)
340
341
342class GitHubArtifactDatabase(ExampleDatabase):
343 """
344 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact.
345
346 You can use this for sharing example databases between CI runs and developers, allowing
347 the latter to get read-only access to the former. This is particularly useful for
348 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_),
349 where the CI system can help find new failing examples through fuzzing,
350 and developers can reproduce them locally without any manual effort.
351
352 .. note::
353 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides
354 this automatically, but it needs to be set manually for local usage. In a developer machine,
355 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token>`_.
356 If the repository is private, it's necessary for the token to have ``repo`` scope
357 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token.
358
359
360 In most cases, this will be used
361 through the :class:`~hypothesis.database.MultiplexedDatabase`,
362 by combining a local directory-based database with this one. For example:
363
364 .. code-block:: python
365
366 local = DirectoryBasedExampleDatabase(".hypothesis/examples")
367 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo"))
368
369 settings.register_profile("ci", database=local)
370 settings.register_profile("dev", database=MultiplexedDatabase(local, shared))
371 # We don't want to use the shared database in CI, only to populate its local one.
372 # which the workflow should then upload as an artifact.
373 settings.load_profile("ci" if os.environ.get("CI") else "dev")
374
375 .. note::
376 Because this database is read-only, you always need to wrap it with the
377 :class:`ReadOnlyDatabase`.
378
379 A setup like this can be paired with a GitHub Actions workflow including
380 something like the following:
381
382 .. code-block:: yaml
383
384 - name: Download example database
385 uses: dawidd6/action-download-artifact@v2.24.3
386 with:
387 name: hypothesis-example-db
388 path: .hypothesis/examples
389 if_no_artifact_found: warn
390 workflow_conclusion: completed
391
392 - name: Run tests
393 run: pytest
394
395 - name: Upload example database
396 uses: actions/upload-artifact@v3
397 if: always()
398 with:
399 name: hypothesis-example-db
400 path: .hypothesis/examples
401
402 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_
403 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_
404 does not support downloading artifacts from previous workflow runs.
405
406 The database automatically implements a simple file-based cache with a default expiration period
407 of 1 day. You can adjust this through the ``cache_timeout`` property.
408
409 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``).
410 """
411
412 def __init__(
413 self,
414 owner: str,
415 repo: str,
416 artifact_name: str = "hypothesis-example-db",
417 cache_timeout: timedelta = timedelta(days=1),
418 path: Optional[os.PathLike] = None,
419 ):
420 self.owner = owner
421 self.repo = repo
422 self.artifact_name = artifact_name
423 self.cache_timeout = cache_timeout
424
425 # Get the GitHub token from the environment
426 # It's unnecessary to use a token if the repo is public
427 self.token: Optional[str] = getenv("GITHUB_TOKEN")
428
429 if path is None:
430 self.path: Path = Path(
431 storage_directory(f"github-artifacts/{self.artifact_name}/")
432 )
433 else:
434 self.path = Path(path)
435
436 # We don't want to initialize the cache until we need to
437 self._initialized: bool = False
438 self._disabled: bool = False
439
440 # This is the path to the artifact in usage
441 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip
442 self._artifact: Optional[Path] = None
443 # This caches the artifact structure
444 self._access_cache: Optional[Dict[PurePath, Set[PurePath]]] = None
445
446 # Message to display if user doesn't wrap around ReadOnlyDatabase
447 self._read_only_message = (
448 "This database is read-only. "
449 "Please wrap this class with ReadOnlyDatabase"
450 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))."
451 )
452
453 def __repr__(self) -> str:
454 return (
455 f"GitHubArtifactDatabase(owner={self.owner!r}, "
456 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})"
457 )
458
459 def _prepare_for_io(self) -> None:
460 assert self._artifact is not None, "Artifact not loaded."
461
462 if self._initialized: # pragma: no cover
463 return
464
465 # Test that the artifact is valid
466 try:
467 with ZipFile(self._artifact) as f:
468 if f.testzip(): # pragma: no cover
469 raise BadZipFile
470
471 # Turns out that testzip() doesn't work quite well
472 # doing the cache initialization here instead
473 # will give us more coverage of the artifact.
474
475 # Cache the files inside each keypath
476 self._access_cache = {}
477 with ZipFile(self._artifact) as zf:
478 namelist = zf.namelist()
479 # Iterate over files in the artifact
480 for filename in namelist:
481 fileinfo = zf.getinfo(filename)
482 if fileinfo.is_dir():
483 self._access_cache[PurePath(filename)] = set()
484 else:
485 # Get the keypath from the filename
486 keypath = PurePath(filename).parent
487 # Add the file to the keypath
488 self._access_cache[keypath].add(PurePath(filename))
489 except BadZipFile:
490 warnings.warn(
491 "The downloaded artifact from GitHub is invalid. "
492 "This could be because the artifact was corrupted, "
493 "or because the artifact was not created by Hypothesis. ",
494 HypothesisWarning,
495 stacklevel=3,
496 )
497 self._disabled = True
498
499 self._initialized = True
500
501 def _initialize_db(self) -> None:
502 # Trigger warning that we suppressed earlier by intent_to_write=False
503 storage_directory(self.path.name)
504 # Create the cache directory if it doesn't exist
505 self.path.mkdir(exist_ok=True, parents=True)
506
507 # Get all artifacts
508 cached_artifacts = sorted(
509 self.path.glob("*.zip"),
510 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")),
511 )
512
513 # Remove all but the latest artifact
514 for artifact in cached_artifacts[:-1]:
515 artifact.unlink()
516
517 try:
518 found_artifact = cached_artifacts[-1]
519 except IndexError:
520 found_artifact = None
521
522 # Check if the latest artifact is a cache hit
523 if found_artifact is not None and (
524 datetime.now(timezone.utc)
525 - datetime.fromisoformat(found_artifact.stem.replace("_", ":"))
526 < self.cache_timeout
527 ):
528 self._artifact = found_artifact
529 else:
530 # Download the latest artifact from GitHub
531 new_artifact = self._fetch_artifact()
532
533 if new_artifact:
534 if found_artifact is not None:
535 found_artifact.unlink()
536 self._artifact = new_artifact
537 elif found_artifact is not None:
538 warnings.warn(
539 "Using an expired artifact as a fallback for the database: "
540 f"{found_artifact}",
541 HypothesisWarning,
542 stacklevel=2,
543 )
544 self._artifact = found_artifact
545 else:
546 warnings.warn(
547 "Couldn't acquire a new or existing artifact. Disabling database.",
548 HypothesisWarning,
549 stacklevel=2,
550 )
551 self._disabled = True
552 return
553
554 self._prepare_for_io()
555
556 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover
557 request = Request(
558 url,
559 headers={
560 "Accept": "application/vnd.github+json",
561 "X-GitHub-Api-Version": "2022-11-28 ",
562 "Authorization": f"Bearer {self.token}",
563 },
564 )
565 warning_message = None
566 response_bytes: Optional[bytes] = None
567 try:
568 with urlopen(request) as response:
569 response_bytes = response.read()
570 except HTTPError as e:
571 if e.code == 401:
572 warning_message = (
573 "Authorization failed when trying to download artifact from GitHub. "
574 "Check that you have a valid GITHUB_TOKEN set in your environment."
575 )
576 else:
577 warning_message = (
578 "Could not get the latest artifact from GitHub. "
579 "This could be because because the repository "
580 "or artifact does not exist. "
581 )
582 except URLError:
583 warning_message = "Could not connect to GitHub to get the latest artifact. "
584 except TimeoutError:
585 warning_message = (
586 "Could not connect to GitHub to get the latest artifact "
587 "(connection timed out)."
588 )
589
590 if warning_message is not None:
591 warnings.warn(warning_message, HypothesisWarning, stacklevel=4)
592 return None
593
594 return response_bytes
595
596 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover
597 # Get the list of artifacts from GitHub
598 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts"
599 response_bytes = self._get_bytes(url)
600 if response_bytes is None:
601 return None
602
603 artifacts = json.loads(response_bytes)["artifacts"]
604 artifacts = [a for a in artifacts if a["name"] == self.artifact_name]
605
606 if not artifacts:
607 return None
608
609 # Get the latest artifact from the list
610 artifact = max(artifacts, key=lambda a: a["created_at"])
611 url = artifact["archive_download_url"]
612
613 # Download the artifact
614 artifact_bytes = self._get_bytes(url)
615 if artifact_bytes is None:
616 return None
617
618 # Save the artifact to the cache
619 # We replace ":" with "_" to ensure the filenames are compatible
620 # with Windows filesystems
621 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_")
622 artifact_path = self.path / f"{timestamp}.zip"
623 try:
624 artifact_path.write_bytes(artifact_bytes)
625 except OSError:
626 warnings.warn(
627 "Could not save the latest artifact from GitHub. ",
628 HypothesisWarning,
629 stacklevel=3,
630 )
631 return None
632
633 return artifact_path
634
635 @staticmethod
636 @lru_cache
637 def _key_path(key: bytes) -> PurePath:
638 return PurePath(_hash(key) + "/")
639
640 def fetch(self, key: bytes) -> Iterable[bytes]:
641 if self._disabled:
642 return
643
644 if not self._initialized:
645 self._initialize_db()
646 if self._disabled:
647 return
648
649 assert self._artifact is not None
650 assert self._access_cache is not None
651
652 kp = self._key_path(key)
653
654 with ZipFile(self._artifact) as zf:
655 # Get the all files in the the kp from the cache
656 filenames = self._access_cache.get(kp, ())
657 for filename in filenames:
658 with zf.open(filename.as_posix()) as f:
659 yield f.read()
660
661 # Read-only interface
662 def save(self, key: bytes, value: bytes) -> None:
663 raise RuntimeError(self._read_only_message)
664
665 def move(self, src: bytes, dest: bytes, value: bytes) -> None:
666 raise RuntimeError(self._read_only_message)
667
668 def delete(self, key: bytes, value: bytes) -> None:
669 raise RuntimeError(self._read_only_message)