Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/caching.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

457 statements  

1from __future__ import annotations 

2 

3import collections 

4import functools 

5import logging 

6import math 

7import os 

8import threading 

9import warnings 

10from concurrent.futures import Future, ThreadPoolExecutor 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Callable, 

15 ClassVar, 

16 Generic, 

17 NamedTuple, 

18 Optional, 

19 OrderedDict, 

20 TypeVar, 

21) 

22 

23if TYPE_CHECKING: 

24 import mmap 

25 

26 from typing_extensions import ParamSpec 

27 

28 P = ParamSpec("P") 

29else: 

30 P = TypeVar("P") 

31 

32T = TypeVar("T") 

33 

34 

35logger = logging.getLogger("fsspec") 

36 

37Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes 

38 

39 

40class BaseCache: 

41 """Pass-though cache: doesn't keep anything, calls every time 

42 

43 Acts as base class for other cachers 

44 

45 Parameters 

46 ---------- 

47 blocksize: int 

48 How far to read ahead in numbers of bytes 

49 fetcher: func 

50 Function of the form f(start, end) which gets bytes from remote as 

51 specified 

52 size: int 

53 How big this file is 

54 """ 

55 

56 name: ClassVar[str] = "none" 

57 

58 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

59 self.blocksize = blocksize 

60 self.nblocks = 0 

61 self.fetcher = fetcher 

62 self.size = size 

63 self.hit_count = 0 

64 self.miss_count = 0 

65 # the bytes that we actually requested 

66 self.total_requested_bytes = 0 

67 

68 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

69 if start is None: 

70 start = 0 

71 if stop is None: 

72 stop = self.size 

73 if start >= self.size or start >= stop: 

74 return b"" 

75 return self.fetcher(start, stop) 

76 

77 def _reset_stats(self) -> None: 

78 """Reset hit and miss counts for a more ganular report e.g. by file.""" 

79 self.hit_count = 0 

80 self.miss_count = 0 

81 self.total_requested_bytes = 0 

82 

83 def _log_stats(self) -> str: 

84 """Return a formatted string of the cache statistics.""" 

85 if self.hit_count == 0 and self.miss_count == 0: 

86 # a cache that does nothing, this is for logs only 

87 return "" 

88 return " , %s: %d hits, %d misses, %d total requested bytes" % ( 

89 self.name, 

90 self.hit_count, 

91 self.miss_count, 

92 self.total_requested_bytes, 

93 ) 

94 

95 def __repr__(self) -> str: 

96 # TODO: use rich for better formatting 

97 return f""" 

98 <{self.__class__.__name__}: 

99 block size : {self.blocksize} 

100 block count : {self.nblocks} 

101 file size : {self.size} 

102 cache hits : {self.hit_count} 

103 cache misses: {self.miss_count} 

104 total requested bytes: {self.total_requested_bytes}> 

105 """ 

106 

107 

108class MMapCache(BaseCache): 

109 """memory-mapped sparse file cache 

110 

111 Opens temporary file, which is filled blocks-wise when data is requested. 

112 Ensure there is enough disc space in the temporary location. 

113 

114 This cache method might only work on posix 

115 """ 

116 

117 name = "mmap" 

118 

119 def __init__( 

120 self, 

121 blocksize: int, 

122 fetcher: Fetcher, 

123 size: int, 

124 location: str | None = None, 

125 blocks: set[int] | None = None, 

126 ) -> None: 

127 super().__init__(blocksize, fetcher, size) 

128 self.blocks = set() if blocks is None else blocks 

129 self.location = location 

130 self.cache = self._makefile() 

131 

132 def _makefile(self) -> mmap.mmap | bytearray: 

133 import mmap 

134 import tempfile 

135 

136 if self.size == 0: 

137 return bytearray() 

138 

139 # posix version 

140 if self.location is None or not os.path.exists(self.location): 

141 if self.location is None: 

142 fd = tempfile.TemporaryFile() 

143 self.blocks = set() 

144 else: 

145 fd = open(self.location, "wb+") 

146 fd.seek(self.size - 1) 

147 fd.write(b"1") 

148 fd.flush() 

149 else: 

150 fd = open(self.location, "r+b") 

151 

152 return mmap.mmap(fd.fileno(), self.size) 

153 

154 def _fetch(self, start: int | None, end: int | None) -> bytes: 

155 logger.debug(f"MMap cache fetching {start}-{end}") 

156 if start is None: 

157 start = 0 

158 if end is None: 

159 end = self.size 

160 if start >= self.size or start >= end: 

161 return b"" 

162 start_block = start // self.blocksize 

163 end_block = end // self.blocksize 

164 need = [i for i in range(start_block, end_block + 1) if i not in self.blocks] 

165 hits = [i for i in range(start_block, end_block + 1) if i in self.blocks] 

166 self.miss_count += len(need) 

167 self.hit_count += len(hits) 

168 while need: 

169 # TODO: not a for loop so we can consolidate blocks later to 

170 # make fewer fetch calls; this could be parallel 

171 i = need.pop(0) 

172 

173 sstart = i * self.blocksize 

174 send = min(sstart + self.blocksize, self.size) 

175 self.total_requested_bytes += send - sstart 

176 logger.debug(f"MMap get block #{i} ({sstart}-{send})") 

177 self.cache[sstart:send] = self.fetcher(sstart, send) 

178 self.blocks.add(i) 

179 

180 return self.cache[start:end] 

181 

182 def __getstate__(self) -> dict[str, Any]: 

183 state = self.__dict__.copy() 

184 # Remove the unpicklable entries. 

185 del state["cache"] 

186 return state 

187 

188 def __setstate__(self, state: dict[str, Any]) -> None: 

189 # Restore instance attributes 

190 self.__dict__.update(state) 

191 self.cache = self._makefile() 

192 

193 

194class ReadAheadCache(BaseCache): 

195 """Cache which reads only when we get beyond a block of data 

196 

197 This is a much simpler version of BytesCache, and does not attempt to 

198 fill holes in the cache or keep fragments alive. It is best suited to 

199 many small reads in a sequential order (e.g., reading lines from a file). 

200 """ 

201 

202 name = "readahead" 

203 

204 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

205 super().__init__(blocksize, fetcher, size) 

206 self.cache = b"" 

207 self.start = 0 

208 self.end = 0 

209 

210 def _fetch(self, start: int | None, end: int | None) -> bytes: 

211 if start is None: 

212 start = 0 

213 if end is None or end > self.size: 

214 end = self.size 

215 if start >= self.size or start >= end: 

216 return b"" 

217 l = end - start 

218 if start >= self.start and end <= self.end: 

219 # cache hit 

220 self.hit_count += 1 

221 return self.cache[start - self.start : end - self.start] 

222 elif self.start <= start < self.end: 

223 # partial hit 

224 self.miss_count += 1 

225 part = self.cache[start - self.start :] 

226 l -= len(part) 

227 start = self.end 

228 else: 

229 # miss 

230 self.miss_count += 1 

231 part = b"" 

232 end = min(self.size, end + self.blocksize) 

233 self.total_requested_bytes += end - start 

234 self.cache = self.fetcher(start, end) # new block replaces old 

235 self.start = start 

236 self.end = self.start + len(self.cache) 

237 return part + self.cache[:l] 

238 

239 

240class FirstChunkCache(BaseCache): 

241 """Caches the first block of a file only 

242 

243 This may be useful for file types where the metadata is stored in the header, 

244 but is randomly accessed. 

245 """ 

246 

247 name = "first" 

248 

249 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

250 if blocksize > size: 

251 # this will buffer the whole thing 

252 blocksize = size 

253 super().__init__(blocksize, fetcher, size) 

254 self.cache: bytes | None = None 

255 

256 def _fetch(self, start: int | None, end: int | None) -> bytes: 

257 start = start or 0 

258 if start > self.size: 

259 logger.debug("FirstChunkCache: requested start > file size") 

260 return b"" 

261 

262 end = min(end, self.size) 

263 

264 if start < self.blocksize: 

265 if self.cache is None: 

266 self.miss_count += 1 

267 if end > self.blocksize: 

268 self.total_requested_bytes += end 

269 data = self.fetcher(0, end) 

270 self.cache = data[: self.blocksize] 

271 return data[start:] 

272 self.cache = self.fetcher(0, self.blocksize) 

273 self.total_requested_bytes += self.blocksize 

274 part = self.cache[start:end] 

275 if end > self.blocksize: 

276 self.total_requested_bytes += end - self.blocksize 

277 part += self.fetcher(self.blocksize, end) 

278 self.hit_count += 1 

279 return part 

280 else: 

281 self.miss_count += 1 

282 self.total_requested_bytes += end - start 

283 return self.fetcher(start, end) 

284 

285 

286class BlockCache(BaseCache): 

287 """ 

288 Cache holding memory as a set of blocks. 

289 

290 Requests are only ever made ``blocksize`` at a time, and are 

291 stored in an LRU cache. The least recently accessed block is 

292 discarded when more than ``maxblocks`` are stored. 

293 

294 Parameters 

295 ---------- 

296 blocksize : int 

297 The number of bytes to store in each block. 

298 Requests are only ever made for ``blocksize``, so this 

299 should balance the overhead of making a request against 

300 the granularity of the blocks. 

301 fetcher : Callable 

302 size : int 

303 The total size of the file being cached. 

304 maxblocks : int 

305 The maximum number of blocks to cache for. The maximum memory 

306 use for this cache is then ``blocksize * maxblocks``. 

307 """ 

308 

309 name = "blockcache" 

310 

311 def __init__( 

312 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

313 ) -> None: 

314 super().__init__(blocksize, fetcher, size) 

315 self.nblocks = math.ceil(size / blocksize) 

316 self.maxblocks = maxblocks 

317 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block) 

318 

319 def cache_info(self): 

320 """ 

321 The statistics on the block cache. 

322 

323 Returns 

324 ------- 

325 NamedTuple 

326 Returned directly from the LRU Cache used internally. 

327 """ 

328 return self._fetch_block_cached.cache_info() 

329 

330 def __getstate__(self) -> dict[str, Any]: 

331 state = self.__dict__ 

332 del state["_fetch_block_cached"] 

333 return state 

334 

335 def __setstate__(self, state: dict[str, Any]) -> None: 

336 self.__dict__.update(state) 

337 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])( 

338 self._fetch_block 

339 ) 

340 

341 def _fetch(self, start: int | None, end: int | None) -> bytes: 

342 if start is None: 

343 start = 0 

344 if end is None: 

345 end = self.size 

346 if start >= self.size or start >= end: 

347 return b"" 

348 

349 # byte position -> block numbers 

350 start_block_number = start // self.blocksize 

351 end_block_number = end // self.blocksize 

352 

353 # these are cached, so safe to do multiple calls for the same start and end. 

354 for block_number in range(start_block_number, end_block_number + 1): 

355 self._fetch_block_cached(block_number) 

356 

357 return self._read_cache( 

358 start, 

359 end, 

360 start_block_number=start_block_number, 

361 end_block_number=end_block_number, 

362 ) 

363 

364 def _fetch_block(self, block_number: int) -> bytes: 

365 """ 

366 Fetch the block of data for `block_number`. 

367 """ 

368 if block_number > self.nblocks: 

369 raise ValueError( 

370 f"'block_number={block_number}' is greater than " 

371 f"the number of blocks ({self.nblocks})" 

372 ) 

373 

374 start = block_number * self.blocksize 

375 end = start + self.blocksize 

376 self.total_requested_bytes += end - start 

377 self.miss_count += 1 

378 logger.info("BlockCache fetching block %d", block_number) 

379 block_contents = super()._fetch(start, end) 

380 return block_contents 

381 

382 def _read_cache( 

383 self, start: int, end: int, start_block_number: int, end_block_number: int 

384 ) -> bytes: 

385 """ 

386 Read from our block cache. 

387 

388 Parameters 

389 ---------- 

390 start, end : int 

391 The start and end byte positions. 

392 start_block_number, end_block_number : int 

393 The start and end block numbers. 

394 """ 

395 start_pos = start % self.blocksize 

396 end_pos = end % self.blocksize 

397 

398 self.hit_count += 1 

399 if start_block_number == end_block_number: 

400 block: bytes = self._fetch_block_cached(start_block_number) 

401 return block[start_pos:end_pos] 

402 

403 else: 

404 # read from the initial 

405 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

406 

407 # intermediate blocks 

408 # Note: it'd be nice to combine these into one big request. However 

409 # that doesn't play nicely with our LRU cache. 

410 out.extend( 

411 map( 

412 self._fetch_block_cached, 

413 range(start_block_number + 1, end_block_number), 

414 ) 

415 ) 

416 

417 # final block 

418 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

419 

420 return b"".join(out) 

421 

422 

423class BytesCache(BaseCache): 

424 """Cache which holds data in a in-memory bytes object 

425 

426 Implements read-ahead by the block size, for semi-random reads progressing 

427 through the file. 

428 

429 Parameters 

430 ---------- 

431 trim: bool 

432 As we read more data, whether to discard the start of the buffer when 

433 we are more than a blocksize ahead of it. 

434 """ 

435 

436 name: ClassVar[str] = "bytes" 

437 

438 def __init__( 

439 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True 

440 ) -> None: 

441 super().__init__(blocksize, fetcher, size) 

442 self.cache = b"" 

443 self.start: int | None = None 

444 self.end: int | None = None 

445 self.trim = trim 

446 

447 def _fetch(self, start: int | None, end: int | None) -> bytes: 

448 # TODO: only set start/end after fetch, in case it fails? 

449 # is this where retry logic might go? 

450 if start is None: 

451 start = 0 

452 if end is None: 

453 end = self.size 

454 if start >= self.size or start >= end: 

455 return b"" 

456 if ( 

457 self.start is not None 

458 and start >= self.start 

459 and self.end is not None 

460 and end < self.end 

461 ): 

462 # cache hit: we have all the required data 

463 offset = start - self.start 

464 self.hit_count += 1 

465 return self.cache[offset : offset + end - start] 

466 

467 if self.blocksize: 

468 bend = min(self.size, end + self.blocksize) 

469 else: 

470 bend = end 

471 

472 if bend == start or start > self.size: 

473 return b"" 

474 

475 if (self.start is None or start < self.start) and ( 

476 self.end is None or end > self.end 

477 ): 

478 # First read, or extending both before and after 

479 self.total_requested_bytes += bend - start 

480 self.miss_count += 1 

481 self.cache = self.fetcher(start, bend) 

482 self.start = start 

483 else: 

484 assert self.start is not None 

485 assert self.end is not None 

486 self.miss_count += 1 

487 

488 if start < self.start: 

489 if self.end is None or self.end - end > self.blocksize: 

490 self.total_requested_bytes += bend - start 

491 self.cache = self.fetcher(start, bend) 

492 self.start = start 

493 else: 

494 self.total_requested_bytes += self.start - start 

495 new = self.fetcher(start, self.start) 

496 self.start = start 

497 self.cache = new + self.cache 

498 elif self.end is not None and bend > self.end: 

499 if self.end > self.size: 

500 pass 

501 elif end - self.end > self.blocksize: 

502 self.total_requested_bytes += bend - start 

503 self.cache = self.fetcher(start, bend) 

504 self.start = start 

505 else: 

506 self.total_requested_bytes += bend - self.end 

507 new = self.fetcher(self.end, bend) 

508 self.cache = self.cache + new 

509 

510 self.end = self.start + len(self.cache) 

511 offset = start - self.start 

512 out = self.cache[offset : offset + end - start] 

513 if self.trim: 

514 num = (self.end - self.start) // (self.blocksize + 1) 

515 if num > 1: 

516 self.start += self.blocksize * num 

517 self.cache = self.cache[self.blocksize * num :] 

518 return out 

519 

520 def __len__(self) -> int: 

521 return len(self.cache) 

522 

523 

524class AllBytes(BaseCache): 

525 """Cache entire contents of the file""" 

526 

527 name: ClassVar[str] = "all" 

528 

529 def __init__( 

530 self, 

531 blocksize: int | None = None, 

532 fetcher: Fetcher | None = None, 

533 size: int | None = None, 

534 data: bytes | None = None, 

535 ) -> None: 

536 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type] 

537 if data is None: 

538 self.miss_count += 1 

539 self.total_requested_bytes += self.size 

540 data = self.fetcher(0, self.size) 

541 self.data = data 

542 

543 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

544 self.hit_count += 1 

545 return self.data[start:stop] 

546 

547 

548class KnownPartsOfAFile(BaseCache): 

549 """ 

550 Cache holding known file parts. 

551 

552 Parameters 

553 ---------- 

554 blocksize: int 

555 How far to read ahead in numbers of bytes 

556 fetcher: func 

557 Function of the form f(start, end) which gets bytes from remote as 

558 specified 

559 size: int 

560 How big this file is 

561 data: dict 

562 A dictionary mapping explicit `(start, stop)` file-offset tuples 

563 with known bytes. 

564 strict: bool, default True 

565 Whether to fetch reads that go beyond a known byte-range boundary. 

566 If `False`, any read that ends outside a known part will be zero 

567 padded. Note that zero padding will not be used for reads that 

568 begin outside a known byte-range. 

569 """ 

570 

571 name: ClassVar[str] = "parts" 

572 

573 def __init__( 

574 self, 

575 blocksize: int, 

576 fetcher: Fetcher, 

577 size: int, 

578 data: Optional[dict[tuple[int, int], bytes]] = None, 

579 strict: bool = True, 

580 **_: Any, 

581 ): 

582 super().__init__(blocksize, fetcher, size) 

583 self.strict = strict 

584 

585 # simple consolidation of contiguous blocks 

586 if data: 

587 old_offsets = sorted(data.keys()) 

588 offsets = [old_offsets[0]] 

589 blocks = [data.pop(old_offsets[0])] 

590 for start, stop in old_offsets[1:]: 

591 start0, stop0 = offsets[-1] 

592 if start == stop0: 

593 offsets[-1] = (start0, stop) 

594 blocks[-1] += data.pop((start, stop)) 

595 else: 

596 offsets.append((start, stop)) 

597 blocks.append(data.pop((start, stop))) 

598 

599 self.data = dict(zip(offsets, blocks)) 

600 else: 

601 self.data = {} 

602 

603 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

604 if start is None: 

605 start = 0 

606 if stop is None: 

607 stop = self.size 

608 

609 out = b"" 

610 for (loc0, loc1), data in self.data.items(): 

611 # If self.strict=False, use zero-padded data 

612 # for reads beyond the end of a "known" buffer 

613 if loc0 <= start < loc1: 

614 off = start - loc0 

615 out = data[off : off + stop - start] 

616 if not self.strict or loc0 <= stop <= loc1: 

617 # The request is within a known range, or 

618 # it begins within a known range, and we 

619 # are allowed to pad reads beyond the 

620 # buffer with zero 

621 out += b"\x00" * (stop - start - len(out)) 

622 self.hit_count += 1 

623 return out 

624 else: 

625 # The request ends outside a known range, 

626 # and we are being "strict" about reads 

627 # beyond the buffer 

628 start = loc1 

629 break 

630 

631 # We only get here if there is a request outside the 

632 # known parts of the file. In an ideal world, this 

633 # should never happen 

634 if self.fetcher is None: 

635 # We cannot fetch the data, so raise an error 

636 raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ") 

637 # We can fetch the data, but should warn the user 

638 # that this may be slow 

639 warnings.warn( 

640 f"Read is outside the known file parts: {(start, stop)}. " 

641 f"IO/caching performance may be poor!" 

642 ) 

643 logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}") 

644 self.total_requested_bytes += stop - start 

645 self.miss_count += 1 

646 return out + super()._fetch(start, stop) 

647 

648 

649class UpdatableLRU(Generic[P, T]): 

650 """ 

651 Custom implementation of LRU cache that allows updating keys 

652 

653 Used by BackgroudBlockCache 

654 """ 

655 

656 class CacheInfo(NamedTuple): 

657 hits: int 

658 misses: int 

659 maxsize: int 

660 currsize: int 

661 

662 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None: 

663 self._cache: OrderedDict[Any, T] = collections.OrderedDict() 

664 self._func = func 

665 self._max_size = max_size 

666 self._hits = 0 

667 self._misses = 0 

668 self._lock = threading.Lock() 

669 

670 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: 

671 if kwargs: 

672 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}") 

673 with self._lock: 

674 if args in self._cache: 

675 self._cache.move_to_end(args) 

676 self._hits += 1 

677 return self._cache[args] 

678 

679 result = self._func(*args, **kwargs) 

680 

681 with self._lock: 

682 self._cache[args] = result 

683 self._misses += 1 

684 if len(self._cache) > self._max_size: 

685 self._cache.popitem(last=False) 

686 

687 return result 

688 

689 def is_key_cached(self, *args: Any) -> bool: 

690 with self._lock: 

691 return args in self._cache 

692 

693 def add_key(self, result: T, *args: Any) -> None: 

694 with self._lock: 

695 self._cache[args] = result 

696 if len(self._cache) > self._max_size: 

697 self._cache.popitem(last=False) 

698 

699 def cache_info(self) -> UpdatableLRU.CacheInfo: 

700 with self._lock: 

701 return self.CacheInfo( 

702 maxsize=self._max_size, 

703 currsize=len(self._cache), 

704 hits=self._hits, 

705 misses=self._misses, 

706 ) 

707 

708 

709class BackgroundBlockCache(BaseCache): 

710 """ 

711 Cache holding memory as a set of blocks with pre-loading of 

712 the next block in the background. 

713 

714 Requests are only ever made ``blocksize`` at a time, and are 

715 stored in an LRU cache. The least recently accessed block is 

716 discarded when more than ``maxblocks`` are stored. If the 

717 next block is not in cache, it is loaded in a separate thread 

718 in non-blocking way. 

719 

720 Parameters 

721 ---------- 

722 blocksize : int 

723 The number of bytes to store in each block. 

724 Requests are only ever made for ``blocksize``, so this 

725 should balance the overhead of making a request against 

726 the granularity of the blocks. 

727 fetcher : Callable 

728 size : int 

729 The total size of the file being cached. 

730 maxblocks : int 

731 The maximum number of blocks to cache for. The maximum memory 

732 use for this cache is then ``blocksize * maxblocks``. 

733 """ 

734 

735 name: ClassVar[str] = "background" 

736 

737 def __init__( 

738 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

739 ) -> None: 

740 super().__init__(blocksize, fetcher, size) 

741 self.nblocks = math.ceil(size / blocksize) 

742 self.maxblocks = maxblocks 

743 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks) 

744 

745 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

746 self._fetch_future_block_number: int | None = None 

747 self._fetch_future: Future[bytes] | None = None 

748 self._fetch_future_lock = threading.Lock() 

749 

750 def cache_info(self) -> UpdatableLRU.CacheInfo: 

751 """ 

752 The statistics on the block cache. 

753 

754 Returns 

755 ------- 

756 NamedTuple 

757 Returned directly from the LRU Cache used internally. 

758 """ 

759 return self._fetch_block_cached.cache_info() 

760 

761 def __getstate__(self) -> dict[str, Any]: 

762 state = self.__dict__ 

763 del state["_fetch_block_cached"] 

764 del state["_thread_executor"] 

765 del state["_fetch_future_block_number"] 

766 del state["_fetch_future"] 

767 del state["_fetch_future_lock"] 

768 return state 

769 

770 def __setstate__(self, state) -> None: 

771 self.__dict__.update(state) 

772 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"]) 

773 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

774 self._fetch_future_block_number = None 

775 self._fetch_future = None 

776 self._fetch_future_lock = threading.Lock() 

777 

778 def _fetch(self, start: int | None, end: int | None) -> bytes: 

779 if start is None: 

780 start = 0 

781 if end is None: 

782 end = self.size 

783 if start >= self.size or start >= end: 

784 return b"" 

785 

786 # byte position -> block numbers 

787 start_block_number = start // self.blocksize 

788 end_block_number = end // self.blocksize 

789 

790 fetch_future_block_number = None 

791 fetch_future = None 

792 with self._fetch_future_lock: 

793 # Background thread is running. Check we we can or must join it. 

794 if self._fetch_future is not None: 

795 assert self._fetch_future_block_number is not None 

796 if self._fetch_future.done(): 

797 logger.info("BlockCache joined background fetch without waiting.") 

798 self._fetch_block_cached.add_key( 

799 self._fetch_future.result(), self._fetch_future_block_number 

800 ) 

801 # Cleanup the fetch variables. Done with fetching the block. 

802 self._fetch_future_block_number = None 

803 self._fetch_future = None 

804 else: 

805 # Must join if we need the block for the current fetch 

806 must_join = bool( 

807 start_block_number 

808 <= self._fetch_future_block_number 

809 <= end_block_number 

810 ) 

811 if must_join: 

812 # Copy to the local variables to release lock 

813 # before waiting for result 

814 fetch_future_block_number = self._fetch_future_block_number 

815 fetch_future = self._fetch_future 

816 

817 # Cleanup the fetch variables. Have a local copy. 

818 self._fetch_future_block_number = None 

819 self._fetch_future = None 

820 

821 # Need to wait for the future for the current read 

822 if fetch_future is not None: 

823 logger.info("BlockCache waiting for background fetch.") 

824 # Wait until result and put it in cache 

825 self._fetch_block_cached.add_key( 

826 fetch_future.result(), fetch_future_block_number 

827 ) 

828 

829 # these are cached, so safe to do multiple calls for the same start and end. 

830 for block_number in range(start_block_number, end_block_number + 1): 

831 self._fetch_block_cached(block_number) 

832 

833 # fetch next block in the background if nothing is running in the background, 

834 # the block is within file and it is not already cached 

835 end_block_plus_1 = end_block_number + 1 

836 with self._fetch_future_lock: 

837 if ( 

838 self._fetch_future is None 

839 and end_block_plus_1 <= self.nblocks 

840 and not self._fetch_block_cached.is_key_cached(end_block_plus_1) 

841 ): 

842 self._fetch_future_block_number = end_block_plus_1 

843 self._fetch_future = self._thread_executor.submit( 

844 self._fetch_block, end_block_plus_1, "async" 

845 ) 

846 

847 return self._read_cache( 

848 start, 

849 end, 

850 start_block_number=start_block_number, 

851 end_block_number=end_block_number, 

852 ) 

853 

854 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes: 

855 """ 

856 Fetch the block of data for `block_number`. 

857 """ 

858 if block_number > self.nblocks: 

859 raise ValueError( 

860 f"'block_number={block_number}' is greater than " 

861 f"the number of blocks ({self.nblocks})" 

862 ) 

863 

864 start = block_number * self.blocksize 

865 end = start + self.blocksize 

866 logger.info("BlockCache fetching block (%s) %d", log_info, block_number) 

867 self.total_requested_bytes += end - start 

868 self.miss_count += 1 

869 block_contents = super()._fetch(start, end) 

870 return block_contents 

871 

872 def _read_cache( 

873 self, start: int, end: int, start_block_number: int, end_block_number: int 

874 ) -> bytes: 

875 """ 

876 Read from our block cache. 

877 

878 Parameters 

879 ---------- 

880 start, end : int 

881 The start and end byte positions. 

882 start_block_number, end_block_number : int 

883 The start and end block numbers. 

884 """ 

885 start_pos = start % self.blocksize 

886 end_pos = end % self.blocksize 

887 

888 # kind of pointless to count this as a hit, but it is 

889 self.hit_count += 1 

890 

891 if start_block_number == end_block_number: 

892 block = self._fetch_block_cached(start_block_number) 

893 return block[start_pos:end_pos] 

894 

895 else: 

896 # read from the initial 

897 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

898 

899 # intermediate blocks 

900 # Note: it'd be nice to combine these into one big request. However 

901 # that doesn't play nicely with our LRU cache. 

902 out.extend( 

903 map( 

904 self._fetch_block_cached, 

905 range(start_block_number + 1, end_block_number), 

906 ) 

907 ) 

908 

909 # final block 

910 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

911 

912 return b"".join(out) 

913 

914 

915caches: dict[str | None, type[BaseCache]] = { 

916 # one custom case 

917 None: BaseCache, 

918} 

919 

920 

921def register_cache(cls: type[BaseCache], clobber: bool = False) -> None: 

922 """'Register' cache implementation. 

923 

924 Parameters 

925 ---------- 

926 clobber: bool, optional 

927 If set to True (default is False) - allow to overwrite existing 

928 entry. 

929 

930 Raises 

931 ------ 

932 ValueError 

933 """ 

934 name = cls.name 

935 if not clobber and name in caches: 

936 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}") 

937 caches[name] = cls 

938 

939 

940for c in ( 

941 BaseCache, 

942 MMapCache, 

943 BytesCache, 

944 ReadAheadCache, 

945 BlockCache, 

946 FirstChunkCache, 

947 AllBytes, 

948 KnownPartsOfAFile, 

949 BackgroundBlockCache, 

950): 

951 register_cache(c)