Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/caching.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

475 statements  

1from __future__ import annotations 

2 

3import collections 

4import functools 

5import logging 

6import math 

7import os 

8import threading 

9import warnings 

10from collections import OrderedDict 

11from collections.abc import Callable 

12from concurrent.futures import Future, ThreadPoolExecutor 

13from itertools import groupby 

14from operator import itemgetter 

15from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, TypeVar 

16 

17if TYPE_CHECKING: 

18 import mmap 

19 

20 from typing_extensions import ParamSpec 

21 

22 P = ParamSpec("P") 

23else: 

24 P = TypeVar("P") 

25 

26T = TypeVar("T") 

27 

28 

29logger = logging.getLogger("fsspec") 

30 

31Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes 

32MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes 

33 

34 

35class BaseCache: 

36 """Pass-though cache: doesn't keep anything, calls every time 

37 

38 Acts as base class for other cachers 

39 

40 Parameters 

41 ---------- 

42 blocksize: int 

43 How far to read ahead in numbers of bytes 

44 fetcher: func 

45 Function of the form f(start, end) which gets bytes from remote as 

46 specified 

47 size: int 

48 How big this file is 

49 """ 

50 

51 name: ClassVar[str] = "none" 

52 

53 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

54 self.blocksize = blocksize 

55 self.nblocks = 0 

56 self.fetcher = fetcher 

57 self.size = size 

58 self.hit_count = 0 

59 self.miss_count = 0 

60 # the bytes that we actually requested 

61 self.total_requested_bytes = 0 

62 

63 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

64 if start is None: 

65 start = 0 

66 if stop is None: 

67 stop = self.size 

68 if start >= self.size or start >= stop: 

69 return b"" 

70 return self.fetcher(start, stop) 

71 

72 def _reset_stats(self) -> None: 

73 """Reset hit and miss counts for a more ganular report e.g. by file.""" 

74 self.hit_count = 0 

75 self.miss_count = 0 

76 self.total_requested_bytes = 0 

77 

78 def _log_stats(self) -> str: 

79 """Return a formatted string of the cache statistics.""" 

80 if self.hit_count == 0 and self.miss_count == 0: 

81 # a cache that does nothing, this is for logs only 

82 return "" 

83 return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes" 

84 

85 def __repr__(self) -> str: 

86 # TODO: use rich for better formatting 

87 return f""" 

88 <{self.__class__.__name__}: 

89 block size : {self.blocksize} 

90 block count : {self.nblocks} 

91 file size : {self.size} 

92 cache hits : {self.hit_count} 

93 cache misses: {self.miss_count} 

94 total requested bytes: {self.total_requested_bytes}> 

95 """ 

96 

97 

98class MMapCache(BaseCache): 

99 """memory-mapped sparse file cache 

100 

101 Opens temporary file, which is filled blocks-wise when data is requested. 

102 Ensure there is enough disc space in the temporary location. 

103 

104 This cache method might only work on posix 

105 

106 Parameters 

107 ---------- 

108 blocksize: int 

109 How far to read ahead in numbers of bytes 

110 fetcher: Fetcher 

111 Function of the form f(start, end) which gets bytes from remote as 

112 specified 

113 size: int 

114 How big this file is 

115 location: str 

116 Where to create the temporary file. If None, a temporary file is 

117 created using tempfile.TemporaryFile(). 

118 blocks: set[int] 

119 Set of block numbers that have already been fetched. If None, an empty 

120 set is created. 

121 multi_fetcher: MultiFetcher 

122 Function of the form f([(start, end)]) which gets bytes from remote 

123 as specified. This function is used to fetch multiple blocks at once. 

124 If not specified, the fetcher function is used instead. 

125 """ 

126 

127 name = "mmap" 

128 

129 def __init__( 

130 self, 

131 blocksize: int, 

132 fetcher: Fetcher, 

133 size: int, 

134 location: str | None = None, 

135 blocks: set[int] | None = None, 

136 multi_fetcher: MultiFetcher | None = None, 

137 ) -> None: 

138 super().__init__(blocksize, fetcher, size) 

139 self.blocks = set() if blocks is None else blocks 

140 self.location = location 

141 self.multi_fetcher = multi_fetcher 

142 self.cache = self._makefile() 

143 

144 def _makefile(self) -> mmap.mmap | bytearray: 

145 import mmap 

146 import tempfile 

147 

148 if self.size == 0: 

149 return bytearray() 

150 

151 # posix version 

152 if self.location is None or not os.path.exists(self.location): 

153 if self.location is None: 

154 fd = tempfile.TemporaryFile() 

155 self.blocks = set() 

156 else: 

157 fd = open(self.location, "wb+") 

158 fd.seek(self.size - 1) 

159 fd.write(b"1") 

160 fd.flush() 

161 else: 

162 fd = open(self.location, "r+b") 

163 

164 return mmap.mmap(fd.fileno(), self.size) 

165 

166 def _fetch(self, start: int | None, end: int | None) -> bytes: 

167 logger.debug(f"MMap cache fetching {start}-{end}") 

168 if start is None: 

169 start = 0 

170 if end is None: 

171 end = self.size 

172 if start >= self.size or start >= end: 

173 return b"" 

174 start_block = start // self.blocksize 

175 end_block = end // self.blocksize 

176 block_range = range(start_block, end_block + 1) 

177 # Determine which blocks need to be fetched. This sequence is sorted by construction. 

178 need = (i for i in block_range if i not in self.blocks) 

179 # Count the number of blocks already cached 

180 self.hit_count += sum(1 for i in block_range if i in self.blocks) 

181 

182 ranges = [] 

183 

184 # Consolidate needed blocks. 

185 # Algorithm adapted from Python 2.x itertools documentation. 

186 # We are grouping an enumerated sequence of blocks. By comparing when the difference 

187 # between an ascending range (provided by enumerate) and the needed block numbers 

188 # we can detect when the block number skips values. The key computes this difference. 

189 # Whenever the difference changes, we know that we have previously cached block(s), 

190 # and a new group is started. In other words, this algorithm neatly groups 

191 # runs of consecutive block numbers so they can be fetched together. 

192 for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]): 

193 # Extract the blocks from the enumerated sequence 

194 _blocks = tuple(map(itemgetter(1), _blocks)) 

195 # Compute start of first block 

196 sstart = _blocks[0] * self.blocksize 

197 # Compute the end of the last block. Last block may not be full size. 

198 send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size) 

199 

200 # Fetch bytes (could be multiple consecutive blocks) 

201 self.total_requested_bytes += send - sstart 

202 logger.debug( 

203 f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})" 

204 ) 

205 ranges.append((sstart, send)) 

206 

207 # Update set of cached blocks 

208 self.blocks.update(_blocks) 

209 # Update cache statistics with number of blocks we had to cache 

210 self.miss_count += len(_blocks) 

211 

212 if not ranges: 

213 return self.cache[start:end] 

214 

215 if self.multi_fetcher: 

216 logger.debug(f"MMap get blocks {ranges}") 

217 for idx, r in enumerate(self.multi_fetcher(ranges)): 

218 (sstart, send) = ranges[idx] 

219 logger.debug(f"MMap copy block ({sstart}-{send}") 

220 self.cache[sstart:send] = r 

221 else: 

222 for sstart, send in ranges: 

223 logger.debug(f"MMap get block ({sstart}-{send}") 

224 self.cache[sstart:send] = self.fetcher(sstart, send) 

225 

226 return self.cache[start:end] 

227 

228 def __getstate__(self) -> dict[str, Any]: 

229 state = self.__dict__.copy() 

230 # Remove the unpicklable entries. 

231 del state["cache"] 

232 return state 

233 

234 def __setstate__(self, state: dict[str, Any]) -> None: 

235 # Restore instance attributes 

236 self.__dict__.update(state) 

237 self.cache = self._makefile() 

238 

239 

240class ReadAheadCache(BaseCache): 

241 """Cache which reads only when we get beyond a block of data 

242 

243 This is a much simpler version of BytesCache, and does not attempt to 

244 fill holes in the cache or keep fragments alive. It is best suited to 

245 many small reads in a sequential order (e.g., reading lines from a file). 

246 """ 

247 

248 name = "readahead" 

249 

250 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

251 super().__init__(blocksize, fetcher, size) 

252 self.cache = b"" 

253 self.start = 0 

254 self.end = 0 

255 

256 def _fetch(self, start: int | None, end: int | None) -> bytes: 

257 if start is None: 

258 start = 0 

259 if end is None or end > self.size: 

260 end = self.size 

261 if start >= self.size or start >= end: 

262 return b"" 

263 l = end - start 

264 if start >= self.start and end <= self.end: 

265 # cache hit 

266 self.hit_count += 1 

267 return self.cache[start - self.start : end - self.start] 

268 elif self.start <= start < self.end: 

269 # partial hit 

270 self.miss_count += 1 

271 part = self.cache[start - self.start :] 

272 l -= len(part) 

273 start = self.end 

274 else: 

275 # miss 

276 self.miss_count += 1 

277 part = b"" 

278 end = min(self.size, end + self.blocksize) 

279 self.total_requested_bytes += end - start 

280 self.cache = self.fetcher(start, end) # new block replaces old 

281 self.start = start 

282 self.end = self.start + len(self.cache) 

283 return part + self.cache[:l] 

284 

285 

286class FirstChunkCache(BaseCache): 

287 """Caches the first block of a file only 

288 

289 This may be useful for file types where the metadata is stored in the header, 

290 but is randomly accessed. 

291 """ 

292 

293 name = "first" 

294 

295 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

296 if blocksize > size: 

297 # this will buffer the whole thing 

298 blocksize = size 

299 super().__init__(blocksize, fetcher, size) 

300 self.cache: bytes | None = None 

301 

302 def _fetch(self, start: int | None, end: int | None) -> bytes: 

303 start = start or 0 

304 if start > self.size: 

305 logger.debug("FirstChunkCache: requested start > file size") 

306 return b"" 

307 

308 end = min(end, self.size) 

309 

310 if start < self.blocksize: 

311 if self.cache is None: 

312 self.miss_count += 1 

313 if end > self.blocksize: 

314 self.total_requested_bytes += end 

315 data = self.fetcher(0, end) 

316 self.cache = data[: self.blocksize] 

317 return data[start:] 

318 self.cache = self.fetcher(0, self.blocksize) 

319 self.total_requested_bytes += self.blocksize 

320 part = self.cache[start:end] 

321 if end > self.blocksize: 

322 self.total_requested_bytes += end - self.blocksize 

323 part += self.fetcher(self.blocksize, end) 

324 self.hit_count += 1 

325 return part 

326 else: 

327 self.miss_count += 1 

328 self.total_requested_bytes += end - start 

329 return self.fetcher(start, end) 

330 

331 

332class BlockCache(BaseCache): 

333 """ 

334 Cache holding memory as a set of blocks. 

335 

336 Requests are only ever made ``blocksize`` at a time, and are 

337 stored in an LRU cache. The least recently accessed block is 

338 discarded when more than ``maxblocks`` are stored. 

339 

340 Parameters 

341 ---------- 

342 blocksize : int 

343 The number of bytes to store in each block. 

344 Requests are only ever made for ``blocksize``, so this 

345 should balance the overhead of making a request against 

346 the granularity of the blocks. 

347 fetcher : Callable 

348 size : int 

349 The total size of the file being cached. 

350 maxblocks : int 

351 The maximum number of blocks to cache for. The maximum memory 

352 use for this cache is then ``blocksize * maxblocks``. 

353 """ 

354 

355 name = "blockcache" 

356 

357 def __init__( 

358 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

359 ) -> None: 

360 super().__init__(blocksize, fetcher, size) 

361 self.nblocks = math.ceil(size / blocksize) 

362 self.maxblocks = maxblocks 

363 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block) 

364 

365 def cache_info(self): 

366 """ 

367 The statistics on the block cache. 

368 

369 Returns 

370 ------- 

371 NamedTuple 

372 Returned directly from the LRU Cache used internally. 

373 """ 

374 return self._fetch_block_cached.cache_info() 

375 

376 def __getstate__(self) -> dict[str, Any]: 

377 state = self.__dict__ 

378 del state["_fetch_block_cached"] 

379 return state 

380 

381 def __setstate__(self, state: dict[str, Any]) -> None: 

382 self.__dict__.update(state) 

383 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])( 

384 self._fetch_block 

385 ) 

386 

387 def _fetch(self, start: int | None, end: int | None) -> bytes: 

388 if start is None: 

389 start = 0 

390 if end is None: 

391 end = self.size 

392 if start >= self.size or start >= end: 

393 return b"" 

394 

395 # byte position -> block numbers 

396 start_block_number = start // self.blocksize 

397 end_block_number = end // self.blocksize 

398 

399 # these are cached, so safe to do multiple calls for the same start and end. 

400 for block_number in range(start_block_number, end_block_number + 1): 

401 self._fetch_block_cached(block_number) 

402 

403 return self._read_cache( 

404 start, 

405 end, 

406 start_block_number=start_block_number, 

407 end_block_number=end_block_number, 

408 ) 

409 

410 def _fetch_block(self, block_number: int) -> bytes: 

411 """ 

412 Fetch the block of data for `block_number`. 

413 """ 

414 if block_number > self.nblocks: 

415 raise ValueError( 

416 f"'block_number={block_number}' is greater than " 

417 f"the number of blocks ({self.nblocks})" 

418 ) 

419 

420 start = block_number * self.blocksize 

421 end = start + self.blocksize 

422 self.total_requested_bytes += end - start 

423 self.miss_count += 1 

424 logger.info("BlockCache fetching block %d", block_number) 

425 block_contents = super()._fetch(start, end) 

426 return block_contents 

427 

428 def _read_cache( 

429 self, start: int, end: int, start_block_number: int, end_block_number: int 

430 ) -> bytes: 

431 """ 

432 Read from our block cache. 

433 

434 Parameters 

435 ---------- 

436 start, end : int 

437 The start and end byte positions. 

438 start_block_number, end_block_number : int 

439 The start and end block numbers. 

440 """ 

441 start_pos = start % self.blocksize 

442 end_pos = end % self.blocksize 

443 

444 self.hit_count += 1 

445 if start_block_number == end_block_number: 

446 block: bytes = self._fetch_block_cached(start_block_number) 

447 return block[start_pos:end_pos] 

448 

449 else: 

450 # read from the initial 

451 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

452 

453 # intermediate blocks 

454 # Note: it'd be nice to combine these into one big request. However 

455 # that doesn't play nicely with our LRU cache. 

456 out.extend( 

457 map( 

458 self._fetch_block_cached, 

459 range(start_block_number + 1, end_block_number), 

460 ) 

461 ) 

462 

463 # final block 

464 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

465 

466 return b"".join(out) 

467 

468 

469class BytesCache(BaseCache): 

470 """Cache which holds data in a in-memory bytes object 

471 

472 Implements read-ahead by the block size, for semi-random reads progressing 

473 through the file. 

474 

475 Parameters 

476 ---------- 

477 trim: bool 

478 As we read more data, whether to discard the start of the buffer when 

479 we are more than a blocksize ahead of it. 

480 """ 

481 

482 name: ClassVar[str] = "bytes" 

483 

484 def __init__( 

485 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True 

486 ) -> None: 

487 super().__init__(blocksize, fetcher, size) 

488 self.cache = b"" 

489 self.start: int | None = None 

490 self.end: int | None = None 

491 self.trim = trim 

492 

493 def _fetch(self, start: int | None, end: int | None) -> bytes: 

494 # TODO: only set start/end after fetch, in case it fails? 

495 # is this where retry logic might go? 

496 if start is None: 

497 start = 0 

498 if end is None: 

499 end = self.size 

500 if start >= self.size or start >= end: 

501 return b"" 

502 if ( 

503 self.start is not None 

504 and start >= self.start 

505 and self.end is not None 

506 and end < self.end 

507 ): 

508 # cache hit: we have all the required data 

509 offset = start - self.start 

510 self.hit_count += 1 

511 return self.cache[offset : offset + end - start] 

512 

513 if self.blocksize: 

514 bend = min(self.size, end + self.blocksize) 

515 else: 

516 bend = end 

517 

518 if bend == start or start > self.size: 

519 return b"" 

520 

521 if (self.start is None or start < self.start) and ( 

522 self.end is None or end > self.end 

523 ): 

524 # First read, or extending both before and after 

525 self.total_requested_bytes += bend - start 

526 self.miss_count += 1 

527 self.cache = self.fetcher(start, bend) 

528 self.start = start 

529 else: 

530 assert self.start is not None 

531 assert self.end is not None 

532 self.miss_count += 1 

533 

534 if start < self.start: 

535 if self.end is None or self.end - end > self.blocksize: 

536 self.total_requested_bytes += bend - start 

537 self.cache = self.fetcher(start, bend) 

538 self.start = start 

539 else: 

540 self.total_requested_bytes += self.start - start 

541 new = self.fetcher(start, self.start) 

542 self.start = start 

543 self.cache = new + self.cache 

544 elif self.end is not None and bend > self.end: 

545 if self.end > self.size: 

546 pass 

547 elif end - self.end > self.blocksize: 

548 self.total_requested_bytes += bend - start 

549 self.cache = self.fetcher(start, bend) 

550 self.start = start 

551 else: 

552 self.total_requested_bytes += bend - self.end 

553 new = self.fetcher(self.end, bend) 

554 self.cache = self.cache + new 

555 

556 self.end = self.start + len(self.cache) 

557 offset = start - self.start 

558 out = self.cache[offset : offset + end - start] 

559 if self.trim: 

560 num = (self.end - self.start) // (self.blocksize + 1) 

561 if num > 1: 

562 self.start += self.blocksize * num 

563 self.cache = self.cache[self.blocksize * num :] 

564 return out 

565 

566 def __len__(self) -> int: 

567 return len(self.cache) 

568 

569 

570class AllBytes(BaseCache): 

571 """Cache entire contents of the file""" 

572 

573 name: ClassVar[str] = "all" 

574 

575 def __init__( 

576 self, 

577 blocksize: int | None = None, 

578 fetcher: Fetcher | None = None, 

579 size: int | None = None, 

580 data: bytes | None = None, 

581 ) -> None: 

582 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type] 

583 if data is None: 

584 self.miss_count += 1 

585 self.total_requested_bytes += self.size 

586 data = self.fetcher(0, self.size) 

587 self.data = data 

588 

589 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

590 self.hit_count += 1 

591 return self.data[start:stop] 

592 

593 

594class KnownPartsOfAFile(BaseCache): 

595 """ 

596 Cache holding known file parts. 

597 

598 Parameters 

599 ---------- 

600 blocksize: int 

601 How far to read ahead in numbers of bytes 

602 fetcher: func 

603 Function of the form f(start, end) which gets bytes from remote as 

604 specified 

605 size: int 

606 How big this file is 

607 data: dict 

608 A dictionary mapping explicit `(start, stop)` file-offset tuples 

609 with known bytes. 

610 strict: bool, default True 

611 Whether to fetch reads that go beyond a known byte-range boundary. 

612 If `False`, any read that ends outside a known part will be zero 

613 padded. Note that zero padding will not be used for reads that 

614 begin outside a known byte-range. 

615 """ 

616 

617 name: ClassVar[str] = "parts" 

618 

619 def __init__( 

620 self, 

621 blocksize: int, 

622 fetcher: Fetcher, 

623 size: int, 

624 data: dict[tuple[int, int], bytes] | None = None, 

625 strict: bool = True, 

626 **_: Any, 

627 ): 

628 super().__init__(blocksize, fetcher, size) 

629 self.strict = strict 

630 

631 # simple consolidation of contiguous blocks 

632 if data: 

633 old_offsets = sorted(data.keys()) 

634 offsets = [old_offsets[0]] 

635 blocks = [data.pop(old_offsets[0])] 

636 for start, stop in old_offsets[1:]: 

637 start0, stop0 = offsets[-1] 

638 if start == stop0: 

639 offsets[-1] = (start0, stop) 

640 blocks[-1] += data.pop((start, stop)) 

641 else: 

642 offsets.append((start, stop)) 

643 blocks.append(data.pop((start, stop))) 

644 

645 self.data = dict(zip(offsets, blocks)) 

646 else: 

647 self.data = {} 

648 

649 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

650 if start is None: 

651 start = 0 

652 if stop is None: 

653 stop = self.size 

654 

655 out = b"" 

656 for (loc0, loc1), data in self.data.items(): 

657 # If self.strict=False, use zero-padded data 

658 # for reads beyond the end of a "known" buffer 

659 if loc0 <= start < loc1: 

660 off = start - loc0 

661 out = data[off : off + stop - start] 

662 if not self.strict or loc0 <= stop <= loc1: 

663 # The request is within a known range, or 

664 # it begins within a known range, and we 

665 # are allowed to pad reads beyond the 

666 # buffer with zero 

667 out += b"\x00" * (stop - start - len(out)) 

668 self.hit_count += 1 

669 return out 

670 else: 

671 # The request ends outside a known range, 

672 # and we are being "strict" about reads 

673 # beyond the buffer 

674 start = loc1 

675 break 

676 

677 # We only get here if there is a request outside the 

678 # known parts of the file. In an ideal world, this 

679 # should never happen 

680 if self.fetcher is None: 

681 # We cannot fetch the data, so raise an error 

682 raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ") 

683 # We can fetch the data, but should warn the user 

684 # that this may be slow 

685 warnings.warn( 

686 f"Read is outside the known file parts: {(start, stop)}. " 

687 f"IO/caching performance may be poor!" 

688 ) 

689 logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}") 

690 self.total_requested_bytes += stop - start 

691 self.miss_count += 1 

692 return out + super()._fetch(start, stop) 

693 

694 

695class UpdatableLRU(Generic[P, T]): 

696 """ 

697 Custom implementation of LRU cache that allows updating keys 

698 

699 Used by BackgroudBlockCache 

700 """ 

701 

702 class CacheInfo(NamedTuple): 

703 hits: int 

704 misses: int 

705 maxsize: int 

706 currsize: int 

707 

708 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None: 

709 self._cache: OrderedDict[Any, T] = collections.OrderedDict() 

710 self._func = func 

711 self._max_size = max_size 

712 self._hits = 0 

713 self._misses = 0 

714 self._lock = threading.Lock() 

715 

716 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: 

717 if kwargs: 

718 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}") 

719 with self._lock: 

720 if args in self._cache: 

721 self._cache.move_to_end(args) 

722 self._hits += 1 

723 return self._cache[args] 

724 

725 result = self._func(*args, **kwargs) 

726 

727 with self._lock: 

728 self._cache[args] = result 

729 self._misses += 1 

730 if len(self._cache) > self._max_size: 

731 self._cache.popitem(last=False) 

732 

733 return result 

734 

735 def is_key_cached(self, *args: Any) -> bool: 

736 with self._lock: 

737 return args in self._cache 

738 

739 def add_key(self, result: T, *args: Any) -> None: 

740 with self._lock: 

741 self._cache[args] = result 

742 if len(self._cache) > self._max_size: 

743 self._cache.popitem(last=False) 

744 

745 def cache_info(self) -> UpdatableLRU.CacheInfo: 

746 with self._lock: 

747 return self.CacheInfo( 

748 maxsize=self._max_size, 

749 currsize=len(self._cache), 

750 hits=self._hits, 

751 misses=self._misses, 

752 ) 

753 

754 

755class BackgroundBlockCache(BaseCache): 

756 """ 

757 Cache holding memory as a set of blocks with pre-loading of 

758 the next block in the background. 

759 

760 Requests are only ever made ``blocksize`` at a time, and are 

761 stored in an LRU cache. The least recently accessed block is 

762 discarded when more than ``maxblocks`` are stored. If the 

763 next block is not in cache, it is loaded in a separate thread 

764 in non-blocking way. 

765 

766 Parameters 

767 ---------- 

768 blocksize : int 

769 The number of bytes to store in each block. 

770 Requests are only ever made for ``blocksize``, so this 

771 should balance the overhead of making a request against 

772 the granularity of the blocks. 

773 fetcher : Callable 

774 size : int 

775 The total size of the file being cached. 

776 maxblocks : int 

777 The maximum number of blocks to cache for. The maximum memory 

778 use for this cache is then ``blocksize * maxblocks``. 

779 """ 

780 

781 name: ClassVar[str] = "background" 

782 

783 def __init__( 

784 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

785 ) -> None: 

786 super().__init__(blocksize, fetcher, size) 

787 self.nblocks = math.ceil(size / blocksize) 

788 self.maxblocks = maxblocks 

789 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks) 

790 

791 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

792 self._fetch_future_block_number: int | None = None 

793 self._fetch_future: Future[bytes] | None = None 

794 self._fetch_future_lock = threading.Lock() 

795 

796 def cache_info(self) -> UpdatableLRU.CacheInfo: 

797 """ 

798 The statistics on the block cache. 

799 

800 Returns 

801 ------- 

802 NamedTuple 

803 Returned directly from the LRU Cache used internally. 

804 """ 

805 return self._fetch_block_cached.cache_info() 

806 

807 def __getstate__(self) -> dict[str, Any]: 

808 state = self.__dict__ 

809 del state["_fetch_block_cached"] 

810 del state["_thread_executor"] 

811 del state["_fetch_future_block_number"] 

812 del state["_fetch_future"] 

813 del state["_fetch_future_lock"] 

814 return state 

815 

816 def __setstate__(self, state) -> None: 

817 self.__dict__.update(state) 

818 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"]) 

819 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

820 self._fetch_future_block_number = None 

821 self._fetch_future = None 

822 self._fetch_future_lock = threading.Lock() 

823 

824 def _fetch(self, start: int | None, end: int | None) -> bytes: 

825 if start is None: 

826 start = 0 

827 if end is None: 

828 end = self.size 

829 if start >= self.size or start >= end: 

830 return b"" 

831 

832 # byte position -> block numbers 

833 start_block_number = start // self.blocksize 

834 end_block_number = end // self.blocksize 

835 

836 fetch_future_block_number = None 

837 fetch_future = None 

838 with self._fetch_future_lock: 

839 # Background thread is running. Check we we can or must join it. 

840 if self._fetch_future is not None: 

841 assert self._fetch_future_block_number is not None 

842 if self._fetch_future.done(): 

843 logger.info("BlockCache joined background fetch without waiting.") 

844 self._fetch_block_cached.add_key( 

845 self._fetch_future.result(), self._fetch_future_block_number 

846 ) 

847 # Cleanup the fetch variables. Done with fetching the block. 

848 self._fetch_future_block_number = None 

849 self._fetch_future = None 

850 else: 

851 # Must join if we need the block for the current fetch 

852 must_join = bool( 

853 start_block_number 

854 <= self._fetch_future_block_number 

855 <= end_block_number 

856 ) 

857 if must_join: 

858 # Copy to the local variables to release lock 

859 # before waiting for result 

860 fetch_future_block_number = self._fetch_future_block_number 

861 fetch_future = self._fetch_future 

862 

863 # Cleanup the fetch variables. Have a local copy. 

864 self._fetch_future_block_number = None 

865 self._fetch_future = None 

866 

867 # Need to wait for the future for the current read 

868 if fetch_future is not None: 

869 logger.info("BlockCache waiting for background fetch.") 

870 # Wait until result and put it in cache 

871 self._fetch_block_cached.add_key( 

872 fetch_future.result(), fetch_future_block_number 

873 ) 

874 

875 # these are cached, so safe to do multiple calls for the same start and end. 

876 for block_number in range(start_block_number, end_block_number + 1): 

877 self._fetch_block_cached(block_number) 

878 

879 # fetch next block in the background if nothing is running in the background, 

880 # the block is within file and it is not already cached 

881 end_block_plus_1 = end_block_number + 1 

882 with self._fetch_future_lock: 

883 if ( 

884 self._fetch_future is None 

885 and end_block_plus_1 <= self.nblocks 

886 and not self._fetch_block_cached.is_key_cached(end_block_plus_1) 

887 ): 

888 self._fetch_future_block_number = end_block_plus_1 

889 self._fetch_future = self._thread_executor.submit( 

890 self._fetch_block, end_block_plus_1, "async" 

891 ) 

892 

893 return self._read_cache( 

894 start, 

895 end, 

896 start_block_number=start_block_number, 

897 end_block_number=end_block_number, 

898 ) 

899 

900 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes: 

901 """ 

902 Fetch the block of data for `block_number`. 

903 """ 

904 if block_number > self.nblocks: 

905 raise ValueError( 

906 f"'block_number={block_number}' is greater than " 

907 f"the number of blocks ({self.nblocks})" 

908 ) 

909 

910 start = block_number * self.blocksize 

911 end = start + self.blocksize 

912 logger.info("BlockCache fetching block (%s) %d", log_info, block_number) 

913 self.total_requested_bytes += end - start 

914 self.miss_count += 1 

915 block_contents = super()._fetch(start, end) 

916 return block_contents 

917 

918 def _read_cache( 

919 self, start: int, end: int, start_block_number: int, end_block_number: int 

920 ) -> bytes: 

921 """ 

922 Read from our block cache. 

923 

924 Parameters 

925 ---------- 

926 start, end : int 

927 The start and end byte positions. 

928 start_block_number, end_block_number : int 

929 The start and end block numbers. 

930 """ 

931 start_pos = start % self.blocksize 

932 end_pos = end % self.blocksize 

933 

934 # kind of pointless to count this as a hit, but it is 

935 self.hit_count += 1 

936 

937 if start_block_number == end_block_number: 

938 block = self._fetch_block_cached(start_block_number) 

939 return block[start_pos:end_pos] 

940 

941 else: 

942 # read from the initial 

943 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

944 

945 # intermediate blocks 

946 # Note: it'd be nice to combine these into one big request. However 

947 # that doesn't play nicely with our LRU cache. 

948 out.extend( 

949 map( 

950 self._fetch_block_cached, 

951 range(start_block_number + 1, end_block_number), 

952 ) 

953 ) 

954 

955 # final block 

956 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

957 

958 return b"".join(out) 

959 

960 

961caches: dict[str | None, type[BaseCache]] = { 

962 # one custom case 

963 None: BaseCache, 

964} 

965 

966 

967def register_cache(cls: type[BaseCache], clobber: bool = False) -> None: 

968 """'Register' cache implementation. 

969 

970 Parameters 

971 ---------- 

972 clobber: bool, optional 

973 If set to True (default is False) - allow to overwrite existing 

974 entry. 

975 

976 Raises 

977 ------ 

978 ValueError 

979 """ 

980 name = cls.name 

981 if not clobber and name in caches: 

982 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}") 

983 caches[name] = cls 

984 

985 

986for c in ( 

987 BaseCache, 

988 MMapCache, 

989 BytesCache, 

990 ReadAheadCache, 

991 BlockCache, 

992 FirstChunkCache, 

993 AllBytes, 

994 KnownPartsOfAFile, 

995 BackgroundBlockCache, 

996): 

997 register_cache(c)