Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/caching.py: 21%

411 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1from __future__ import annotations 

2 

3import collections 

4import functools 

5import logging 

6import math 

7import os 

8import threading 

9import warnings 

10from concurrent.futures import Future, ThreadPoolExecutor 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Callable, 

15 ClassVar, 

16 Generic, 

17 NamedTuple, 

18 OrderedDict, 

19 TypeVar, 

20) 

21 

22if TYPE_CHECKING: 

23 import mmap 

24 

25 from typing_extensions import ParamSpec 

26 

27 P = ParamSpec("P") 

28else: 

29 P = TypeVar("P") 

30 

31T = TypeVar("T") 

32 

33 

34logger = logging.getLogger("fsspec") 

35 

36Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes 

37 

38 

39class BaseCache: 

40 """Pass-though cache: doesn't keep anything, calls every time 

41 

42 Acts as base class for other cachers 

43 

44 Parameters 

45 ---------- 

46 blocksize: int 

47 How far to read ahead in numbers of bytes 

48 fetcher: func 

49 Function of the form f(start, end) which gets bytes from remote as 

50 specified 

51 size: int 

52 How big this file is 

53 """ 

54 

55 name: ClassVar[str] = "none" 

56 

57 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

58 self.blocksize = blocksize 

59 self.fetcher = fetcher 

60 self.size = size 

61 

62 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

63 if start is None: 

64 start = 0 

65 if stop is None: 

66 stop = self.size 

67 if start >= self.size or start >= stop: 

68 return b"" 

69 return self.fetcher(start, stop) 

70 

71 

72class MMapCache(BaseCache): 

73 """memory-mapped sparse file cache 

74 

75 Opens temporary file, which is filled blocks-wise when data is requested. 

76 Ensure there is enough disc space in the temporary location. 

77 

78 This cache method might only work on posix 

79 """ 

80 

81 name = "mmap" 

82 

83 def __init__( 

84 self, 

85 blocksize: int, 

86 fetcher: Fetcher, 

87 size: int, 

88 location: str | None = None, 

89 blocks: set[int] | None = None, 

90 ) -> None: 

91 super().__init__(blocksize, fetcher, size) 

92 self.blocks = set() if blocks is None else blocks 

93 self.location = location 

94 self.cache = self._makefile() 

95 

96 def _makefile(self) -> mmap.mmap | bytearray: 

97 import mmap 

98 import tempfile 

99 

100 if self.size == 0: 

101 return bytearray() 

102 

103 # posix version 

104 if self.location is None or not os.path.exists(self.location): 

105 if self.location is None: 

106 fd = tempfile.TemporaryFile() 

107 self.blocks = set() 

108 else: 

109 fd = open(self.location, "wb+") 

110 fd.seek(self.size - 1) 

111 fd.write(b"1") 

112 fd.flush() 

113 else: 

114 fd = open(self.location, "r+b") 

115 

116 return mmap.mmap(fd.fileno(), self.size) 

117 

118 def _fetch(self, start: int | None, end: int | None) -> bytes: 

119 logger.debug(f"MMap cache fetching {start}-{end}") 

120 if start is None: 

121 start = 0 

122 if end is None: 

123 end = self.size 

124 if start >= self.size or start >= end: 

125 return b"" 

126 start_block = start // self.blocksize 

127 end_block = end // self.blocksize 

128 need = [i for i in range(start_block, end_block + 1) if i not in self.blocks] 

129 while need: 

130 # TODO: not a for loop so we can consolidate blocks later to 

131 # make fewer fetch calls; this could be parallel 

132 i = need.pop(0) 

133 sstart = i * self.blocksize 

134 send = min(sstart + self.blocksize, self.size) 

135 logger.debug(f"MMap get block #{i} ({sstart}-{send}") 

136 self.cache[sstart:send] = self.fetcher(sstart, send) 

137 self.blocks.add(i) 

138 

139 return self.cache[start:end] 

140 

141 def __getstate__(self) -> dict[str, Any]: 

142 state = self.__dict__.copy() 

143 # Remove the unpicklable entries. 

144 del state["cache"] 

145 return state 

146 

147 def __setstate__(self, state: dict[str, Any]) -> None: 

148 # Restore instance attributes 

149 self.__dict__.update(state) 

150 self.cache = self._makefile() 

151 

152 

153class ReadAheadCache(BaseCache): 

154 """Cache which reads only when we get beyond a block of data 

155 

156 This is a much simpler version of BytesCache, and does not attempt to 

157 fill holes in the cache or keep fragments alive. It is best suited to 

158 many small reads in a sequential order (e.g., reading lines from a file). 

159 """ 

160 

161 name = "readahead" 

162 

163 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

164 super().__init__(blocksize, fetcher, size) 

165 self.cache = b"" 

166 self.start = 0 

167 self.end = 0 

168 

169 def _fetch(self, start: int | None, end: int | None) -> bytes: 

170 if start is None: 

171 start = 0 

172 if end is None or end > self.size: 

173 end = self.size 

174 if start >= self.size or start >= end: 

175 return b"" 

176 l = end - start 

177 if start >= self.start and end <= self.end: 

178 # cache hit 

179 return self.cache[start - self.start : end - self.start] 

180 elif self.start <= start < self.end: 

181 # partial hit 

182 part = self.cache[start - self.start :] 

183 l -= len(part) 

184 start = self.end 

185 else: 

186 # miss 

187 part = b"" 

188 end = min(self.size, end + self.blocksize) 

189 self.cache = self.fetcher(start, end) # new block replaces old 

190 self.start = start 

191 self.end = self.start + len(self.cache) 

192 return part + self.cache[:l] 

193 

194 

195class FirstChunkCache(BaseCache): 

196 """Caches the first block of a file only 

197 

198 This may be useful for file types where the metadata is stored in the header, 

199 but is randomly accessed. 

200 """ 

201 

202 name = "first" 

203 

204 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

205 super().__init__(blocksize, fetcher, size) 

206 self.cache: bytes | None = None 

207 

208 def _fetch(self, start: int | None, end: int | None) -> bytes: 

209 start = start or 0 

210 end = end or self.size 

211 if start < self.blocksize: 

212 if self.cache is None: 

213 if end > self.blocksize: 

214 data = self.fetcher(0, end) 

215 self.cache = data[: self.blocksize] 

216 return data[start:] 

217 self.cache = self.fetcher(0, self.blocksize) 

218 part = self.cache[start:end] 

219 if end > self.blocksize: 

220 part += self.fetcher(self.blocksize, end) 

221 return part 

222 else: 

223 return self.fetcher(start, end) 

224 

225 

226class BlockCache(BaseCache): 

227 """ 

228 Cache holding memory as a set of blocks. 

229 

230 Requests are only ever made ``blocksize`` at a time, and are 

231 stored in an LRU cache. The least recently accessed block is 

232 discarded when more than ``maxblocks`` are stored. 

233 

234 Parameters 

235 ---------- 

236 blocksize : int 

237 The number of bytes to store in each block. 

238 Requests are only ever made for ``blocksize``, so this 

239 should balance the overhead of making a request against 

240 the granularity of the blocks. 

241 fetcher : Callable 

242 size : int 

243 The total size of the file being cached. 

244 maxblocks : int 

245 The maximum number of blocks to cache for. The maximum memory 

246 use for this cache is then ``blocksize * maxblocks``. 

247 """ 

248 

249 name = "blockcache" 

250 

251 def __init__( 

252 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

253 ) -> None: 

254 super().__init__(blocksize, fetcher, size) 

255 self.nblocks = math.ceil(size / blocksize) 

256 self.maxblocks = maxblocks 

257 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block) 

258 

259 def __repr__(self) -> str: 

260 return ( 

261 f"<BlockCache blocksize={self.blocksize}, " 

262 f"size={self.size}, nblocks={self.nblocks}>" 

263 ) 

264 

265 def cache_info(self): 

266 """ 

267 The statistics on the block cache. 

268 

269 Returns 

270 ------- 

271 NamedTuple 

272 Returned directly from the LRU Cache used internally. 

273 """ 

274 return self._fetch_block_cached.cache_info() 

275 

276 def __getstate__(self) -> dict[str, Any]: 

277 state = self.__dict__ 

278 del state["_fetch_block_cached"] 

279 return state 

280 

281 def __setstate__(self, state: dict[str, Any]) -> None: 

282 self.__dict__.update(state) 

283 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])( 

284 self._fetch_block 

285 ) 

286 

287 def _fetch(self, start: int | None, end: int | None) -> bytes: 

288 if start is None: 

289 start = 0 

290 if end is None: 

291 end = self.size 

292 if start >= self.size or start >= end: 

293 return b"" 

294 

295 # byte position -> block numbers 

296 start_block_number = start // self.blocksize 

297 end_block_number = end // self.blocksize 

298 

299 # these are cached, so safe to do multiple calls for the same start and end. 

300 for block_number in range(start_block_number, end_block_number + 1): 

301 self._fetch_block_cached(block_number) 

302 

303 return self._read_cache( 

304 start, 

305 end, 

306 start_block_number=start_block_number, 

307 end_block_number=end_block_number, 

308 ) 

309 

310 def _fetch_block(self, block_number: int) -> bytes: 

311 """ 

312 Fetch the block of data for `block_number`. 

313 """ 

314 if block_number > self.nblocks: 

315 raise ValueError( 

316 f"'block_number={block_number}' is greater than " 

317 f"the number of blocks ({self.nblocks})" 

318 ) 

319 

320 start = block_number * self.blocksize 

321 end = start + self.blocksize 

322 logger.info("BlockCache fetching block %d", block_number) 

323 block_contents = super()._fetch(start, end) 

324 return block_contents 

325 

326 def _read_cache( 

327 self, start: int, end: int, start_block_number: int, end_block_number: int 

328 ) -> bytes: 

329 """ 

330 Read from our block cache. 

331 

332 Parameters 

333 ---------- 

334 start, end : int 

335 The start and end byte positions. 

336 start_block_number, end_block_number : int 

337 The start and end block numbers. 

338 """ 

339 start_pos = start % self.blocksize 

340 end_pos = end % self.blocksize 

341 

342 if start_block_number == end_block_number: 

343 block: bytes = self._fetch_block_cached(start_block_number) 

344 return block[start_pos:end_pos] 

345 

346 else: 

347 # read from the initial 

348 out = [] 

349 out.append(self._fetch_block_cached(start_block_number)[start_pos:]) 

350 

351 # intermediate blocks 

352 # Note: it'd be nice to combine these into one big request. However 

353 # that doesn't play nicely with our LRU cache. 

354 for block_number in range(start_block_number + 1, end_block_number): 

355 out.append(self._fetch_block_cached(block_number)) 

356 

357 # final block 

358 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

359 

360 return b"".join(out) 

361 

362 

363class BytesCache(BaseCache): 

364 """Cache which holds data in a in-memory bytes object 

365 

366 Implements read-ahead by the block size, for semi-random reads progressing 

367 through the file. 

368 

369 Parameters 

370 ---------- 

371 trim: bool 

372 As we read more data, whether to discard the start of the buffer when 

373 we are more than a blocksize ahead of it. 

374 """ 

375 

376 name: ClassVar[str] = "bytes" 

377 

378 def __init__( 

379 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True 

380 ) -> None: 

381 super().__init__(blocksize, fetcher, size) 

382 self.cache = b"" 

383 self.start: int | None = None 

384 self.end: int | None = None 

385 self.trim = trim 

386 

387 def _fetch(self, start: int | None, end: int | None) -> bytes: 

388 # TODO: only set start/end after fetch, in case it fails? 

389 # is this where retry logic might go? 

390 if start is None: 

391 start = 0 

392 if end is None: 

393 end = self.size 

394 if start >= self.size or start >= end: 

395 return b"" 

396 if ( 

397 self.start is not None 

398 and start >= self.start 

399 and self.end is not None 

400 and end < self.end 

401 ): 

402 # cache hit: we have all the required data 

403 offset = start - self.start 

404 return self.cache[offset : offset + end - start] 

405 

406 if self.blocksize: 

407 bend = min(self.size, end + self.blocksize) 

408 else: 

409 bend = end 

410 

411 if bend == start or start > self.size: 

412 return b"" 

413 

414 if (self.start is None or start < self.start) and ( 

415 self.end is None or end > self.end 

416 ): 

417 # First read, or extending both before and after 

418 self.cache = self.fetcher(start, bend) 

419 self.start = start 

420 else: 

421 assert self.start is not None 

422 assert self.end is not None 

423 

424 if start < self.start: 

425 if self.end is None or self.end - end > self.blocksize: 

426 self.cache = self.fetcher(start, bend) 

427 self.start = start 

428 else: 

429 new = self.fetcher(start, self.start) 

430 self.start = start 

431 self.cache = new + self.cache 

432 elif self.end is not None and bend > self.end: 

433 if self.end > self.size: 

434 pass 

435 elif end - self.end > self.blocksize: 

436 self.cache = self.fetcher(start, bend) 

437 self.start = start 

438 else: 

439 new = self.fetcher(self.end, bend) 

440 self.cache = self.cache + new 

441 

442 self.end = self.start + len(self.cache) 

443 offset = start - self.start 

444 out = self.cache[offset : offset + end - start] 

445 if self.trim: 

446 num = (self.end - self.start) // (self.blocksize + 1) 

447 if num > 1: 

448 self.start += self.blocksize * num 

449 self.cache = self.cache[self.blocksize * num :] 

450 return out 

451 

452 def __len__(self) -> int: 

453 return len(self.cache) 

454 

455 

456class AllBytes(BaseCache): 

457 """Cache entire contents of the file""" 

458 

459 name: ClassVar[str] = "all" 

460 

461 def __init__( 

462 self, 

463 blocksize: int | None = None, 

464 fetcher: Fetcher | None = None, 

465 size: int | None = None, 

466 data: bytes | None = None, 

467 ) -> None: 

468 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type] 

469 if data is None: 

470 data = self.fetcher(0, self.size) 

471 self.data = data 

472 

473 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

474 return self.data[start:stop] 

475 

476 

477class KnownPartsOfAFile(BaseCache): 

478 """ 

479 Cache holding known file parts. 

480 

481 Parameters 

482 ---------- 

483 blocksize: int 

484 How far to read ahead in numbers of bytes 

485 fetcher: func 

486 Function of the form f(start, end) which gets bytes from remote as 

487 specified 

488 size: int 

489 How big this file is 

490 data: dict 

491 A dictionary mapping explicit `(start, stop)` file-offset tuples 

492 with known bytes. 

493 strict: bool, default True 

494 Whether to fetch reads that go beyond a known byte-range boundary. 

495 If `False`, any read that ends outside a known part will be zero 

496 padded. Note that zero padding will not be used for reads that 

497 begin outside a known byte-range. 

498 """ 

499 

500 name: ClassVar[str] = "parts" 

501 

502 def __init__( 

503 self, 

504 blocksize: int, 

505 fetcher: Fetcher, 

506 size: int, 

507 data: dict[tuple[int, int], bytes] = {}, 

508 strict: bool = True, 

509 **_: Any, 

510 ): 

511 super().__init__(blocksize, fetcher, size) 

512 self.strict = strict 

513 

514 # simple consolidation of contiguous blocks 

515 if data: 

516 old_offsets = sorted(data.keys()) 

517 offsets = [old_offsets[0]] 

518 blocks = [data.pop(old_offsets[0])] 

519 for start, stop in old_offsets[1:]: 

520 start0, stop0 = offsets[-1] 

521 if start == stop0: 

522 offsets[-1] = (start0, stop) 

523 blocks[-1] += data.pop((start, stop)) 

524 else: 

525 offsets.append((start, stop)) 

526 blocks.append(data.pop((start, stop))) 

527 

528 self.data = dict(zip(offsets, blocks)) 

529 else: 

530 self.data = data 

531 

532 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

533 if start is None: 

534 start = 0 

535 if stop is None: 

536 stop = self.size 

537 

538 out = b"" 

539 for (loc0, loc1), data in self.data.items(): 

540 # If self.strict=False, use zero-padded data 

541 # for reads beyond the end of a "known" buffer 

542 if loc0 <= start < loc1: 

543 off = start - loc0 

544 out = data[off : off + stop - start] 

545 if not self.strict or loc0 <= stop <= loc1: 

546 # The request is within a known range, or 

547 # it begins within a known range, and we 

548 # are allowed to pad reads beyond the 

549 # buffer with zero 

550 out += b"\x00" * (stop - start - len(out)) 

551 return out 

552 else: 

553 # The request ends outside a known range, 

554 # and we are being "strict" about reads 

555 # beyond the buffer 

556 start = loc1 

557 break 

558 

559 # We only get here if there is a request outside the 

560 # known parts of the file. In an ideal world, this 

561 # should never happen 

562 if self.fetcher is None: 

563 # We cannot fetch the data, so raise an error 

564 raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ") 

565 # We can fetch the data, but should warn the user 

566 # that this may be slow 

567 warnings.warn( 

568 f"Read is outside the known file parts: {(start, stop)}. " 

569 f"IO/caching performance may be poor!" 

570 ) 

571 logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}") 

572 return out + super()._fetch(start, stop) 

573 

574 

575class UpdatableLRU(Generic[P, T]): 

576 """ 

577 Custom implementation of LRU cache that allows updating keys 

578 

579 Used by BackgroudBlockCache 

580 """ 

581 

582 class CacheInfo(NamedTuple): 

583 hits: int 

584 misses: int 

585 maxsize: int 

586 currsize: int 

587 

588 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None: 

589 self._cache: OrderedDict[Any, T] = collections.OrderedDict() 

590 self._func = func 

591 self._max_size = max_size 

592 self._hits = 0 

593 self._misses = 0 

594 self._lock = threading.Lock() 

595 

596 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: 

597 if kwargs: 

598 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}") 

599 with self._lock: 

600 if args in self._cache: 

601 self._cache.move_to_end(args) 

602 self._hits += 1 

603 return self._cache[args] 

604 

605 result = self._func(*args, **kwargs) 

606 

607 with self._lock: 

608 self._cache[args] = result 

609 self._misses += 1 

610 if len(self._cache) > self._max_size: 

611 self._cache.popitem(last=False) 

612 

613 return result 

614 

615 def is_key_cached(self, *args: Any) -> bool: 

616 with self._lock: 

617 return args in self._cache 

618 

619 def add_key(self, result: T, *args: Any) -> None: 

620 with self._lock: 

621 self._cache[args] = result 

622 if len(self._cache) > self._max_size: 

623 self._cache.popitem(last=False) 

624 

625 def cache_info(self) -> UpdatableLRU.CacheInfo: 

626 with self._lock: 

627 return self.CacheInfo( 

628 maxsize=self._max_size, 

629 currsize=len(self._cache), 

630 hits=self._hits, 

631 misses=self._misses, 

632 ) 

633 

634 

635class BackgroundBlockCache(BaseCache): 

636 """ 

637 Cache holding memory as a set of blocks with pre-loading of 

638 the next block in the background. 

639 

640 Requests are only ever made ``blocksize`` at a time, and are 

641 stored in an LRU cache. The least recently accessed block is 

642 discarded when more than ``maxblocks`` are stored. If the 

643 next block is not in cache, it is loaded in a separate thread 

644 in non-blocking way. 

645 

646 Parameters 

647 ---------- 

648 blocksize : int 

649 The number of bytes to store in each block. 

650 Requests are only ever made for ``blocksize``, so this 

651 should balance the overhead of making a request against 

652 the granularity of the blocks. 

653 fetcher : Callable 

654 size : int 

655 The total size of the file being cached. 

656 maxblocks : int 

657 The maximum number of blocks to cache for. The maximum memory 

658 use for this cache is then ``blocksize * maxblocks``. 

659 """ 

660 

661 name: ClassVar[str] = "background" 

662 

663 def __init__( 

664 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

665 ) -> None: 

666 super().__init__(blocksize, fetcher, size) 

667 self.nblocks = math.ceil(size / blocksize) 

668 self.maxblocks = maxblocks 

669 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks) 

670 

671 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

672 self._fetch_future_block_number: int | None = None 

673 self._fetch_future: Future[bytes] | None = None 

674 self._fetch_future_lock = threading.Lock() 

675 

676 def __repr__(self) -> str: 

677 return ( 

678 f"<BackgroundBlockCache blocksize={self.blocksize}, " 

679 f"size={self.size}, nblocks={self.nblocks}>" 

680 ) 

681 

682 def cache_info(self) -> UpdatableLRU.CacheInfo: 

683 """ 

684 The statistics on the block cache. 

685 

686 Returns 

687 ------- 

688 NamedTuple 

689 Returned directly from the LRU Cache used internally. 

690 """ 

691 return self._fetch_block_cached.cache_info() 

692 

693 def __getstate__(self) -> dict[str, Any]: 

694 state = self.__dict__ 

695 del state["_fetch_block_cached"] 

696 del state["_thread_executor"] 

697 del state["_fetch_future_block_number"] 

698 del state["_fetch_future"] 

699 del state["_fetch_future_lock"] 

700 return state 

701 

702 def __setstate__(self, state) -> None: 

703 self.__dict__.update(state) 

704 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"]) 

705 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

706 self._fetch_future_block_number = None 

707 self._fetch_future = None 

708 self._fetch_future_lock = threading.Lock() 

709 

710 def _fetch(self, start: int | None, end: int | None) -> bytes: 

711 if start is None: 

712 start = 0 

713 if end is None: 

714 end = self.size 

715 if start >= self.size or start >= end: 

716 return b"" 

717 

718 # byte position -> block numbers 

719 start_block_number = start // self.blocksize 

720 end_block_number = end // self.blocksize 

721 

722 fetch_future_block_number = None 

723 fetch_future = None 

724 with self._fetch_future_lock: 

725 # Background thread is running. Check we we can or must join it. 

726 if self._fetch_future is not None: 

727 assert self._fetch_future_block_number is not None 

728 if self._fetch_future.done(): 

729 logger.info("BlockCache joined background fetch without waiting.") 

730 self._fetch_block_cached.add_key( 

731 self._fetch_future.result(), self._fetch_future_block_number 

732 ) 

733 # Cleanup the fetch variables. Done with fetching the block. 

734 self._fetch_future_block_number = None 

735 self._fetch_future = None 

736 else: 

737 # Must join if we need the block for the current fetch 

738 must_join = bool( 

739 start_block_number 

740 <= self._fetch_future_block_number 

741 <= end_block_number 

742 ) 

743 if must_join: 

744 # Copy to the local variables to release lock 

745 # before waiting for result 

746 fetch_future_block_number = self._fetch_future_block_number 

747 fetch_future = self._fetch_future 

748 

749 # Cleanup the fetch variables. Have a local copy. 

750 self._fetch_future_block_number = None 

751 self._fetch_future = None 

752 

753 # Need to wait for the future for the current read 

754 if fetch_future is not None: 

755 logger.info("BlockCache waiting for background fetch.") 

756 # Wait until result and put it in cache 

757 self._fetch_block_cached.add_key( 

758 fetch_future.result(), fetch_future_block_number 

759 ) 

760 

761 # these are cached, so safe to do multiple calls for the same start and end. 

762 for block_number in range(start_block_number, end_block_number + 1): 

763 self._fetch_block_cached(block_number) 

764 

765 # fetch next block in the background if nothing is running in the background, 

766 # the block is within file and it is not already cached 

767 end_block_plus_1 = end_block_number + 1 

768 with self._fetch_future_lock: 

769 if ( 

770 self._fetch_future is None 

771 and end_block_plus_1 <= self.nblocks 

772 and not self._fetch_block_cached.is_key_cached(end_block_plus_1) 

773 ): 

774 self._fetch_future_block_number = end_block_plus_1 

775 self._fetch_future = self._thread_executor.submit( 

776 self._fetch_block, end_block_plus_1, "async" 

777 ) 

778 

779 return self._read_cache( 

780 start, 

781 end, 

782 start_block_number=start_block_number, 

783 end_block_number=end_block_number, 

784 ) 

785 

786 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes: 

787 """ 

788 Fetch the block of data for `block_number`. 

789 """ 

790 if block_number > self.nblocks: 

791 raise ValueError( 

792 f"'block_number={block_number}' is greater than " 

793 f"the number of blocks ({self.nblocks})" 

794 ) 

795 

796 start = block_number * self.blocksize 

797 end = start + self.blocksize 

798 logger.info("BlockCache fetching block (%s) %d", log_info, block_number) 

799 block_contents = super()._fetch(start, end) 

800 return block_contents 

801 

802 def _read_cache( 

803 self, start: int, end: int, start_block_number: int, end_block_number: int 

804 ) -> bytes: 

805 """ 

806 Read from our block cache. 

807 

808 Parameters 

809 ---------- 

810 start, end : int 

811 The start and end byte positions. 

812 start_block_number, end_block_number : int 

813 The start and end block numbers. 

814 """ 

815 start_pos = start % self.blocksize 

816 end_pos = end % self.blocksize 

817 

818 if start_block_number == end_block_number: 

819 block = self._fetch_block_cached(start_block_number) 

820 return block[start_pos:end_pos] 

821 

822 else: 

823 # read from the initial 

824 out = [] 

825 out.append(self._fetch_block_cached(start_block_number)[start_pos:]) 

826 

827 # intermediate blocks 

828 # Note: it'd be nice to combine these into one big request. However 

829 # that doesn't play nicely with our LRU cache. 

830 for block_number in range(start_block_number + 1, end_block_number): 

831 out.append(self._fetch_block_cached(block_number)) 

832 

833 # final block 

834 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

835 

836 return b"".join(out) 

837 

838 

839caches: dict[str | None, type[BaseCache]] = { 

840 # one custom case 

841 None: BaseCache, 

842} 

843 

844 

845def register_cache(cls: type[BaseCache], clobber: bool = False) -> None: 

846 """'Register' cache implementation. 

847 

848 Parameters 

849 ---------- 

850 clobber: bool, optional 

851 If set to True (default is False) - allow to overwrite existing 

852 entry. 

853 

854 Raises 

855 ------ 

856 ValueError 

857 """ 

858 name = cls.name 

859 if not clobber and name in caches: 

860 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}") 

861 caches[name] = cls 

862 

863 

864for c in ( 

865 BaseCache, 

866 MMapCache, 

867 BytesCache, 

868 ReadAheadCache, 

869 BlockCache, 

870 FirstChunkCache, 

871 AllBytes, 

872 KnownPartsOfAFile, 

873 BackgroundBlockCache, 

874): 

875 register_cache(c)