Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/caching.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

474 statements  

1from __future__ import annotations 

2 

3import collections 

4import functools 

5import logging 

6import math 

7import os 

8import threading 

9import warnings 

10from collections import OrderedDict 

11from concurrent.futures import Future, ThreadPoolExecutor 

12from itertools import groupby 

13from operator import itemgetter 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Callable, 

18 ClassVar, 

19 Generic, 

20 NamedTuple, 

21 TypeVar, 

22) 

23 

24if TYPE_CHECKING: 

25 import mmap 

26 

27 from typing_extensions import ParamSpec 

28 

29 P = ParamSpec("P") 

30else: 

31 P = TypeVar("P") 

32 

33T = TypeVar("T") 

34 

35 

36logger = logging.getLogger("fsspec") 

37 

38Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes 

39MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes 

40 

41 

42class BaseCache: 

43 """Pass-though cache: doesn't keep anything, calls every time 

44 

45 Acts as base class for other cachers 

46 

47 Parameters 

48 ---------- 

49 blocksize: int 

50 How far to read ahead in numbers of bytes 

51 fetcher: func 

52 Function of the form f(start, end) which gets bytes from remote as 

53 specified 

54 size: int 

55 How big this file is 

56 """ 

57 

58 name: ClassVar[str] = "none" 

59 

60 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

61 self.blocksize = blocksize 

62 self.nblocks = 0 

63 self.fetcher = fetcher 

64 self.size = size 

65 self.hit_count = 0 

66 self.miss_count = 0 

67 # the bytes that we actually requested 

68 self.total_requested_bytes = 0 

69 

70 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

71 if start is None: 

72 start = 0 

73 if stop is None: 

74 stop = self.size 

75 if start >= self.size or start >= stop: 

76 return b"" 

77 return self.fetcher(start, stop) 

78 

79 def _reset_stats(self) -> None: 

80 """Reset hit and miss counts for a more ganular report e.g. by file.""" 

81 self.hit_count = 0 

82 self.miss_count = 0 

83 self.total_requested_bytes = 0 

84 

85 def _log_stats(self) -> str: 

86 """Return a formatted string of the cache statistics.""" 

87 if self.hit_count == 0 and self.miss_count == 0: 

88 # a cache that does nothing, this is for logs only 

89 return "" 

90 return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes" 

91 

92 def __repr__(self) -> str: 

93 # TODO: use rich for better formatting 

94 return f""" 

95 <{self.__class__.__name__}: 

96 block size : {self.blocksize} 

97 block count : {self.nblocks} 

98 file size : {self.size} 

99 cache hits : {self.hit_count} 

100 cache misses: {self.miss_count} 

101 total requested bytes: {self.total_requested_bytes}> 

102 """ 

103 

104 

105class MMapCache(BaseCache): 

106 """memory-mapped sparse file cache 

107 

108 Opens temporary file, which is filled blocks-wise when data is requested. 

109 Ensure there is enough disc space in the temporary location. 

110 

111 This cache method might only work on posix 

112 

113 Parameters 

114 ---------- 

115 blocksize: int 

116 How far to read ahead in numbers of bytes 

117 fetcher: Fetcher 

118 Function of the form f(start, end) which gets bytes from remote as 

119 specified 

120 size: int 

121 How big this file is 

122 location: str 

123 Where to create the temporary file. If None, a temporary file is 

124 created using tempfile.TemporaryFile(). 

125 blocks: set[int] 

126 Set of block numbers that have already been fetched. If None, an empty 

127 set is created. 

128 multi_fetcher: MultiFetcher 

129 Function of the form f([(start, end)]) which gets bytes from remote 

130 as specified. This function is used to fetch multiple blocks at once. 

131 If not specified, the fetcher function is used instead. 

132 """ 

133 

134 name = "mmap" 

135 

136 def __init__( 

137 self, 

138 blocksize: int, 

139 fetcher: Fetcher, 

140 size: int, 

141 location: str | None = None, 

142 blocks: set[int] | None = None, 

143 multi_fetcher: MultiFetcher | None = None, 

144 ) -> None: 

145 super().__init__(blocksize, fetcher, size) 

146 self.blocks = set() if blocks is None else blocks 

147 self.location = location 

148 self.multi_fetcher = multi_fetcher 

149 self.cache = self._makefile() 

150 

151 def _makefile(self) -> mmap.mmap | bytearray: 

152 import mmap 

153 import tempfile 

154 

155 if self.size == 0: 

156 return bytearray() 

157 

158 # posix version 

159 if self.location is None or not os.path.exists(self.location): 

160 if self.location is None: 

161 fd = tempfile.TemporaryFile() 

162 self.blocks = set() 

163 else: 

164 fd = open(self.location, "wb+") 

165 fd.seek(self.size - 1) 

166 fd.write(b"1") 

167 fd.flush() 

168 else: 

169 fd = open(self.location, "r+b") 

170 

171 return mmap.mmap(fd.fileno(), self.size) 

172 

173 def _fetch(self, start: int | None, end: int | None) -> bytes: 

174 logger.debug(f"MMap cache fetching {start}-{end}") 

175 if start is None: 

176 start = 0 

177 if end is None: 

178 end = self.size 

179 if start >= self.size or start >= end: 

180 return b"" 

181 start_block = start // self.blocksize 

182 end_block = end // self.blocksize 

183 block_range = range(start_block, end_block + 1) 

184 # Determine which blocks need to be fetched. This sequence is sorted by construction. 

185 need = (i for i in block_range if i not in self.blocks) 

186 # Count the number of blocks already cached 

187 self.hit_count += sum(1 for i in block_range if i in self.blocks) 

188 

189 ranges = [] 

190 

191 # Consolidate needed blocks. 

192 # Algorithm adapted from Python 2.x itertools documentation. 

193 # We are grouping an enumerated sequence of blocks. By comparing when the difference 

194 # between an ascending range (provided by enumerate) and the needed block numbers 

195 # we can detect when the block number skips values. The key computes this difference. 

196 # Whenever the difference changes, we know that we have previously cached block(s), 

197 # and a new group is started. In other words, this algorithm neatly groups 

198 # runs of consecutive block numbers so they can be fetched together. 

199 for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]): 

200 # Extract the blocks from the enumerated sequence 

201 _blocks = tuple(map(itemgetter(1), _blocks)) 

202 # Compute start of first block 

203 sstart = _blocks[0] * self.blocksize 

204 # Compute the end of the last block. Last block may not be full size. 

205 send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size) 

206 

207 # Fetch bytes (could be multiple consecutive blocks) 

208 self.total_requested_bytes += send - sstart 

209 logger.debug( 

210 f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})" 

211 ) 

212 ranges.append((sstart, send)) 

213 

214 # Update set of cached blocks 

215 self.blocks.update(_blocks) 

216 # Update cache statistics with number of blocks we had to cache 

217 self.miss_count += len(_blocks) 

218 

219 if not ranges: 

220 return self.cache[start:end] 

221 

222 if self.multi_fetcher: 

223 logger.debug(f"MMap get blocks {ranges}") 

224 for idx, r in enumerate(self.multi_fetcher(ranges)): 

225 (sstart, send) = ranges[idx] 

226 logger.debug(f"MMap copy block ({sstart}-{send}") 

227 self.cache[sstart:send] = r 

228 else: 

229 for sstart, send in ranges: 

230 logger.debug(f"MMap get block ({sstart}-{send}") 

231 self.cache[sstart:send] = self.fetcher(sstart, send) 

232 

233 return self.cache[start:end] 

234 

235 def __getstate__(self) -> dict[str, Any]: 

236 state = self.__dict__.copy() 

237 # Remove the unpicklable entries. 

238 del state["cache"] 

239 return state 

240 

241 def __setstate__(self, state: dict[str, Any]) -> None: 

242 # Restore instance attributes 

243 self.__dict__.update(state) 

244 self.cache = self._makefile() 

245 

246 

247class ReadAheadCache(BaseCache): 

248 """Cache which reads only when we get beyond a block of data 

249 

250 This is a much simpler version of BytesCache, and does not attempt to 

251 fill holes in the cache or keep fragments alive. It is best suited to 

252 many small reads in a sequential order (e.g., reading lines from a file). 

253 """ 

254 

255 name = "readahead" 

256 

257 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

258 super().__init__(blocksize, fetcher, size) 

259 self.cache = b"" 

260 self.start = 0 

261 self.end = 0 

262 

263 def _fetch(self, start: int | None, end: int | None) -> bytes: 

264 if start is None: 

265 start = 0 

266 if end is None or end > self.size: 

267 end = self.size 

268 if start >= self.size or start >= end: 

269 return b"" 

270 l = end - start 

271 if start >= self.start and end <= self.end: 

272 # cache hit 

273 self.hit_count += 1 

274 return self.cache[start - self.start : end - self.start] 

275 elif self.start <= start < self.end: 

276 # partial hit 

277 self.miss_count += 1 

278 part = self.cache[start - self.start :] 

279 l -= len(part) 

280 start = self.end 

281 else: 

282 # miss 

283 self.miss_count += 1 

284 part = b"" 

285 end = min(self.size, end + self.blocksize) 

286 self.total_requested_bytes += end - start 

287 self.cache = self.fetcher(start, end) # new block replaces old 

288 self.start = start 

289 self.end = self.start + len(self.cache) 

290 return part + self.cache[:l] 

291 

292 

293class FirstChunkCache(BaseCache): 

294 """Caches the first block of a file only 

295 

296 This may be useful for file types where the metadata is stored in the header, 

297 but is randomly accessed. 

298 """ 

299 

300 name = "first" 

301 

302 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

303 if blocksize > size: 

304 # this will buffer the whole thing 

305 blocksize = size 

306 super().__init__(blocksize, fetcher, size) 

307 self.cache: bytes | None = None 

308 

309 def _fetch(self, start: int | None, end: int | None) -> bytes: 

310 start = start or 0 

311 if start > self.size: 

312 logger.debug("FirstChunkCache: requested start > file size") 

313 return b"" 

314 

315 end = min(end, self.size) 

316 

317 if start < self.blocksize: 

318 if self.cache is None: 

319 self.miss_count += 1 

320 if end > self.blocksize: 

321 self.total_requested_bytes += end 

322 data = self.fetcher(0, end) 

323 self.cache = data[: self.blocksize] 

324 return data[start:] 

325 self.cache = self.fetcher(0, self.blocksize) 

326 self.total_requested_bytes += self.blocksize 

327 part = self.cache[start:end] 

328 if end > self.blocksize: 

329 self.total_requested_bytes += end - self.blocksize 

330 part += self.fetcher(self.blocksize, end) 

331 self.hit_count += 1 

332 return part 

333 else: 

334 self.miss_count += 1 

335 self.total_requested_bytes += end - start 

336 return self.fetcher(start, end) 

337 

338 

339class BlockCache(BaseCache): 

340 """ 

341 Cache holding memory as a set of blocks. 

342 

343 Requests are only ever made ``blocksize`` at a time, and are 

344 stored in an LRU cache. The least recently accessed block is 

345 discarded when more than ``maxblocks`` are stored. 

346 

347 Parameters 

348 ---------- 

349 blocksize : int 

350 The number of bytes to store in each block. 

351 Requests are only ever made for ``blocksize``, so this 

352 should balance the overhead of making a request against 

353 the granularity of the blocks. 

354 fetcher : Callable 

355 size : int 

356 The total size of the file being cached. 

357 maxblocks : int 

358 The maximum number of blocks to cache for. The maximum memory 

359 use for this cache is then ``blocksize * maxblocks``. 

360 """ 

361 

362 name = "blockcache" 

363 

364 def __init__( 

365 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

366 ) -> None: 

367 super().__init__(blocksize, fetcher, size) 

368 self.nblocks = math.ceil(size / blocksize) 

369 self.maxblocks = maxblocks 

370 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block) 

371 

372 def cache_info(self): 

373 """ 

374 The statistics on the block cache. 

375 

376 Returns 

377 ------- 

378 NamedTuple 

379 Returned directly from the LRU Cache used internally. 

380 """ 

381 return self._fetch_block_cached.cache_info() 

382 

383 def __getstate__(self) -> dict[str, Any]: 

384 state = self.__dict__ 

385 del state["_fetch_block_cached"] 

386 return state 

387 

388 def __setstate__(self, state: dict[str, Any]) -> None: 

389 self.__dict__.update(state) 

390 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])( 

391 self._fetch_block 

392 ) 

393 

394 def _fetch(self, start: int | None, end: int | None) -> bytes: 

395 if start is None: 

396 start = 0 

397 if end is None: 

398 end = self.size 

399 if start >= self.size or start >= end: 

400 return b"" 

401 

402 # byte position -> block numbers 

403 start_block_number = start // self.blocksize 

404 end_block_number = end // self.blocksize 

405 

406 # these are cached, so safe to do multiple calls for the same start and end. 

407 for block_number in range(start_block_number, end_block_number + 1): 

408 self._fetch_block_cached(block_number) 

409 

410 return self._read_cache( 

411 start, 

412 end, 

413 start_block_number=start_block_number, 

414 end_block_number=end_block_number, 

415 ) 

416 

417 def _fetch_block(self, block_number: int) -> bytes: 

418 """ 

419 Fetch the block of data for `block_number`. 

420 """ 

421 if block_number > self.nblocks: 

422 raise ValueError( 

423 f"'block_number={block_number}' is greater than " 

424 f"the number of blocks ({self.nblocks})" 

425 ) 

426 

427 start = block_number * self.blocksize 

428 end = start + self.blocksize 

429 self.total_requested_bytes += end - start 

430 self.miss_count += 1 

431 logger.info("BlockCache fetching block %d", block_number) 

432 block_contents = super()._fetch(start, end) 

433 return block_contents 

434 

435 def _read_cache( 

436 self, start: int, end: int, start_block_number: int, end_block_number: int 

437 ) -> bytes: 

438 """ 

439 Read from our block cache. 

440 

441 Parameters 

442 ---------- 

443 start, end : int 

444 The start and end byte positions. 

445 start_block_number, end_block_number : int 

446 The start and end block numbers. 

447 """ 

448 start_pos = start % self.blocksize 

449 end_pos = end % self.blocksize 

450 

451 self.hit_count += 1 

452 if start_block_number == end_block_number: 

453 block: bytes = self._fetch_block_cached(start_block_number) 

454 return block[start_pos:end_pos] 

455 

456 else: 

457 # read from the initial 

458 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

459 

460 # intermediate blocks 

461 # Note: it'd be nice to combine these into one big request. However 

462 # that doesn't play nicely with our LRU cache. 

463 out.extend( 

464 map( 

465 self._fetch_block_cached, 

466 range(start_block_number + 1, end_block_number), 

467 ) 

468 ) 

469 

470 # final block 

471 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

472 

473 return b"".join(out) 

474 

475 

476class BytesCache(BaseCache): 

477 """Cache which holds data in a in-memory bytes object 

478 

479 Implements read-ahead by the block size, for semi-random reads progressing 

480 through the file. 

481 

482 Parameters 

483 ---------- 

484 trim: bool 

485 As we read more data, whether to discard the start of the buffer when 

486 we are more than a blocksize ahead of it. 

487 """ 

488 

489 name: ClassVar[str] = "bytes" 

490 

491 def __init__( 

492 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True 

493 ) -> None: 

494 super().__init__(blocksize, fetcher, size) 

495 self.cache = b"" 

496 self.start: int | None = None 

497 self.end: int | None = None 

498 self.trim = trim 

499 

500 def _fetch(self, start: int | None, end: int | None) -> bytes: 

501 # TODO: only set start/end after fetch, in case it fails? 

502 # is this where retry logic might go? 

503 if start is None: 

504 start = 0 

505 if end is None: 

506 end = self.size 

507 if start >= self.size or start >= end: 

508 return b"" 

509 if ( 

510 self.start is not None 

511 and start >= self.start 

512 and self.end is not None 

513 and end < self.end 

514 ): 

515 # cache hit: we have all the required data 

516 offset = start - self.start 

517 self.hit_count += 1 

518 return self.cache[offset : offset + end - start] 

519 

520 if self.blocksize: 

521 bend = min(self.size, end + self.blocksize) 

522 else: 

523 bend = end 

524 

525 if bend == start or start > self.size: 

526 return b"" 

527 

528 if (self.start is None or start < self.start) and ( 

529 self.end is None or end > self.end 

530 ): 

531 # First read, or extending both before and after 

532 self.total_requested_bytes += bend - start 

533 self.miss_count += 1 

534 self.cache = self.fetcher(start, bend) 

535 self.start = start 

536 else: 

537 assert self.start is not None 

538 assert self.end is not None 

539 self.miss_count += 1 

540 

541 if start < self.start: 

542 if self.end is None or self.end - end > self.blocksize: 

543 self.total_requested_bytes += bend - start 

544 self.cache = self.fetcher(start, bend) 

545 self.start = start 

546 else: 

547 self.total_requested_bytes += self.start - start 

548 new = self.fetcher(start, self.start) 

549 self.start = start 

550 self.cache = new + self.cache 

551 elif self.end is not None and bend > self.end: 

552 if self.end > self.size: 

553 pass 

554 elif end - self.end > self.blocksize: 

555 self.total_requested_bytes += bend - start 

556 self.cache = self.fetcher(start, bend) 

557 self.start = start 

558 else: 

559 self.total_requested_bytes += bend - self.end 

560 new = self.fetcher(self.end, bend) 

561 self.cache = self.cache + new 

562 

563 self.end = self.start + len(self.cache) 

564 offset = start - self.start 

565 out = self.cache[offset : offset + end - start] 

566 if self.trim: 

567 num = (self.end - self.start) // (self.blocksize + 1) 

568 if num > 1: 

569 self.start += self.blocksize * num 

570 self.cache = self.cache[self.blocksize * num :] 

571 return out 

572 

573 def __len__(self) -> int: 

574 return len(self.cache) 

575 

576 

577class AllBytes(BaseCache): 

578 """Cache entire contents of the file""" 

579 

580 name: ClassVar[str] = "all" 

581 

582 def __init__( 

583 self, 

584 blocksize: int | None = None, 

585 fetcher: Fetcher | None = None, 

586 size: int | None = None, 

587 data: bytes | None = None, 

588 ) -> None: 

589 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type] 

590 if data is None: 

591 self.miss_count += 1 

592 self.total_requested_bytes += self.size 

593 data = self.fetcher(0, self.size) 

594 self.data = data 

595 

596 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

597 self.hit_count += 1 

598 return self.data[start:stop] 

599 

600 

601class KnownPartsOfAFile(BaseCache): 

602 """ 

603 Cache holding known file parts. 

604 

605 Parameters 

606 ---------- 

607 blocksize: int 

608 How far to read ahead in numbers of bytes 

609 fetcher: func 

610 Function of the form f(start, end) which gets bytes from remote as 

611 specified 

612 size: int 

613 How big this file is 

614 data: dict 

615 A dictionary mapping explicit `(start, stop)` file-offset tuples 

616 with known bytes. 

617 strict: bool, default True 

618 Whether to fetch reads that go beyond a known byte-range boundary. 

619 If `False`, any read that ends outside a known part will be zero 

620 padded. Note that zero padding will not be used for reads that 

621 begin outside a known byte-range. 

622 """ 

623 

624 name: ClassVar[str] = "parts" 

625 

626 def __init__( 

627 self, 

628 blocksize: int, 

629 fetcher: Fetcher, 

630 size: int, 

631 data: dict[tuple[int, int], bytes] | None = None, 

632 strict: bool = True, 

633 **_: Any, 

634 ): 

635 super().__init__(blocksize, fetcher, size) 

636 self.strict = strict 

637 

638 # simple consolidation of contiguous blocks 

639 if data: 

640 old_offsets = sorted(data.keys()) 

641 offsets = [old_offsets[0]] 

642 blocks = [data.pop(old_offsets[0])] 

643 for start, stop in old_offsets[1:]: 

644 start0, stop0 = offsets[-1] 

645 if start == stop0: 

646 offsets[-1] = (start0, stop) 

647 blocks[-1] += data.pop((start, stop)) 

648 else: 

649 offsets.append((start, stop)) 

650 blocks.append(data.pop((start, stop))) 

651 

652 self.data = dict(zip(offsets, blocks)) 

653 else: 

654 self.data = {} 

655 

656 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

657 if start is None: 

658 start = 0 

659 if stop is None: 

660 stop = self.size 

661 

662 out = b"" 

663 for (loc0, loc1), data in self.data.items(): 

664 # If self.strict=False, use zero-padded data 

665 # for reads beyond the end of a "known" buffer 

666 if loc0 <= start < loc1: 

667 off = start - loc0 

668 out = data[off : off + stop - start] 

669 if not self.strict or loc0 <= stop <= loc1: 

670 # The request is within a known range, or 

671 # it begins within a known range, and we 

672 # are allowed to pad reads beyond the 

673 # buffer with zero 

674 out += b"\x00" * (stop - start - len(out)) 

675 self.hit_count += 1 

676 return out 

677 else: 

678 # The request ends outside a known range, 

679 # and we are being "strict" about reads 

680 # beyond the buffer 

681 start = loc1 

682 break 

683 

684 # We only get here if there is a request outside the 

685 # known parts of the file. In an ideal world, this 

686 # should never happen 

687 if self.fetcher is None: 

688 # We cannot fetch the data, so raise an error 

689 raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ") 

690 # We can fetch the data, but should warn the user 

691 # that this may be slow 

692 warnings.warn( 

693 f"Read is outside the known file parts: {(start, stop)}. " 

694 f"IO/caching performance may be poor!" 

695 ) 

696 logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}") 

697 self.total_requested_bytes += stop - start 

698 self.miss_count += 1 

699 return out + super()._fetch(start, stop) 

700 

701 

702class UpdatableLRU(Generic[P, T]): 

703 """ 

704 Custom implementation of LRU cache that allows updating keys 

705 

706 Used by BackgroudBlockCache 

707 """ 

708 

709 class CacheInfo(NamedTuple): 

710 hits: int 

711 misses: int 

712 maxsize: int 

713 currsize: int 

714 

715 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None: 

716 self._cache: OrderedDict[Any, T] = collections.OrderedDict() 

717 self._func = func 

718 self._max_size = max_size 

719 self._hits = 0 

720 self._misses = 0 

721 self._lock = threading.Lock() 

722 

723 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: 

724 if kwargs: 

725 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}") 

726 with self._lock: 

727 if args in self._cache: 

728 self._cache.move_to_end(args) 

729 self._hits += 1 

730 return self._cache[args] 

731 

732 result = self._func(*args, **kwargs) 

733 

734 with self._lock: 

735 self._cache[args] = result 

736 self._misses += 1 

737 if len(self._cache) > self._max_size: 

738 self._cache.popitem(last=False) 

739 

740 return result 

741 

742 def is_key_cached(self, *args: Any) -> bool: 

743 with self._lock: 

744 return args in self._cache 

745 

746 def add_key(self, result: T, *args: Any) -> None: 

747 with self._lock: 

748 self._cache[args] = result 

749 if len(self._cache) > self._max_size: 

750 self._cache.popitem(last=False) 

751 

752 def cache_info(self) -> UpdatableLRU.CacheInfo: 

753 with self._lock: 

754 return self.CacheInfo( 

755 maxsize=self._max_size, 

756 currsize=len(self._cache), 

757 hits=self._hits, 

758 misses=self._misses, 

759 ) 

760 

761 

762class BackgroundBlockCache(BaseCache): 

763 """ 

764 Cache holding memory as a set of blocks with pre-loading of 

765 the next block in the background. 

766 

767 Requests are only ever made ``blocksize`` at a time, and are 

768 stored in an LRU cache. The least recently accessed block is 

769 discarded when more than ``maxblocks`` are stored. If the 

770 next block is not in cache, it is loaded in a separate thread 

771 in non-blocking way. 

772 

773 Parameters 

774 ---------- 

775 blocksize : int 

776 The number of bytes to store in each block. 

777 Requests are only ever made for ``blocksize``, so this 

778 should balance the overhead of making a request against 

779 the granularity of the blocks. 

780 fetcher : Callable 

781 size : int 

782 The total size of the file being cached. 

783 maxblocks : int 

784 The maximum number of blocks to cache for. The maximum memory 

785 use for this cache is then ``blocksize * maxblocks``. 

786 """ 

787 

788 name: ClassVar[str] = "background" 

789 

790 def __init__( 

791 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

792 ) -> None: 

793 super().__init__(blocksize, fetcher, size) 

794 self.nblocks = math.ceil(size / blocksize) 

795 self.maxblocks = maxblocks 

796 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks) 

797 

798 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

799 self._fetch_future_block_number: int | None = None 

800 self._fetch_future: Future[bytes] | None = None 

801 self._fetch_future_lock = threading.Lock() 

802 

803 def cache_info(self) -> UpdatableLRU.CacheInfo: 

804 """ 

805 The statistics on the block cache. 

806 

807 Returns 

808 ------- 

809 NamedTuple 

810 Returned directly from the LRU Cache used internally. 

811 """ 

812 return self._fetch_block_cached.cache_info() 

813 

814 def __getstate__(self) -> dict[str, Any]: 

815 state = self.__dict__ 

816 del state["_fetch_block_cached"] 

817 del state["_thread_executor"] 

818 del state["_fetch_future_block_number"] 

819 del state["_fetch_future"] 

820 del state["_fetch_future_lock"] 

821 return state 

822 

823 def __setstate__(self, state) -> None: 

824 self.__dict__.update(state) 

825 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"]) 

826 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

827 self._fetch_future_block_number = None 

828 self._fetch_future = None 

829 self._fetch_future_lock = threading.Lock() 

830 

831 def _fetch(self, start: int | None, end: int | None) -> bytes: 

832 if start is None: 

833 start = 0 

834 if end is None: 

835 end = self.size 

836 if start >= self.size or start >= end: 

837 return b"" 

838 

839 # byte position -> block numbers 

840 start_block_number = start // self.blocksize 

841 end_block_number = end // self.blocksize 

842 

843 fetch_future_block_number = None 

844 fetch_future = None 

845 with self._fetch_future_lock: 

846 # Background thread is running. Check we we can or must join it. 

847 if self._fetch_future is not None: 

848 assert self._fetch_future_block_number is not None 

849 if self._fetch_future.done(): 

850 logger.info("BlockCache joined background fetch without waiting.") 

851 self._fetch_block_cached.add_key( 

852 self._fetch_future.result(), self._fetch_future_block_number 

853 ) 

854 # Cleanup the fetch variables. Done with fetching the block. 

855 self._fetch_future_block_number = None 

856 self._fetch_future = None 

857 else: 

858 # Must join if we need the block for the current fetch 

859 must_join = bool( 

860 start_block_number 

861 <= self._fetch_future_block_number 

862 <= end_block_number 

863 ) 

864 if must_join: 

865 # Copy to the local variables to release lock 

866 # before waiting for result 

867 fetch_future_block_number = self._fetch_future_block_number 

868 fetch_future = self._fetch_future 

869 

870 # Cleanup the fetch variables. Have a local copy. 

871 self._fetch_future_block_number = None 

872 self._fetch_future = None 

873 

874 # Need to wait for the future for the current read 

875 if fetch_future is not None: 

876 logger.info("BlockCache waiting for background fetch.") 

877 # Wait until result and put it in cache 

878 self._fetch_block_cached.add_key( 

879 fetch_future.result(), fetch_future_block_number 

880 ) 

881 

882 # these are cached, so safe to do multiple calls for the same start and end. 

883 for block_number in range(start_block_number, end_block_number + 1): 

884 self._fetch_block_cached(block_number) 

885 

886 # fetch next block in the background if nothing is running in the background, 

887 # the block is within file and it is not already cached 

888 end_block_plus_1 = end_block_number + 1 

889 with self._fetch_future_lock: 

890 if ( 

891 self._fetch_future is None 

892 and end_block_plus_1 <= self.nblocks 

893 and not self._fetch_block_cached.is_key_cached(end_block_plus_1) 

894 ): 

895 self._fetch_future_block_number = end_block_plus_1 

896 self._fetch_future = self._thread_executor.submit( 

897 self._fetch_block, end_block_plus_1, "async" 

898 ) 

899 

900 return self._read_cache( 

901 start, 

902 end, 

903 start_block_number=start_block_number, 

904 end_block_number=end_block_number, 

905 ) 

906 

907 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes: 

908 """ 

909 Fetch the block of data for `block_number`. 

910 """ 

911 if block_number > self.nblocks: 

912 raise ValueError( 

913 f"'block_number={block_number}' is greater than " 

914 f"the number of blocks ({self.nblocks})" 

915 ) 

916 

917 start = block_number * self.blocksize 

918 end = start + self.blocksize 

919 logger.info("BlockCache fetching block (%s) %d", log_info, block_number) 

920 self.total_requested_bytes += end - start 

921 self.miss_count += 1 

922 block_contents = super()._fetch(start, end) 

923 return block_contents 

924 

925 def _read_cache( 

926 self, start: int, end: int, start_block_number: int, end_block_number: int 

927 ) -> bytes: 

928 """ 

929 Read from our block cache. 

930 

931 Parameters 

932 ---------- 

933 start, end : int 

934 The start and end byte positions. 

935 start_block_number, end_block_number : int 

936 The start and end block numbers. 

937 """ 

938 start_pos = start % self.blocksize 

939 end_pos = end % self.blocksize 

940 

941 # kind of pointless to count this as a hit, but it is 

942 self.hit_count += 1 

943 

944 if start_block_number == end_block_number: 

945 block = self._fetch_block_cached(start_block_number) 

946 return block[start_pos:end_pos] 

947 

948 else: 

949 # read from the initial 

950 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

951 

952 # intermediate blocks 

953 # Note: it'd be nice to combine these into one big request. However 

954 # that doesn't play nicely with our LRU cache. 

955 out.extend( 

956 map( 

957 self._fetch_block_cached, 

958 range(start_block_number + 1, end_block_number), 

959 ) 

960 ) 

961 

962 # final block 

963 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

964 

965 return b"".join(out) 

966 

967 

968caches: dict[str | None, type[BaseCache]] = { 

969 # one custom case 

970 None: BaseCache, 

971} 

972 

973 

974def register_cache(cls: type[BaseCache], clobber: bool = False) -> None: 

975 """'Register' cache implementation. 

976 

977 Parameters 

978 ---------- 

979 clobber: bool, optional 

980 If set to True (default is False) - allow to overwrite existing 

981 entry. 

982 

983 Raises 

984 ------ 

985 ValueError 

986 """ 

987 name = cls.name 

988 if not clobber and name in caches: 

989 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}") 

990 caches[name] = cls 

991 

992 

993for c in ( 

994 BaseCache, 

995 MMapCache, 

996 BytesCache, 

997 ReadAheadCache, 

998 BlockCache, 

999 FirstChunkCache, 

1000 AllBytes, 

1001 KnownPartsOfAFile, 

1002 BackgroundBlockCache, 

1003): 

1004 register_cache(c)