Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/database.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

463 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import abc 

12import errno 

13import json 

14import os 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import weakref 

20from collections.abc import Callable, Iterable 

21from datetime import datetime, timedelta, timezone 

22from functools import lru_cache 

23from hashlib import sha384 

24from os import PathLike, getenv 

25from pathlib import Path, PurePath 

26from queue import Queue 

27from threading import Thread 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 ClassVar, 

32 Literal, 

33 TypeAlias, 

34 cast, 

35) 

36from urllib.error import HTTPError, URLError 

37from urllib.request import Request, urlopen 

38from zipfile import BadZipFile, ZipFile 

39 

40from hypothesis.configuration import storage_directory 

41from hypothesis.errors import HypothesisException, HypothesisWarning 

42from hypothesis.internal.conjecture.choice import ChoiceT 

43from hypothesis.utils.conventions import UniqueIdentifier, not_set 

44from hypothesis.utils.deprecation import note_deprecation 

45 

46__all__ = [ 

47 "DirectoryBasedExampleDatabase", 

48 "ExampleDatabase", 

49 "GitHubArtifactDatabase", 

50 "InMemoryExampleDatabase", 

51 "MultiplexedDatabase", 

52 "ReadOnlyDatabase", 

53] 

54 

55if TYPE_CHECKING: 

56 from watchdog.observers.api import BaseObserver 

57 

58StrPathT: TypeAlias = str | PathLike[str] 

59SaveDataT: TypeAlias = tuple[bytes, bytes] # key, value 

60DeleteDataT: TypeAlias = tuple[bytes, bytes | None] # key, value 

61ListenerEventT: TypeAlias = ( 

62 tuple[Literal["save"], SaveDataT] | tuple[Literal["delete"], DeleteDataT] 

63) 

64ListenerT: TypeAlias = Callable[[ListenerEventT], Any] 

65 

66 

67def _usable_dir(path: StrPathT) -> bool: 

68 """ 

69 Returns True if the desired path can be used as database path because 

70 either the directory exists and can be used, or its root directory can 

71 be used and we can make the directory as needed. 

72 """ 

73 path = Path(path) 

74 try: 

75 while not path.exists(): 

76 # Loop terminates because the root dir ('/' on unix) always exists. 

77 path = path.parent 

78 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK) 

79 except PermissionError: # pragma: no cover 

80 # path.exists() returns False on 3.14+ instead of raising. See 

81 # https://docs.python.org/3.14/library/pathlib.html#querying-file-type-and-status 

82 return False 

83 

84 

85def _db_for_path( 

86 path: StrPathT | UniqueIdentifier | Literal[":memory:"] | None = None, 

87) -> "ExampleDatabase": 

88 if path is not_set: 

89 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover 

90 raise HypothesisException( 

91 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any " 

92 "effect. Configure your database location via a settings profile instead.\n" 

93 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles" 

94 ) 

95 

96 path = storage_directory("examples", intent_to_write=False) 

97 if not _usable_dir(path): # pragma: no cover 

98 warnings.warn( 

99 "The database setting is not configured, and the default " 

100 "location is unusable - falling back to an in-memory " 

101 f"database for this session. {path=}", 

102 HypothesisWarning, 

103 stacklevel=3, 

104 ) 

105 return InMemoryExampleDatabase() 

106 if path in (None, ":memory:"): 

107 return InMemoryExampleDatabase() 

108 path = cast(StrPathT, path) 

109 return DirectoryBasedExampleDatabase(path) 

110 

111 

112class _EDMeta(abc.ABCMeta): 

113 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase": 

114 if self is ExampleDatabase: 

115 note_deprecation( 

116 "Creating a database using the abstract ExampleDatabase() class " 

117 "is deprecated. Prefer using a concrete subclass, like " 

118 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). " 

119 'In particular, the special string ExampleDatabase(":memory:") ' 

120 "should be replaced by InMemoryExampleDatabase().", 

121 since="2025-04-07", 

122 has_codemod=False, 

123 ) 

124 return _db_for_path(*args, **kwargs) 

125 return super().__call__(*args, **kwargs) 

126 

127 

128# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase 

129# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx 

130# maintains a list of metaclass-call-methods to ignore, and while they would prefer 

131# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we 

132# can insert ourselves here. 

133# 

134# This code only runs if Sphinx has already been imported; and it would live in our 

135# docs/conf.py except that we would also like it to work for anyone documenting 

136# downstream ExampleDatabase subclasses too. 

137# 

138# We avoid type-checking this block due to this combination facts: 

139# * our check-types-api CI job runs under 3.14 

140# * tools.txt therefore pins to a newer version of sphinx which uses 3.12+ `type` 

141# syntax 

142# * in test_mypy.py, mypy sees this block, sees sphinx is installed, tries parsing 

143# sphinx code, and errors 

144# 

145# Putting `and not TYPE_CHECKING` here is just a convenience for our testing setup 

146# (because we don't split mypy tests by running CI version, eg), not for runtime 

147# behavior. 

148if "sphinx" in sys.modules and not TYPE_CHECKING: # pragma: no cover 

149 try: 

150 import sphinx.ext.autodoc 

151 

152 signature = "hypothesis.database._EDMeta.__call__" 

153 

154 # _METACLASS_CALL_BLACKLIST moved in newer sphinx versions 

155 try: 

156 import sphinx.ext.autodoc._dynamic._signatures as _module 

157 except ImportError: 

158 _module = sphinx.ext.autodoc 

159 

160 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions 

161 if isinstance(_module._METACLASS_CALL_BLACKLIST, frozenset): 

162 _module._METACLASS_CALL_BLACKLIST = _module._METACLASS_CALL_BLACKLIST | { 

163 signature 

164 } 

165 else: 

166 _module._METACLASS_CALL_BLACKLIST.append(signature) 

167 except Exception: 

168 pass 

169 

170 

171class ExampleDatabase(metaclass=_EDMeta): 

172 """ 

173 A Hypothesis database, for use in |settings.database|. 

174 

175 Hypothesis automatically saves failures to the database set in 

176 |settings.database|. The next time the test is run, Hypothesis will replay 

177 any failures from the database in |settings.database| for that test (in 

178 |Phase.reuse|). 

179 

180 The database is best thought of as a cache that you never need to invalidate. 

181 Entries may be transparently dropped when upgrading your Hypothesis version 

182 or changing your test. Do not rely on the database for correctness; to ensure 

183 Hypothesis always tries an input, use |@example|. 

184 

185 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis 

186 provides several concrete database subclasses. To write your own database class, 

187 see :doc:`/how-to/custom-database`. 

188 

189 Change listening 

190 ---------------- 

191 

192 An optional extension to |ExampleDatabase| is change listening. On databases 

193 which support change listening, calling |ExampleDatabase.add_listener| adds 

194 a function as a change listener, which will be called whenever a value is 

195 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener| 

196 for details. 

197 

198 All databases in Hypothesis support change listening. Custom database classes 

199 are not required to support change listening, though they will not be compatible 

200 with features that require change listening until they do so. 

201 

202 .. note:: 

203 

204 While no Hypothesis features currently require change listening, change 

205 listening is required by `HypoFuzz <https://hypofuzz.com/>`_. 

206 

207 Database methods 

208 ---------------- 

209 

210 Required methods: 

211 

212 * |ExampleDatabase.save| 

213 * |ExampleDatabase.fetch| 

214 * |ExampleDatabase.delete| 

215 

216 Optional methods: 

217 

218 * |ExampleDatabase.move| 

219 

220 Change listening methods: 

221 

222 * |ExampleDatabase.add_listener| 

223 * |ExampleDatabase.remove_listener| 

224 * |ExampleDatabase.clear_listeners| 

225 * |ExampleDatabase._start_listening| 

226 * |ExampleDatabase._stop_listening| 

227 * |ExampleDatabase._broadcast_change| 

228 """ 

229 

230 def __init__(self) -> None: 

231 self._listeners: list[ListenerT] = [] 

232 

233 @abc.abstractmethod 

234 def save(self, key: bytes, value: bytes) -> None: 

235 """Save ``value`` under ``key``. 

236 

237 If ``value`` is already present in ``key``, silently do nothing. 

238 """ 

239 raise NotImplementedError(f"{type(self).__name__}.save") 

240 

241 @abc.abstractmethod 

242 def fetch(self, key: bytes) -> Iterable[bytes]: 

243 """Return an iterable over all values matching this key.""" 

244 raise NotImplementedError(f"{type(self).__name__}.fetch") 

245 

246 @abc.abstractmethod 

247 def delete(self, key: bytes, value: bytes) -> None: 

248 """Remove ``value`` from ``key``. 

249 

250 If ``value`` is not present in ``key``, silently do nothing. 

251 """ 

252 raise NotImplementedError(f"{type(self).__name__}.delete") 

253 

254 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

255 """ 

256 Move ``value`` from key ``src`` to key ``dest``. 

257 

258 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``, 

259 but may have a more efficient implementation. 

260 

261 Note that ``value`` will be inserted at ``dest`` regardless of whether 

262 it is currently present at ``src``. 

263 """ 

264 if src == dest: 

265 self.save(src, value) 

266 return 

267 self.delete(src, value) 

268 self.save(dest, value) 

269 

270 def add_listener(self, f: ListenerT, /) -> None: 

271 """ 

272 Add a change listener. ``f`` will be called whenever a value is saved, 

273 deleted, or moved in the database. 

274 

275 ``f`` can be called with two different event values: 

276 

277 * ``("save", (key, value))`` 

278 * ``("delete", (key, value))`` 

279 

280 where ``key`` and ``value`` are both ``bytes``. 

281 

282 There is no ``move`` event. Instead, a move is broadcasted as a 

283 ``delete`` event followed by a ``save`` event. 

284 

285 For the ``delete`` event, ``value`` may be ``None``. This might occur if 

286 the database knows that a deletion has occurred in ``key``, but does not 

287 know what value was deleted. 

288 """ 

289 had_listeners = bool(self._listeners) 

290 self._listeners.append(f) 

291 if not had_listeners: 

292 self._start_listening() 

293 

294 def remove_listener(self, f: ListenerT, /) -> None: 

295 """ 

296 Removes ``f`` from the list of change listeners. 

297 

298 If ``f`` is not in the list of change listeners, silently do nothing. 

299 """ 

300 if f not in self._listeners: 

301 return 

302 self._listeners.remove(f) 

303 if not self._listeners: 

304 self._stop_listening() 

305 

306 def clear_listeners(self) -> None: 

307 """Remove all change listeners.""" 

308 had_listeners = bool(self._listeners) 

309 self._listeners.clear() 

310 if had_listeners: 

311 self._stop_listening() 

312 

313 def _broadcast_change(self, event: ListenerEventT) -> None: 

314 """ 

315 Called when a value has been either added to or deleted from a key in 

316 the underlying database store. The possible values for ``event`` are: 

317 

318 * ``("save", (key, value))`` 

319 * ``("delete", (key, value))`` 

320 

321 ``value`` may be ``None`` for the ``delete`` event, indicating we know 

322 that some value was deleted under this key, but not its exact value. 

323 

324 Note that you should not assume your instance is the only reference to 

325 the underlying database store. For example, if two instances of 

326 |DirectoryBasedExampleDatabase| reference the same directory, 

327 _broadcast_change should be called whenever a file is added or removed 

328 from the directory, even if that database was not responsible for 

329 changing the file. 

330 """ 

331 for listener in self._listeners: 

332 listener(event) 

333 

334 def _start_listening(self) -> None: 

335 """ 

336 Called when the database adds a change listener, and did not previously 

337 have any change listeners. Intended to allow databases to wait to start 

338 expensive listening operations until necessary. 

339 

340 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate, 

341 so you do not need to handle the case of multiple consecutive 

342 ``_start_listening`` calls without an intermediate ``_stop_listening`` 

343 call. 

344 """ 

345 warnings.warn( 

346 f"{self.__class__} does not support listening for changes", 

347 HypothesisWarning, 

348 stacklevel=4, 

349 ) 

350 

351 def _stop_listening(self) -> None: 

352 """ 

353 Called whenever no change listeners remain on the database. 

354 

355 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate, 

356 so you do not need to handle the case of multiple consecutive 

357 ``_stop_listening`` calls without an intermediate ``_start_listening`` 

358 call. 

359 """ 

360 warnings.warn( 

361 f"{self.__class__} does not support stopping listening for changes", 

362 HypothesisWarning, 

363 stacklevel=4, 

364 ) 

365 

366 

367class InMemoryExampleDatabase(ExampleDatabase): 

368 """A non-persistent example database, implemented in terms of an in-memory 

369 dictionary. 

370 

371 This can be useful if you call a test function several times in a single 

372 session, or for testing other database implementations, but because it 

373 does not persist between runs we do not recommend it for general use. 

374 """ 

375 

376 def __init__(self) -> None: 

377 super().__init__() 

378 self.data: dict[bytes, set[bytes]] = {} 

379 

380 def __repr__(self) -> str: 

381 return f"InMemoryExampleDatabase({self.data!r})" 

382 

383 def __eq__(self, other: object) -> bool: 

384 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data 

385 

386 def fetch(self, key: bytes) -> Iterable[bytes]: 

387 yield from self.data.get(key, ()) 

388 

389 def save(self, key: bytes, value: bytes) -> None: 

390 value = bytes(value) 

391 values = self.data.setdefault(key, set()) 

392 changed = value not in values 

393 values.add(value) 

394 

395 if changed: 

396 self._broadcast_change(("save", (key, value))) 

397 

398 def delete(self, key: bytes, value: bytes) -> None: 

399 value = bytes(value) 

400 values = self.data.get(key, set()) 

401 changed = value in values 

402 values.discard(value) 

403 

404 if changed: 

405 self._broadcast_change(("delete", (key, value))) 

406 

407 def _start_listening(self) -> None: 

408 # declare compatibility with the listener api, but do the actual 

409 # implementation in .delete and .save, since we know we are the only 

410 # writer to .data. 

411 pass 

412 

413 def _stop_listening(self) -> None: 

414 pass 

415 

416 

417def _hash(key: bytes) -> str: 

418 return sha384(key).hexdigest()[:16] 

419 

420 

421class DirectoryBasedExampleDatabase(ExampleDatabase): 

422 """Use a directory to store Hypothesis examples as files. 

423 

424 Each test corresponds to a directory, and each example to a file within that 

425 directory. While the contents are fairly opaque, a 

426 |DirectoryBasedExampleDatabase| can be shared by checking the directory 

427 into version control, for example with the following ``.gitignore``:: 

428 

429 # Ignore files cached by Hypothesis... 

430 .hypothesis/* 

431 # except for the examples directory 

432 !.hypothesis/examples/ 

433 

434 Note however that this only makes sense if you also pin to an exact version of 

435 Hypothesis, and we would usually recommend implementing a shared database with 

436 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper. 

437 """ 

438 

439 # we keep a database entry of the full values of all the database keys. 

440 # currently only used for inverse mapping of hash -> key in change listening. 

441 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys" 

442 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name) 

443 

444 def __init__(self, path: StrPathT) -> None: 

445 super().__init__() 

446 self.path = Path(path) 

447 self.keypaths: dict[bytes, Path] = {} 

448 self._observer: BaseObserver | None = None 

449 

450 def __repr__(self) -> str: 

451 return f"DirectoryBasedExampleDatabase({self.path!r})" 

452 

453 def __eq__(self, other: object) -> bool: 

454 return ( 

455 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path 

456 ) 

457 

458 def _key_path(self, key: bytes) -> Path: 

459 try: 

460 return self.keypaths[key] 

461 except KeyError: 

462 pass 

463 self.keypaths[key] = self.path / _hash(key) 

464 return self.keypaths[key] 

465 

466 def _value_path(self, key: bytes, value: bytes) -> Path: 

467 return self._key_path(key) / _hash(value) 

468 

469 def fetch(self, key: bytes) -> Iterable[bytes]: 

470 kp = self._key_path(key) 

471 if not kp.is_dir(): 

472 return 

473 

474 try: 

475 for path in os.listdir(kp): 

476 try: 

477 yield (kp / path).read_bytes() 

478 except OSError: 

479 pass 

480 except OSError: # pragma: no cover 

481 # the `kp` directory might have been deleted in the meantime 

482 pass 

483 

484 def save(self, key: bytes, value: bytes) -> None: 

485 key_path = self._key_path(key) 

486 if key_path.name != self._metakeys_hash: 

487 # add this key to our meta entry of all keys - taking care to avoid 

488 # infinite recursion. 

489 self.save(self._metakeys_name, key) 

490 

491 # Note: we attempt to create the dir in question now. We 

492 # already checked for permissions, but there can still be other issues, 

493 # e.g. the disk is full, or permissions might have been changed. 

494 try: 

495 key_path.mkdir(exist_ok=True, parents=True) 

496 path = self._value_path(key, value) 

497 if not path.exists(): 

498 # to mimic an atomic write, create and write in a temporary 

499 # directory, and only move to the final path after. This avoids 

500 # any intermediate state where the file is created (and empty) 

501 # but not yet written to. 

502 fd, tmpname = tempfile.mkstemp() 

503 tmppath = Path(tmpname) 

504 os.write(fd, value) 

505 os.close(fd) 

506 try: 

507 tmppath.rename(path) 

508 except OSError as err: # pragma: no cover 

509 if err.errno == errno.EXDEV: 

510 # Can't rename across filesystem boundaries, see e.g. 

511 # https://github.com/HypothesisWorks/hypothesis/issues/4335 

512 try: 

513 path.write_bytes(tmppath.read_bytes()) 

514 except OSError: 

515 pass 

516 tmppath.unlink() 

517 assert not tmppath.exists() 

518 except OSError: # pragma: no cover 

519 pass 

520 

521 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

522 if src == dest: 

523 self.save(src, value) 

524 return 

525 

526 src_path = self._value_path(src, value) 

527 dest_path = self._value_path(dest, value) 

528 # if the dest key path does not exist, os.renames will create it for us, 

529 # and we will never track its creation in the meta keys entry. Do so now. 

530 if not self._key_path(dest).exists(): 

531 self.save(self._metakeys_name, dest) 

532 

533 try: 

534 os.renames(src_path, dest_path) 

535 except OSError: 

536 self.delete(src, value) 

537 self.save(dest, value) 

538 

539 def delete(self, key: bytes, value: bytes) -> None: 

540 try: 

541 self._value_path(key, value).unlink() 

542 except OSError: 

543 return 

544 

545 # try deleting the key dir, which will only succeed if the dir is empty 

546 # (i.e. ``value`` was the last value in this key). 

547 try: 

548 self._key_path(key).rmdir() 

549 except OSError: 

550 pass 

551 else: 

552 # if the deletion succeeded, also delete this key entry from metakeys. 

553 # (if this key happens to be the metakey itself, this deletion will 

554 # fail; that's ok and faster than checking for this rare case.) 

555 self.delete(self._metakeys_name, key) 

556 

557 def _start_listening(self) -> None: 

558 try: 

559 from watchdog.events import ( 

560 DirCreatedEvent, 

561 DirDeletedEvent, 

562 DirMovedEvent, 

563 FileCreatedEvent, 

564 FileDeletedEvent, 

565 FileMovedEvent, 

566 FileSystemEventHandler, 

567 ) 

568 from watchdog.observers import Observer 

569 except ImportError: 

570 warnings.warn( 

571 f"listening for changes in a {self.__class__.__name__} " 

572 "requires the watchdog library. To install, run " 

573 "`pip install hypothesis[watchdog]`", 

574 HypothesisWarning, 

575 stacklevel=4, 

576 ) 

577 return 

578 

579 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)} 

580 _metakeys_hash = self._metakeys_hash 

581 _broadcast_change = self._broadcast_change 

582 

583 class Handler( 

584 FileSystemEventHandler 

585 ): # pragma: no cover # skipped in test_database.py for now 

586 def on_created(_self, event: FileCreatedEvent | DirCreatedEvent) -> None: 

587 # we only registered for the file creation event 

588 assert not isinstance(event, DirCreatedEvent) 

589 # watchdog events are only bytes if we passed a byte path to 

590 # .schedule 

591 assert isinstance(event.src_path, str) 

592 

593 value_path = Path(event.src_path) 

594 # the parent dir represents the key, and its name is the key hash 

595 key_hash = value_path.parent.name 

596 

597 if key_hash == _metakeys_hash: 

598 try: 

599 hash_to_key[value_path.name] = value_path.read_bytes() 

600 except OSError: # pragma: no cover 

601 # this might occur if all the values in a key have been 

602 # deleted and DirectoryBasedExampleDatabase removes its 

603 # metakeys entry (which is `value_path` here`). 

604 pass 

605 return 

606 

607 key = hash_to_key.get(key_hash) 

608 if key is None: # pragma: no cover 

609 # we didn't recognize this key. This shouldn't ever happen, 

610 # but some race condition trickery might cause this. 

611 return 

612 

613 try: 

614 value = value_path.read_bytes() 

615 except OSError: # pragma: no cover 

616 return 

617 

618 _broadcast_change(("save", (key, value))) 

619 

620 def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None: 

621 assert not isinstance(event, DirDeletedEvent) 

622 assert isinstance(event.src_path, str) 

623 

624 value_path = Path(event.src_path) 

625 key = hash_to_key.get(value_path.parent.name) 

626 if key is None: # pragma: no cover 

627 return 

628 

629 _broadcast_change(("delete", (key, None))) 

630 

631 def on_moved(self, event: FileMovedEvent | DirMovedEvent) -> None: 

632 assert not isinstance(event, DirMovedEvent) 

633 assert isinstance(event.src_path, str) 

634 assert isinstance(event.dest_path, str) 

635 

636 src_path = Path(event.src_path) 

637 dest_path = Path(event.dest_path) 

638 k1 = hash_to_key.get(src_path.parent.name) 

639 k2 = hash_to_key.get(dest_path.parent.name) 

640 

641 if k1 is None or k2 is None: # pragma: no cover 

642 return 

643 

644 try: 

645 value = dest_path.read_bytes() 

646 except OSError: # pragma: no cover 

647 return 

648 

649 _broadcast_change(("delete", (k1, value))) 

650 _broadcast_change(("save", (k2, value))) 

651 

652 # If we add a listener to a DirectoryBasedExampleDatabase whose database 

653 # directory doesn't yet exist, the watchdog observer will not fire any 

654 # events, even after the directory gets created. 

655 # 

656 # Ensure the directory exists before starting the observer. 

657 self.path.mkdir(exist_ok=True, parents=True) 

658 self._observer = Observer() 

659 self._observer.schedule( 

660 Handler(), 

661 # remove type: ignore when released 

662 # https://github.com/gorakhargosh/watchdog/pull/1096 

663 self.path, # type: ignore 

664 recursive=True, 

665 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent], 

666 ) 

667 self._observer.start() 

668 

669 def _stop_listening(self) -> None: 

670 assert self._observer is not None 

671 self._observer.stop() 

672 self._observer.join() 

673 self._observer = None 

674 

675 

676class ReadOnlyDatabase(ExampleDatabase): 

677 """A wrapper to make the given database read-only. 

678 

679 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and 

680 ``move`` into silent no-ops. 

681 

682 Note that this disables Hypothesis' automatic discarding of stale examples. 

683 It is designed to allow local machines to access a shared database (e.g. from CI 

684 servers), without propagating changes back from a local or in-development branch. 

685 """ 

686 

687 def __init__(self, db: ExampleDatabase) -> None: 

688 super().__init__() 

689 assert isinstance(db, ExampleDatabase) 

690 self._wrapped = db 

691 

692 def __repr__(self) -> str: 

693 return f"ReadOnlyDatabase({self._wrapped!r})" 

694 

695 def __eq__(self, other: object) -> bool: 

696 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped 

697 

698 def fetch(self, key: bytes) -> Iterable[bytes]: 

699 yield from self._wrapped.fetch(key) 

700 

701 def save(self, key: bytes, value: bytes) -> None: 

702 pass 

703 

704 def delete(self, key: bytes, value: bytes) -> None: 

705 pass 

706 

707 def _start_listening(self) -> None: 

708 # we're read only, so there are no changes to broadcast. 

709 pass 

710 

711 def _stop_listening(self) -> None: 

712 pass 

713 

714 

715class MultiplexedDatabase(ExampleDatabase): 

716 """A wrapper around multiple databases. 

717 

718 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against 

719 all of the wrapped databases. ``fetch`` does not yield duplicate values, even 

720 if the same value is present in two or more of the wrapped databases. 

721 

722 This combines well with a :class:`ReadOnlyDatabase`, as follows: 

723 

724 .. code-block:: python 

725 

726 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/") 

727 shared = CustomNetworkDatabase() 

728 

729 settings.register_profile("ci", database=shared) 

730 settings.register_profile( 

731 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared)) 

732 ) 

733 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

734 

735 So your CI system or fuzzing runs can populate a central shared database; 

736 while local runs on development machines can reproduce any failures from CI 

737 but will only cache their own failures locally and cannot remove examples 

738 from the shared database. 

739 """ 

740 

741 def __init__(self, *dbs: ExampleDatabase) -> None: 

742 super().__init__() 

743 assert all(isinstance(db, ExampleDatabase) for db in dbs) 

744 self._wrapped = dbs 

745 

746 def __repr__(self) -> str: 

747 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped))) 

748 

749 def __eq__(self, other: object) -> bool: 

750 return ( 

751 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped 

752 ) 

753 

754 def fetch(self, key: bytes) -> Iterable[bytes]: 

755 seen = set() 

756 for db in self._wrapped: 

757 for value in db.fetch(key): 

758 if value not in seen: 

759 yield value 

760 seen.add(value) 

761 

762 def save(self, key: bytes, value: bytes) -> None: 

763 for db in self._wrapped: 

764 db.save(key, value) 

765 

766 def delete(self, key: bytes, value: bytes) -> None: 

767 for db in self._wrapped: 

768 db.delete(key, value) 

769 

770 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

771 for db in self._wrapped: 

772 db.move(src, dest, value) 

773 

774 def _start_listening(self) -> None: 

775 for db in self._wrapped: 

776 db.add_listener(self._broadcast_change) 

777 

778 def _stop_listening(self) -> None: 

779 for db in self._wrapped: 

780 db.remove_listener(self._broadcast_change) 

781 

782 

783class GitHubArtifactDatabase(ExampleDatabase): 

784 """ 

785 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact. 

786 

787 You can use this for sharing example databases between CI runs and developers, allowing 

788 the latter to get read-only access to the former. This is particularly useful for 

789 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_), 

790 where the CI system can help find new failing examples through fuzzing, 

791 and developers can reproduce them locally without any manual effort. 

792 

793 .. note:: 

794 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides 

795 this automatically, but it needs to be set manually for local usage. In a developer machine, 

796 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_. 

797 If the repository is private, it's necessary for the token to have ``repo`` scope 

798 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token. 

799 

800 

801 In most cases, this will be used 

802 through the :class:`~hypothesis.database.MultiplexedDatabase`, 

803 by combining a local directory-based database with this one. For example: 

804 

805 .. code-block:: python 

806 

807 local = DirectoryBasedExampleDatabase(".hypothesis/examples") 

808 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo")) 

809 

810 settings.register_profile("ci", database=local) 

811 settings.register_profile("dev", database=MultiplexedDatabase(local, shared)) 

812 # We don't want to use the shared database in CI, only to populate its local one. 

813 # which the workflow should then upload as an artifact. 

814 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

815 

816 .. note:: 

817 Because this database is read-only, you always need to wrap it with the 

818 :class:`ReadOnlyDatabase`. 

819 

820 A setup like this can be paired with a GitHub Actions workflow including 

821 something like the following: 

822 

823 .. code-block:: yaml 

824 

825 - name: Download example database 

826 uses: dawidd6/action-download-artifact@v9 

827 with: 

828 name: hypothesis-example-db 

829 path: .hypothesis/examples 

830 if_no_artifact_found: warn 

831 workflow_conclusion: completed 

832 

833 - name: Run tests 

834 run: pytest 

835 

836 - name: Upload example database 

837 uses: actions/upload-artifact@v3 

838 if: always() 

839 with: 

840 name: hypothesis-example-db 

841 path: .hypothesis/examples 

842 

843 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_ 

844 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_ 

845 does not support downloading artifacts from previous workflow runs. 

846 

847 The database automatically implements a simple file-based cache with a default expiration period 

848 of 1 day. You can adjust this through the ``cache_timeout`` property. 

849 

850 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``). 

851 """ 

852 

853 def __init__( 

854 self, 

855 owner: str, 

856 repo: str, 

857 artifact_name: str = "hypothesis-example-db", 

858 cache_timeout: timedelta = timedelta(days=1), 

859 path: StrPathT | None = None, 

860 ): 

861 super().__init__() 

862 self.owner = owner 

863 self.repo = repo 

864 self.artifact_name = artifact_name 

865 self.cache_timeout = cache_timeout 

866 

867 # Get the GitHub token from the environment 

868 # It's unnecessary to use a token if the repo is public 

869 self.token: str | None = getenv("GITHUB_TOKEN") 

870 

871 if path is None: 

872 self.path: Path = Path( 

873 storage_directory(f"github-artifacts/{self.artifact_name}/") 

874 ) 

875 else: 

876 self.path = Path(path) 

877 

878 # We don't want to initialize the cache until we need to 

879 self._initialized: bool = False 

880 self._disabled: bool = False 

881 

882 # This is the path to the artifact in usage 

883 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip 

884 self._artifact: Path | None = None 

885 # This caches the artifact structure 

886 self._access_cache: dict[PurePath, set[PurePath]] | None = None 

887 

888 # Message to display if user doesn't wrap around ReadOnlyDatabase 

889 self._read_only_message = ( 

890 "This database is read-only. " 

891 "Please wrap this class with ReadOnlyDatabase" 

892 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))." 

893 ) 

894 

895 def __repr__(self) -> str: 

896 return ( 

897 f"GitHubArtifactDatabase(owner={self.owner!r}, " 

898 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})" 

899 ) 

900 

901 def __eq__(self, other: object) -> bool: 

902 return ( 

903 isinstance(other, GitHubArtifactDatabase) 

904 and self.owner == other.owner 

905 and self.repo == other.repo 

906 and self.artifact_name == other.artifact_name 

907 and self.path == other.path 

908 ) 

909 

910 def _prepare_for_io(self) -> None: 

911 assert self._artifact is not None, "Artifact not loaded." 

912 

913 if self._initialized: # pragma: no cover 

914 return 

915 

916 # Test that the artifact is valid 

917 try: 

918 with ZipFile(self._artifact) as f: 

919 if f.testzip(): # pragma: no cover 

920 raise BadZipFile 

921 

922 # Turns out that testzip() doesn't work quite well 

923 # doing the cache initialization here instead 

924 # will give us more coverage of the artifact. 

925 

926 # Cache the files inside each keypath 

927 self._access_cache = {} 

928 with ZipFile(self._artifact) as zf: 

929 namelist = zf.namelist() 

930 # Iterate over files in the artifact 

931 for filename in namelist: 

932 fileinfo = zf.getinfo(filename) 

933 if fileinfo.is_dir(): 

934 self._access_cache[PurePath(filename)] = set() 

935 else: 

936 # Get the keypath from the filename 

937 keypath = PurePath(filename).parent 

938 # Add the file to the keypath 

939 self._access_cache[keypath].add(PurePath(filename)) 

940 except BadZipFile: 

941 warnings.warn( 

942 "The downloaded artifact from GitHub is invalid. " 

943 "This could be because the artifact was corrupted, " 

944 "or because the artifact was not created by Hypothesis. ", 

945 HypothesisWarning, 

946 stacklevel=3, 

947 ) 

948 self._disabled = True 

949 

950 self._initialized = True 

951 

952 def _initialize_db(self) -> None: 

953 # Trigger warning that we suppressed earlier by intent_to_write=False 

954 storage_directory(self.path.name) 

955 # Create the cache directory if it doesn't exist 

956 self.path.mkdir(exist_ok=True, parents=True) 

957 

958 # Get all artifacts 

959 cached_artifacts = sorted( 

960 self.path.glob("*.zip"), 

961 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")), 

962 ) 

963 

964 # Remove all but the latest artifact 

965 for artifact in cached_artifacts[:-1]: 

966 artifact.unlink() 

967 

968 try: 

969 found_artifact = cached_artifacts[-1] 

970 except IndexError: 

971 found_artifact = None 

972 

973 # Check if the latest artifact is a cache hit 

974 if found_artifact is not None and ( 

975 datetime.now(timezone.utc) 

976 - datetime.fromisoformat(found_artifact.stem.replace("_", ":")) 

977 < self.cache_timeout 

978 ): 

979 self._artifact = found_artifact 

980 else: 

981 # Download the latest artifact from GitHub 

982 new_artifact = self._fetch_artifact() 

983 

984 if new_artifact: 

985 if found_artifact is not None: 

986 found_artifact.unlink() 

987 self._artifact = new_artifact 

988 elif found_artifact is not None: 

989 warnings.warn( 

990 "Using an expired artifact as a fallback for the database: " 

991 f"{found_artifact}", 

992 HypothesisWarning, 

993 stacklevel=2, 

994 ) 

995 self._artifact = found_artifact 

996 else: 

997 warnings.warn( 

998 "Couldn't acquire a new or existing artifact. Disabling database.", 

999 HypothesisWarning, 

1000 stacklevel=2, 

1001 ) 

1002 self._disabled = True 

1003 return 

1004 

1005 self._prepare_for_io() 

1006 

1007 def _get_bytes(self, url: str) -> bytes | None: # pragma: no cover 

1008 request = Request( 

1009 url, 

1010 headers={ 

1011 "Accept": "application/vnd.github+json", 

1012 "X-GitHub-Api-Version": "2022-11-28 ", 

1013 "Authorization": f"Bearer {self.token}", 

1014 }, 

1015 ) 

1016 warning_message = None 

1017 response_bytes: bytes | None = None 

1018 try: 

1019 with urlopen(request) as response: 

1020 response_bytes = response.read() 

1021 except HTTPError as e: 

1022 if e.code == 401: 

1023 warning_message = ( 

1024 "Authorization failed when trying to download artifact from GitHub. " 

1025 "Check that you have a valid GITHUB_TOKEN set in your environment." 

1026 ) 

1027 else: 

1028 warning_message = ( 

1029 "Could not get the latest artifact from GitHub. " 

1030 "This could be because the repository " 

1031 "or artifact does not exist. " 

1032 ) 

1033 # see https://github.com/python/cpython/issues/128734 

1034 e.close() 

1035 except URLError: 

1036 warning_message = "Could not connect to GitHub to get the latest artifact. " 

1037 except TimeoutError: 

1038 warning_message = ( 

1039 "Could not connect to GitHub to get the latest artifact " 

1040 "(connection timed out)." 

1041 ) 

1042 

1043 if warning_message is not None: 

1044 warnings.warn(warning_message, HypothesisWarning, stacklevel=4) 

1045 return None 

1046 

1047 return response_bytes 

1048 

1049 def _fetch_artifact(self) -> Path | None: # pragma: no cover 

1050 # Get the list of artifacts from GitHub 

1051 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts" 

1052 response_bytes = self._get_bytes(url) 

1053 if response_bytes is None: 

1054 return None 

1055 

1056 artifacts = json.loads(response_bytes)["artifacts"] 

1057 artifacts = [a for a in artifacts if a["name"] == self.artifact_name] 

1058 

1059 if not artifacts: 

1060 return None 

1061 

1062 # Get the latest artifact from the list 

1063 artifact = max(artifacts, key=lambda a: a["created_at"]) 

1064 url = artifact["archive_download_url"] 

1065 

1066 # Download the artifact 

1067 artifact_bytes = self._get_bytes(url) 

1068 if artifact_bytes is None: 

1069 return None 

1070 

1071 # Save the artifact to the cache 

1072 # We replace ":" with "_" to ensure the filenames are compatible 

1073 # with Windows filesystems 

1074 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_") 

1075 artifact_path = self.path / f"{timestamp}.zip" 

1076 try: 

1077 artifact_path.write_bytes(artifact_bytes) 

1078 except OSError: 

1079 warnings.warn( 

1080 "Could not save the latest artifact from GitHub. ", 

1081 HypothesisWarning, 

1082 stacklevel=3, 

1083 ) 

1084 return None 

1085 

1086 return artifact_path 

1087 

1088 @staticmethod 

1089 @lru_cache 

1090 def _key_path(key: bytes) -> PurePath: 

1091 return PurePath(_hash(key) + "/") 

1092 

1093 def fetch(self, key: bytes) -> Iterable[bytes]: 

1094 if self._disabled: 

1095 return 

1096 

1097 if not self._initialized: 

1098 self._initialize_db() 

1099 if self._disabled: 

1100 return 

1101 

1102 assert self._artifact is not None 

1103 assert self._access_cache is not None 

1104 

1105 kp = self._key_path(key) 

1106 

1107 with ZipFile(self._artifact) as zf: 

1108 # Get all the files in the kp from the cache 

1109 filenames = self._access_cache.get(kp, ()) 

1110 for filename in filenames: 

1111 with zf.open(filename.as_posix()) as f: 

1112 yield f.read() 

1113 

1114 # Read-only interface 

1115 def save(self, key: bytes, value: bytes) -> None: 

1116 raise RuntimeError(self._read_only_message) 

1117 

1118 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1119 raise RuntimeError(self._read_only_message) 

1120 

1121 def delete(self, key: bytes, value: bytes) -> None: 

1122 raise RuntimeError(self._read_only_message) 

1123 

1124 

1125class BackgroundWriteDatabase(ExampleDatabase): 

1126 """A wrapper which defers writes on the given database to a background thread. 

1127 

1128 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any 

1129 enqueued writes to finish before fetching from the database. 

1130 """ 

1131 

1132 def __init__(self, db: ExampleDatabase) -> None: 

1133 super().__init__() 

1134 self._db = db 

1135 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue() 

1136 self._thread: Thread | None = None 

1137 

1138 def _ensure_thread(self): 

1139 if self._thread is None: 

1140 self._thread = Thread(target=self._worker, daemon=True) 

1141 self._thread.start() 

1142 # avoid an unbounded timeout during gc. 0.1 should be plenty for most 

1143 # use cases. 

1144 weakref.finalize(self, self._join, 0.1) 

1145 

1146 def __repr__(self) -> str: 

1147 return f"BackgroundWriteDatabase({self._db!r})" 

1148 

1149 def __eq__(self, other: object) -> bool: 

1150 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db 

1151 

1152 def _worker(self) -> None: 

1153 while True: 

1154 method, args = self._queue.get() 

1155 getattr(self._db, method)(*args) 

1156 self._queue.task_done() 

1157 

1158 def _join(self, timeout: float | None = None) -> None: 

1159 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634 

1160 with self._queue.all_tasks_done: 

1161 while self._queue.unfinished_tasks: 

1162 self._queue.all_tasks_done.wait(timeout) 

1163 

1164 def fetch(self, key: bytes) -> Iterable[bytes]: 

1165 self._join() 

1166 return self._db.fetch(key) 

1167 

1168 def save(self, key: bytes, value: bytes) -> None: 

1169 self._ensure_thread() 

1170 self._queue.put(("save", (key, value))) 

1171 

1172 def delete(self, key: bytes, value: bytes) -> None: 

1173 self._ensure_thread() 

1174 self._queue.put(("delete", (key, value))) 

1175 

1176 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1177 self._ensure_thread() 

1178 self._queue.put(("move", (src, dest, value))) 

1179 

1180 def _start_listening(self) -> None: 

1181 self._db.add_listener(self._broadcast_change) 

1182 

1183 def _stop_listening(self) -> None: 

1184 self._db.remove_listener(self._broadcast_change) 

1185 

1186 

1187def _pack_uleb128(value: int) -> bytes: 

1188 """ 

1189 Serialize an integer into variable-length bytes. For each byte, the first 7 

1190 bits represent (part of) the integer, while the last bit indicates whether the 

1191 integer continues into the next byte. 

1192 

1193 https://en.wikipedia.org/wiki/LEB128 

1194 """ 

1195 parts = bytearray() 

1196 assert value >= 0 

1197 while True: 

1198 # chop off 7 bits 

1199 byte = value & ((1 << 7) - 1) 

1200 value >>= 7 

1201 # set the continuation bit if we have more left 

1202 if value: 

1203 byte |= 1 << 7 

1204 

1205 parts.append(byte) 

1206 if not value: 

1207 break 

1208 return bytes(parts) 

1209 

1210 

1211def _unpack_uleb128(buffer: bytes) -> tuple[int, int]: 

1212 """ 

1213 Inverts _pack_uleb128, and also returns the index at which at which we stopped 

1214 reading. 

1215 """ 

1216 value = 0 

1217 for i, byte in enumerate(buffer): 

1218 n = byte & ((1 << 7) - 1) 

1219 value |= n << (i * 7) 

1220 

1221 if not byte >> 7: 

1222 break 

1223 return (i + 1, value) 

1224 

1225 

1226def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes: 

1227 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes.""" 

1228 # We use a custom serialization format for this, which might seem crazy - but our 

1229 # data is a flat sequence of elements, and standard tools like protobuf or msgpack 

1230 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode. 

1231 # 

1232 # We simply encode each element with a metadata byte, if needed a uint16 size, and 

1233 # then the payload bytes. For booleans, the payload is inlined into the metadata. 

1234 parts = [] 

1235 for choice in choices: 

1236 if isinstance(choice, bool): 

1237 # `000_0000v` - tag zero, low bit payload. 

1238 parts.append(b"\1" if choice else b"\0") 

1239 continue 

1240 

1241 # `tag_ssss [uint16 size?] [payload]` 

1242 if isinstance(choice, float): 

1243 tag = 1 << 5 

1244 choice = struct.pack("!d", choice) 

1245 elif isinstance(choice, int): 

1246 tag = 2 << 5 

1247 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True) 

1248 elif isinstance(choice, bytes): 

1249 tag = 3 << 5 

1250 else: 

1251 assert isinstance(choice, str) 

1252 tag = 4 << 5 

1253 choice = choice.encode(errors="surrogatepass") 

1254 

1255 size = len(choice) 

1256 if size < 0b11111: 

1257 parts.append((tag | size).to_bytes(1, "big")) 

1258 else: 

1259 parts.append((tag | 0b11111).to_bytes(1, "big")) 

1260 parts.append(_pack_uleb128(size)) 

1261 parts.append(choice) 

1262 

1263 return b"".join(parts) 

1264 

1265 

1266def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]: 

1267 # See above for an explanation of the format. 

1268 parts: list[ChoiceT] = [] 

1269 idx = 0 

1270 while idx < len(buffer): 

1271 tag = buffer[idx] >> 5 

1272 size = buffer[idx] & 0b11111 

1273 idx += 1 

1274 

1275 if tag == 0: 

1276 parts.append(bool(size)) 

1277 continue 

1278 if size == 0b11111: 

1279 offset, size = _unpack_uleb128(buffer[idx:]) 

1280 idx += offset 

1281 chunk = buffer[idx : idx + size] 

1282 idx += size 

1283 

1284 if tag == 1: 

1285 assert size == 8, "expected float64" 

1286 parts.extend(struct.unpack("!d", chunk)) 

1287 elif tag == 2: 

1288 parts.append(int.from_bytes(chunk, "big", signed=True)) 

1289 elif tag == 3: 

1290 parts.append(chunk) 

1291 else: 

1292 assert tag == 4 

1293 parts.append(chunk.decode(errors="surrogatepass")) 

1294 return tuple(parts) 

1295 

1296 

1297def choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...] | None: 

1298 """ 

1299 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes. 

1300 

1301 Returns None if the given bytestring is not a valid serialization of choice 

1302 sequences. 

1303 """ 

1304 try: 

1305 return _choices_from_bytes(buffer) 

1306 except Exception: 

1307 # deserialization error, eg because our format changed or someone put junk 

1308 # data in the db. 

1309 return None