Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/caching.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections
4import functools
5import logging
6import math
7import os
8import threading
9import warnings
10from concurrent.futures import Future, ThreadPoolExecutor
11from typing import (
12 TYPE_CHECKING,
13 Any,
14 Callable,
15 ClassVar,
16 Generic,
17 NamedTuple,
18 Optional,
19 OrderedDict,
20 TypeVar,
21)
23if TYPE_CHECKING:
24 import mmap
26 from typing_extensions import ParamSpec
28 P = ParamSpec("P")
29else:
30 P = TypeVar("P")
32T = TypeVar("T")
35logger = logging.getLogger("fsspec")
37Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes
40class BaseCache:
41 """Pass-though cache: doesn't keep anything, calls every time
43 Acts as base class for other cachers
45 Parameters
46 ----------
47 blocksize: int
48 How far to read ahead in numbers of bytes
49 fetcher: func
50 Function of the form f(start, end) which gets bytes from remote as
51 specified
52 size: int
53 How big this file is
54 """
56 name: ClassVar[str] = "none"
58 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
59 self.blocksize = blocksize
60 self.nblocks = 0
61 self.fetcher = fetcher
62 self.size = size
63 self.hit_count = 0
64 self.miss_count = 0
65 # the bytes that we actually requested
66 self.total_requested_bytes = 0
68 def _fetch(self, start: int | None, stop: int | None) -> bytes:
69 if start is None:
70 start = 0
71 if stop is None:
72 stop = self.size
73 if start >= self.size or start >= stop:
74 return b""
75 return self.fetcher(start, stop)
77 def _reset_stats(self) -> None:
78 """Reset hit and miss counts for a more ganular report e.g. by file."""
79 self.hit_count = 0
80 self.miss_count = 0
81 self.total_requested_bytes = 0
83 def _log_stats(self) -> str:
84 """Return a formatted string of the cache statistics."""
85 if self.hit_count == 0 and self.miss_count == 0:
86 # a cache that does nothing, this is for logs only
87 return ""
88 return " , %s: %d hits, %d misses, %d total requested bytes" % (
89 self.name,
90 self.hit_count,
91 self.miss_count,
92 self.total_requested_bytes,
93 )
95 def __repr__(self) -> str:
96 # TODO: use rich for better formatting
97 return f"""
98 <{self.__class__.__name__}:
99 block size : {self.blocksize}
100 block count : {self.nblocks}
101 file size : {self.size}
102 cache hits : {self.hit_count}
103 cache misses: {self.miss_count}
104 total requested bytes: {self.total_requested_bytes}>
105 """
108class MMapCache(BaseCache):
109 """memory-mapped sparse file cache
111 Opens temporary file, which is filled blocks-wise when data is requested.
112 Ensure there is enough disc space in the temporary location.
114 This cache method might only work on posix
115 """
117 name = "mmap"
119 def __init__(
120 self,
121 blocksize: int,
122 fetcher: Fetcher,
123 size: int,
124 location: str | None = None,
125 blocks: set[int] | None = None,
126 ) -> None:
127 super().__init__(blocksize, fetcher, size)
128 self.blocks = set() if blocks is None else blocks
129 self.location = location
130 self.cache = self._makefile()
132 def _makefile(self) -> mmap.mmap | bytearray:
133 import mmap
134 import tempfile
136 if self.size == 0:
137 return bytearray()
139 # posix version
140 if self.location is None or not os.path.exists(self.location):
141 if self.location is None:
142 fd = tempfile.TemporaryFile()
143 self.blocks = set()
144 else:
145 fd = open(self.location, "wb+")
146 fd.seek(self.size - 1)
147 fd.write(b"1")
148 fd.flush()
149 else:
150 fd = open(self.location, "r+b")
152 return mmap.mmap(fd.fileno(), self.size)
154 def _fetch(self, start: int | None, end: int | None) -> bytes:
155 logger.debug(f"MMap cache fetching {start}-{end}")
156 if start is None:
157 start = 0
158 if end is None:
159 end = self.size
160 if start >= self.size or start >= end:
161 return b""
162 start_block = start // self.blocksize
163 end_block = end // self.blocksize
164 need = [i for i in range(start_block, end_block + 1) if i not in self.blocks]
165 hits = [i for i in range(start_block, end_block + 1) if i in self.blocks]
166 self.miss_count += len(need)
167 self.hit_count += len(hits)
168 while need:
169 # TODO: not a for loop so we can consolidate blocks later to
170 # make fewer fetch calls; this could be parallel
171 i = need.pop(0)
173 sstart = i * self.blocksize
174 send = min(sstart + self.blocksize, self.size)
175 self.total_requested_bytes += send - sstart
176 logger.debug(f"MMap get block #{i} ({sstart}-{send})")
177 self.cache[sstart:send] = self.fetcher(sstart, send)
178 self.blocks.add(i)
180 return self.cache[start:end]
182 def __getstate__(self) -> dict[str, Any]:
183 state = self.__dict__.copy()
184 # Remove the unpicklable entries.
185 del state["cache"]
186 return state
188 def __setstate__(self, state: dict[str, Any]) -> None:
189 # Restore instance attributes
190 self.__dict__.update(state)
191 self.cache = self._makefile()
194class ReadAheadCache(BaseCache):
195 """Cache which reads only when we get beyond a block of data
197 This is a much simpler version of BytesCache, and does not attempt to
198 fill holes in the cache or keep fragments alive. It is best suited to
199 many small reads in a sequential order (e.g., reading lines from a file).
200 """
202 name = "readahead"
204 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
205 super().__init__(blocksize, fetcher, size)
206 self.cache = b""
207 self.start = 0
208 self.end = 0
210 def _fetch(self, start: int | None, end: int | None) -> bytes:
211 if start is None:
212 start = 0
213 if end is None or end > self.size:
214 end = self.size
215 if start >= self.size or start >= end:
216 return b""
217 l = end - start
218 if start >= self.start and end <= self.end:
219 # cache hit
220 self.hit_count += 1
221 return self.cache[start - self.start : end - self.start]
222 elif self.start <= start < self.end:
223 # partial hit
224 self.miss_count += 1
225 part = self.cache[start - self.start :]
226 l -= len(part)
227 start = self.end
228 else:
229 # miss
230 self.miss_count += 1
231 part = b""
232 end = min(self.size, end + self.blocksize)
233 self.total_requested_bytes += end - start
234 self.cache = self.fetcher(start, end) # new block replaces old
235 self.start = start
236 self.end = self.start + len(self.cache)
237 return part + self.cache[:l]
240class FirstChunkCache(BaseCache):
241 """Caches the first block of a file only
243 This may be useful for file types where the metadata is stored in the header,
244 but is randomly accessed.
245 """
247 name = "first"
249 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
250 if blocksize > size:
251 # this will buffer the whole thing
252 blocksize = size
253 super().__init__(blocksize, fetcher, size)
254 self.cache: bytes | None = None
256 def _fetch(self, start: int | None, end: int | None) -> bytes:
257 start = start or 0
258 if start > self.size:
259 logger.debug("FirstChunkCache: requested start > file size")
260 return b""
262 end = min(end, self.size)
264 if start < self.blocksize:
265 if self.cache is None:
266 self.miss_count += 1
267 if end > self.blocksize:
268 self.total_requested_bytes += end
269 data = self.fetcher(0, end)
270 self.cache = data[: self.blocksize]
271 return data[start:]
272 self.cache = self.fetcher(0, self.blocksize)
273 self.total_requested_bytes += self.blocksize
274 part = self.cache[start:end]
275 if end > self.blocksize:
276 self.total_requested_bytes += end - self.blocksize
277 part += self.fetcher(self.blocksize, end)
278 self.hit_count += 1
279 return part
280 else:
281 self.miss_count += 1
282 self.total_requested_bytes += end - start
283 return self.fetcher(start, end)
286class BlockCache(BaseCache):
287 """
288 Cache holding memory as a set of blocks.
290 Requests are only ever made ``blocksize`` at a time, and are
291 stored in an LRU cache. The least recently accessed block is
292 discarded when more than ``maxblocks`` are stored.
294 Parameters
295 ----------
296 blocksize : int
297 The number of bytes to store in each block.
298 Requests are only ever made for ``blocksize``, so this
299 should balance the overhead of making a request against
300 the granularity of the blocks.
301 fetcher : Callable
302 size : int
303 The total size of the file being cached.
304 maxblocks : int
305 The maximum number of blocks to cache for. The maximum memory
306 use for this cache is then ``blocksize * maxblocks``.
307 """
309 name = "blockcache"
311 def __init__(
312 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
313 ) -> None:
314 super().__init__(blocksize, fetcher, size)
315 self.nblocks = math.ceil(size / blocksize)
316 self.maxblocks = maxblocks
317 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block)
319 def cache_info(self):
320 """
321 The statistics on the block cache.
323 Returns
324 -------
325 NamedTuple
326 Returned directly from the LRU Cache used internally.
327 """
328 return self._fetch_block_cached.cache_info()
330 def __getstate__(self) -> dict[str, Any]:
331 state = self.__dict__
332 del state["_fetch_block_cached"]
333 return state
335 def __setstate__(self, state: dict[str, Any]) -> None:
336 self.__dict__.update(state)
337 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])(
338 self._fetch_block
339 )
341 def _fetch(self, start: int | None, end: int | None) -> bytes:
342 if start is None:
343 start = 0
344 if end is None:
345 end = self.size
346 if start >= self.size or start >= end:
347 return b""
349 # byte position -> block numbers
350 start_block_number = start // self.blocksize
351 end_block_number = end // self.blocksize
353 # these are cached, so safe to do multiple calls for the same start and end.
354 for block_number in range(start_block_number, end_block_number + 1):
355 self._fetch_block_cached(block_number)
357 return self._read_cache(
358 start,
359 end,
360 start_block_number=start_block_number,
361 end_block_number=end_block_number,
362 )
364 def _fetch_block(self, block_number: int) -> bytes:
365 """
366 Fetch the block of data for `block_number`.
367 """
368 if block_number > self.nblocks:
369 raise ValueError(
370 f"'block_number={block_number}' is greater than "
371 f"the number of blocks ({self.nblocks})"
372 )
374 start = block_number * self.blocksize
375 end = start + self.blocksize
376 self.total_requested_bytes += end - start
377 self.miss_count += 1
378 logger.info("BlockCache fetching block %d", block_number)
379 block_contents = super()._fetch(start, end)
380 return block_contents
382 def _read_cache(
383 self, start: int, end: int, start_block_number: int, end_block_number: int
384 ) -> bytes:
385 """
386 Read from our block cache.
388 Parameters
389 ----------
390 start, end : int
391 The start and end byte positions.
392 start_block_number, end_block_number : int
393 The start and end block numbers.
394 """
395 start_pos = start % self.blocksize
396 end_pos = end % self.blocksize
398 self.hit_count += 1
399 if start_block_number == end_block_number:
400 block: bytes = self._fetch_block_cached(start_block_number)
401 return block[start_pos:end_pos]
403 else:
404 # read from the initial
405 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
407 # intermediate blocks
408 # Note: it'd be nice to combine these into one big request. However
409 # that doesn't play nicely with our LRU cache.
410 out.extend(
411 map(
412 self._fetch_block_cached,
413 range(start_block_number + 1, end_block_number),
414 )
415 )
417 # final block
418 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
420 return b"".join(out)
423class BytesCache(BaseCache):
424 """Cache which holds data in a in-memory bytes object
426 Implements read-ahead by the block size, for semi-random reads progressing
427 through the file.
429 Parameters
430 ----------
431 trim: bool
432 As we read more data, whether to discard the start of the buffer when
433 we are more than a blocksize ahead of it.
434 """
436 name: ClassVar[str] = "bytes"
438 def __init__(
439 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True
440 ) -> None:
441 super().__init__(blocksize, fetcher, size)
442 self.cache = b""
443 self.start: int | None = None
444 self.end: int | None = None
445 self.trim = trim
447 def _fetch(self, start: int | None, end: int | None) -> bytes:
448 # TODO: only set start/end after fetch, in case it fails?
449 # is this where retry logic might go?
450 if start is None:
451 start = 0
452 if end is None:
453 end = self.size
454 if start >= self.size or start >= end:
455 return b""
456 if (
457 self.start is not None
458 and start >= self.start
459 and self.end is not None
460 and end < self.end
461 ):
462 # cache hit: we have all the required data
463 offset = start - self.start
464 self.hit_count += 1
465 return self.cache[offset : offset + end - start]
467 if self.blocksize:
468 bend = min(self.size, end + self.blocksize)
469 else:
470 bend = end
472 if bend == start or start > self.size:
473 return b""
475 if (self.start is None or start < self.start) and (
476 self.end is None or end > self.end
477 ):
478 # First read, or extending both before and after
479 self.total_requested_bytes += bend - start
480 self.miss_count += 1
481 self.cache = self.fetcher(start, bend)
482 self.start = start
483 else:
484 assert self.start is not None
485 assert self.end is not None
486 self.miss_count += 1
488 if start < self.start:
489 if self.end is None or self.end - end > self.blocksize:
490 self.total_requested_bytes += bend - start
491 self.cache = self.fetcher(start, bend)
492 self.start = start
493 else:
494 self.total_requested_bytes += self.start - start
495 new = self.fetcher(start, self.start)
496 self.start = start
497 self.cache = new + self.cache
498 elif self.end is not None and bend > self.end:
499 if self.end > self.size:
500 pass
501 elif end - self.end > self.blocksize:
502 self.total_requested_bytes += bend - start
503 self.cache = self.fetcher(start, bend)
504 self.start = start
505 else:
506 self.total_requested_bytes += bend - self.end
507 new = self.fetcher(self.end, bend)
508 self.cache = self.cache + new
510 self.end = self.start + len(self.cache)
511 offset = start - self.start
512 out = self.cache[offset : offset + end - start]
513 if self.trim:
514 num = (self.end - self.start) // (self.blocksize + 1)
515 if num > 1:
516 self.start += self.blocksize * num
517 self.cache = self.cache[self.blocksize * num :]
518 return out
520 def __len__(self) -> int:
521 return len(self.cache)
524class AllBytes(BaseCache):
525 """Cache entire contents of the file"""
527 name: ClassVar[str] = "all"
529 def __init__(
530 self,
531 blocksize: int | None = None,
532 fetcher: Fetcher | None = None,
533 size: int | None = None,
534 data: bytes | None = None,
535 ) -> None:
536 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type]
537 if data is None:
538 self.miss_count += 1
539 self.total_requested_bytes += self.size
540 data = self.fetcher(0, self.size)
541 self.data = data
543 def _fetch(self, start: int | None, stop: int | None) -> bytes:
544 self.hit_count += 1
545 return self.data[start:stop]
548class KnownPartsOfAFile(BaseCache):
549 """
550 Cache holding known file parts.
552 Parameters
553 ----------
554 blocksize: int
555 How far to read ahead in numbers of bytes
556 fetcher: func
557 Function of the form f(start, end) which gets bytes from remote as
558 specified
559 size: int
560 How big this file is
561 data: dict
562 A dictionary mapping explicit `(start, stop)` file-offset tuples
563 with known bytes.
564 strict: bool, default True
565 Whether to fetch reads that go beyond a known byte-range boundary.
566 If `False`, any read that ends outside a known part will be zero
567 padded. Note that zero padding will not be used for reads that
568 begin outside a known byte-range.
569 """
571 name: ClassVar[str] = "parts"
573 def __init__(
574 self,
575 blocksize: int,
576 fetcher: Fetcher,
577 size: int,
578 data: Optional[dict[tuple[int, int], bytes]] = None,
579 strict: bool = True,
580 **_: Any,
581 ):
582 super().__init__(blocksize, fetcher, size)
583 self.strict = strict
585 # simple consolidation of contiguous blocks
586 if data:
587 old_offsets = sorted(data.keys())
588 offsets = [old_offsets[0]]
589 blocks = [data.pop(old_offsets[0])]
590 for start, stop in old_offsets[1:]:
591 start0, stop0 = offsets[-1]
592 if start == stop0:
593 offsets[-1] = (start0, stop)
594 blocks[-1] += data.pop((start, stop))
595 else:
596 offsets.append((start, stop))
597 blocks.append(data.pop((start, stop)))
599 self.data = dict(zip(offsets, blocks))
600 else:
601 self.data = {}
603 def _fetch(self, start: int | None, stop: int | None) -> bytes:
604 if start is None:
605 start = 0
606 if stop is None:
607 stop = self.size
609 out = b""
610 for (loc0, loc1), data in self.data.items():
611 # If self.strict=False, use zero-padded data
612 # for reads beyond the end of a "known" buffer
613 if loc0 <= start < loc1:
614 off = start - loc0
615 out = data[off : off + stop - start]
616 if not self.strict or loc0 <= stop <= loc1:
617 # The request is within a known range, or
618 # it begins within a known range, and we
619 # are allowed to pad reads beyond the
620 # buffer with zero
621 out += b"\x00" * (stop - start - len(out))
622 self.hit_count += 1
623 return out
624 else:
625 # The request ends outside a known range,
626 # and we are being "strict" about reads
627 # beyond the buffer
628 start = loc1
629 break
631 # We only get here if there is a request outside the
632 # known parts of the file. In an ideal world, this
633 # should never happen
634 if self.fetcher is None:
635 # We cannot fetch the data, so raise an error
636 raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ")
637 # We can fetch the data, but should warn the user
638 # that this may be slow
639 warnings.warn(
640 f"Read is outside the known file parts: {(start, stop)}. "
641 f"IO/caching performance may be poor!"
642 )
643 logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}")
644 self.total_requested_bytes += stop - start
645 self.miss_count += 1
646 return out + super()._fetch(start, stop)
649class UpdatableLRU(Generic[P, T]):
650 """
651 Custom implementation of LRU cache that allows updating keys
653 Used by BackgroudBlockCache
654 """
656 class CacheInfo(NamedTuple):
657 hits: int
658 misses: int
659 maxsize: int
660 currsize: int
662 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None:
663 self._cache: OrderedDict[Any, T] = collections.OrderedDict()
664 self._func = func
665 self._max_size = max_size
666 self._hits = 0
667 self._misses = 0
668 self._lock = threading.Lock()
670 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
671 if kwargs:
672 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}")
673 with self._lock:
674 if args in self._cache:
675 self._cache.move_to_end(args)
676 self._hits += 1
677 return self._cache[args]
679 result = self._func(*args, **kwargs)
681 with self._lock:
682 self._cache[args] = result
683 self._misses += 1
684 if len(self._cache) > self._max_size:
685 self._cache.popitem(last=False)
687 return result
689 def is_key_cached(self, *args: Any) -> bool:
690 with self._lock:
691 return args in self._cache
693 def add_key(self, result: T, *args: Any) -> None:
694 with self._lock:
695 self._cache[args] = result
696 if len(self._cache) > self._max_size:
697 self._cache.popitem(last=False)
699 def cache_info(self) -> UpdatableLRU.CacheInfo:
700 with self._lock:
701 return self.CacheInfo(
702 maxsize=self._max_size,
703 currsize=len(self._cache),
704 hits=self._hits,
705 misses=self._misses,
706 )
709class BackgroundBlockCache(BaseCache):
710 """
711 Cache holding memory as a set of blocks with pre-loading of
712 the next block in the background.
714 Requests are only ever made ``blocksize`` at a time, and are
715 stored in an LRU cache. The least recently accessed block is
716 discarded when more than ``maxblocks`` are stored. If the
717 next block is not in cache, it is loaded in a separate thread
718 in non-blocking way.
720 Parameters
721 ----------
722 blocksize : int
723 The number of bytes to store in each block.
724 Requests are only ever made for ``blocksize``, so this
725 should balance the overhead of making a request against
726 the granularity of the blocks.
727 fetcher : Callable
728 size : int
729 The total size of the file being cached.
730 maxblocks : int
731 The maximum number of blocks to cache for. The maximum memory
732 use for this cache is then ``blocksize * maxblocks``.
733 """
735 name: ClassVar[str] = "background"
737 def __init__(
738 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
739 ) -> None:
740 super().__init__(blocksize, fetcher, size)
741 self.nblocks = math.ceil(size / blocksize)
742 self.maxblocks = maxblocks
743 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks)
745 self._thread_executor = ThreadPoolExecutor(max_workers=1)
746 self._fetch_future_block_number: int | None = None
747 self._fetch_future: Future[bytes] | None = None
748 self._fetch_future_lock = threading.Lock()
750 def cache_info(self) -> UpdatableLRU.CacheInfo:
751 """
752 The statistics on the block cache.
754 Returns
755 -------
756 NamedTuple
757 Returned directly from the LRU Cache used internally.
758 """
759 return self._fetch_block_cached.cache_info()
761 def __getstate__(self) -> dict[str, Any]:
762 state = self.__dict__
763 del state["_fetch_block_cached"]
764 del state["_thread_executor"]
765 del state["_fetch_future_block_number"]
766 del state["_fetch_future"]
767 del state["_fetch_future_lock"]
768 return state
770 def __setstate__(self, state) -> None:
771 self.__dict__.update(state)
772 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"])
773 self._thread_executor = ThreadPoolExecutor(max_workers=1)
774 self._fetch_future_block_number = None
775 self._fetch_future = None
776 self._fetch_future_lock = threading.Lock()
778 def _fetch(self, start: int | None, end: int | None) -> bytes:
779 if start is None:
780 start = 0
781 if end is None:
782 end = self.size
783 if start >= self.size or start >= end:
784 return b""
786 # byte position -> block numbers
787 start_block_number = start // self.blocksize
788 end_block_number = end // self.blocksize
790 fetch_future_block_number = None
791 fetch_future = None
792 with self._fetch_future_lock:
793 # Background thread is running. Check we we can or must join it.
794 if self._fetch_future is not None:
795 assert self._fetch_future_block_number is not None
796 if self._fetch_future.done():
797 logger.info("BlockCache joined background fetch without waiting.")
798 self._fetch_block_cached.add_key(
799 self._fetch_future.result(), self._fetch_future_block_number
800 )
801 # Cleanup the fetch variables. Done with fetching the block.
802 self._fetch_future_block_number = None
803 self._fetch_future = None
804 else:
805 # Must join if we need the block for the current fetch
806 must_join = bool(
807 start_block_number
808 <= self._fetch_future_block_number
809 <= end_block_number
810 )
811 if must_join:
812 # Copy to the local variables to release lock
813 # before waiting for result
814 fetch_future_block_number = self._fetch_future_block_number
815 fetch_future = self._fetch_future
817 # Cleanup the fetch variables. Have a local copy.
818 self._fetch_future_block_number = None
819 self._fetch_future = None
821 # Need to wait for the future for the current read
822 if fetch_future is not None:
823 logger.info("BlockCache waiting for background fetch.")
824 # Wait until result and put it in cache
825 self._fetch_block_cached.add_key(
826 fetch_future.result(), fetch_future_block_number
827 )
829 # these are cached, so safe to do multiple calls for the same start and end.
830 for block_number in range(start_block_number, end_block_number + 1):
831 self._fetch_block_cached(block_number)
833 # fetch next block in the background if nothing is running in the background,
834 # the block is within file and it is not already cached
835 end_block_plus_1 = end_block_number + 1
836 with self._fetch_future_lock:
837 if (
838 self._fetch_future is None
839 and end_block_plus_1 <= self.nblocks
840 and not self._fetch_block_cached.is_key_cached(end_block_plus_1)
841 ):
842 self._fetch_future_block_number = end_block_plus_1
843 self._fetch_future = self._thread_executor.submit(
844 self._fetch_block, end_block_plus_1, "async"
845 )
847 return self._read_cache(
848 start,
849 end,
850 start_block_number=start_block_number,
851 end_block_number=end_block_number,
852 )
854 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes:
855 """
856 Fetch the block of data for `block_number`.
857 """
858 if block_number > self.nblocks:
859 raise ValueError(
860 f"'block_number={block_number}' is greater than "
861 f"the number of blocks ({self.nblocks})"
862 )
864 start = block_number * self.blocksize
865 end = start + self.blocksize
866 logger.info("BlockCache fetching block (%s) %d", log_info, block_number)
867 self.total_requested_bytes += end - start
868 self.miss_count += 1
869 block_contents = super()._fetch(start, end)
870 return block_contents
872 def _read_cache(
873 self, start: int, end: int, start_block_number: int, end_block_number: int
874 ) -> bytes:
875 """
876 Read from our block cache.
878 Parameters
879 ----------
880 start, end : int
881 The start and end byte positions.
882 start_block_number, end_block_number : int
883 The start and end block numbers.
884 """
885 start_pos = start % self.blocksize
886 end_pos = end % self.blocksize
888 # kind of pointless to count this as a hit, but it is
889 self.hit_count += 1
891 if start_block_number == end_block_number:
892 block = self._fetch_block_cached(start_block_number)
893 return block[start_pos:end_pos]
895 else:
896 # read from the initial
897 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
899 # intermediate blocks
900 # Note: it'd be nice to combine these into one big request. However
901 # that doesn't play nicely with our LRU cache.
902 out.extend(
903 map(
904 self._fetch_block_cached,
905 range(start_block_number + 1, end_block_number),
906 )
907 )
909 # final block
910 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
912 return b"".join(out)
915caches: dict[str | None, type[BaseCache]] = {
916 # one custom case
917 None: BaseCache,
918}
921def register_cache(cls: type[BaseCache], clobber: bool = False) -> None:
922 """'Register' cache implementation.
924 Parameters
925 ----------
926 clobber: bool, optional
927 If set to True (default is False) - allow to overwrite existing
928 entry.
930 Raises
931 ------
932 ValueError
933 """
934 name = cls.name
935 if not clobber and name in caches:
936 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}")
937 caches[name] = cls
940for c in (
941 BaseCache,
942 MMapCache,
943 BytesCache,
944 ReadAheadCache,
945 BlockCache,
946 FirstChunkCache,
947 AllBytes,
948 KnownPartsOfAFile,
949 BackgroundBlockCache,
950):
951 register_cache(c)