Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/caching.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

501 statements  

1from __future__ import annotations 

2 

3import collections 

4import functools 

5import logging 

6import math 

7import os 

8import threading 

9from collections import OrderedDict 

10from collections.abc import Callable 

11from concurrent.futures import Future, ThreadPoolExecutor 

12from itertools import groupby 

13from operator import itemgetter 

14from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, TypeVar 

15 

16if TYPE_CHECKING: 

17 import mmap 

18 

19 from typing_extensions import ParamSpec 

20 

21 P = ParamSpec("P") 

22else: 

23 P = TypeVar("P") 

24 

25T = TypeVar("T") 

26 

27 

28logger = logging.getLogger("fsspec.caching") 

29 

30Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes 

31MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes 

32 

33 

34class BaseCache: 

35 """Pass-though cache: doesn't keep anything, calls every time 

36 

37 Acts as base class for other cachers 

38 

39 Parameters 

40 ---------- 

41 blocksize: int 

42 How far to read ahead in numbers of bytes 

43 fetcher: func 

44 Function of the form f(start, end) which gets bytes from remote as 

45 specified 

46 size: int 

47 How big this file is 

48 """ 

49 

50 name: ClassVar[str] = "none" 

51 

52 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

53 self.blocksize = blocksize 

54 self.nblocks = 0 

55 self.fetcher = fetcher 

56 self.size = size 

57 self.hit_count = 0 

58 self.miss_count = 0 

59 # the bytes that we actually requested 

60 self.total_requested_bytes = 0 

61 

62 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

63 if start is None: 

64 start = 0 

65 if stop is None: 

66 stop = self.size 

67 if start >= self.size or start >= stop: 

68 return b"" 

69 return self.fetcher(start, stop) 

70 

71 def _reset_stats(self) -> None: 

72 """Reset hit and miss counts for a more ganular report e.g. by file.""" 

73 self.hit_count = 0 

74 self.miss_count = 0 

75 self.total_requested_bytes = 0 

76 

77 def _log_stats(self) -> str: 

78 """Return a formatted string of the cache statistics.""" 

79 if self.hit_count == 0 and self.miss_count == 0: 

80 # a cache that does nothing, this is for logs only 

81 return "" 

82 return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes" 

83 

84 def __repr__(self) -> str: 

85 # TODO: use rich for better formatting 

86 return f""" 

87 <{self.__class__.__name__}: 

88 block size : {self.blocksize} 

89 block count : {self.nblocks} 

90 file size : {self.size} 

91 cache hits : {self.hit_count} 

92 cache misses: {self.miss_count} 

93 total requested bytes: {self.total_requested_bytes}> 

94 """ 

95 

96 

97class MMapCache(BaseCache): 

98 """memory-mapped sparse file cache 

99 

100 Opens temporary file, which is filled blocks-wise when data is requested. 

101 Ensure there is enough disc space in the temporary location. 

102 

103 This cache method might only work on posix 

104 

105 Parameters 

106 ---------- 

107 blocksize: int 

108 How far to read ahead in numbers of bytes 

109 fetcher: Fetcher 

110 Function of the form f(start, end) which gets bytes from remote as 

111 specified 

112 size: int 

113 How big this file is 

114 location: str 

115 Where to create the temporary file. If None, a temporary file is 

116 created using tempfile.TemporaryFile(). 

117 blocks: set[int] 

118 Set of block numbers that have already been fetched. If None, an empty 

119 set is created. 

120 multi_fetcher: MultiFetcher 

121 Function of the form f([(start, end)]) which gets bytes from remote 

122 as specified. This function is used to fetch multiple blocks at once. 

123 If not specified, the fetcher function is used instead. 

124 """ 

125 

126 name = "mmap" 

127 

128 def __init__( 

129 self, 

130 blocksize: int, 

131 fetcher: Fetcher, 

132 size: int, 

133 location: str | None = None, 

134 blocks: set[int] | None = None, 

135 multi_fetcher: MultiFetcher | None = None, 

136 ) -> None: 

137 super().__init__(blocksize, fetcher, size) 

138 self.blocks = set() if blocks is None else blocks 

139 self.location = location 

140 self.multi_fetcher = multi_fetcher 

141 self.cache = self._makefile() 

142 

143 def _makefile(self) -> mmap.mmap | bytearray: 

144 import mmap 

145 import tempfile 

146 

147 if self.size == 0: 

148 return bytearray() 

149 

150 # posix version 

151 if self.location is None or not os.path.exists(self.location): 

152 if self.location is None: 

153 fd = tempfile.TemporaryFile() 

154 self.blocks = set() 

155 else: 

156 fd = open(self.location, "wb+") 

157 fd.seek(self.size - 1) 

158 fd.write(b"1") 

159 fd.flush() 

160 else: 

161 fd = open(self.location, "r+b") 

162 

163 return mmap.mmap(fd.fileno(), self.size) 

164 

165 def _fetch(self, start: int | None, end: int | None) -> bytes: 

166 logger.debug(f"MMap cache fetching {start}-{end}") 

167 if start is None: 

168 start = 0 

169 if end is None: 

170 end = self.size 

171 if start >= self.size or start >= end: 

172 return b"" 

173 start_block = start // self.blocksize 

174 end_block = end // self.blocksize 

175 block_range = range(start_block, end_block + 1) 

176 # Determine which blocks need to be fetched. This sequence is sorted by construction. 

177 need = (i for i in block_range if i not in self.blocks) 

178 # Count the number of blocks already cached 

179 self.hit_count += sum(1 for i in block_range if i in self.blocks) 

180 

181 ranges = [] 

182 

183 # Consolidate needed blocks. 

184 # Algorithm adapted from Python 2.x itertools documentation. 

185 # We are grouping an enumerated sequence of blocks. By comparing when the difference 

186 # between an ascending range (provided by enumerate) and the needed block numbers 

187 # we can detect when the block number skips values. The key computes this difference. 

188 # Whenever the difference changes, we know that we have previously cached block(s), 

189 # and a new group is started. In other words, this algorithm neatly groups 

190 # runs of consecutive block numbers so they can be fetched together. 

191 for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]): 

192 # Extract the blocks from the enumerated sequence 

193 _blocks = tuple(map(itemgetter(1), _blocks)) 

194 # Compute start of first block 

195 sstart = _blocks[0] * self.blocksize 

196 # Compute the end of the last block. Last block may not be full size. 

197 send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size) 

198 

199 # Fetch bytes (could be multiple consecutive blocks) 

200 self.total_requested_bytes += send - sstart 

201 logger.debug( 

202 f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})" 

203 ) 

204 ranges.append((sstart, send)) 

205 

206 # Update set of cached blocks 

207 self.blocks.update(_blocks) 

208 # Update cache statistics with number of blocks we had to cache 

209 self.miss_count += len(_blocks) 

210 

211 if not ranges: 

212 return self.cache[start:end] 

213 

214 if self.multi_fetcher: 

215 logger.debug(f"MMap get blocks {ranges}") 

216 for idx, r in enumerate(self.multi_fetcher(ranges)): 

217 sstart, send = ranges[idx] 

218 logger.debug(f"MMap copy block ({sstart}-{send}") 

219 self.cache[sstart:send] = r 

220 else: 

221 for sstart, send in ranges: 

222 logger.debug(f"MMap get block ({sstart}-{send}") 

223 self.cache[sstart:send] = self.fetcher(sstart, send) 

224 

225 return self.cache[start:end] 

226 

227 def __getstate__(self) -> dict[str, Any]: 

228 state = self.__dict__.copy() 

229 # Remove the unpicklable entries. 

230 del state["cache"] 

231 return state 

232 

233 def __setstate__(self, state: dict[str, Any]) -> None: 

234 # Restore instance attributes 

235 self.__dict__.update(state) 

236 self.cache = self._makefile() 

237 

238 

239class ReadAheadCache(BaseCache): 

240 """Cache which reads only when we get beyond a block of data 

241 

242 This is a much simpler version of BytesCache, and does not attempt to 

243 fill holes in the cache or keep fragments alive. It is best suited to 

244 many small reads in a sequential order (e.g., reading lines from a file). 

245 """ 

246 

247 name = "readahead" 

248 

249 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

250 super().__init__(blocksize, fetcher, size) 

251 self.cache = b"" 

252 self.start = 0 

253 self.end = 0 

254 

255 def _fetch(self, start: int | None, end: int | None) -> bytes: 

256 if start is None: 

257 start = 0 

258 if end is None or end > self.size: 

259 end = self.size 

260 if start >= self.size or start >= end: 

261 return b"" 

262 l = end - start 

263 if start >= self.start and end <= self.end: 

264 # cache hit 

265 self.hit_count += 1 

266 return self.cache[start - self.start : end - self.start] 

267 elif self.start <= start < self.end: 

268 # partial hit 

269 self.miss_count += 1 

270 part = self.cache[start - self.start :] 

271 l -= len(part) 

272 start = self.end 

273 else: 

274 # miss 

275 self.miss_count += 1 

276 part = b"" 

277 end = min(self.size, end + self.blocksize) 

278 self.total_requested_bytes += end - start 

279 self.cache = self.fetcher(start, end) # new block replaces old 

280 self.start = start 

281 self.end = self.start + len(self.cache) 

282 return part + self.cache[:l] 

283 

284 

285class FirstChunkCache(BaseCache): 

286 """Caches the first block of a file only 

287 

288 This may be useful for file types where the metadata is stored in the header, 

289 but is randomly accessed. 

290 """ 

291 

292 name = "first" 

293 

294 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

295 if blocksize > size: 

296 # this will buffer the whole thing 

297 blocksize = size 

298 super().__init__(blocksize, fetcher, size) 

299 self.cache: bytes | None = None 

300 

301 def _fetch(self, start: int | None, end: int | None) -> bytes: 

302 start = start or 0 

303 if start > self.size: 

304 logger.debug("FirstChunkCache: requested start > file size") 

305 return b"" 

306 

307 end = min(end, self.size) 

308 

309 if start < self.blocksize: 

310 if self.cache is None: 

311 self.miss_count += 1 

312 if end > self.blocksize: 

313 self.total_requested_bytes += end 

314 data = self.fetcher(0, end) 

315 self.cache = data[: self.blocksize] 

316 return data[start:] 

317 self.cache = self.fetcher(0, self.blocksize) 

318 self.total_requested_bytes += self.blocksize 

319 part = self.cache[start:end] 

320 if end > self.blocksize: 

321 self.total_requested_bytes += end - self.blocksize 

322 part += self.fetcher(self.blocksize, end) 

323 self.hit_count += 1 

324 return part 

325 else: 

326 self.miss_count += 1 

327 self.total_requested_bytes += end - start 

328 return self.fetcher(start, end) 

329 

330 

331class BlockCache(BaseCache): 

332 """ 

333 Cache holding memory as a set of blocks. 

334 

335 Requests are only ever made ``blocksize`` at a time, and are 

336 stored in an LRU cache. The least recently accessed block is 

337 discarded when more than ``maxblocks`` are stored. 

338 

339 Parameters 

340 ---------- 

341 blocksize : int 

342 The number of bytes to store in each block. 

343 Requests are only ever made for ``blocksize``, so this 

344 should balance the overhead of making a request against 

345 the granularity of the blocks. 

346 fetcher : Callable 

347 size : int 

348 The total size of the file being cached. 

349 maxblocks : int 

350 The maximum number of blocks to cache for. The maximum memory 

351 use for this cache is then ``blocksize * maxblocks``. 

352 """ 

353 

354 name = "blockcache" 

355 

356 def __init__( 

357 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

358 ) -> None: 

359 super().__init__(blocksize, fetcher, size) 

360 self.nblocks = math.ceil(size / blocksize) 

361 self.maxblocks = maxblocks 

362 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block) 

363 

364 def cache_info(self): 

365 """ 

366 The statistics on the block cache. 

367 

368 Returns 

369 ------- 

370 NamedTuple 

371 Returned directly from the LRU Cache used internally. 

372 """ 

373 return self._fetch_block_cached.cache_info() 

374 

375 def __getstate__(self) -> dict[str, Any]: 

376 state = self.__dict__ 

377 del state["_fetch_block_cached"] 

378 return state 

379 

380 def __setstate__(self, state: dict[str, Any]) -> None: 

381 self.__dict__.update(state) 

382 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])( 

383 self._fetch_block 

384 ) 

385 

386 def _fetch(self, start: int | None, end: int | None) -> bytes: 

387 if start is None: 

388 start = 0 

389 if end is None: 

390 end = self.size 

391 if start >= self.size or start >= end: 

392 return b"" 

393 

394 return self._read_cache( 

395 start, end, start // self.blocksize, (end - 1) // self.blocksize 

396 ) 

397 

398 def _fetch_block(self, block_number: int) -> bytes: 

399 """ 

400 Fetch the block of data for `block_number`. 

401 """ 

402 if block_number > self.nblocks: 

403 raise ValueError( 

404 f"'block_number={block_number}' is greater than " 

405 f"the number of blocks ({self.nblocks})" 

406 ) 

407 

408 start = block_number * self.blocksize 

409 end = start + self.blocksize 

410 self.total_requested_bytes += end - start 

411 self.miss_count += 1 

412 logger.info("BlockCache fetching block %d", block_number) 

413 block_contents = super()._fetch(start, end) 

414 return block_contents 

415 

416 def _read_cache( 

417 self, start: int, end: int, start_block_number: int, end_block_number: int 

418 ) -> bytes: 

419 """ 

420 Read from our block cache. 

421 

422 Parameters 

423 ---------- 

424 start, end : int 

425 The start and end byte positions. 

426 start_block_number, end_block_number : int 

427 The start and end block numbers. 

428 """ 

429 start_pos = start % self.blocksize 

430 end_pos = end % self.blocksize 

431 if end_pos == 0: 

432 end_pos = self.blocksize 

433 

434 self.hit_count += 1 

435 if start_block_number == end_block_number: 

436 block: bytes = self._fetch_block_cached(start_block_number) 

437 return block[start_pos:end_pos] 

438 

439 else: 

440 # read from the initial 

441 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

442 

443 # intermediate blocks 

444 # Note: it'd be nice to combine these into one big request. However 

445 # that doesn't play nicely with our LRU cache. 

446 out.extend( 

447 map( 

448 self._fetch_block_cached, 

449 range(start_block_number + 1, end_block_number), 

450 ) 

451 ) 

452 

453 # final block 

454 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

455 

456 return b"".join(out) 

457 

458 

459class BytesCache(BaseCache): 

460 """Cache which holds data in a in-memory bytes object 

461 

462 Implements read-ahead by the block size, for semi-random reads progressing 

463 through the file. 

464 

465 Parameters 

466 ---------- 

467 trim: bool 

468 As we read more data, whether to discard the start of the buffer when 

469 we are more than a blocksize ahead of it. 

470 """ 

471 

472 name: ClassVar[str] = "bytes" 

473 

474 def __init__( 

475 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True 

476 ) -> None: 

477 super().__init__(blocksize, fetcher, size) 

478 self.cache = b"" 

479 self.start: int | None = None 

480 self.end: int | None = None 

481 self.trim = trim 

482 

483 def _fetch(self, start: int | None, end: int | None) -> bytes: 

484 # TODO: only set start/end after fetch, in case it fails? 

485 # is this where retry logic might go? 

486 if start is None: 

487 start = 0 

488 if end is None: 

489 end = self.size 

490 if start >= self.size or start >= end: 

491 return b"" 

492 if ( 

493 self.start is not None 

494 and start >= self.start 

495 and self.end is not None 

496 and end < self.end 

497 ): 

498 # cache hit: we have all the required data 

499 offset = start - self.start 

500 self.hit_count += 1 

501 return self.cache[offset : offset + end - start] 

502 

503 if self.blocksize: 

504 bend = min(self.size, end + self.blocksize) 

505 else: 

506 bend = end 

507 

508 if bend == start or start > self.size: 

509 return b"" 

510 

511 if (self.start is None or start < self.start) and ( 

512 self.end is None or end > self.end 

513 ): 

514 # First read, or extending both before and after 

515 self.total_requested_bytes += bend - start 

516 self.miss_count += 1 

517 self.cache = self.fetcher(start, bend) 

518 self.start = start 

519 else: 

520 assert self.start is not None 

521 assert self.end is not None 

522 self.miss_count += 1 

523 

524 if start < self.start: 

525 if self.end is None or self.end - end > self.blocksize: 

526 self.total_requested_bytes += bend - start 

527 self.cache = self.fetcher(start, bend) 

528 self.start = start 

529 else: 

530 self.total_requested_bytes += self.start - start 

531 new = self.fetcher(start, self.start) 

532 self.start = start 

533 self.cache = new + self.cache 

534 elif self.end is not None and bend > self.end: 

535 if self.end > self.size: 

536 pass 

537 elif end - self.end > self.blocksize: 

538 self.total_requested_bytes += bend - start 

539 self.cache = self.fetcher(start, bend) 

540 self.start = start 

541 else: 

542 self.total_requested_bytes += bend - self.end 

543 new = self.fetcher(self.end, bend) 

544 self.cache = self.cache + new 

545 

546 self.end = self.start + len(self.cache) 

547 offset = start - self.start 

548 out = self.cache[offset : offset + end - start] 

549 if self.trim: 

550 num = (self.end - self.start) // (self.blocksize + 1) 

551 if num > 1: 

552 self.start += self.blocksize * num 

553 self.cache = self.cache[self.blocksize * num :] 

554 return out 

555 

556 def __len__(self) -> int: 

557 return len(self.cache) 

558 

559 

560class AllBytes(BaseCache): 

561 """Cache entire contents of the file""" 

562 

563 name: ClassVar[str] = "all" 

564 

565 def __init__( 

566 self, 

567 blocksize: int | None = None, 

568 fetcher: Fetcher | None = None, 

569 size: int | None = None, 

570 data: bytes | None = None, 

571 ) -> None: 

572 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type] 

573 if data is None: 

574 self.miss_count += 1 

575 self.total_requested_bytes += self.size 

576 data = self.fetcher(0, self.size) 

577 self.data = data 

578 

579 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

580 self.hit_count += 1 

581 return self.data[start:stop] 

582 

583 

584class KnownPartsOfAFile(BaseCache): 

585 """ 

586 Cache holding known file parts. 

587 

588 Parameters 

589 ---------- 

590 blocksize: int 

591 How far to read ahead in numbers of bytes 

592 fetcher: func 

593 Function of the form f(start, end) which gets bytes from remote as 

594 specified 

595 size: int 

596 How big this file is 

597 data: dict 

598 A dictionary mapping explicit `(start, stop)` file-offset tuples 

599 with known bytes. 

600 strict: bool, default True 

601 Whether to fetch reads that go beyond a known byte-range boundary. 

602 If `False`, any read that ends outside a known part will be zero 

603 padded. Note that zero padding will not be used for reads that 

604 begin outside a known byte-range. 

605 """ 

606 

607 name: ClassVar[str] = "parts" 

608 

609 def __init__( 

610 self, 

611 blocksize: int, 

612 fetcher: Fetcher, 

613 size: int, 

614 data: dict[tuple[int, int], bytes] | None = None, 

615 strict: bool = False, 

616 **_: Any, 

617 ): 

618 super().__init__(blocksize, fetcher, size) 

619 self.strict = strict 

620 

621 # simple consolidation of contiguous blocks 

622 if data: 

623 old_offsets = sorted(data.keys()) 

624 offsets = [old_offsets[0]] 

625 blocks = [data.pop(old_offsets[0])] 

626 for start, stop in old_offsets[1:]: 

627 start0, stop0 = offsets[-1] 

628 if start == stop0: 

629 offsets[-1] = (start0, stop) 

630 blocks[-1] += data.pop((start, stop)) 

631 else: 

632 offsets.append((start, stop)) 

633 blocks.append(data.pop((start, stop))) 

634 

635 self.data = dict(zip(offsets, blocks)) 

636 else: 

637 self.data = {} 

638 

639 @property 

640 def size(self): 

641 return sum(_[1] - _[0] for _ in self.data) 

642 

643 @size.setter 

644 def size(self, value): 

645 pass 

646 

647 @property 

648 def nblocks(self): 

649 return len(self.data) 

650 

651 @nblocks.setter 

652 def nblocks(self, value): 

653 pass 

654 

655 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

656 logger.debug("Known parts request %s %s", start, stop) 

657 if start is None: 

658 start = 0 

659 if stop is None: 

660 stop = self.size 

661 self.total_requested_bytes += stop - start 

662 out = b"" 

663 started = False 

664 loc_old = 0 

665 for loc0, loc1 in sorted(self.data): 

666 if (loc0 <= start < loc1) and (loc0 <= stop <= loc1): 

667 # entirely within the block 

668 off = start - loc0 

669 self.hit_count += 1 

670 return self.data[(loc0, loc1)][off : off + stop - start] 

671 if stop <= loc0: 

672 break 

673 if started and loc0 > loc_old: 

674 # a gap where we need data 

675 self.miss_count += 1 

676 if self.strict: 

677 raise ValueError 

678 out += b"\x00" * (loc0 - loc_old) 

679 if loc0 <= start < loc1: 

680 # found the start 

681 self.hit_count += 1 

682 off = start - loc0 

683 out = self.data[(loc0, loc1)][off : off + stop - start] 

684 started = True 

685 elif start < loc0 and stop > loc1: 

686 # the whole block 

687 self.hit_count += 1 

688 out += self.data[(loc0, loc1)] 

689 elif loc0 <= stop <= loc1: 

690 # end block 

691 self.hit_count += 1 

692 out = out + self.data[(loc0, loc1)][: stop - loc0] 

693 return out 

694 loc_old = loc1 

695 self.miss_count += 1 

696 if started and not self.strict: 

697 out = out + b"\x00" * (stop - loc_old) 

698 return out 

699 raise ValueError 

700 

701 

702class UpdatableLRU(Generic[P, T]): 

703 """ 

704 Custom implementation of LRU cache that allows updating keys 

705 

706 Used by BackgroudBlockCache 

707 """ 

708 

709 class CacheInfo(NamedTuple): 

710 hits: int 

711 misses: int 

712 maxsize: int 

713 currsize: int 

714 

715 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None: 

716 self._cache: OrderedDict[Any, T] = collections.OrderedDict() 

717 self._func = func 

718 self._max_size = max_size 

719 self._hits = 0 

720 self._misses = 0 

721 self._lock = threading.Lock() 

722 

723 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: 

724 if kwargs: 

725 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}") 

726 with self._lock: 

727 if args in self._cache: 

728 self._cache.move_to_end(args) 

729 self._hits += 1 

730 return self._cache[args] 

731 

732 result = self._func(*args, **kwargs) 

733 

734 with self._lock: 

735 self._cache[args] = result 

736 self._misses += 1 

737 if len(self._cache) > self._max_size: 

738 self._cache.popitem(last=False) 

739 

740 return result 

741 

742 def is_key_cached(self, *args: Any) -> bool: 

743 with self._lock: 

744 return args in self._cache 

745 

746 def add_key(self, result: T, *args: Any) -> None: 

747 with self._lock: 

748 self._cache[args] = result 

749 if len(self._cache) > self._max_size: 

750 self._cache.popitem(last=False) 

751 

752 def cache_info(self) -> UpdatableLRU.CacheInfo: 

753 with self._lock: 

754 return self.CacheInfo( 

755 maxsize=self._max_size, 

756 currsize=len(self._cache), 

757 hits=self._hits, 

758 misses=self._misses, 

759 ) 

760 

761 

762class BackgroundBlockCache(BaseCache): 

763 """ 

764 Cache holding memory as a set of blocks with pre-loading of 

765 the next block in the background. 

766 

767 Requests are only ever made ``blocksize`` at a time, and are 

768 stored in an LRU cache. The least recently accessed block is 

769 discarded when more than ``maxblocks`` are stored. If the 

770 next block is not in cache, it is loaded in a separate thread 

771 in non-blocking way. 

772 

773 Parameters 

774 ---------- 

775 blocksize : int 

776 The number of bytes to store in each block. 

777 Requests are only ever made for ``blocksize``, so this 

778 should balance the overhead of making a request against 

779 the granularity of the blocks. 

780 fetcher : Callable 

781 size : int 

782 The total size of the file being cached. 

783 maxblocks : int 

784 The maximum number of blocks to cache for. The maximum memory 

785 use for this cache is then ``blocksize * maxblocks``. 

786 """ 

787 

788 name: ClassVar[str] = "background" 

789 

790 def __init__( 

791 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

792 ) -> None: 

793 super().__init__(blocksize, fetcher, size) 

794 self.nblocks = math.ceil(size / blocksize) 

795 self.maxblocks = maxblocks 

796 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks) 

797 

798 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

799 self._fetch_future_block_number: int | None = None 

800 self._fetch_future: Future[bytes] | None = None 

801 self._fetch_future_lock = threading.Lock() 

802 

803 def cache_info(self) -> UpdatableLRU.CacheInfo: 

804 """ 

805 The statistics on the block cache. 

806 

807 Returns 

808 ------- 

809 NamedTuple 

810 Returned directly from the LRU Cache used internally. 

811 """ 

812 return self._fetch_block_cached.cache_info() 

813 

814 def __getstate__(self) -> dict[str, Any]: 

815 state = self.__dict__ 

816 del state["_fetch_block_cached"] 

817 del state["_thread_executor"] 

818 del state["_fetch_future_block_number"] 

819 del state["_fetch_future"] 

820 del state["_fetch_future_lock"] 

821 return state 

822 

823 def __setstate__(self, state) -> None: 

824 self.__dict__.update(state) 

825 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"]) 

826 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

827 self._fetch_future_block_number = None 

828 self._fetch_future = None 

829 self._fetch_future_lock = threading.Lock() 

830 

831 def _fetch(self, start: int | None, end: int | None) -> bytes: 

832 if start is None: 

833 start = 0 

834 if end is None: 

835 end = self.size 

836 if start >= self.size or start >= end: 

837 return b"" 

838 

839 # byte position -> block numbers 

840 start_block_number = start // self.blocksize 

841 end_block_number = end // self.blocksize 

842 

843 fetch_future_block_number = None 

844 fetch_future = None 

845 with self._fetch_future_lock: 

846 # Background thread is running. Check we we can or must join it. 

847 if self._fetch_future is not None: 

848 assert self._fetch_future_block_number is not None 

849 if self._fetch_future.done(): 

850 logger.info("BlockCache joined background fetch without waiting.") 

851 self._fetch_block_cached.add_key( 

852 self._fetch_future.result(), self._fetch_future_block_number 

853 ) 

854 # Cleanup the fetch variables. Done with fetching the block. 

855 self._fetch_future_block_number = None 

856 self._fetch_future = None 

857 else: 

858 # Must join if we need the block for the current fetch 

859 must_join = bool( 

860 start_block_number 

861 <= self._fetch_future_block_number 

862 <= end_block_number 

863 ) 

864 if must_join: 

865 # Copy to the local variables to release lock 

866 # before waiting for result 

867 fetch_future_block_number = self._fetch_future_block_number 

868 fetch_future = self._fetch_future 

869 

870 # Cleanup the fetch variables. Have a local copy. 

871 self._fetch_future_block_number = None 

872 self._fetch_future = None 

873 

874 # Need to wait for the future for the current read 

875 if fetch_future is not None: 

876 logger.info("BlockCache waiting for background fetch.") 

877 # Wait until result and put it in cache 

878 self._fetch_block_cached.add_key( 

879 fetch_future.result(), fetch_future_block_number 

880 ) 

881 

882 # these are cached, so safe to do multiple calls for the same start and end. 

883 for block_number in range(start_block_number, end_block_number + 1): 

884 self._fetch_block_cached(block_number) 

885 

886 # fetch next block in the background if nothing is running in the background, 

887 # the block is within file and it is not already cached 

888 end_block_plus_1 = end_block_number + 1 

889 with self._fetch_future_lock: 

890 if ( 

891 self._fetch_future is None 

892 and end_block_plus_1 <= self.nblocks 

893 and not self._fetch_block_cached.is_key_cached(end_block_plus_1) 

894 ): 

895 self._fetch_future_block_number = end_block_plus_1 

896 self._fetch_future = self._thread_executor.submit( 

897 self._fetch_block, end_block_plus_1, "async" 

898 ) 

899 

900 return self._read_cache( 

901 start, 

902 end, 

903 start_block_number=start_block_number, 

904 end_block_number=end_block_number, 

905 ) 

906 

907 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes: 

908 """ 

909 Fetch the block of data for `block_number`. 

910 """ 

911 if block_number > self.nblocks: 

912 raise ValueError( 

913 f"'block_number={block_number}' is greater than " 

914 f"the number of blocks ({self.nblocks})" 

915 ) 

916 

917 start = block_number * self.blocksize 

918 end = start + self.blocksize 

919 logger.info("BlockCache fetching block (%s) %d", log_info, block_number) 

920 self.total_requested_bytes += end - start 

921 self.miss_count += 1 

922 block_contents = super()._fetch(start, end) 

923 return block_contents 

924 

925 def _read_cache( 

926 self, start: int, end: int, start_block_number: int, end_block_number: int 

927 ) -> bytes: 

928 """ 

929 Read from our block cache. 

930 

931 Parameters 

932 ---------- 

933 start, end : int 

934 The start and end byte positions. 

935 start_block_number, end_block_number : int 

936 The start and end block numbers. 

937 """ 

938 start_pos = start % self.blocksize 

939 end_pos = end % self.blocksize 

940 

941 # kind of pointless to count this as a hit, but it is 

942 self.hit_count += 1 

943 

944 if start_block_number == end_block_number: 

945 block = self._fetch_block_cached(start_block_number) 

946 return block[start_pos:end_pos] 

947 

948 else: 

949 # read from the initial 

950 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

951 

952 # intermediate blocks 

953 # Note: it'd be nice to combine these into one big request. However 

954 # that doesn't play nicely with our LRU cache. 

955 out.extend( 

956 map( 

957 self._fetch_block_cached, 

958 range(start_block_number + 1, end_block_number), 

959 ) 

960 ) 

961 

962 # final block 

963 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

964 

965 return b"".join(out) 

966 

967 

968caches: dict[str | None, type[BaseCache]] = { 

969 # one custom case 

970 None: BaseCache, 

971} 

972 

973 

974def register_cache(cls: type[BaseCache], clobber: bool = False) -> None: 

975 """'Register' cache implementation. 

976 

977 Parameters 

978 ---------- 

979 clobber: bool, optional 

980 If set to True (default is False) - allow to overwrite existing 

981 entry. 

982 

983 Raises 

984 ------ 

985 ValueError 

986 """ 

987 name = cls.name 

988 if not clobber and name in caches: 

989 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}") 

990 caches[name] = cls 

991 

992 

993for c in ( 

994 BaseCache, 

995 MMapCache, 

996 BytesCache, 

997 ReadAheadCache, 

998 BlockCache, 

999 FirstChunkCache, 

1000 AllBytes, 

1001 KnownPartsOfAFile, 

1002 BackgroundBlockCache, 

1003): 

1004 register_cache(c)