Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/caching.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections
4import functools
5import logging
6import math
7import os
8import threading
9import warnings
10from collections import OrderedDict
11from concurrent.futures import Future, ThreadPoolExecutor
12from itertools import groupby
13from operator import itemgetter
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Callable,
18 ClassVar,
19 Generic,
20 NamedTuple,
21 TypeVar,
22)
24if TYPE_CHECKING:
25 import mmap
27 from typing_extensions import ParamSpec
29 P = ParamSpec("P")
30else:
31 P = TypeVar("P")
33T = TypeVar("T")
36logger = logging.getLogger("fsspec")
38Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes
39MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes
42class BaseCache:
43 """Pass-though cache: doesn't keep anything, calls every time
45 Acts as base class for other cachers
47 Parameters
48 ----------
49 blocksize: int
50 How far to read ahead in numbers of bytes
51 fetcher: func
52 Function of the form f(start, end) which gets bytes from remote as
53 specified
54 size: int
55 How big this file is
56 """
58 name: ClassVar[str] = "none"
60 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
61 self.blocksize = blocksize
62 self.nblocks = 0
63 self.fetcher = fetcher
64 self.size = size
65 self.hit_count = 0
66 self.miss_count = 0
67 # the bytes that we actually requested
68 self.total_requested_bytes = 0
70 def _fetch(self, start: int | None, stop: int | None) -> bytes:
71 if start is None:
72 start = 0
73 if stop is None:
74 stop = self.size
75 if start >= self.size or start >= stop:
76 return b""
77 return self.fetcher(start, stop)
79 def _reset_stats(self) -> None:
80 """Reset hit and miss counts for a more ganular report e.g. by file."""
81 self.hit_count = 0
82 self.miss_count = 0
83 self.total_requested_bytes = 0
85 def _log_stats(self) -> str:
86 """Return a formatted string of the cache statistics."""
87 if self.hit_count == 0 and self.miss_count == 0:
88 # a cache that does nothing, this is for logs only
89 return ""
90 return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes"
92 def __repr__(self) -> str:
93 # TODO: use rich for better formatting
94 return f"""
95 <{self.__class__.__name__}:
96 block size : {self.blocksize}
97 block count : {self.nblocks}
98 file size : {self.size}
99 cache hits : {self.hit_count}
100 cache misses: {self.miss_count}
101 total requested bytes: {self.total_requested_bytes}>
102 """
105class MMapCache(BaseCache):
106 """memory-mapped sparse file cache
108 Opens temporary file, which is filled blocks-wise when data is requested.
109 Ensure there is enough disc space in the temporary location.
111 This cache method might only work on posix
113 Parameters
114 ----------
115 blocksize: int
116 How far to read ahead in numbers of bytes
117 fetcher: Fetcher
118 Function of the form f(start, end) which gets bytes from remote as
119 specified
120 size: int
121 How big this file is
122 location: str
123 Where to create the temporary file. If None, a temporary file is
124 created using tempfile.TemporaryFile().
125 blocks: set[int]
126 Set of block numbers that have already been fetched. If None, an empty
127 set is created.
128 multi_fetcher: MultiFetcher
129 Function of the form f([(start, end)]) which gets bytes from remote
130 as specified. This function is used to fetch multiple blocks at once.
131 If not specified, the fetcher function is used instead.
132 """
134 name = "mmap"
136 def __init__(
137 self,
138 blocksize: int,
139 fetcher: Fetcher,
140 size: int,
141 location: str | None = None,
142 blocks: set[int] | None = None,
143 multi_fetcher: MultiFetcher | None = None,
144 ) -> None:
145 super().__init__(blocksize, fetcher, size)
146 self.blocks = set() if blocks is None else blocks
147 self.location = location
148 self.multi_fetcher = multi_fetcher
149 self.cache = self._makefile()
151 def _makefile(self) -> mmap.mmap | bytearray:
152 import mmap
153 import tempfile
155 if self.size == 0:
156 return bytearray()
158 # posix version
159 if self.location is None or not os.path.exists(self.location):
160 if self.location is None:
161 fd = tempfile.TemporaryFile()
162 self.blocks = set()
163 else:
164 fd = open(self.location, "wb+")
165 fd.seek(self.size - 1)
166 fd.write(b"1")
167 fd.flush()
168 else:
169 fd = open(self.location, "r+b")
171 return mmap.mmap(fd.fileno(), self.size)
173 def _fetch(self, start: int | None, end: int | None) -> bytes:
174 logger.debug(f"MMap cache fetching {start}-{end}")
175 if start is None:
176 start = 0
177 if end is None:
178 end = self.size
179 if start >= self.size or start >= end:
180 return b""
181 start_block = start // self.blocksize
182 end_block = end // self.blocksize
183 block_range = range(start_block, end_block + 1)
184 # Determine which blocks need to be fetched. This sequence is sorted by construction.
185 need = (i for i in block_range if i not in self.blocks)
186 # Count the number of blocks already cached
187 self.hit_count += sum(1 for i in block_range if i in self.blocks)
189 ranges = []
191 # Consolidate needed blocks.
192 # Algorithm adapted from Python 2.x itertools documentation.
193 # We are grouping an enumerated sequence of blocks. By comparing when the difference
194 # between an ascending range (provided by enumerate) and the needed block numbers
195 # we can detect when the block number skips values. The key computes this difference.
196 # Whenever the difference changes, we know that we have previously cached block(s),
197 # and a new group is started. In other words, this algorithm neatly groups
198 # runs of consecutive block numbers so they can be fetched together.
199 for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]):
200 # Extract the blocks from the enumerated sequence
201 _blocks = tuple(map(itemgetter(1), _blocks))
202 # Compute start of first block
203 sstart = _blocks[0] * self.blocksize
204 # Compute the end of the last block. Last block may not be full size.
205 send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size)
207 # Fetch bytes (could be multiple consecutive blocks)
208 self.total_requested_bytes += send - sstart
209 logger.debug(
210 f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})"
211 )
212 ranges.append((sstart, send))
214 # Update set of cached blocks
215 self.blocks.update(_blocks)
216 # Update cache statistics with number of blocks we had to cache
217 self.miss_count += len(_blocks)
219 if not ranges:
220 return self.cache[start:end]
222 if self.multi_fetcher:
223 logger.debug(f"MMap get blocks {ranges}")
224 for idx, r in enumerate(self.multi_fetcher(ranges)):
225 (sstart, send) = ranges[idx]
226 logger.debug(f"MMap copy block ({sstart}-{send}")
227 self.cache[sstart:send] = r
228 else:
229 for sstart, send in ranges:
230 logger.debug(f"MMap get block ({sstart}-{send}")
231 self.cache[sstart:send] = self.fetcher(sstart, send)
233 return self.cache[start:end]
235 def __getstate__(self) -> dict[str, Any]:
236 state = self.__dict__.copy()
237 # Remove the unpicklable entries.
238 del state["cache"]
239 return state
241 def __setstate__(self, state: dict[str, Any]) -> None:
242 # Restore instance attributes
243 self.__dict__.update(state)
244 self.cache = self._makefile()
247class ReadAheadCache(BaseCache):
248 """Cache which reads only when we get beyond a block of data
250 This is a much simpler version of BytesCache, and does not attempt to
251 fill holes in the cache or keep fragments alive. It is best suited to
252 many small reads in a sequential order (e.g., reading lines from a file).
253 """
255 name = "readahead"
257 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
258 super().__init__(blocksize, fetcher, size)
259 self.cache = b""
260 self.start = 0
261 self.end = 0
263 def _fetch(self, start: int | None, end: int | None) -> bytes:
264 if start is None:
265 start = 0
266 if end is None or end > self.size:
267 end = self.size
268 if start >= self.size or start >= end:
269 return b""
270 l = end - start
271 if start >= self.start and end <= self.end:
272 # cache hit
273 self.hit_count += 1
274 return self.cache[start - self.start : end - self.start]
275 elif self.start <= start < self.end:
276 # partial hit
277 self.miss_count += 1
278 part = self.cache[start - self.start :]
279 l -= len(part)
280 start = self.end
281 else:
282 # miss
283 self.miss_count += 1
284 part = b""
285 end = min(self.size, end + self.blocksize)
286 self.total_requested_bytes += end - start
287 self.cache = self.fetcher(start, end) # new block replaces old
288 self.start = start
289 self.end = self.start + len(self.cache)
290 return part + self.cache[:l]
293class FirstChunkCache(BaseCache):
294 """Caches the first block of a file only
296 This may be useful for file types where the metadata is stored in the header,
297 but is randomly accessed.
298 """
300 name = "first"
302 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
303 if blocksize > size:
304 # this will buffer the whole thing
305 blocksize = size
306 super().__init__(blocksize, fetcher, size)
307 self.cache: bytes | None = None
309 def _fetch(self, start: int | None, end: int | None) -> bytes:
310 start = start or 0
311 if start > self.size:
312 logger.debug("FirstChunkCache: requested start > file size")
313 return b""
315 end = min(end, self.size)
317 if start < self.blocksize:
318 if self.cache is None:
319 self.miss_count += 1
320 if end > self.blocksize:
321 self.total_requested_bytes += end
322 data = self.fetcher(0, end)
323 self.cache = data[: self.blocksize]
324 return data[start:]
325 self.cache = self.fetcher(0, self.blocksize)
326 self.total_requested_bytes += self.blocksize
327 part = self.cache[start:end]
328 if end > self.blocksize:
329 self.total_requested_bytes += end - self.blocksize
330 part += self.fetcher(self.blocksize, end)
331 self.hit_count += 1
332 return part
333 else:
334 self.miss_count += 1
335 self.total_requested_bytes += end - start
336 return self.fetcher(start, end)
339class BlockCache(BaseCache):
340 """
341 Cache holding memory as a set of blocks.
343 Requests are only ever made ``blocksize`` at a time, and are
344 stored in an LRU cache. The least recently accessed block is
345 discarded when more than ``maxblocks`` are stored.
347 Parameters
348 ----------
349 blocksize : int
350 The number of bytes to store in each block.
351 Requests are only ever made for ``blocksize``, so this
352 should balance the overhead of making a request against
353 the granularity of the blocks.
354 fetcher : Callable
355 size : int
356 The total size of the file being cached.
357 maxblocks : int
358 The maximum number of blocks to cache for. The maximum memory
359 use for this cache is then ``blocksize * maxblocks``.
360 """
362 name = "blockcache"
364 def __init__(
365 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
366 ) -> None:
367 super().__init__(blocksize, fetcher, size)
368 self.nblocks = math.ceil(size / blocksize)
369 self.maxblocks = maxblocks
370 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block)
372 def cache_info(self):
373 """
374 The statistics on the block cache.
376 Returns
377 -------
378 NamedTuple
379 Returned directly from the LRU Cache used internally.
380 """
381 return self._fetch_block_cached.cache_info()
383 def __getstate__(self) -> dict[str, Any]:
384 state = self.__dict__
385 del state["_fetch_block_cached"]
386 return state
388 def __setstate__(self, state: dict[str, Any]) -> None:
389 self.__dict__.update(state)
390 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])(
391 self._fetch_block
392 )
394 def _fetch(self, start: int | None, end: int | None) -> bytes:
395 if start is None:
396 start = 0
397 if end is None:
398 end = self.size
399 if start >= self.size or start >= end:
400 return b""
402 # byte position -> block numbers
403 start_block_number = start // self.blocksize
404 end_block_number = end // self.blocksize
406 # these are cached, so safe to do multiple calls for the same start and end.
407 for block_number in range(start_block_number, end_block_number + 1):
408 self._fetch_block_cached(block_number)
410 return self._read_cache(
411 start,
412 end,
413 start_block_number=start_block_number,
414 end_block_number=end_block_number,
415 )
417 def _fetch_block(self, block_number: int) -> bytes:
418 """
419 Fetch the block of data for `block_number`.
420 """
421 if block_number > self.nblocks:
422 raise ValueError(
423 f"'block_number={block_number}' is greater than "
424 f"the number of blocks ({self.nblocks})"
425 )
427 start = block_number * self.blocksize
428 end = start + self.blocksize
429 self.total_requested_bytes += end - start
430 self.miss_count += 1
431 logger.info("BlockCache fetching block %d", block_number)
432 block_contents = super()._fetch(start, end)
433 return block_contents
435 def _read_cache(
436 self, start: int, end: int, start_block_number: int, end_block_number: int
437 ) -> bytes:
438 """
439 Read from our block cache.
441 Parameters
442 ----------
443 start, end : int
444 The start and end byte positions.
445 start_block_number, end_block_number : int
446 The start and end block numbers.
447 """
448 start_pos = start % self.blocksize
449 end_pos = end % self.blocksize
451 self.hit_count += 1
452 if start_block_number == end_block_number:
453 block: bytes = self._fetch_block_cached(start_block_number)
454 return block[start_pos:end_pos]
456 else:
457 # read from the initial
458 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
460 # intermediate blocks
461 # Note: it'd be nice to combine these into one big request. However
462 # that doesn't play nicely with our LRU cache.
463 out.extend(
464 map(
465 self._fetch_block_cached,
466 range(start_block_number + 1, end_block_number),
467 )
468 )
470 # final block
471 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
473 return b"".join(out)
476class BytesCache(BaseCache):
477 """Cache which holds data in a in-memory bytes object
479 Implements read-ahead by the block size, for semi-random reads progressing
480 through the file.
482 Parameters
483 ----------
484 trim: bool
485 As we read more data, whether to discard the start of the buffer when
486 we are more than a blocksize ahead of it.
487 """
489 name: ClassVar[str] = "bytes"
491 def __init__(
492 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True
493 ) -> None:
494 super().__init__(blocksize, fetcher, size)
495 self.cache = b""
496 self.start: int | None = None
497 self.end: int | None = None
498 self.trim = trim
500 def _fetch(self, start: int | None, end: int | None) -> bytes:
501 # TODO: only set start/end after fetch, in case it fails?
502 # is this where retry logic might go?
503 if start is None:
504 start = 0
505 if end is None:
506 end = self.size
507 if start >= self.size or start >= end:
508 return b""
509 if (
510 self.start is not None
511 and start >= self.start
512 and self.end is not None
513 and end < self.end
514 ):
515 # cache hit: we have all the required data
516 offset = start - self.start
517 self.hit_count += 1
518 return self.cache[offset : offset + end - start]
520 if self.blocksize:
521 bend = min(self.size, end + self.blocksize)
522 else:
523 bend = end
525 if bend == start or start > self.size:
526 return b""
528 if (self.start is None or start < self.start) and (
529 self.end is None or end > self.end
530 ):
531 # First read, or extending both before and after
532 self.total_requested_bytes += bend - start
533 self.miss_count += 1
534 self.cache = self.fetcher(start, bend)
535 self.start = start
536 else:
537 assert self.start is not None
538 assert self.end is not None
539 self.miss_count += 1
541 if start < self.start:
542 if self.end is None or self.end - end > self.blocksize:
543 self.total_requested_bytes += bend - start
544 self.cache = self.fetcher(start, bend)
545 self.start = start
546 else:
547 self.total_requested_bytes += self.start - start
548 new = self.fetcher(start, self.start)
549 self.start = start
550 self.cache = new + self.cache
551 elif self.end is not None and bend > self.end:
552 if self.end > self.size:
553 pass
554 elif end - self.end > self.blocksize:
555 self.total_requested_bytes += bend - start
556 self.cache = self.fetcher(start, bend)
557 self.start = start
558 else:
559 self.total_requested_bytes += bend - self.end
560 new = self.fetcher(self.end, bend)
561 self.cache = self.cache + new
563 self.end = self.start + len(self.cache)
564 offset = start - self.start
565 out = self.cache[offset : offset + end - start]
566 if self.trim:
567 num = (self.end - self.start) // (self.blocksize + 1)
568 if num > 1:
569 self.start += self.blocksize * num
570 self.cache = self.cache[self.blocksize * num :]
571 return out
573 def __len__(self) -> int:
574 return len(self.cache)
577class AllBytes(BaseCache):
578 """Cache entire contents of the file"""
580 name: ClassVar[str] = "all"
582 def __init__(
583 self,
584 blocksize: int | None = None,
585 fetcher: Fetcher | None = None,
586 size: int | None = None,
587 data: bytes | None = None,
588 ) -> None:
589 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type]
590 if data is None:
591 self.miss_count += 1
592 self.total_requested_bytes += self.size
593 data = self.fetcher(0, self.size)
594 self.data = data
596 def _fetch(self, start: int | None, stop: int | None) -> bytes:
597 self.hit_count += 1
598 return self.data[start:stop]
601class KnownPartsOfAFile(BaseCache):
602 """
603 Cache holding known file parts.
605 Parameters
606 ----------
607 blocksize: int
608 How far to read ahead in numbers of bytes
609 fetcher: func
610 Function of the form f(start, end) which gets bytes from remote as
611 specified
612 size: int
613 How big this file is
614 data: dict
615 A dictionary mapping explicit `(start, stop)` file-offset tuples
616 with known bytes.
617 strict: bool, default True
618 Whether to fetch reads that go beyond a known byte-range boundary.
619 If `False`, any read that ends outside a known part will be zero
620 padded. Note that zero padding will not be used for reads that
621 begin outside a known byte-range.
622 """
624 name: ClassVar[str] = "parts"
626 def __init__(
627 self,
628 blocksize: int,
629 fetcher: Fetcher,
630 size: int,
631 data: dict[tuple[int, int], bytes] | None = None,
632 strict: bool = True,
633 **_: Any,
634 ):
635 super().__init__(blocksize, fetcher, size)
636 self.strict = strict
638 # simple consolidation of contiguous blocks
639 if data:
640 old_offsets = sorted(data.keys())
641 offsets = [old_offsets[0]]
642 blocks = [data.pop(old_offsets[0])]
643 for start, stop in old_offsets[1:]:
644 start0, stop0 = offsets[-1]
645 if start == stop0:
646 offsets[-1] = (start0, stop)
647 blocks[-1] += data.pop((start, stop))
648 else:
649 offsets.append((start, stop))
650 blocks.append(data.pop((start, stop)))
652 self.data = dict(zip(offsets, blocks))
653 else:
654 self.data = {}
656 def _fetch(self, start: int | None, stop: int | None) -> bytes:
657 if start is None:
658 start = 0
659 if stop is None:
660 stop = self.size
662 out = b""
663 for (loc0, loc1), data in self.data.items():
664 # If self.strict=False, use zero-padded data
665 # for reads beyond the end of a "known" buffer
666 if loc0 <= start < loc1:
667 off = start - loc0
668 out = data[off : off + stop - start]
669 if not self.strict or loc0 <= stop <= loc1:
670 # The request is within a known range, or
671 # it begins within a known range, and we
672 # are allowed to pad reads beyond the
673 # buffer with zero
674 out += b"\x00" * (stop - start - len(out))
675 self.hit_count += 1
676 return out
677 else:
678 # The request ends outside a known range,
679 # and we are being "strict" about reads
680 # beyond the buffer
681 start = loc1
682 break
684 # We only get here if there is a request outside the
685 # known parts of the file. In an ideal world, this
686 # should never happen
687 if self.fetcher is None:
688 # We cannot fetch the data, so raise an error
689 raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ")
690 # We can fetch the data, but should warn the user
691 # that this may be slow
692 warnings.warn(
693 f"Read is outside the known file parts: {(start, stop)}. "
694 f"IO/caching performance may be poor!"
695 )
696 logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}")
697 self.total_requested_bytes += stop - start
698 self.miss_count += 1
699 return out + super()._fetch(start, stop)
702class UpdatableLRU(Generic[P, T]):
703 """
704 Custom implementation of LRU cache that allows updating keys
706 Used by BackgroudBlockCache
707 """
709 class CacheInfo(NamedTuple):
710 hits: int
711 misses: int
712 maxsize: int
713 currsize: int
715 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None:
716 self._cache: OrderedDict[Any, T] = collections.OrderedDict()
717 self._func = func
718 self._max_size = max_size
719 self._hits = 0
720 self._misses = 0
721 self._lock = threading.Lock()
723 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
724 if kwargs:
725 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}")
726 with self._lock:
727 if args in self._cache:
728 self._cache.move_to_end(args)
729 self._hits += 1
730 return self._cache[args]
732 result = self._func(*args, **kwargs)
734 with self._lock:
735 self._cache[args] = result
736 self._misses += 1
737 if len(self._cache) > self._max_size:
738 self._cache.popitem(last=False)
740 return result
742 def is_key_cached(self, *args: Any) -> bool:
743 with self._lock:
744 return args in self._cache
746 def add_key(self, result: T, *args: Any) -> None:
747 with self._lock:
748 self._cache[args] = result
749 if len(self._cache) > self._max_size:
750 self._cache.popitem(last=False)
752 def cache_info(self) -> UpdatableLRU.CacheInfo:
753 with self._lock:
754 return self.CacheInfo(
755 maxsize=self._max_size,
756 currsize=len(self._cache),
757 hits=self._hits,
758 misses=self._misses,
759 )
762class BackgroundBlockCache(BaseCache):
763 """
764 Cache holding memory as a set of blocks with pre-loading of
765 the next block in the background.
767 Requests are only ever made ``blocksize`` at a time, and are
768 stored in an LRU cache. The least recently accessed block is
769 discarded when more than ``maxblocks`` are stored. If the
770 next block is not in cache, it is loaded in a separate thread
771 in non-blocking way.
773 Parameters
774 ----------
775 blocksize : int
776 The number of bytes to store in each block.
777 Requests are only ever made for ``blocksize``, so this
778 should balance the overhead of making a request against
779 the granularity of the blocks.
780 fetcher : Callable
781 size : int
782 The total size of the file being cached.
783 maxblocks : int
784 The maximum number of blocks to cache for. The maximum memory
785 use for this cache is then ``blocksize * maxblocks``.
786 """
788 name: ClassVar[str] = "background"
790 def __init__(
791 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
792 ) -> None:
793 super().__init__(blocksize, fetcher, size)
794 self.nblocks = math.ceil(size / blocksize)
795 self.maxblocks = maxblocks
796 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks)
798 self._thread_executor = ThreadPoolExecutor(max_workers=1)
799 self._fetch_future_block_number: int | None = None
800 self._fetch_future: Future[bytes] | None = None
801 self._fetch_future_lock = threading.Lock()
803 def cache_info(self) -> UpdatableLRU.CacheInfo:
804 """
805 The statistics on the block cache.
807 Returns
808 -------
809 NamedTuple
810 Returned directly from the LRU Cache used internally.
811 """
812 return self._fetch_block_cached.cache_info()
814 def __getstate__(self) -> dict[str, Any]:
815 state = self.__dict__
816 del state["_fetch_block_cached"]
817 del state["_thread_executor"]
818 del state["_fetch_future_block_number"]
819 del state["_fetch_future"]
820 del state["_fetch_future_lock"]
821 return state
823 def __setstate__(self, state) -> None:
824 self.__dict__.update(state)
825 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"])
826 self._thread_executor = ThreadPoolExecutor(max_workers=1)
827 self._fetch_future_block_number = None
828 self._fetch_future = None
829 self._fetch_future_lock = threading.Lock()
831 def _fetch(self, start: int | None, end: int | None) -> bytes:
832 if start is None:
833 start = 0
834 if end is None:
835 end = self.size
836 if start >= self.size or start >= end:
837 return b""
839 # byte position -> block numbers
840 start_block_number = start // self.blocksize
841 end_block_number = end // self.blocksize
843 fetch_future_block_number = None
844 fetch_future = None
845 with self._fetch_future_lock:
846 # Background thread is running. Check we we can or must join it.
847 if self._fetch_future is not None:
848 assert self._fetch_future_block_number is not None
849 if self._fetch_future.done():
850 logger.info("BlockCache joined background fetch without waiting.")
851 self._fetch_block_cached.add_key(
852 self._fetch_future.result(), self._fetch_future_block_number
853 )
854 # Cleanup the fetch variables. Done with fetching the block.
855 self._fetch_future_block_number = None
856 self._fetch_future = None
857 else:
858 # Must join if we need the block for the current fetch
859 must_join = bool(
860 start_block_number
861 <= self._fetch_future_block_number
862 <= end_block_number
863 )
864 if must_join:
865 # Copy to the local variables to release lock
866 # before waiting for result
867 fetch_future_block_number = self._fetch_future_block_number
868 fetch_future = self._fetch_future
870 # Cleanup the fetch variables. Have a local copy.
871 self._fetch_future_block_number = None
872 self._fetch_future = None
874 # Need to wait for the future for the current read
875 if fetch_future is not None:
876 logger.info("BlockCache waiting for background fetch.")
877 # Wait until result and put it in cache
878 self._fetch_block_cached.add_key(
879 fetch_future.result(), fetch_future_block_number
880 )
882 # these are cached, so safe to do multiple calls for the same start and end.
883 for block_number in range(start_block_number, end_block_number + 1):
884 self._fetch_block_cached(block_number)
886 # fetch next block in the background if nothing is running in the background,
887 # the block is within file and it is not already cached
888 end_block_plus_1 = end_block_number + 1
889 with self._fetch_future_lock:
890 if (
891 self._fetch_future is None
892 and end_block_plus_1 <= self.nblocks
893 and not self._fetch_block_cached.is_key_cached(end_block_plus_1)
894 ):
895 self._fetch_future_block_number = end_block_plus_1
896 self._fetch_future = self._thread_executor.submit(
897 self._fetch_block, end_block_plus_1, "async"
898 )
900 return self._read_cache(
901 start,
902 end,
903 start_block_number=start_block_number,
904 end_block_number=end_block_number,
905 )
907 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes:
908 """
909 Fetch the block of data for `block_number`.
910 """
911 if block_number > self.nblocks:
912 raise ValueError(
913 f"'block_number={block_number}' is greater than "
914 f"the number of blocks ({self.nblocks})"
915 )
917 start = block_number * self.blocksize
918 end = start + self.blocksize
919 logger.info("BlockCache fetching block (%s) %d", log_info, block_number)
920 self.total_requested_bytes += end - start
921 self.miss_count += 1
922 block_contents = super()._fetch(start, end)
923 return block_contents
925 def _read_cache(
926 self, start: int, end: int, start_block_number: int, end_block_number: int
927 ) -> bytes:
928 """
929 Read from our block cache.
931 Parameters
932 ----------
933 start, end : int
934 The start and end byte positions.
935 start_block_number, end_block_number : int
936 The start and end block numbers.
937 """
938 start_pos = start % self.blocksize
939 end_pos = end % self.blocksize
941 # kind of pointless to count this as a hit, but it is
942 self.hit_count += 1
944 if start_block_number == end_block_number:
945 block = self._fetch_block_cached(start_block_number)
946 return block[start_pos:end_pos]
948 else:
949 # read from the initial
950 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
952 # intermediate blocks
953 # Note: it'd be nice to combine these into one big request. However
954 # that doesn't play nicely with our LRU cache.
955 out.extend(
956 map(
957 self._fetch_block_cached,
958 range(start_block_number + 1, end_block_number),
959 )
960 )
962 # final block
963 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
965 return b"".join(out)
968caches: dict[str | None, type[BaseCache]] = {
969 # one custom case
970 None: BaseCache,
971}
974def register_cache(cls: type[BaseCache], clobber: bool = False) -> None:
975 """'Register' cache implementation.
977 Parameters
978 ----------
979 clobber: bool, optional
980 If set to True (default is False) - allow to overwrite existing
981 entry.
983 Raises
984 ------
985 ValueError
986 """
987 name = cls.name
988 if not clobber and name in caches:
989 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}")
990 caches[name] = cls
993for c in (
994 BaseCache,
995 MMapCache,
996 BytesCache,
997 ReadAheadCache,
998 BlockCache,
999 FirstChunkCache,
1000 AllBytes,
1001 KnownPartsOfAFile,
1002 BackgroundBlockCache,
1003):
1004 register_cache(c)