Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/hypothesis/database.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

238 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import abc 

12import binascii 

13import json 

14import os 

15import sys 

16import warnings 

17from datetime import datetime, timedelta, timezone 

18from functools import lru_cache 

19from hashlib import sha384 

20from os import getenv 

21from pathlib import Path, PurePath 

22from typing import Dict, Iterable, Optional, Set 

23from urllib.error import HTTPError, URLError 

24from urllib.request import Request, urlopen 

25from zipfile import BadZipFile, ZipFile 

26 

27from hypothesis.configuration import storage_directory 

28from hypothesis.errors import HypothesisException, HypothesisWarning 

29from hypothesis.utils.conventions import not_set 

30 

31__all__ = [ 

32 "DirectoryBasedExampleDatabase", 

33 "ExampleDatabase", 

34 "InMemoryExampleDatabase", 

35 "MultiplexedDatabase", 

36 "ReadOnlyDatabase", 

37 "GitHubArtifactDatabase", 

38] 

39 

40 

41def _usable_dir(path: os.PathLike) -> bool: 

42 """ 

43 Returns True if the desired path can be used as database path because 

44 either the directory exists and can be used, or its root directory can 

45 be used and we can make the directory as needed. 

46 """ 

47 path = Path(path) 

48 try: 

49 while not path.exists(): 

50 # Loop terminates because the root dir ('/' on unix) always exists. 

51 path = path.parent 

52 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK) 

53 except PermissionError: 

54 return False 

55 

56 

57def _db_for_path(path=None): 

58 if path is not_set: 

59 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover 

60 raise HypothesisException( 

61 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any " 

62 "effect. Configure your database location via a settings profile instead.\n" 

63 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles" 

64 ) 

65 

66 path = storage_directory("examples", intent_to_write=False) 

67 if not _usable_dir(path): # pragma: no cover 

68 warnings.warn( 

69 "The database setting is not configured, and the default " 

70 "location is unusable - falling back to an in-memory " 

71 f"database for this session. {path=}", 

72 HypothesisWarning, 

73 stacklevel=3, 

74 ) 

75 return InMemoryExampleDatabase() 

76 if path in (None, ":memory:"): 

77 return InMemoryExampleDatabase() 

78 return DirectoryBasedExampleDatabase(path) 

79 

80 

81class _EDMeta(abc.ABCMeta): 

82 def __call__(self, *args, **kwargs): 

83 if self is ExampleDatabase: 

84 return _db_for_path(*args, **kwargs) 

85 return super().__call__(*args, **kwargs) 

86 

87 

88# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase 

89# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx 

90# maintains a list of metaclass-call-methods to ignore, and while they would prefer 

91# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we 

92# can insert ourselves here. 

93# 

94# This code only runs if Sphinx has already been imported; and it would live in our 

95# docs/conf.py except that we would also like it to work for anyone documenting 

96# downstream ExampleDatabase subclasses too. 

97if "sphinx" in sys.modules: 

98 try: 

99 from sphinx.ext.autodoc import _METACLASS_CALL_BLACKLIST 

100 

101 _METACLASS_CALL_BLACKLIST.append("hypothesis.database._EDMeta.__call__") 

102 except Exception: 

103 pass 

104 

105 

106class ExampleDatabase(metaclass=_EDMeta): 

107 """An abstract base class for storing examples in Hypothesis' internal format. 

108 

109 An ExampleDatabase maps each ``bytes`` key to many distinct ``bytes`` 

110 values, like a ``Mapping[bytes, AbstractSet[bytes]]``. 

111 """ 

112 

113 @abc.abstractmethod 

114 def save(self, key: bytes, value: bytes) -> None: 

115 """Save ``value`` under ``key``. 

116 

117 If this value is already present for this key, silently do nothing. 

118 """ 

119 raise NotImplementedError(f"{type(self).__name__}.save") 

120 

121 @abc.abstractmethod 

122 def fetch(self, key: bytes) -> Iterable[bytes]: 

123 """Return an iterable over all values matching this key.""" 

124 raise NotImplementedError(f"{type(self).__name__}.fetch") 

125 

126 @abc.abstractmethod 

127 def delete(self, key: bytes, value: bytes) -> None: 

128 """Remove this value from this key. 

129 

130 If this value is not present, silently do nothing. 

131 """ 

132 raise NotImplementedError(f"{type(self).__name__}.delete") 

133 

134 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

135 """Move ``value`` from key ``src`` to key ``dest``. Equivalent to 

136 ``delete(src, value)`` followed by ``save(src, value)``, but may 

137 have a more efficient implementation. 

138 

139 Note that ``value`` will be inserted at ``dest`` regardless of whether 

140 it is currently present at ``src``. 

141 """ 

142 if src == dest: 

143 self.save(src, value) 

144 return 

145 self.delete(src, value) 

146 self.save(dest, value) 

147 

148 

149class InMemoryExampleDatabase(ExampleDatabase): 

150 """A non-persistent example database, implemented in terms of a dict of sets. 

151 

152 This can be useful if you call a test function several times in a single 

153 session, or for testing other database implementations, but because it 

154 does not persist between runs we do not recommend it for general use. 

155 """ 

156 

157 def __init__(self): 

158 self.data = {} 

159 

160 def __repr__(self) -> str: 

161 return f"InMemoryExampleDatabase({self.data!r})" 

162 

163 def fetch(self, key: bytes) -> Iterable[bytes]: 

164 yield from self.data.get(key, ()) 

165 

166 def save(self, key: bytes, value: bytes) -> None: 

167 self.data.setdefault(key, set()).add(bytes(value)) 

168 

169 def delete(self, key: bytes, value: bytes) -> None: 

170 self.data.get(key, set()).discard(bytes(value)) 

171 

172 

173def _hash(key): 

174 return sha384(key).hexdigest()[:16] 

175 

176 

177class DirectoryBasedExampleDatabase(ExampleDatabase): 

178 """Use a directory to store Hypothesis examples as files. 

179 

180 Each test corresponds to a directory, and each example to a file within that 

181 directory. While the contents are fairly opaque, a 

182 ``DirectoryBasedExampleDatabase`` can be shared by checking the directory 

183 into version control, for example with the following ``.gitignore``:: 

184 

185 # Ignore files cached by Hypothesis... 

186 .hypothesis/* 

187 # except for the examples directory 

188 !.hypothesis/examples/ 

189 

190 Note however that this only makes sense if you also pin to an exact version of 

191 Hypothesis, and we would usually recommend implementing a shared database with 

192 a network datastore - see :class:`~hypothesis.database.ExampleDatabase`, and 

193 the :class:`~hypothesis.database.MultiplexedDatabase` helper. 

194 """ 

195 

196 def __init__(self, path: os.PathLike) -> None: 

197 self.path = Path(path) 

198 self.keypaths: Dict[bytes, Path] = {} 

199 

200 def __repr__(self) -> str: 

201 return f"DirectoryBasedExampleDatabase({self.path!r})" 

202 

203 def _key_path(self, key: bytes) -> Path: 

204 try: 

205 return self.keypaths[key] 

206 except KeyError: 

207 pass 

208 self.keypaths[key] = self.path / _hash(key) 

209 return self.keypaths[key] 

210 

211 def _value_path(self, key, value): 

212 return self._key_path(key) / _hash(value) 

213 

214 def fetch(self, key: bytes) -> Iterable[bytes]: 

215 kp = self._key_path(key) 

216 if not kp.is_dir(): 

217 return 

218 for path in os.listdir(kp): 

219 try: 

220 yield (kp / path).read_bytes() 

221 except OSError: 

222 pass 

223 

224 def save(self, key: bytes, value: bytes) -> None: 

225 # Note: we attempt to create the dir in question now. We 

226 # already checked for permissions, but there can still be other issues, 

227 # e.g. the disk is full, or permissions might have been changed. 

228 self._key_path(key).mkdir(exist_ok=True, parents=True) 

229 path = self._value_path(key, value) 

230 if not path.exists(): 

231 suffix = binascii.hexlify(os.urandom(16)).decode("ascii") 

232 tmpname = path.with_suffix(f"{path.suffix}.{suffix}") 

233 tmpname.write_bytes(value) 

234 try: 

235 tmpname.rename(path) 

236 except OSError: # pragma: no cover 

237 tmpname.unlink() 

238 assert not tmpname.exists() 

239 

240 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

241 if src == dest: 

242 self.save(src, value) 

243 return 

244 try: 

245 os.renames( 

246 self._value_path(src, value), 

247 self._value_path(dest, value), 

248 ) 

249 except OSError: 

250 self.delete(src, value) 

251 self.save(dest, value) 

252 

253 def delete(self, key: bytes, value: bytes) -> None: 

254 try: 

255 self._value_path(key, value).unlink() 

256 except OSError: 

257 pass 

258 

259 

260class ReadOnlyDatabase(ExampleDatabase): 

261 """A wrapper to make the given database read-only. 

262 

263 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and 

264 ``move`` into silent no-ops. 

265 

266 Note that this disables Hypothesis' automatic discarding of stale examples. 

267 It is designed to allow local machines to access a shared database (e.g. from CI 

268 servers), without propagating changes back from a local or in-development branch. 

269 """ 

270 

271 def __init__(self, db: ExampleDatabase) -> None: 

272 assert isinstance(db, ExampleDatabase) 

273 self._wrapped = db 

274 

275 def __repr__(self) -> str: 

276 return f"ReadOnlyDatabase({self._wrapped!r})" 

277 

278 def fetch(self, key: bytes) -> Iterable[bytes]: 

279 yield from self._wrapped.fetch(key) 

280 

281 def save(self, key: bytes, value: bytes) -> None: 

282 pass 

283 

284 def delete(self, key: bytes, value: bytes) -> None: 

285 pass 

286 

287 

288class MultiplexedDatabase(ExampleDatabase): 

289 """A wrapper around multiple databases. 

290 

291 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against 

292 all of the wrapped databases. ``fetch`` does not yield duplicate values, even 

293 if the same value is present in two or more of the wrapped databases. 

294 

295 This combines well with a :class:`ReadOnlyDatabase`, as follows: 

296 

297 .. code-block:: python 

298 

299 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/") 

300 shared = CustomNetworkDatabase() 

301 

302 settings.register_profile("ci", database=shared) 

303 settings.register_profile( 

304 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared)) 

305 ) 

306 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

307 

308 So your CI system or fuzzing runs can populate a central shared database; 

309 while local runs on development machines can reproduce any failures from CI 

310 but will only cache their own failures locally and cannot remove examples 

311 from the shared database. 

312 """ 

313 

314 def __init__(self, *dbs: ExampleDatabase) -> None: 

315 assert all(isinstance(db, ExampleDatabase) for db in dbs) 

316 self._wrapped = dbs 

317 

318 def __repr__(self) -> str: 

319 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped))) 

320 

321 def fetch(self, key: bytes) -> Iterable[bytes]: 

322 seen = set() 

323 for db in self._wrapped: 

324 for value in db.fetch(key): 

325 if value not in seen: 

326 yield value 

327 seen.add(value) 

328 

329 def save(self, key: bytes, value: bytes) -> None: 

330 for db in self._wrapped: 

331 db.save(key, value) 

332 

333 def delete(self, key: bytes, value: bytes) -> None: 

334 for db in self._wrapped: 

335 db.delete(key, value) 

336 

337 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

338 for db in self._wrapped: 

339 db.move(src, dest, value) 

340 

341 

342class GitHubArtifactDatabase(ExampleDatabase): 

343 """ 

344 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact. 

345 

346 You can use this for sharing example databases between CI runs and developers, allowing 

347 the latter to get read-only access to the former. This is particularly useful for 

348 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_), 

349 where the CI system can help find new failing examples through fuzzing, 

350 and developers can reproduce them locally without any manual effort. 

351 

352 .. note:: 

353 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides 

354 this automatically, but it needs to be set manually for local usage. In a developer machine, 

355 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token>`_. 

356 If the repository is private, it's necessary for the token to have ``repo`` scope 

357 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token. 

358 

359 

360 In most cases, this will be used 

361 through the :class:`~hypothesis.database.MultiplexedDatabase`, 

362 by combining a local directory-based database with this one. For example: 

363 

364 .. code-block:: python 

365 

366 local = DirectoryBasedExampleDatabase(".hypothesis/examples") 

367 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo")) 

368 

369 settings.register_profile("ci", database=local) 

370 settings.register_profile("dev", database=MultiplexedDatabase(local, shared)) 

371 # We don't want to use the shared database in CI, only to populate its local one. 

372 # which the workflow should then upload as an artifact. 

373 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

374 

375 .. note:: 

376 Because this database is read-only, you always need to wrap it with the 

377 :class:`ReadOnlyDatabase`. 

378 

379 A setup like this can be paired with a GitHub Actions workflow including 

380 something like the following: 

381 

382 .. code-block:: yaml 

383 

384 - name: Download example database 

385 uses: dawidd6/action-download-artifact@v2.24.3 

386 with: 

387 name: hypothesis-example-db 

388 path: .hypothesis/examples 

389 if_no_artifact_found: warn 

390 workflow_conclusion: completed 

391 

392 - name: Run tests 

393 run: pytest 

394 

395 - name: Upload example database 

396 uses: actions/upload-artifact@v3 

397 if: always() 

398 with: 

399 name: hypothesis-example-db 

400 path: .hypothesis/examples 

401 

402 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_ 

403 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_ 

404 does not support downloading artifacts from previous workflow runs. 

405 

406 The database automatically implements a simple file-based cache with a default expiration period 

407 of 1 day. You can adjust this through the ``cache_timeout`` property. 

408 

409 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``). 

410 """ 

411 

412 def __init__( 

413 self, 

414 owner: str, 

415 repo: str, 

416 artifact_name: str = "hypothesis-example-db", 

417 cache_timeout: timedelta = timedelta(days=1), 

418 path: Optional[os.PathLike] = None, 

419 ): 

420 self.owner = owner 

421 self.repo = repo 

422 self.artifact_name = artifact_name 

423 self.cache_timeout = cache_timeout 

424 

425 # Get the GitHub token from the environment 

426 # It's unnecessary to use a token if the repo is public 

427 self.token: Optional[str] = getenv("GITHUB_TOKEN") 

428 

429 if path is None: 

430 self.path: Path = Path( 

431 storage_directory(f"github-artifacts/{self.artifact_name}/") 

432 ) 

433 else: 

434 self.path = Path(path) 

435 

436 # We don't want to initialize the cache until we need to 

437 self._initialized: bool = False 

438 self._disabled: bool = False 

439 

440 # This is the path to the artifact in usage 

441 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip 

442 self._artifact: Optional[Path] = None 

443 # This caches the artifact structure 

444 self._access_cache: Optional[Dict[PurePath, Set[PurePath]]] = None 

445 

446 # Message to display if user doesn't wrap around ReadOnlyDatabase 

447 self._read_only_message = ( 

448 "This database is read-only. " 

449 "Please wrap this class with ReadOnlyDatabase" 

450 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))." 

451 ) 

452 

453 def __repr__(self) -> str: 

454 return ( 

455 f"GitHubArtifactDatabase(owner={self.owner!r}, " 

456 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})" 

457 ) 

458 

459 def _prepare_for_io(self) -> None: 

460 assert self._artifact is not None, "Artifact not loaded." 

461 

462 if self._initialized: # pragma: no cover 

463 return 

464 

465 # Test that the artifact is valid 

466 try: 

467 with ZipFile(self._artifact) as f: 

468 if f.testzip(): # pragma: no cover 

469 raise BadZipFile 

470 

471 # Turns out that testzip() doesn't work quite well 

472 # doing the cache initialization here instead 

473 # will give us more coverage of the artifact. 

474 

475 # Cache the files inside each keypath 

476 self._access_cache = {} 

477 with ZipFile(self._artifact) as zf: 

478 namelist = zf.namelist() 

479 # Iterate over files in the artifact 

480 for filename in namelist: 

481 fileinfo = zf.getinfo(filename) 

482 if fileinfo.is_dir(): 

483 self._access_cache[PurePath(filename)] = set() 

484 else: 

485 # Get the keypath from the filename 

486 keypath = PurePath(filename).parent 

487 # Add the file to the keypath 

488 self._access_cache[keypath].add(PurePath(filename)) 

489 except BadZipFile: 

490 warnings.warn( 

491 "The downloaded artifact from GitHub is invalid. " 

492 "This could be because the artifact was corrupted, " 

493 "or because the artifact was not created by Hypothesis. ", 

494 HypothesisWarning, 

495 stacklevel=3, 

496 ) 

497 self._disabled = True 

498 

499 self._initialized = True 

500 

501 def _initialize_db(self) -> None: 

502 # Trigger warning that we suppressed earlier by intent_to_write=False 

503 storage_directory(self.path.name) 

504 # Create the cache directory if it doesn't exist 

505 self.path.mkdir(exist_ok=True, parents=True) 

506 

507 # Get all artifacts 

508 cached_artifacts = sorted( 

509 self.path.glob("*.zip"), 

510 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")), 

511 ) 

512 

513 # Remove all but the latest artifact 

514 for artifact in cached_artifacts[:-1]: 

515 artifact.unlink() 

516 

517 try: 

518 found_artifact = cached_artifacts[-1] 

519 except IndexError: 

520 found_artifact = None 

521 

522 # Check if the latest artifact is a cache hit 

523 if found_artifact is not None and ( 

524 datetime.now(timezone.utc) 

525 - datetime.fromisoformat(found_artifact.stem.replace("_", ":")) 

526 < self.cache_timeout 

527 ): 

528 self._artifact = found_artifact 

529 else: 

530 # Download the latest artifact from GitHub 

531 new_artifact = self._fetch_artifact() 

532 

533 if new_artifact: 

534 if found_artifact is not None: 

535 found_artifact.unlink() 

536 self._artifact = new_artifact 

537 elif found_artifact is not None: 

538 warnings.warn( 

539 "Using an expired artifact as a fallback for the database: " 

540 f"{found_artifact}", 

541 HypothesisWarning, 

542 stacklevel=2, 

543 ) 

544 self._artifact = found_artifact 

545 else: 

546 warnings.warn( 

547 "Couldn't acquire a new or existing artifact. Disabling database.", 

548 HypothesisWarning, 

549 stacklevel=2, 

550 ) 

551 self._disabled = True 

552 return 

553 

554 self._prepare_for_io() 

555 

556 def _get_bytes(self, url: str) -> Optional[bytes]: # pragma: no cover 

557 request = Request( 

558 url, 

559 headers={ 

560 "Accept": "application/vnd.github+json", 

561 "X-GitHub-Api-Version": "2022-11-28 ", 

562 "Authorization": f"Bearer {self.token}", 

563 }, 

564 ) 

565 warning_message = None 

566 response_bytes: Optional[bytes] = None 

567 try: 

568 with urlopen(request) as response: 

569 response_bytes = response.read() 

570 except HTTPError as e: 

571 if e.code == 401: 

572 warning_message = ( 

573 "Authorization failed when trying to download artifact from GitHub. " 

574 "Check that you have a valid GITHUB_TOKEN set in your environment." 

575 ) 

576 else: 

577 warning_message = ( 

578 "Could not get the latest artifact from GitHub. " 

579 "This could be because because the repository " 

580 "or artifact does not exist. " 

581 ) 

582 except URLError: 

583 warning_message = "Could not connect to GitHub to get the latest artifact. " 

584 except TimeoutError: 

585 warning_message = ( 

586 "Could not connect to GitHub to get the latest artifact " 

587 "(connection timed out)." 

588 ) 

589 

590 if warning_message is not None: 

591 warnings.warn(warning_message, HypothesisWarning, stacklevel=4) 

592 return None 

593 

594 return response_bytes 

595 

596 def _fetch_artifact(self) -> Optional[Path]: # pragma: no cover 

597 # Get the list of artifacts from GitHub 

598 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts" 

599 response_bytes = self._get_bytes(url) 

600 if response_bytes is None: 

601 return None 

602 

603 artifacts = json.loads(response_bytes)["artifacts"] 

604 artifacts = [a for a in artifacts if a["name"] == self.artifact_name] 

605 

606 if not artifacts: 

607 return None 

608 

609 # Get the latest artifact from the list 

610 artifact = max(artifacts, key=lambda a: a["created_at"]) 

611 url = artifact["archive_download_url"] 

612 

613 # Download the artifact 

614 artifact_bytes = self._get_bytes(url) 

615 if artifact_bytes is None: 

616 return None 

617 

618 # Save the artifact to the cache 

619 # We replace ":" with "_" to ensure the filenames are compatible 

620 # with Windows filesystems 

621 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_") 

622 artifact_path = self.path / f"{timestamp}.zip" 

623 try: 

624 artifact_path.write_bytes(artifact_bytes) 

625 except OSError: 

626 warnings.warn( 

627 "Could not save the latest artifact from GitHub. ", 

628 HypothesisWarning, 

629 stacklevel=3, 

630 ) 

631 return None 

632 

633 return artifact_path 

634 

635 @staticmethod 

636 @lru_cache 

637 def _key_path(key: bytes) -> PurePath: 

638 return PurePath(_hash(key) + "/") 

639 

640 def fetch(self, key: bytes) -> Iterable[bytes]: 

641 if self._disabled: 

642 return 

643 

644 if not self._initialized: 

645 self._initialize_db() 

646 if self._disabled: 

647 return 

648 

649 assert self._artifact is not None 

650 assert self._access_cache is not None 

651 

652 kp = self._key_path(key) 

653 

654 with ZipFile(self._artifact) as zf: 

655 # Get the all files in the the kp from the cache 

656 filenames = self._access_cache.get(kp, ()) 

657 for filename in filenames: 

658 with zf.open(filename.as_posix()) as f: 

659 yield f.read() 

660 

661 # Read-only interface 

662 def save(self, key: bytes, value: bytes) -> None: 

663 raise RuntimeError(self._read_only_message) 

664 

665 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

666 raise RuntimeError(self._read_only_message) 

667 

668 def delete(self, key: bytes, value: bytes) -> None: 

669 raise RuntimeError(self._read_only_message)