Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/database.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

482 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import abc 

12import errno 

13import json 

14import os 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import weakref 

20from collections.abc import Callable, Iterable 

21from datetime import datetime, timedelta, timezone 

22from functools import lru_cache 

23from hashlib import sha384 

24from os import PathLike, getenv 

25from pathlib import Path, PurePath 

26from queue import Queue 

27from threading import Thread 

28from typing import ( 

29 TYPE_CHECKING, 

30 Any, 

31 ClassVar, 

32 Literal, 

33 TypeAlias, 

34 cast, 

35) 

36from urllib.error import HTTPError, URLError 

37from urllib.request import Request, urlopen 

38from zipfile import BadZipFile, ZipFile 

39 

40from hypothesis.configuration import StorageDirectory, storage_directory 

41from hypothesis.errors import HypothesisException, HypothesisWarning 

42from hypothesis.internal.conjecture.choice import ChoiceT 

43from hypothesis.utils.conventions import UniqueIdentifier, not_set 

44from hypothesis.utils.deprecation import note_deprecation 

45 

46__all__ = [ 

47 "DirectoryBasedExampleDatabase", 

48 "ExampleDatabase", 

49 "GitHubArtifactDatabase", 

50 "InMemoryExampleDatabase", 

51 "MultiplexedDatabase", 

52 "ReadOnlyDatabase", 

53] 

54 

55if TYPE_CHECKING: 

56 from watchdog.observers.api import BaseObserver 

57 

58StrPathT: TypeAlias = str | PathLike[str] 

59SaveDataT: TypeAlias = tuple[bytes, bytes] # key, value 

60DeleteDataT: TypeAlias = tuple[bytes, bytes | None] # key, value 

61ListenerEventT: TypeAlias = ( 

62 tuple[Literal["save"], SaveDataT] | tuple[Literal["delete"], DeleteDataT] 

63) 

64ListenerT: TypeAlias = Callable[[ListenerEventT], Any] 

65 

66 

67def _usable_dir(path: StrPathT) -> bool: 

68 """ 

69 Returns True if the desired path can be used as database path because 

70 either the directory exists and can be used, or its root directory can 

71 be used and we can make the directory as needed. 

72 """ 

73 path = Path(path) 

74 try: 

75 while not path.exists(): 

76 # Loop terminates because the root dir ('/' on unix) always exists. 

77 path = path.parent 

78 return path.is_dir() and os.access(path, os.R_OK | os.W_OK | os.X_OK) 

79 except PermissionError: # pragma: no cover 

80 # path.exists() returns False on 3.14+ instead of raising. See 

81 # https://docs.python.org/3.14/library/pathlib.html#querying-file-type-and-status 

82 return False 

83 

84 

85def _db_for_path( 

86 path: StrPathT | UniqueIdentifier | Literal[":memory:"] | None = None, 

87) -> "ExampleDatabase": 

88 if path is not_set: 

89 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover 

90 raise HypothesisException( 

91 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any " 

92 "effect. Configure your database location via a settings profile instead.\n" 

93 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles" 

94 ) 

95 

96 storage_dir = storage_directory("examples", intent_to_write=False) 

97 if not _usable_dir(storage_dir.path): # pragma: no cover 

98 warnings.warn( 

99 "The database setting is not configured, and the default " 

100 "location is unusable - falling back to an in-memory " 

101 f"database for this session. path={storage_dir.path!r}", 

102 HypothesisWarning, 

103 stacklevel=3, 

104 ) 

105 return InMemoryExampleDatabase() 

106 return _StorageDirectoryDatabase(storage_dir) 

107 if path in (None, ":memory:"): 

108 return InMemoryExampleDatabase() 

109 path = cast(StrPathT, path) 

110 return DirectoryBasedExampleDatabase(path) 

111 

112 

113class _EDMeta(abc.ABCMeta): 

114 def __call__(self, *args: Any, **kwargs: Any) -> "ExampleDatabase": 

115 if self is ExampleDatabase: 

116 note_deprecation( 

117 "Creating a database using the abstract ExampleDatabase() class " 

118 "is deprecated. Prefer using a concrete subclass, like " 

119 "InMemoryExampleDatabase() or DirectoryBasedExampleDatabase(path). " 

120 'In particular, the special string ExampleDatabase(":memory:") ' 

121 "should be replaced by InMemoryExampleDatabase().", 

122 since="2025-04-07", 

123 has_codemod=False, 

124 ) 

125 return _db_for_path(*args, **kwargs) 

126 return super().__call__(*args, **kwargs) 

127 

128 

129# This __call__ method is picked up by Sphinx as the signature of all ExampleDatabase 

130# subclasses, which is accurate, reasonable, and unhelpful. Fortunately Sphinx 

131# maintains a list of metaclass-call-methods to ignore, and while they would prefer 

132# not to maintain it upstream (https://github.com/sphinx-doc/sphinx/pull/8262) we 

133# can insert ourselves here. 

134# 

135# This code only runs if Sphinx has already been imported; and it would live in our 

136# docs/conf.py except that we would also like it to work for anyone documenting 

137# downstream ExampleDatabase subclasses too. 

138# 

139# We avoid type-checking this block due to this combination facts: 

140# * our check-types-api CI job runs under 3.14 

141# * tools.txt therefore pins to a newer version of sphinx which uses 3.12+ `type` 

142# syntax 

143# * in test_mypy.py, mypy sees this block, sees sphinx is installed, tries parsing 

144# sphinx code, and errors 

145# 

146# Putting `and not TYPE_CHECKING` here is just a convenience for our testing setup 

147# (because we don't split mypy tests by running CI version, eg), not for runtime 

148# behavior. 

149if "sphinx" in sys.modules and not TYPE_CHECKING: # pragma: no cover 

150 try: 

151 import sphinx.ext.autodoc 

152 

153 signature = "hypothesis.database._EDMeta.__call__" 

154 

155 # _METACLASS_CALL_BLACKLIST moved in newer sphinx versions 

156 try: 

157 import sphinx.ext.autodoc._dynamic._signatures as _module 

158 except ImportError: 

159 _module = sphinx.ext.autodoc 

160 

161 # _METACLASS_CALL_BLACKLIST is a frozenset in later sphinx versions 

162 if isinstance(_module._METACLASS_CALL_BLACKLIST, frozenset): 

163 _module._METACLASS_CALL_BLACKLIST = _module._METACLASS_CALL_BLACKLIST | { 

164 signature 

165 } 

166 else: 

167 _module._METACLASS_CALL_BLACKLIST.append(signature) 

168 except Exception: 

169 pass 

170 

171 

172class ExampleDatabase(metaclass=_EDMeta): 

173 """ 

174 A Hypothesis database, for use in |settings.database|. 

175 

176 Hypothesis automatically saves failures to the database set in 

177 |settings.database|. The next time the test is run, Hypothesis will replay 

178 any failures from the database in |settings.database| for that test (in 

179 |Phase.reuse|). 

180 

181 The database is best thought of as a cache that you never need to invalidate. 

182 Entries may be transparently dropped when upgrading your Hypothesis version 

183 or changing your test. Do not rely on the database for correctness; to ensure 

184 Hypothesis always tries an input, use |@example|. 

185 

186 A Hypothesis database is a simple mapping of bytes to sets of bytes. Hypothesis 

187 provides several concrete database subclasses. To write your own database class, 

188 see :doc:`/how-to/custom-database`. 

189 

190 Change listening 

191 ---------------- 

192 

193 An optional extension to |ExampleDatabase| is change listening. On databases 

194 which support change listening, calling |ExampleDatabase.add_listener| adds 

195 a function as a change listener, which will be called whenever a value is 

196 added, deleted, or moved inside the database. See |ExampleDatabase.add_listener| 

197 for details. 

198 

199 All databases in Hypothesis support change listening. Custom database classes 

200 are not required to support change listening, though they will not be compatible 

201 with features that require change listening until they do so. 

202 

203 .. note:: 

204 

205 While no Hypothesis features currently require change listening, change 

206 listening is required by `HypoFuzz <https://hypofuzz.com/>`_. 

207 

208 Database methods 

209 ---------------- 

210 

211 Required methods: 

212 

213 * |ExampleDatabase.save| 

214 * |ExampleDatabase.fetch| 

215 * |ExampleDatabase.delete| 

216 

217 Optional methods: 

218 

219 * |ExampleDatabase.move| 

220 

221 Change listening methods: 

222 

223 * |ExampleDatabase.add_listener| 

224 * |ExampleDatabase.remove_listener| 

225 * |ExampleDatabase.clear_listeners| 

226 * |ExampleDatabase._start_listening| 

227 * |ExampleDatabase._stop_listening| 

228 * |ExampleDatabase._broadcast_change| 

229 """ 

230 

231 def __init__(self) -> None: 

232 self._listeners: list[ListenerT] = [] 

233 

234 @abc.abstractmethod 

235 def save(self, key: bytes, value: bytes) -> None: 

236 """Save ``value`` under ``key``. 

237 

238 If ``value`` is already present in ``key``, silently do nothing. 

239 """ 

240 raise NotImplementedError(f"{type(self).__name__}.save") 

241 

242 @abc.abstractmethod 

243 def fetch(self, key: bytes) -> Iterable[bytes]: 

244 """Return an iterable over all values matching this key.""" 

245 raise NotImplementedError(f"{type(self).__name__}.fetch") 

246 

247 @abc.abstractmethod 

248 def delete(self, key: bytes, value: bytes) -> None: 

249 """Remove ``value`` from ``key``. 

250 

251 If ``value`` is not present in ``key``, silently do nothing. 

252 """ 

253 raise NotImplementedError(f"{type(self).__name__}.delete") 

254 

255 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

256 """ 

257 Move ``value`` from key ``src`` to key ``dest``. 

258 

259 Equivalent to ``delete(src, value)`` followed by ``save(src, value)``, 

260 but may have a more efficient implementation. 

261 

262 Note that ``value`` will be inserted at ``dest`` regardless of whether 

263 it is currently present at ``src``. 

264 """ 

265 if src == dest: 

266 self.save(src, value) 

267 return 

268 self.delete(src, value) 

269 self.save(dest, value) 

270 

271 def add_listener(self, f: ListenerT, /) -> None: 

272 """ 

273 Add a change listener. ``f`` will be called whenever a value is saved, 

274 deleted, or moved in the database. 

275 

276 ``f`` can be called with two different event values: 

277 

278 * ``("save", (key, value))`` 

279 * ``("delete", (key, value))`` 

280 

281 where ``key`` and ``value`` are both ``bytes``. 

282 

283 There is no ``move`` event. Instead, a move is broadcasted as a 

284 ``delete`` event followed by a ``save`` event. 

285 

286 For the ``delete`` event, ``value`` may be ``None``. This might occur if 

287 the database knows that a deletion has occurred in ``key``, but does not 

288 know what value was deleted. 

289 """ 

290 had_listeners = bool(self._listeners) 

291 self._listeners.append(f) 

292 if not had_listeners: 

293 self._start_listening() 

294 

295 def remove_listener(self, f: ListenerT, /) -> None: 

296 """ 

297 Removes ``f`` from the list of change listeners. 

298 

299 If ``f`` is not in the list of change listeners, silently do nothing. 

300 """ 

301 if f not in self._listeners: 

302 return 

303 self._listeners.remove(f) 

304 if not self._listeners: 

305 self._stop_listening() 

306 

307 def clear_listeners(self) -> None: 

308 """Remove all change listeners.""" 

309 had_listeners = bool(self._listeners) 

310 self._listeners.clear() 

311 if had_listeners: 

312 self._stop_listening() 

313 

314 def _broadcast_change(self, event: ListenerEventT) -> None: 

315 """ 

316 Called when a value has been either added to or deleted from a key in 

317 the underlying database store. The possible values for ``event`` are: 

318 

319 * ``("save", (key, value))`` 

320 * ``("delete", (key, value))`` 

321 

322 ``value`` may be ``None`` for the ``delete`` event, indicating we know 

323 that some value was deleted under this key, but not its exact value. 

324 

325 Note that you should not assume your instance is the only reference to 

326 the underlying database store. For example, if two instances of 

327 |DirectoryBasedExampleDatabase| reference the same directory, 

328 _broadcast_change should be called whenever a file is added or removed 

329 from the directory, even if that database was not responsible for 

330 changing the file. 

331 """ 

332 for listener in self._listeners: 

333 listener(event) 

334 

335 def _start_listening(self) -> None: 

336 """ 

337 Called when the database adds a change listener, and did not previously 

338 have any change listeners. Intended to allow databases to wait to start 

339 expensive listening operations until necessary. 

340 

341 ``_start_listening`` and ``_stop_listening`` are guaranteed to alternate, 

342 so you do not need to handle the case of multiple consecutive 

343 ``_start_listening`` calls without an intermediate ``_stop_listening`` 

344 call. 

345 """ 

346 warnings.warn( 

347 f"{self.__class__} does not support listening for changes", 

348 HypothesisWarning, 

349 stacklevel=4, 

350 ) 

351 

352 def _stop_listening(self) -> None: 

353 """ 

354 Called whenever no change listeners remain on the database. 

355 

356 ``_stop_listening`` and ``_start_listening`` are guaranteed to alternate, 

357 so you do not need to handle the case of multiple consecutive 

358 ``_stop_listening`` calls without an intermediate ``_start_listening`` 

359 call. 

360 """ 

361 warnings.warn( 

362 f"{self.__class__} does not support stopping listening for changes", 

363 HypothesisWarning, 

364 stacklevel=4, 

365 ) 

366 

367 

368class InMemoryExampleDatabase(ExampleDatabase): 

369 """A non-persistent example database, implemented in terms of an in-memory 

370 dictionary. 

371 

372 This can be useful if you call a test function several times in a single 

373 session, or for testing other database implementations, but because it 

374 does not persist between runs we do not recommend it for general use. 

375 """ 

376 

377 def __init__(self) -> None: 

378 super().__init__() 

379 self.data: dict[bytes, set[bytes]] = {} 

380 

381 def __repr__(self) -> str: 

382 return f"InMemoryExampleDatabase({self.data!r})" 

383 

384 def __eq__(self, other: object) -> bool: 

385 return isinstance(other, InMemoryExampleDatabase) and self.data is other.data 

386 

387 def fetch(self, key: bytes) -> Iterable[bytes]: 

388 yield from self.data.get(key, ()) 

389 

390 def save(self, key: bytes, value: bytes) -> None: 

391 value = bytes(value) 

392 values = self.data.setdefault(key, set()) 

393 changed = value not in values 

394 values.add(value) 

395 

396 if changed: 

397 self._broadcast_change(("save", (key, value))) 

398 

399 def delete(self, key: bytes, value: bytes) -> None: 

400 value = bytes(value) 

401 values = self.data.get(key, set()) 

402 changed = value in values 

403 values.discard(value) 

404 

405 if changed: 

406 self._broadcast_change(("delete", (key, value))) 

407 

408 def _start_listening(self) -> None: 

409 # declare compatibility with the listener api, but do the actual 

410 # implementation in .delete and .save, since we know we are the only 

411 # writer to .data. 

412 pass 

413 

414 def _stop_listening(self) -> None: 

415 pass 

416 

417 

418def _hash(key: bytes) -> str: 

419 return sha384(key).hexdigest()[:16] 

420 

421 

422class DirectoryBasedExampleDatabase(ExampleDatabase): 

423 """Use a directory to store Hypothesis examples as files. 

424 

425 Each test corresponds to a directory, and each example to a file within that 

426 directory. While the contents are fairly opaque, a 

427 |DirectoryBasedExampleDatabase| can be shared by checking the directory 

428 into version control, for example with the following ``.gitignore``:: 

429 

430 # Ignore files cached by Hypothesis... 

431 .hypothesis/* 

432 # except for the examples directory 

433 !.hypothesis/examples/ 

434 

435 Note however that this only makes sense if you also pin to an exact version of 

436 Hypothesis, and we would usually recommend implementing a shared database with 

437 a network datastore - see |ExampleDatabase|, and the |MultiplexedDatabase| helper. 

438 """ 

439 

440 # we keep a database entry of the full values of all the database keys. 

441 # currently only used for inverse mapping of hash -> key in change listening. 

442 _metakeys_name: ClassVar[bytes] = b".hypothesis-keys" 

443 _metakeys_hash: ClassVar[str] = _hash(_metakeys_name) 

444 

445 def __init__(self, path: StrPathT) -> None: 

446 super().__init__() 

447 self.path = Path(path) 

448 self.keypaths: dict[bytes, Path] = {} 

449 self._observer: BaseObserver | None = None 

450 self._ensure_directory_exists_called = False 

451 

452 def _ensure_directory_exists(self) -> None: 

453 # disk hits are expensive: early-return for performance 

454 if self._ensure_directory_exists_called: 

455 return 

456 

457 self.path.mkdir(exist_ok=True, parents=True) 

458 self._ensure_directory_exists_called = True 

459 

460 def __repr__(self) -> str: 

461 return f"DirectoryBasedExampleDatabase({self.path!r})" 

462 

463 def __eq__(self, other: object) -> bool: 

464 return ( 

465 isinstance(other, DirectoryBasedExampleDatabase) and self.path == other.path 

466 ) 

467 

468 def _key_path(self, key: bytes) -> Path: 

469 try: 

470 return self.keypaths[key] 

471 except KeyError: 

472 pass 

473 self.keypaths[key] = self.path / _hash(key) 

474 return self.keypaths[key] 

475 

476 def _value_path(self, key: bytes, value: bytes) -> Path: 

477 return self._key_path(key) / _hash(value) 

478 

479 def fetch(self, key: bytes) -> Iterable[bytes]: 

480 kp = self._key_path(key) 

481 if not kp.is_dir(): 

482 return 

483 

484 try: 

485 for path in os.listdir(kp): 

486 try: 

487 yield (kp / path).read_bytes() 

488 except OSError: 

489 pass 

490 except OSError: # pragma: no cover 

491 # the `kp` directory might have been deleted in the meantime 

492 pass 

493 

494 def save(self, key: bytes, value: bytes) -> None: 

495 key_path = self._key_path(key) 

496 if key_path.name != self._metakeys_hash: 

497 # add this key to our meta entry of all keys - taking care to avoid 

498 # infinite recursion. 

499 self.save(self._metakeys_name, key) 

500 

501 # Note: we attempt to create the dir in question now. We 

502 # already checked for permissions, but there can still be other issues, 

503 # e.g. the disk is full, or permissions might have been changed. 

504 try: 

505 self._ensure_directory_exists() 

506 key_path.mkdir(exist_ok=True, parents=True) 

507 path = self._value_path(key, value) 

508 if not path.exists(): 

509 # to mimic an atomic write, create and write in a temporary 

510 # directory, and only move to the final path after. This avoids 

511 # any intermediate state where the file is created (and empty) 

512 # but not yet written to. 

513 fd, tmpname = tempfile.mkstemp() 

514 tmppath = Path(tmpname) 

515 os.write(fd, value) 

516 os.close(fd) 

517 try: 

518 tmppath.rename(path) 

519 except OSError as err: # pragma: no cover 

520 if err.errno == errno.EXDEV: 

521 # Can't rename across filesystem boundaries, see e.g. 

522 # https://github.com/HypothesisWorks/hypothesis/issues/4335 

523 try: 

524 path.write_bytes(tmppath.read_bytes()) 

525 except OSError: 

526 pass 

527 tmppath.unlink() 

528 assert not tmppath.exists() 

529 except OSError: # pragma: no cover 

530 pass 

531 

532 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

533 if src == dest: 

534 self.save(src, value) 

535 return 

536 

537 src_path = self._value_path(src, value) 

538 dest_path = self._value_path(dest, value) 

539 # if the dest key path does not exist, os.renames will create it for us, 

540 # and we will never track its creation in the meta keys entry. Do so now. 

541 if not self._key_path(dest).exists(): 

542 self.save(self._metakeys_name, dest) 

543 

544 try: 

545 os.renames(src_path, dest_path) 

546 except OSError: 

547 self.delete(src, value) 

548 self.save(dest, value) 

549 

550 def delete(self, key: bytes, value: bytes) -> None: 

551 try: 

552 self._value_path(key, value).unlink() 

553 except OSError: 

554 return 

555 

556 # try deleting the key dir, which will only succeed if the dir is empty 

557 # (i.e. ``value`` was the last value in this key). 

558 try: 

559 self._key_path(key).rmdir() 

560 except OSError: 

561 pass 

562 else: 

563 # if the deletion succeeded, also delete this key entry from metakeys. 

564 # (if this key happens to be the metakey itself, this deletion will 

565 # fail; that's ok and faster than checking for this rare case.) 

566 self.delete(self._metakeys_name, key) 

567 

568 def _start_listening(self) -> None: 

569 try: 

570 from watchdog.events import ( 

571 DirCreatedEvent, 

572 DirDeletedEvent, 

573 DirMovedEvent, 

574 FileCreatedEvent, 

575 FileDeletedEvent, 

576 FileMovedEvent, 

577 FileSystemEventHandler, 

578 ) 

579 from watchdog.observers import Observer 

580 except ImportError: 

581 warnings.warn( 

582 f"listening for changes in a {self.__class__.__name__} " 

583 "requires the watchdog library. To install, run " 

584 "`pip install hypothesis[watchdog]`", 

585 HypothesisWarning, 

586 stacklevel=4, 

587 ) 

588 return 

589 

590 hash_to_key = {_hash(key): key for key in self.fetch(self._metakeys_name)} 

591 _metakeys_hash = self._metakeys_hash 

592 _broadcast_change = self._broadcast_change 

593 

594 class Handler( 

595 FileSystemEventHandler 

596 ): # pragma: no cover # skipped in test_database.py for now 

597 def on_created(_self, event: FileCreatedEvent | DirCreatedEvent) -> None: 

598 # we only registered for the file creation event 

599 assert not isinstance(event, DirCreatedEvent) 

600 # watchdog events are only bytes if we passed a byte path to 

601 # .schedule 

602 assert isinstance(event.src_path, str) 

603 

604 value_path = Path(event.src_path) 

605 # the parent dir represents the key, and its name is the key hash 

606 key_hash = value_path.parent.name 

607 

608 if key_hash == _metakeys_hash: 

609 try: 

610 hash_to_key[value_path.name] = value_path.read_bytes() 

611 except OSError: # pragma: no cover 

612 # this might occur if all the values in a key have been 

613 # deleted and DirectoryBasedExampleDatabase removes its 

614 # metakeys entry (which is `value_path` here`). 

615 pass 

616 return 

617 

618 key = hash_to_key.get(key_hash) 

619 if key is None: # pragma: no cover 

620 # we didn't recognize this key. This shouldn't ever happen, 

621 # but some race condition trickery might cause this. 

622 return 

623 

624 try: 

625 value = value_path.read_bytes() 

626 except OSError: # pragma: no cover 

627 return 

628 

629 _broadcast_change(("save", (key, value))) 

630 

631 def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None: 

632 assert not isinstance(event, DirDeletedEvent) 

633 assert isinstance(event.src_path, str) 

634 

635 value_path = Path(event.src_path) 

636 key = hash_to_key.get(value_path.parent.name) 

637 if key is None: # pragma: no cover 

638 return 

639 

640 _broadcast_change(("delete", (key, None))) 

641 

642 def on_moved(self, event: FileMovedEvent | DirMovedEvent) -> None: 

643 assert not isinstance(event, DirMovedEvent) 

644 assert isinstance(event.src_path, str) 

645 assert isinstance(event.dest_path, str) 

646 

647 src_path = Path(event.src_path) 

648 dest_path = Path(event.dest_path) 

649 k1 = hash_to_key.get(src_path.parent.name) 

650 k2 = hash_to_key.get(dest_path.parent.name) 

651 

652 if k1 is None or k2 is None: # pragma: no cover 

653 return 

654 

655 try: 

656 value = dest_path.read_bytes() 

657 except OSError: # pragma: no cover 

658 return 

659 

660 _broadcast_change(("delete", (k1, value))) 

661 _broadcast_change(("save", (k2, value))) 

662 

663 # If we add a listener to a DirectoryBasedExampleDatabase whose database 

664 # directory doesn't yet exist, the watchdog observer will not fire any 

665 # events, even after the directory gets created. 

666 # 

667 # Ensure the directory exists before starting the observer. 

668 self._ensure_directory_exists() 

669 self._observer = Observer() 

670 self._observer.schedule( 

671 Handler(), 

672 # remove type: ignore when released 

673 # https://github.com/gorakhargosh/watchdog/pull/1096 

674 self.path, # type: ignore 

675 recursive=True, 

676 event_filter=[FileCreatedEvent, FileDeletedEvent, FileMovedEvent], 

677 ) 

678 self._observer.start() 

679 

680 def _stop_listening(self) -> None: 

681 assert self._observer is not None 

682 self._observer.stop() 

683 self._observer.join() 

684 self._observer = None 

685 

686 

687class _StorageDirectoryDatabase(DirectoryBasedExampleDatabase): 

688 # A DirectoryBasedExampleDatabase which is located at the same directory as the storage 

689 # directory. This lets our database logic interact with our logic for writing .gitignore 

690 # files to the storage directory. 

691 # 

692 # The reason why we need this class is because the first interaction we have 

693 # with .hypothesis might be writing a file to .hypothesis/examples, and 

694 # DirectoryBasedExampleDatabase.save would otherwise create .hypothesis without 

695 # performing our .gitignore logic. 

696 

697 def __init__(self, storage_dir: StorageDirectory) -> None: 

698 super().__init__(storage_dir.path) 

699 self._storage_dir = storage_dir 

700 

701 def _ensure_directory_exists(self) -> None: 

702 if self._ensure_directory_exists_called: 

703 return 

704 

705 self._storage_dir.create_if_missing() 

706 self._ensure_directory_exists_called = True 

707 

708 

709class ReadOnlyDatabase(ExampleDatabase): 

710 """A wrapper to make the given database read-only. 

711 

712 The implementation passes through ``fetch``, and turns ``save``, ``delete``, and 

713 ``move`` into silent no-ops. 

714 

715 Note that this disables Hypothesis' automatic discarding of stale examples. 

716 It is designed to allow local machines to access a shared database (e.g. from CI 

717 servers), without propagating changes back from a local or in-development branch. 

718 """ 

719 

720 def __init__(self, db: ExampleDatabase) -> None: 

721 super().__init__() 

722 assert isinstance(db, ExampleDatabase) 

723 self._wrapped = db 

724 

725 def __repr__(self) -> str: 

726 return f"ReadOnlyDatabase({self._wrapped!r})" 

727 

728 def __eq__(self, other: object) -> bool: 

729 return isinstance(other, ReadOnlyDatabase) and self._wrapped == other._wrapped 

730 

731 def fetch(self, key: bytes) -> Iterable[bytes]: 

732 yield from self._wrapped.fetch(key) 

733 

734 def save(self, key: bytes, value: bytes) -> None: 

735 pass 

736 

737 def delete(self, key: bytes, value: bytes) -> None: 

738 pass 

739 

740 def _start_listening(self) -> None: 

741 # we're read only, so there are no changes to broadcast. 

742 pass 

743 

744 def _stop_listening(self) -> None: 

745 pass 

746 

747 

748class MultiplexedDatabase(ExampleDatabase): 

749 """A wrapper around multiple databases. 

750 

751 Each ``save``, ``fetch``, ``move``, or ``delete`` operation will be run against 

752 all of the wrapped databases. ``fetch`` does not yield duplicate values, even 

753 if the same value is present in two or more of the wrapped databases. 

754 

755 This combines well with a :class:`ReadOnlyDatabase`, as follows: 

756 

757 .. code-block:: python 

758 

759 local = DirectoryBasedExampleDatabase("/tmp/hypothesis/examples/") 

760 shared = CustomNetworkDatabase() 

761 

762 settings.register_profile("ci", database=shared) 

763 settings.register_profile( 

764 "dev", database=MultiplexedDatabase(local, ReadOnlyDatabase(shared)) 

765 ) 

766 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

767 

768 So your CI system or fuzzing runs can populate a central shared database; 

769 while local runs on development machines can reproduce any failures from CI 

770 but will only cache their own failures locally and cannot remove examples 

771 from the shared database. 

772 """ 

773 

774 def __init__(self, *dbs: ExampleDatabase) -> None: 

775 super().__init__() 

776 assert all(isinstance(db, ExampleDatabase) for db in dbs) 

777 self._wrapped = dbs 

778 

779 def __repr__(self) -> str: 

780 return "MultiplexedDatabase({})".format(", ".join(map(repr, self._wrapped))) 

781 

782 def __eq__(self, other: object) -> bool: 

783 return ( 

784 isinstance(other, MultiplexedDatabase) and self._wrapped == other._wrapped 

785 ) 

786 

787 def fetch(self, key: bytes) -> Iterable[bytes]: 

788 seen = set() 

789 for db in self._wrapped: 

790 for value in db.fetch(key): 

791 if value not in seen: 

792 yield value 

793 seen.add(value) 

794 

795 def save(self, key: bytes, value: bytes) -> None: 

796 for db in self._wrapped: 

797 db.save(key, value) 

798 

799 def delete(self, key: bytes, value: bytes) -> None: 

800 for db in self._wrapped: 

801 db.delete(key, value) 

802 

803 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

804 for db in self._wrapped: 

805 db.move(src, dest, value) 

806 

807 def _start_listening(self) -> None: 

808 for db in self._wrapped: 

809 db.add_listener(self._broadcast_change) 

810 

811 def _stop_listening(self) -> None: 

812 for db in self._wrapped: 

813 db.remove_listener(self._broadcast_change) 

814 

815 

816class GitHubArtifactDatabase(ExampleDatabase): 

817 """ 

818 A file-based database loaded from a `GitHub Actions <https://docs.github.com/en/actions>`_ artifact. 

819 

820 You can use this for sharing example databases between CI runs and developers, allowing 

821 the latter to get read-only access to the former. This is particularly useful for 

822 continuous fuzzing (i.e. with `HypoFuzz <https://hypofuzz.com/>`_), 

823 where the CI system can help find new failing examples through fuzzing, 

824 and developers can reproduce them locally without any manual effort. 

825 

826 .. note:: 

827 You must provide ``GITHUB_TOKEN`` as an environment variable. In CI, Github Actions provides 

828 this automatically, but it needs to be set manually for local usage. In a developer machine, 

829 this would usually be a `Personal Access Token <https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens>`_. 

830 If the repository is private, it's necessary for the token to have ``repo`` scope 

831 in the case of a classic token, or ``actions:read`` in the case of a fine-grained token. 

832 

833 

834 In most cases, this will be used 

835 through the :class:`~hypothesis.database.MultiplexedDatabase`, 

836 by combining a local directory-based database with this one. For example: 

837 

838 .. code-block:: python 

839 

840 local = DirectoryBasedExampleDatabase(".hypothesis/examples") 

841 shared = ReadOnlyDatabase(GitHubArtifactDatabase("user", "repo")) 

842 

843 settings.register_profile("ci", database=local) 

844 settings.register_profile("dev", database=MultiplexedDatabase(local, shared)) 

845 # We don't want to use the shared database in CI, only to populate its local one. 

846 # which the workflow should then upload as an artifact. 

847 settings.load_profile("ci" if os.environ.get("CI") else "dev") 

848 

849 .. note:: 

850 Because this database is read-only, you always need to wrap it with the 

851 :class:`ReadOnlyDatabase`. 

852 

853 A setup like this can be paired with a GitHub Actions workflow including 

854 something like the following: 

855 

856 .. code-block:: yaml 

857 

858 - name: Download example database 

859 uses: dawidd6/action-download-artifact@v9 

860 with: 

861 name: hypothesis-example-db 

862 path: .hypothesis/examples 

863 if_no_artifact_found: warn 

864 workflow_conclusion: completed 

865 

866 - name: Run tests 

867 run: pytest 

868 

869 - name: Upload example database 

870 uses: actions/upload-artifact@v3 

871 if: always() 

872 with: 

873 name: hypothesis-example-db 

874 path: .hypothesis/examples 

875 

876 In this workflow, we use `dawidd6/action-download-artifact <https://github.com/dawidd6/action-download-artifact>`_ 

877 to download the latest artifact given that the official `actions/download-artifact <https://github.com/actions/download-artifact>`_ 

878 does not support downloading artifacts from previous workflow runs. 

879 

880 The database automatically implements a simple file-based cache with a default expiration period 

881 of 1 day. You can adjust this through the ``cache_timeout`` property. 

882 

883 For mono-repo support, you can provide a unique ``artifact_name`` (e.g. ``hypofuzz-example-db-frontend``). 

884 """ 

885 

886 def __init__( 

887 self, 

888 owner: str, 

889 repo: str, 

890 artifact_name: str = "hypothesis-example-db", 

891 cache_timeout: timedelta = timedelta(days=1), 

892 path: StrPathT | None = None, 

893 ): 

894 super().__init__() 

895 self.owner = owner 

896 self.repo = repo 

897 self.artifact_name = artifact_name 

898 self.cache_timeout = cache_timeout 

899 

900 # Get the GitHub token from the environment 

901 # It's unnecessary to use a token if the repo is public 

902 self.token: str | None = getenv("GITHUB_TOKEN") 

903 

904 self._storage_dir: StorageDirectory | None = None 

905 if path is None: 

906 self._storage_dir = storage_directory( 

907 f"github-artifacts/{self.artifact_name}/" 

908 ) 

909 self.path = self._storage_dir.path 

910 else: 

911 self.path = Path(path) 

912 

913 # We don't want to initialize the cache until we need to 

914 self._initialized: bool = False 

915 self._disabled: bool = False 

916 

917 # This is the path to the artifact in usage 

918 # .hypothesis/github-artifacts/<artifact-name>/<modified_isoformat>.zip 

919 self._artifact: Path | None = None 

920 # This caches the artifact structure 

921 self._access_cache: dict[PurePath, set[PurePath]] | None = None 

922 

923 # Message to display if user doesn't wrap around ReadOnlyDatabase 

924 self._read_only_message = ( 

925 "This database is read-only. " 

926 "Please wrap this class with ReadOnlyDatabase" 

927 "i.e. ReadOnlyDatabase(GitHubArtifactDatabase(...))." 

928 ) 

929 

930 def __repr__(self) -> str: 

931 return ( 

932 f"GitHubArtifactDatabase(owner={self.owner!r}, " 

933 f"repo={self.repo!r}, artifact_name={self.artifact_name!r})" 

934 ) 

935 

936 def __eq__(self, other: object) -> bool: 

937 return ( 

938 isinstance(other, GitHubArtifactDatabase) 

939 and self.owner == other.owner 

940 and self.repo == other.repo 

941 and self.artifact_name == other.artifact_name 

942 and self.path == other.path 

943 ) 

944 

945 def _prepare_for_io(self) -> None: 

946 assert self._artifact is not None, "Artifact not loaded." 

947 

948 if self._initialized: # pragma: no cover 

949 return 

950 

951 # Test that the artifact is valid 

952 try: 

953 with ZipFile(self._artifact) as f: 

954 if f.testzip(): # pragma: no cover 

955 raise BadZipFile 

956 

957 # Turns out that testzip() doesn't work quite well 

958 # doing the cache initialization here instead 

959 # will give us more coverage of the artifact. 

960 

961 # Cache the files inside each keypath 

962 self._access_cache = {} 

963 with ZipFile(self._artifact) as zf: 

964 namelist = zf.namelist() 

965 # Iterate over files in the artifact 

966 for filename in namelist: 

967 fileinfo = zf.getinfo(filename) 

968 if fileinfo.is_dir(): 

969 self._access_cache[PurePath(filename)] = set() 

970 else: 

971 # Get the keypath from the filename 

972 keypath = PurePath(filename).parent 

973 # Add the file to the keypath 

974 self._access_cache[keypath].add(PurePath(filename)) 

975 except BadZipFile: 

976 warnings.warn( 

977 "The downloaded artifact from GitHub is invalid. " 

978 "This could be because the artifact was corrupted, " 

979 "or because the artifact was not created by Hypothesis. ", 

980 HypothesisWarning, 

981 stacklevel=3, 

982 ) 

983 self._disabled = True 

984 

985 self._initialized = True 

986 

987 def _initialize_db(self) -> None: 

988 # Trigger warning that we suppressed earlier by intent_to_write=False 

989 storage_directory(self.path.name) 

990 # Create the cache directory if it doesn't exist 

991 if self._storage_dir is not None: # pragma: no cover 

992 self._storage_dir.create_if_missing() 

993 else: 

994 self.path.mkdir(exist_ok=True, parents=True) 

995 

996 # Get all artifacts 

997 cached_artifacts = sorted( 

998 self.path.glob("*.zip"), 

999 key=lambda a: datetime.fromisoformat(a.stem.replace("_", ":")), 

1000 ) 

1001 

1002 # Remove all but the latest artifact 

1003 for artifact in cached_artifacts[:-1]: 

1004 artifact.unlink() 

1005 

1006 try: 

1007 found_artifact = cached_artifacts[-1] 

1008 except IndexError: 

1009 found_artifact = None 

1010 

1011 # Check if the latest artifact is a cache hit 

1012 if found_artifact is not None and ( 

1013 datetime.now(timezone.utc) 

1014 - datetime.fromisoformat(found_artifact.stem.replace("_", ":")) 

1015 < self.cache_timeout 

1016 ): 

1017 self._artifact = found_artifact 

1018 else: 

1019 # Download the latest artifact from GitHub 

1020 new_artifact = self._fetch_artifact() 

1021 

1022 if new_artifact: 

1023 if found_artifact is not None: 

1024 found_artifact.unlink() 

1025 self._artifact = new_artifact 

1026 elif found_artifact is not None: 

1027 warnings.warn( 

1028 "Using an expired artifact as a fallback for the database: " 

1029 f"{found_artifact}", 

1030 HypothesisWarning, 

1031 stacklevel=2, 

1032 ) 

1033 self._artifact = found_artifact 

1034 else: 

1035 warnings.warn( 

1036 "Couldn't acquire a new or existing artifact. Disabling database.", 

1037 HypothesisWarning, 

1038 stacklevel=2, 

1039 ) 

1040 self._disabled = True 

1041 return 

1042 

1043 self._prepare_for_io() 

1044 

1045 def _get_bytes(self, url: str) -> bytes | None: # pragma: no cover 

1046 request = Request( 

1047 url, 

1048 headers={ 

1049 "Accept": "application/vnd.github+json", 

1050 "X-GitHub-Api-Version": "2022-11-28 ", 

1051 "Authorization": f"Bearer {self.token}", 

1052 }, 

1053 ) 

1054 warning_message = None 

1055 response_bytes: bytes | None = None 

1056 try: 

1057 with urlopen(request) as response: 

1058 response_bytes = response.read() 

1059 except HTTPError as e: 

1060 if e.code == 401: 

1061 warning_message = ( 

1062 "Authorization failed when trying to download artifact from GitHub. " 

1063 "Check that you have a valid GITHUB_TOKEN set in your environment." 

1064 ) 

1065 else: 

1066 warning_message = ( 

1067 "Could not get the latest artifact from GitHub. " 

1068 "This could be because the repository " 

1069 "or artifact does not exist. " 

1070 ) 

1071 # see https://github.com/python/cpython/issues/128734 

1072 e.close() 

1073 except URLError: 

1074 warning_message = "Could not connect to GitHub to get the latest artifact. " 

1075 except TimeoutError: 

1076 warning_message = ( 

1077 "Could not connect to GitHub to get the latest artifact " 

1078 "(connection timed out)." 

1079 ) 

1080 

1081 if warning_message is not None: 

1082 warnings.warn(warning_message, HypothesisWarning, stacklevel=4) 

1083 return None 

1084 

1085 return response_bytes 

1086 

1087 def _fetch_artifact(self) -> Path | None: # pragma: no cover 

1088 # Get the list of artifacts from GitHub 

1089 url = f"https://api.github.com/repos/{self.owner}/{self.repo}/actions/artifacts" 

1090 response_bytes = self._get_bytes(url) 

1091 if response_bytes is None: 

1092 return None 

1093 

1094 artifacts = json.loads(response_bytes)["artifacts"] 

1095 artifacts = [a for a in artifacts if a["name"] == self.artifact_name] 

1096 

1097 if not artifacts: 

1098 return None 

1099 

1100 # Get the latest artifact from the list 

1101 artifact = max(artifacts, key=lambda a: a["created_at"]) 

1102 url = artifact["archive_download_url"] 

1103 

1104 # Download the artifact 

1105 artifact_bytes = self._get_bytes(url) 

1106 if artifact_bytes is None: 

1107 return None 

1108 

1109 # Save the artifact to the cache 

1110 # We replace ":" with "_" to ensure the filenames are compatible 

1111 # with Windows filesystems 

1112 timestamp = datetime.now(timezone.utc).isoformat().replace(":", "_") 

1113 artifact_path = self.path / f"{timestamp}.zip" 

1114 try: 

1115 artifact_path.write_bytes(artifact_bytes) 

1116 except OSError: 

1117 warnings.warn( 

1118 "Could not save the latest artifact from GitHub. ", 

1119 HypothesisWarning, 

1120 stacklevel=3, 

1121 ) 

1122 return None 

1123 

1124 return artifact_path 

1125 

1126 @staticmethod 

1127 @lru_cache 

1128 def _key_path(key: bytes) -> PurePath: 

1129 return PurePath(_hash(key) + "/") 

1130 

1131 def fetch(self, key: bytes) -> Iterable[bytes]: 

1132 if self._disabled: 

1133 return 

1134 

1135 if not self._initialized: 

1136 self._initialize_db() 

1137 if self._disabled: 

1138 return 

1139 

1140 assert self._artifact is not None 

1141 assert self._access_cache is not None 

1142 

1143 kp = self._key_path(key) 

1144 

1145 with ZipFile(self._artifact) as zf: 

1146 # Get all the files in the kp from the cache 

1147 filenames = self._access_cache.get(kp, ()) 

1148 for filename in filenames: 

1149 with zf.open(filename.as_posix()) as f: 

1150 yield f.read() 

1151 

1152 # Read-only interface 

1153 def save(self, key: bytes, value: bytes) -> None: 

1154 raise RuntimeError(self._read_only_message) 

1155 

1156 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1157 raise RuntimeError(self._read_only_message) 

1158 

1159 def delete(self, key: bytes, value: bytes) -> None: 

1160 raise RuntimeError(self._read_only_message) 

1161 

1162 

1163class BackgroundWriteDatabase(ExampleDatabase): 

1164 """A wrapper which defers writes on the given database to a background thread. 

1165 

1166 Calls to :meth:`~hypothesis.database.ExampleDatabase.fetch` wait for any 

1167 enqueued writes to finish before fetching from the database. 

1168 """ 

1169 

1170 def __init__(self, db: ExampleDatabase) -> None: 

1171 super().__init__() 

1172 self._db = db 

1173 self._queue: Queue[tuple[str, tuple[bytes, ...]]] = Queue() 

1174 self._thread: Thread | None = None 

1175 

1176 def _ensure_thread(self): 

1177 if self._thread is None: 

1178 self._thread = Thread(target=self._worker, daemon=True) 

1179 self._thread.start() 

1180 # avoid an unbounded timeout during gc. 0.1 should be plenty for most 

1181 # use cases. 

1182 weakref.finalize(self, self._join, 0.1) 

1183 

1184 def __repr__(self) -> str: 

1185 return f"BackgroundWriteDatabase({self._db!r})" 

1186 

1187 def __eq__(self, other: object) -> bool: 

1188 return isinstance(other, BackgroundWriteDatabase) and self._db == other._db 

1189 

1190 def _worker(self) -> None: 

1191 while True: 

1192 method, args = self._queue.get() 

1193 getattr(self._db, method)(*args) 

1194 self._queue.task_done() 

1195 

1196 def _join(self, timeout: float | None = None) -> None: 

1197 # copy of Queue.join with a timeout. https://bugs.python.org/issue9634 

1198 with self._queue.all_tasks_done: 

1199 while self._queue.unfinished_tasks: 

1200 self._queue.all_tasks_done.wait(timeout) 

1201 

1202 def fetch(self, key: bytes) -> Iterable[bytes]: 

1203 self._join() 

1204 return self._db.fetch(key) 

1205 

1206 def save(self, key: bytes, value: bytes) -> None: 

1207 self._ensure_thread() 

1208 self._queue.put(("save", (key, value))) 

1209 

1210 def delete(self, key: bytes, value: bytes) -> None: 

1211 self._ensure_thread() 

1212 self._queue.put(("delete", (key, value))) 

1213 

1214 def move(self, src: bytes, dest: bytes, value: bytes) -> None: 

1215 self._ensure_thread() 

1216 self._queue.put(("move", (src, dest, value))) 

1217 

1218 def _start_listening(self) -> None: 

1219 self._db.add_listener(self._broadcast_change) 

1220 

1221 def _stop_listening(self) -> None: 

1222 self._db.remove_listener(self._broadcast_change) 

1223 

1224 

1225def _pack_uleb128(value: int) -> bytes: 

1226 """ 

1227 Serialize an integer into variable-length bytes. For each byte, the first 7 

1228 bits represent (part of) the integer, while the last bit indicates whether the 

1229 integer continues into the next byte. 

1230 

1231 https://en.wikipedia.org/wiki/LEB128 

1232 """ 

1233 parts = bytearray() 

1234 assert value >= 0 

1235 while True: 

1236 # chop off 7 bits 

1237 byte = value & ((1 << 7) - 1) 

1238 value >>= 7 

1239 # set the continuation bit if we have more left 

1240 if value: 

1241 byte |= 1 << 7 

1242 

1243 parts.append(byte) 

1244 if not value: 

1245 break 

1246 return bytes(parts) 

1247 

1248 

1249def _unpack_uleb128(buffer: bytes) -> tuple[int, int]: 

1250 """ 

1251 Inverts _pack_uleb128, and also returns the index at which at which we stopped 

1252 reading. 

1253 """ 

1254 value = 0 

1255 for i, byte in enumerate(buffer): 

1256 n = byte & ((1 << 7) - 1) 

1257 value |= n << (i * 7) 

1258 

1259 if not byte >> 7: 

1260 break 

1261 return (i + 1, value) 

1262 

1263 

1264def choices_to_bytes(choices: Iterable[ChoiceT], /) -> bytes: 

1265 """Serialize a list of choices to a bytestring. Inverts choices_from_bytes.""" 

1266 # We use a custom serialization format for this, which might seem crazy - but our 

1267 # data is a flat sequence of elements, and standard tools like protobuf or msgpack 

1268 # don't deal well with e.g. nonstandard bit-pattern-NaNs, or invalid-utf8 unicode. 

1269 # 

1270 # We simply encode each element with a metadata byte, if needed a uint16 size, and 

1271 # then the payload bytes. For booleans, the payload is inlined into the metadata. 

1272 parts = [] 

1273 for choice in choices: 

1274 if isinstance(choice, bool): 

1275 # `000_0000v` - tag zero, low bit payload. 

1276 parts.append(b"\1" if choice else b"\0") 

1277 continue 

1278 

1279 # `tag_ssss [uint16 size?] [payload]` 

1280 if isinstance(choice, float): 

1281 tag = 1 << 5 

1282 choice = struct.pack("!d", choice) 

1283 elif isinstance(choice, int): 

1284 tag = 2 << 5 

1285 choice = choice.to_bytes(1 + choice.bit_length() // 8, "big", signed=True) 

1286 elif isinstance(choice, bytes): 

1287 tag = 3 << 5 

1288 else: 

1289 assert isinstance(choice, str) 

1290 tag = 4 << 5 

1291 choice = choice.encode(errors="surrogatepass") 

1292 

1293 size = len(choice) 

1294 if size < 0b11111: 

1295 parts.append((tag | size).to_bytes(1, "big")) 

1296 else: 

1297 parts.append((tag | 0b11111).to_bytes(1, "big")) 

1298 parts.append(_pack_uleb128(size)) 

1299 parts.append(choice) 

1300 

1301 return b"".join(parts) 

1302 

1303 

1304def _choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...]: 

1305 # See above for an explanation of the format. 

1306 parts: list[ChoiceT] = [] 

1307 idx = 0 

1308 while idx < len(buffer): 

1309 tag = buffer[idx] >> 5 

1310 size = buffer[idx] & 0b11111 

1311 idx += 1 

1312 

1313 if tag == 0: 

1314 parts.append(bool(size)) 

1315 continue 

1316 if size == 0b11111: 

1317 offset, size = _unpack_uleb128(buffer[idx:]) 

1318 idx += offset 

1319 chunk = buffer[idx : idx + size] 

1320 idx += size 

1321 

1322 if tag == 1: 

1323 assert size == 8, "expected float64" 

1324 parts.extend(struct.unpack("!d", chunk)) 

1325 elif tag == 2: 

1326 parts.append(int.from_bytes(chunk, "big", signed=True)) 

1327 elif tag == 3: 

1328 parts.append(chunk) 

1329 else: 

1330 assert tag == 4 

1331 parts.append(chunk.decode(errors="surrogatepass")) 

1332 return tuple(parts) 

1333 

1334 

1335def choices_from_bytes(buffer: bytes, /) -> tuple[ChoiceT, ...] | None: 

1336 """ 

1337 Deserialize a bytestring to a tuple of choices. Inverts choices_to_bytes. 

1338 

1339 Returns None if the given bytestring is not a valid serialization of choice 

1340 sequences. 

1341 """ 

1342 try: 

1343 return _choices_from_bytes(buffer) 

1344 except Exception: 

1345 # deserialization error, eg because our format changed or someone put junk 

1346 # data in the db. 

1347 return None