Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/caching.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

500 statements  

1from __future__ import annotations 

2 

3import collections 

4import functools 

5import logging 

6import math 

7import os 

8import threading 

9from collections import OrderedDict 

10from collections.abc import Callable 

11from concurrent.futures import Future, ThreadPoolExecutor 

12from itertools import groupby 

13from operator import itemgetter 

14from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, TypeVar 

15 

16if TYPE_CHECKING: 

17 import mmap 

18 

19 from typing_extensions import ParamSpec 

20 

21 P = ParamSpec("P") 

22else: 

23 P = TypeVar("P") 

24 

25T = TypeVar("T") 

26 

27 

28logger = logging.getLogger("fsspec") 

29 

30Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes 

31MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes 

32 

33 

34class BaseCache: 

35 """Pass-though cache: doesn't keep anything, calls every time 

36 

37 Acts as base class for other cachers 

38 

39 Parameters 

40 ---------- 

41 blocksize: int 

42 How far to read ahead in numbers of bytes 

43 fetcher: func 

44 Function of the form f(start, end) which gets bytes from remote as 

45 specified 

46 size: int 

47 How big this file is 

48 """ 

49 

50 name: ClassVar[str] = "none" 

51 

52 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

53 self.blocksize = blocksize 

54 self.nblocks = 0 

55 self.fetcher = fetcher 

56 self.size = size 

57 self.hit_count = 0 

58 self.miss_count = 0 

59 # the bytes that we actually requested 

60 self.total_requested_bytes = 0 

61 

62 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

63 if start is None: 

64 start = 0 

65 if stop is None: 

66 stop = self.size 

67 if start >= self.size or start >= stop: 

68 return b"" 

69 return self.fetcher(start, stop) 

70 

71 def _reset_stats(self) -> None: 

72 """Reset hit and miss counts for a more ganular report e.g. by file.""" 

73 self.hit_count = 0 

74 self.miss_count = 0 

75 self.total_requested_bytes = 0 

76 

77 def _log_stats(self) -> str: 

78 """Return a formatted string of the cache statistics.""" 

79 if self.hit_count == 0 and self.miss_count == 0: 

80 # a cache that does nothing, this is for logs only 

81 return "" 

82 return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes" 

83 

84 def __repr__(self) -> str: 

85 # TODO: use rich for better formatting 

86 return f""" 

87 <{self.__class__.__name__}: 

88 block size : {self.blocksize} 

89 block count : {self.nblocks} 

90 file size : {self.size} 

91 cache hits : {self.hit_count} 

92 cache misses: {self.miss_count} 

93 total requested bytes: {self.total_requested_bytes}> 

94 """ 

95 

96 

97class MMapCache(BaseCache): 

98 """memory-mapped sparse file cache 

99 

100 Opens temporary file, which is filled blocks-wise when data is requested. 

101 Ensure there is enough disc space in the temporary location. 

102 

103 This cache method might only work on posix 

104 

105 Parameters 

106 ---------- 

107 blocksize: int 

108 How far to read ahead in numbers of bytes 

109 fetcher: Fetcher 

110 Function of the form f(start, end) which gets bytes from remote as 

111 specified 

112 size: int 

113 How big this file is 

114 location: str 

115 Where to create the temporary file. If None, a temporary file is 

116 created using tempfile.TemporaryFile(). 

117 blocks: set[int] 

118 Set of block numbers that have already been fetched. If None, an empty 

119 set is created. 

120 multi_fetcher: MultiFetcher 

121 Function of the form f([(start, end)]) which gets bytes from remote 

122 as specified. This function is used to fetch multiple blocks at once. 

123 If not specified, the fetcher function is used instead. 

124 """ 

125 

126 name = "mmap" 

127 

128 def __init__( 

129 self, 

130 blocksize: int, 

131 fetcher: Fetcher, 

132 size: int, 

133 location: str | None = None, 

134 blocks: set[int] | None = None, 

135 multi_fetcher: MultiFetcher | None = None, 

136 ) -> None: 

137 super().__init__(blocksize, fetcher, size) 

138 self.blocks = set() if blocks is None else blocks 

139 self.location = location 

140 self.multi_fetcher = multi_fetcher 

141 self.cache = self._makefile() 

142 

143 def _makefile(self) -> mmap.mmap | bytearray: 

144 import mmap 

145 import tempfile 

146 

147 if self.size == 0: 

148 return bytearray() 

149 

150 # posix version 

151 if self.location is None or not os.path.exists(self.location): 

152 if self.location is None: 

153 fd = tempfile.TemporaryFile() 

154 self.blocks = set() 

155 else: 

156 fd = open(self.location, "wb+") 

157 fd.seek(self.size - 1) 

158 fd.write(b"1") 

159 fd.flush() 

160 else: 

161 fd = open(self.location, "r+b") 

162 

163 return mmap.mmap(fd.fileno(), self.size) 

164 

165 def _fetch(self, start: int | None, end: int | None) -> bytes: 

166 logger.debug(f"MMap cache fetching {start}-{end}") 

167 if start is None: 

168 start = 0 

169 if end is None: 

170 end = self.size 

171 if start >= self.size or start >= end: 

172 return b"" 

173 start_block = start // self.blocksize 

174 end_block = end // self.blocksize 

175 block_range = range(start_block, end_block + 1) 

176 # Determine which blocks need to be fetched. This sequence is sorted by construction. 

177 need = (i for i in block_range if i not in self.blocks) 

178 # Count the number of blocks already cached 

179 self.hit_count += sum(1 for i in block_range if i in self.blocks) 

180 

181 ranges = [] 

182 

183 # Consolidate needed blocks. 

184 # Algorithm adapted from Python 2.x itertools documentation. 

185 # We are grouping an enumerated sequence of blocks. By comparing when the difference 

186 # between an ascending range (provided by enumerate) and the needed block numbers 

187 # we can detect when the block number skips values. The key computes this difference. 

188 # Whenever the difference changes, we know that we have previously cached block(s), 

189 # and a new group is started. In other words, this algorithm neatly groups 

190 # runs of consecutive block numbers so they can be fetched together. 

191 for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]): 

192 # Extract the blocks from the enumerated sequence 

193 _blocks = tuple(map(itemgetter(1), _blocks)) 

194 # Compute start of first block 

195 sstart = _blocks[0] * self.blocksize 

196 # Compute the end of the last block. Last block may not be full size. 

197 send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size) 

198 

199 # Fetch bytes (could be multiple consecutive blocks) 

200 self.total_requested_bytes += send - sstart 

201 logger.debug( 

202 f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})" 

203 ) 

204 ranges.append((sstart, send)) 

205 

206 # Update set of cached blocks 

207 self.blocks.update(_blocks) 

208 # Update cache statistics with number of blocks we had to cache 

209 self.miss_count += len(_blocks) 

210 

211 if not ranges: 

212 return self.cache[start:end] 

213 

214 if self.multi_fetcher: 

215 logger.debug(f"MMap get blocks {ranges}") 

216 for idx, r in enumerate(self.multi_fetcher(ranges)): 

217 (sstart, send) = ranges[idx] 

218 logger.debug(f"MMap copy block ({sstart}-{send}") 

219 self.cache[sstart:send] = r 

220 else: 

221 for sstart, send in ranges: 

222 logger.debug(f"MMap get block ({sstart}-{send}") 

223 self.cache[sstart:send] = self.fetcher(sstart, send) 

224 

225 return self.cache[start:end] 

226 

227 def __getstate__(self) -> dict[str, Any]: 

228 state = self.__dict__.copy() 

229 # Remove the unpicklable entries. 

230 del state["cache"] 

231 return state 

232 

233 def __setstate__(self, state: dict[str, Any]) -> None: 

234 # Restore instance attributes 

235 self.__dict__.update(state) 

236 self.cache = self._makefile() 

237 

238 

239class ReadAheadCache(BaseCache): 

240 """Cache which reads only when we get beyond a block of data 

241 

242 This is a much simpler version of BytesCache, and does not attempt to 

243 fill holes in the cache or keep fragments alive. It is best suited to 

244 many small reads in a sequential order (e.g., reading lines from a file). 

245 """ 

246 

247 name = "readahead" 

248 

249 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

250 super().__init__(blocksize, fetcher, size) 

251 self.cache = b"" 

252 self.start = 0 

253 self.end = 0 

254 

255 def _fetch(self, start: int | None, end: int | None) -> bytes: 

256 if start is None: 

257 start = 0 

258 if end is None or end > self.size: 

259 end = self.size 

260 if start >= self.size or start >= end: 

261 return b"" 

262 l = end - start 

263 if start >= self.start and end <= self.end: 

264 # cache hit 

265 self.hit_count += 1 

266 return self.cache[start - self.start : end - self.start] 

267 elif self.start <= start < self.end: 

268 # partial hit 

269 self.miss_count += 1 

270 part = self.cache[start - self.start :] 

271 l -= len(part) 

272 start = self.end 

273 else: 

274 # miss 

275 self.miss_count += 1 

276 part = b"" 

277 end = min(self.size, end + self.blocksize) 

278 self.total_requested_bytes += end - start 

279 self.cache = self.fetcher(start, end) # new block replaces old 

280 self.start = start 

281 self.end = self.start + len(self.cache) 

282 return part + self.cache[:l] 

283 

284 

285class FirstChunkCache(BaseCache): 

286 """Caches the first block of a file only 

287 

288 This may be useful for file types where the metadata is stored in the header, 

289 but is randomly accessed. 

290 """ 

291 

292 name = "first" 

293 

294 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: 

295 if blocksize > size: 

296 # this will buffer the whole thing 

297 blocksize = size 

298 super().__init__(blocksize, fetcher, size) 

299 self.cache: bytes | None = None 

300 

301 def _fetch(self, start: int | None, end: int | None) -> bytes: 

302 start = start or 0 

303 if start > self.size: 

304 logger.debug("FirstChunkCache: requested start > file size") 

305 return b"" 

306 

307 end = min(end, self.size) 

308 

309 if start < self.blocksize: 

310 if self.cache is None: 

311 self.miss_count += 1 

312 if end > self.blocksize: 

313 self.total_requested_bytes += end 

314 data = self.fetcher(0, end) 

315 self.cache = data[: self.blocksize] 

316 return data[start:] 

317 self.cache = self.fetcher(0, self.blocksize) 

318 self.total_requested_bytes += self.blocksize 

319 part = self.cache[start:end] 

320 if end > self.blocksize: 

321 self.total_requested_bytes += end - self.blocksize 

322 part += self.fetcher(self.blocksize, end) 

323 self.hit_count += 1 

324 return part 

325 else: 

326 self.miss_count += 1 

327 self.total_requested_bytes += end - start 

328 return self.fetcher(start, end) 

329 

330 

331class BlockCache(BaseCache): 

332 """ 

333 Cache holding memory as a set of blocks. 

334 

335 Requests are only ever made ``blocksize`` at a time, and are 

336 stored in an LRU cache. The least recently accessed block is 

337 discarded when more than ``maxblocks`` are stored. 

338 

339 Parameters 

340 ---------- 

341 blocksize : int 

342 The number of bytes to store in each block. 

343 Requests are only ever made for ``blocksize``, so this 

344 should balance the overhead of making a request against 

345 the granularity of the blocks. 

346 fetcher : Callable 

347 size : int 

348 The total size of the file being cached. 

349 maxblocks : int 

350 The maximum number of blocks to cache for. The maximum memory 

351 use for this cache is then ``blocksize * maxblocks``. 

352 """ 

353 

354 name = "blockcache" 

355 

356 def __init__( 

357 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

358 ) -> None: 

359 super().__init__(blocksize, fetcher, size) 

360 self.nblocks = math.ceil(size / blocksize) 

361 self.maxblocks = maxblocks 

362 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block) 

363 

364 def cache_info(self): 

365 """ 

366 The statistics on the block cache. 

367 

368 Returns 

369 ------- 

370 NamedTuple 

371 Returned directly from the LRU Cache used internally. 

372 """ 

373 return self._fetch_block_cached.cache_info() 

374 

375 def __getstate__(self) -> dict[str, Any]: 

376 state = self.__dict__ 

377 del state["_fetch_block_cached"] 

378 return state 

379 

380 def __setstate__(self, state: dict[str, Any]) -> None: 

381 self.__dict__.update(state) 

382 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])( 

383 self._fetch_block 

384 ) 

385 

386 def _fetch(self, start: int | None, end: int | None) -> bytes: 

387 if start is None: 

388 start = 0 

389 if end is None: 

390 end = self.size 

391 if start >= self.size or start >= end: 

392 return b"" 

393 

394 # byte position -> block numbers 

395 start_block_number = start // self.blocksize 

396 end_block_number = end // self.blocksize 

397 

398 # these are cached, so safe to do multiple calls for the same start and end. 

399 for block_number in range(start_block_number, end_block_number + 1): 

400 self._fetch_block_cached(block_number) 

401 

402 return self._read_cache( 

403 start, 

404 end, 

405 start_block_number=start_block_number, 

406 end_block_number=end_block_number, 

407 ) 

408 

409 def _fetch_block(self, block_number: int) -> bytes: 

410 """ 

411 Fetch the block of data for `block_number`. 

412 """ 

413 if block_number > self.nblocks: 

414 raise ValueError( 

415 f"'block_number={block_number}' is greater than " 

416 f"the number of blocks ({self.nblocks})" 

417 ) 

418 

419 start = block_number * self.blocksize 

420 end = start + self.blocksize 

421 self.total_requested_bytes += end - start 

422 self.miss_count += 1 

423 logger.info("BlockCache fetching block %d", block_number) 

424 block_contents = super()._fetch(start, end) 

425 return block_contents 

426 

427 def _read_cache( 

428 self, start: int, end: int, start_block_number: int, end_block_number: int 

429 ) -> bytes: 

430 """ 

431 Read from our block cache. 

432 

433 Parameters 

434 ---------- 

435 start, end : int 

436 The start and end byte positions. 

437 start_block_number, end_block_number : int 

438 The start and end block numbers. 

439 """ 

440 start_pos = start % self.blocksize 

441 end_pos = end % self.blocksize 

442 

443 self.hit_count += 1 

444 if start_block_number == end_block_number: 

445 block: bytes = self._fetch_block_cached(start_block_number) 

446 return block[start_pos:end_pos] 

447 

448 else: 

449 # read from the initial 

450 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

451 

452 # intermediate blocks 

453 # Note: it'd be nice to combine these into one big request. However 

454 # that doesn't play nicely with our LRU cache. 

455 out.extend( 

456 map( 

457 self._fetch_block_cached, 

458 range(start_block_number + 1, end_block_number), 

459 ) 

460 ) 

461 

462 # final block 

463 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

464 

465 return b"".join(out) 

466 

467 

468class BytesCache(BaseCache): 

469 """Cache which holds data in a in-memory bytes object 

470 

471 Implements read-ahead by the block size, for semi-random reads progressing 

472 through the file. 

473 

474 Parameters 

475 ---------- 

476 trim: bool 

477 As we read more data, whether to discard the start of the buffer when 

478 we are more than a blocksize ahead of it. 

479 """ 

480 

481 name: ClassVar[str] = "bytes" 

482 

483 def __init__( 

484 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True 

485 ) -> None: 

486 super().__init__(blocksize, fetcher, size) 

487 self.cache = b"" 

488 self.start: int | None = None 

489 self.end: int | None = None 

490 self.trim = trim 

491 

492 def _fetch(self, start: int | None, end: int | None) -> bytes: 

493 # TODO: only set start/end after fetch, in case it fails? 

494 # is this where retry logic might go? 

495 if start is None: 

496 start = 0 

497 if end is None: 

498 end = self.size 

499 if start >= self.size or start >= end: 

500 return b"" 

501 if ( 

502 self.start is not None 

503 and start >= self.start 

504 and self.end is not None 

505 and end < self.end 

506 ): 

507 # cache hit: we have all the required data 

508 offset = start - self.start 

509 self.hit_count += 1 

510 return self.cache[offset : offset + end - start] 

511 

512 if self.blocksize: 

513 bend = min(self.size, end + self.blocksize) 

514 else: 

515 bend = end 

516 

517 if bend == start or start > self.size: 

518 return b"" 

519 

520 if (self.start is None or start < self.start) and ( 

521 self.end is None or end > self.end 

522 ): 

523 # First read, or extending both before and after 

524 self.total_requested_bytes += bend - start 

525 self.miss_count += 1 

526 self.cache = self.fetcher(start, bend) 

527 self.start = start 

528 else: 

529 assert self.start is not None 

530 assert self.end is not None 

531 self.miss_count += 1 

532 

533 if start < self.start: 

534 if self.end is None or self.end - end > self.blocksize: 

535 self.total_requested_bytes += bend - start 

536 self.cache = self.fetcher(start, bend) 

537 self.start = start 

538 else: 

539 self.total_requested_bytes += self.start - start 

540 new = self.fetcher(start, self.start) 

541 self.start = start 

542 self.cache = new + self.cache 

543 elif self.end is not None and bend > self.end: 

544 if self.end > self.size: 

545 pass 

546 elif end - self.end > self.blocksize: 

547 self.total_requested_bytes += bend - start 

548 self.cache = self.fetcher(start, bend) 

549 self.start = start 

550 else: 

551 self.total_requested_bytes += bend - self.end 

552 new = self.fetcher(self.end, bend) 

553 self.cache = self.cache + new 

554 

555 self.end = self.start + len(self.cache) 

556 offset = start - self.start 

557 out = self.cache[offset : offset + end - start] 

558 if self.trim: 

559 num = (self.end - self.start) // (self.blocksize + 1) 

560 if num > 1: 

561 self.start += self.blocksize * num 

562 self.cache = self.cache[self.blocksize * num :] 

563 return out 

564 

565 def __len__(self) -> int: 

566 return len(self.cache) 

567 

568 

569class AllBytes(BaseCache): 

570 """Cache entire contents of the file""" 

571 

572 name: ClassVar[str] = "all" 

573 

574 def __init__( 

575 self, 

576 blocksize: int | None = None, 

577 fetcher: Fetcher | None = None, 

578 size: int | None = None, 

579 data: bytes | None = None, 

580 ) -> None: 

581 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type] 

582 if data is None: 

583 self.miss_count += 1 

584 self.total_requested_bytes += self.size 

585 data = self.fetcher(0, self.size) 

586 self.data = data 

587 

588 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

589 self.hit_count += 1 

590 return self.data[start:stop] 

591 

592 

593class KnownPartsOfAFile(BaseCache): 

594 """ 

595 Cache holding known file parts. 

596 

597 Parameters 

598 ---------- 

599 blocksize: int 

600 How far to read ahead in numbers of bytes 

601 fetcher: func 

602 Function of the form f(start, end) which gets bytes from remote as 

603 specified 

604 size: int 

605 How big this file is 

606 data: dict 

607 A dictionary mapping explicit `(start, stop)` file-offset tuples 

608 with known bytes. 

609 strict: bool, default True 

610 Whether to fetch reads that go beyond a known byte-range boundary. 

611 If `False`, any read that ends outside a known part will be zero 

612 padded. Note that zero padding will not be used for reads that 

613 begin outside a known byte-range. 

614 """ 

615 

616 name: ClassVar[str] = "parts" 

617 

618 def __init__( 

619 self, 

620 blocksize: int, 

621 fetcher: Fetcher, 

622 size: int, 

623 data: dict[tuple[int, int], bytes] | None = None, 

624 strict: bool = False, 

625 **_: Any, 

626 ): 

627 super().__init__(blocksize, fetcher, size) 

628 self.strict = strict 

629 

630 # simple consolidation of contiguous blocks 

631 if data: 

632 old_offsets = sorted(data.keys()) 

633 offsets = [old_offsets[0]] 

634 blocks = [data.pop(old_offsets[0])] 

635 for start, stop in old_offsets[1:]: 

636 start0, stop0 = offsets[-1] 

637 if start == stop0: 

638 offsets[-1] = (start0, stop) 

639 blocks[-1] += data.pop((start, stop)) 

640 else: 

641 offsets.append((start, stop)) 

642 blocks.append(data.pop((start, stop))) 

643 

644 self.data = dict(zip(offsets, blocks)) 

645 else: 

646 self.data = {} 

647 

648 @property 

649 def size(self): 

650 return sum(_[1] - _[0] for _ in self.data) 

651 

652 @size.setter 

653 def size(self, value): 

654 pass 

655 

656 @property 

657 def nblocks(self): 

658 return len(self.data) 

659 

660 @nblocks.setter 

661 def nblocks(self, value): 

662 pass 

663 

664 def _fetch(self, start: int | None, stop: int | None) -> bytes: 

665 if start is None: 

666 start = 0 

667 if stop is None: 

668 stop = self.size 

669 self.total_requested_bytes += stop - start 

670 

671 out = b"" 

672 started = False 

673 loc_old = 0 

674 for loc0, loc1 in sorted(self.data): 

675 if (loc0 <= start < loc1) and (loc0 <= stop <= loc1): 

676 # entirely within the block 

677 off = start - loc0 

678 self.hit_count += 1 

679 return self.data[(loc0, loc1)][off : off + stop - start] 

680 if stop <= loc0: 

681 break 

682 if started and loc0 > loc_old: 

683 # a gap where we need data 

684 self.miss_count += 1 

685 if self.strict: 

686 raise ValueError 

687 out += b"\x00" * (loc0 - loc_old) 

688 if loc0 <= start < loc1: 

689 # found the start 

690 self.hit_count += 1 

691 off = start - loc0 

692 out = self.data[(loc0, loc1)][off : off + stop - start] 

693 started = True 

694 elif start < loc0 and stop > loc1: 

695 # the whole block 

696 self.hit_count += 1 

697 out += self.data[(loc0, loc1)] 

698 elif loc0 <= stop <= loc1: 

699 # end block 

700 self.hit_count += 1 

701 return out + self.data[(loc0, loc1)][: stop - loc0] 

702 loc_old = loc1 

703 self.miss_count += 1 

704 if started and not self.strict: 

705 return out + b"\x00" * (stop - loc_old) 

706 raise ValueError 

707 

708 

709class UpdatableLRU(Generic[P, T]): 

710 """ 

711 Custom implementation of LRU cache that allows updating keys 

712 

713 Used by BackgroudBlockCache 

714 """ 

715 

716 class CacheInfo(NamedTuple): 

717 hits: int 

718 misses: int 

719 maxsize: int 

720 currsize: int 

721 

722 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None: 

723 self._cache: OrderedDict[Any, T] = collections.OrderedDict() 

724 self._func = func 

725 self._max_size = max_size 

726 self._hits = 0 

727 self._misses = 0 

728 self._lock = threading.Lock() 

729 

730 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: 

731 if kwargs: 

732 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}") 

733 with self._lock: 

734 if args in self._cache: 

735 self._cache.move_to_end(args) 

736 self._hits += 1 

737 return self._cache[args] 

738 

739 result = self._func(*args, **kwargs) 

740 

741 with self._lock: 

742 self._cache[args] = result 

743 self._misses += 1 

744 if len(self._cache) > self._max_size: 

745 self._cache.popitem(last=False) 

746 

747 return result 

748 

749 def is_key_cached(self, *args: Any) -> bool: 

750 with self._lock: 

751 return args in self._cache 

752 

753 def add_key(self, result: T, *args: Any) -> None: 

754 with self._lock: 

755 self._cache[args] = result 

756 if len(self._cache) > self._max_size: 

757 self._cache.popitem(last=False) 

758 

759 def cache_info(self) -> UpdatableLRU.CacheInfo: 

760 with self._lock: 

761 return self.CacheInfo( 

762 maxsize=self._max_size, 

763 currsize=len(self._cache), 

764 hits=self._hits, 

765 misses=self._misses, 

766 ) 

767 

768 

769class BackgroundBlockCache(BaseCache): 

770 """ 

771 Cache holding memory as a set of blocks with pre-loading of 

772 the next block in the background. 

773 

774 Requests are only ever made ``blocksize`` at a time, and are 

775 stored in an LRU cache. The least recently accessed block is 

776 discarded when more than ``maxblocks`` are stored. If the 

777 next block is not in cache, it is loaded in a separate thread 

778 in non-blocking way. 

779 

780 Parameters 

781 ---------- 

782 blocksize : int 

783 The number of bytes to store in each block. 

784 Requests are only ever made for ``blocksize``, so this 

785 should balance the overhead of making a request against 

786 the granularity of the blocks. 

787 fetcher : Callable 

788 size : int 

789 The total size of the file being cached. 

790 maxblocks : int 

791 The maximum number of blocks to cache for. The maximum memory 

792 use for this cache is then ``blocksize * maxblocks``. 

793 """ 

794 

795 name: ClassVar[str] = "background" 

796 

797 def __init__( 

798 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 

799 ) -> None: 

800 super().__init__(blocksize, fetcher, size) 

801 self.nblocks = math.ceil(size / blocksize) 

802 self.maxblocks = maxblocks 

803 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks) 

804 

805 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

806 self._fetch_future_block_number: int | None = None 

807 self._fetch_future: Future[bytes] | None = None 

808 self._fetch_future_lock = threading.Lock() 

809 

810 def cache_info(self) -> UpdatableLRU.CacheInfo: 

811 """ 

812 The statistics on the block cache. 

813 

814 Returns 

815 ------- 

816 NamedTuple 

817 Returned directly from the LRU Cache used internally. 

818 """ 

819 return self._fetch_block_cached.cache_info() 

820 

821 def __getstate__(self) -> dict[str, Any]: 

822 state = self.__dict__ 

823 del state["_fetch_block_cached"] 

824 del state["_thread_executor"] 

825 del state["_fetch_future_block_number"] 

826 del state["_fetch_future"] 

827 del state["_fetch_future_lock"] 

828 return state 

829 

830 def __setstate__(self, state) -> None: 

831 self.__dict__.update(state) 

832 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"]) 

833 self._thread_executor = ThreadPoolExecutor(max_workers=1) 

834 self._fetch_future_block_number = None 

835 self._fetch_future = None 

836 self._fetch_future_lock = threading.Lock() 

837 

838 def _fetch(self, start: int | None, end: int | None) -> bytes: 

839 if start is None: 

840 start = 0 

841 if end is None: 

842 end = self.size 

843 if start >= self.size or start >= end: 

844 return b"" 

845 

846 # byte position -> block numbers 

847 start_block_number = start // self.blocksize 

848 end_block_number = end // self.blocksize 

849 

850 fetch_future_block_number = None 

851 fetch_future = None 

852 with self._fetch_future_lock: 

853 # Background thread is running. Check we we can or must join it. 

854 if self._fetch_future is not None: 

855 assert self._fetch_future_block_number is not None 

856 if self._fetch_future.done(): 

857 logger.info("BlockCache joined background fetch without waiting.") 

858 self._fetch_block_cached.add_key( 

859 self._fetch_future.result(), self._fetch_future_block_number 

860 ) 

861 # Cleanup the fetch variables. Done with fetching the block. 

862 self._fetch_future_block_number = None 

863 self._fetch_future = None 

864 else: 

865 # Must join if we need the block for the current fetch 

866 must_join = bool( 

867 start_block_number 

868 <= self._fetch_future_block_number 

869 <= end_block_number 

870 ) 

871 if must_join: 

872 # Copy to the local variables to release lock 

873 # before waiting for result 

874 fetch_future_block_number = self._fetch_future_block_number 

875 fetch_future = self._fetch_future 

876 

877 # Cleanup the fetch variables. Have a local copy. 

878 self._fetch_future_block_number = None 

879 self._fetch_future = None 

880 

881 # Need to wait for the future for the current read 

882 if fetch_future is not None: 

883 logger.info("BlockCache waiting for background fetch.") 

884 # Wait until result and put it in cache 

885 self._fetch_block_cached.add_key( 

886 fetch_future.result(), fetch_future_block_number 

887 ) 

888 

889 # these are cached, so safe to do multiple calls for the same start and end. 

890 for block_number in range(start_block_number, end_block_number + 1): 

891 self._fetch_block_cached(block_number) 

892 

893 # fetch next block in the background if nothing is running in the background, 

894 # the block is within file and it is not already cached 

895 end_block_plus_1 = end_block_number + 1 

896 with self._fetch_future_lock: 

897 if ( 

898 self._fetch_future is None 

899 and end_block_plus_1 <= self.nblocks 

900 and not self._fetch_block_cached.is_key_cached(end_block_plus_1) 

901 ): 

902 self._fetch_future_block_number = end_block_plus_1 

903 self._fetch_future = self._thread_executor.submit( 

904 self._fetch_block, end_block_plus_1, "async" 

905 ) 

906 

907 return self._read_cache( 

908 start, 

909 end, 

910 start_block_number=start_block_number, 

911 end_block_number=end_block_number, 

912 ) 

913 

914 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes: 

915 """ 

916 Fetch the block of data for `block_number`. 

917 """ 

918 if block_number > self.nblocks: 

919 raise ValueError( 

920 f"'block_number={block_number}' is greater than " 

921 f"the number of blocks ({self.nblocks})" 

922 ) 

923 

924 start = block_number * self.blocksize 

925 end = start + self.blocksize 

926 logger.info("BlockCache fetching block (%s) %d", log_info, block_number) 

927 self.total_requested_bytes += end - start 

928 self.miss_count += 1 

929 block_contents = super()._fetch(start, end) 

930 return block_contents 

931 

932 def _read_cache( 

933 self, start: int, end: int, start_block_number: int, end_block_number: int 

934 ) -> bytes: 

935 """ 

936 Read from our block cache. 

937 

938 Parameters 

939 ---------- 

940 start, end : int 

941 The start and end byte positions. 

942 start_block_number, end_block_number : int 

943 The start and end block numbers. 

944 """ 

945 start_pos = start % self.blocksize 

946 end_pos = end % self.blocksize 

947 

948 # kind of pointless to count this as a hit, but it is 

949 self.hit_count += 1 

950 

951 if start_block_number == end_block_number: 

952 block = self._fetch_block_cached(start_block_number) 

953 return block[start_pos:end_pos] 

954 

955 else: 

956 # read from the initial 

957 out = [self._fetch_block_cached(start_block_number)[start_pos:]] 

958 

959 # intermediate blocks 

960 # Note: it'd be nice to combine these into one big request. However 

961 # that doesn't play nicely with our LRU cache. 

962 out.extend( 

963 map( 

964 self._fetch_block_cached, 

965 range(start_block_number + 1, end_block_number), 

966 ) 

967 ) 

968 

969 # final block 

970 out.append(self._fetch_block_cached(end_block_number)[:end_pos]) 

971 

972 return b"".join(out) 

973 

974 

975caches: dict[str | None, type[BaseCache]] = { 

976 # one custom case 

977 None: BaseCache, 

978} 

979 

980 

981def register_cache(cls: type[BaseCache], clobber: bool = False) -> None: 

982 """'Register' cache implementation. 

983 

984 Parameters 

985 ---------- 

986 clobber: bool, optional 

987 If set to True (default is False) - allow to overwrite existing 

988 entry. 

989 

990 Raises 

991 ------ 

992 ValueError 

993 """ 

994 name = cls.name 

995 if not clobber and name in caches: 

996 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}") 

997 caches[name] = cls 

998 

999 

1000for c in ( 

1001 BaseCache, 

1002 MMapCache, 

1003 BytesCache, 

1004 ReadAheadCache, 

1005 BlockCache, 

1006 FirstChunkCache, 

1007 AllBytes, 

1008 KnownPartsOfAFile, 

1009 BackgroundBlockCache, 

1010): 

1011 register_cache(c)