Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/caching.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections
4import functools
5import logging
6import math
7import os
8import threading
9from collections import OrderedDict
10from collections.abc import Callable
11from concurrent.futures import Future, ThreadPoolExecutor
12from itertools import groupby
13from operator import itemgetter
14from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, TypeVar
16if TYPE_CHECKING:
17 import mmap
19 from typing_extensions import ParamSpec
21 P = ParamSpec("P")
22else:
23 P = TypeVar("P")
25T = TypeVar("T")
28logger = logging.getLogger("fsspec.caching")
30Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes
31MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes
34class BaseCache:
35 """Pass-though cache: doesn't keep anything, calls every time
37 Acts as base class for other cachers
39 Parameters
40 ----------
41 blocksize: int
42 How far to read ahead in numbers of bytes
43 fetcher: func
44 Function of the form f(start, end) which gets bytes from remote as
45 specified
46 size: int
47 How big this file is
48 """
50 name: ClassVar[str] = "none"
52 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
53 self.blocksize = blocksize
54 self.nblocks = 0
55 self.fetcher = fetcher
56 self.size = size
57 self.hit_count = 0
58 self.miss_count = 0
59 # the bytes that we actually requested
60 self.total_requested_bytes = 0
62 def _fetch(self, start: int | None, stop: int | None) -> bytes:
63 if start is None:
64 start = 0
65 if stop is None:
66 stop = self.size
67 if start >= self.size or start >= stop:
68 return b""
69 return self.fetcher(start, stop)
71 def _reset_stats(self) -> None:
72 """Reset hit and miss counts for a more ganular report e.g. by file."""
73 self.hit_count = 0
74 self.miss_count = 0
75 self.total_requested_bytes = 0
77 def _log_stats(self) -> str:
78 """Return a formatted string of the cache statistics."""
79 if self.hit_count == 0 and self.miss_count == 0:
80 # a cache that does nothing, this is for logs only
81 return ""
82 return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes"
84 def __repr__(self) -> str:
85 # TODO: use rich for better formatting
86 return f"""
87 <{self.__class__.__name__}:
88 block size : {self.blocksize}
89 block count : {self.nblocks}
90 file size : {self.size}
91 cache hits : {self.hit_count}
92 cache misses: {self.miss_count}
93 total requested bytes: {self.total_requested_bytes}>
94 """
97class MMapCache(BaseCache):
98 """memory-mapped sparse file cache
100 Opens temporary file, which is filled blocks-wise when data is requested.
101 Ensure there is enough disc space in the temporary location.
103 This cache method might only work on posix
105 Parameters
106 ----------
107 blocksize: int
108 How far to read ahead in numbers of bytes
109 fetcher: Fetcher
110 Function of the form f(start, end) which gets bytes from remote as
111 specified
112 size: int
113 How big this file is
114 location: str
115 Where to create the temporary file. If None, a temporary file is
116 created using tempfile.TemporaryFile().
117 blocks: set[int]
118 Set of block numbers that have already been fetched. If None, an empty
119 set is created.
120 multi_fetcher: MultiFetcher
121 Function of the form f([(start, end)]) which gets bytes from remote
122 as specified. This function is used to fetch multiple blocks at once.
123 If not specified, the fetcher function is used instead.
124 """
126 name = "mmap"
128 def __init__(
129 self,
130 blocksize: int,
131 fetcher: Fetcher,
132 size: int,
133 location: str | None = None,
134 blocks: set[int] | None = None,
135 multi_fetcher: MultiFetcher | None = None,
136 ) -> None:
137 super().__init__(blocksize, fetcher, size)
138 self.blocks = set() if blocks is None else blocks
139 self.location = location
140 self.multi_fetcher = multi_fetcher
141 self.cache = self._makefile()
143 def _makefile(self) -> mmap.mmap | bytearray:
144 import mmap
145 import tempfile
147 if self.size == 0:
148 return bytearray()
150 # posix version
151 if self.location is None or not os.path.exists(self.location):
152 if self.location is None:
153 fd = tempfile.TemporaryFile()
154 self.blocks = set()
155 else:
156 fd = open(self.location, "wb+")
157 fd.seek(self.size - 1)
158 fd.write(b"1")
159 fd.flush()
160 else:
161 fd = open(self.location, "r+b")
163 return mmap.mmap(fd.fileno(), self.size)
165 def _fetch(self, start: int | None, end: int | None) -> bytes:
166 logger.debug(f"MMap cache fetching {start}-{end}")
167 if start is None:
168 start = 0
169 if end is None:
170 end = self.size
171 if start >= self.size or start >= end:
172 return b""
173 start_block = start // self.blocksize
174 end_block = end // self.blocksize
175 block_range = range(start_block, end_block + 1)
176 # Determine which blocks need to be fetched. This sequence is sorted by construction.
177 need = (i for i in block_range if i not in self.blocks)
178 # Count the number of blocks already cached
179 self.hit_count += sum(1 for i in block_range if i in self.blocks)
181 ranges = []
183 # Consolidate needed blocks.
184 # Algorithm adapted from Python 2.x itertools documentation.
185 # We are grouping an enumerated sequence of blocks. By comparing when the difference
186 # between an ascending range (provided by enumerate) and the needed block numbers
187 # we can detect when the block number skips values. The key computes this difference.
188 # Whenever the difference changes, we know that we have previously cached block(s),
189 # and a new group is started. In other words, this algorithm neatly groups
190 # runs of consecutive block numbers so they can be fetched together.
191 for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]):
192 # Extract the blocks from the enumerated sequence
193 _blocks = tuple(map(itemgetter(1), _blocks))
194 # Compute start of first block
195 sstart = _blocks[0] * self.blocksize
196 # Compute the end of the last block. Last block may not be full size.
197 send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size)
199 # Fetch bytes (could be multiple consecutive blocks)
200 self.total_requested_bytes += send - sstart
201 logger.debug(
202 f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})"
203 )
204 ranges.append((sstart, send))
206 # Update set of cached blocks
207 self.blocks.update(_blocks)
208 # Update cache statistics with number of blocks we had to cache
209 self.miss_count += len(_blocks)
211 if not ranges:
212 return self.cache[start:end]
214 if self.multi_fetcher:
215 logger.debug(f"MMap get blocks {ranges}")
216 for idx, r in enumerate(self.multi_fetcher(ranges)):
217 sstart, send = ranges[idx]
218 logger.debug(f"MMap copy block ({sstart}-{send}")
219 self.cache[sstart:send] = r
220 else:
221 for sstart, send in ranges:
222 logger.debug(f"MMap get block ({sstart}-{send}")
223 self.cache[sstart:send] = self.fetcher(sstart, send)
225 return self.cache[start:end]
227 def __getstate__(self) -> dict[str, Any]:
228 state = self.__dict__.copy()
229 # Remove the unpicklable entries.
230 del state["cache"]
231 return state
233 def __setstate__(self, state: dict[str, Any]) -> None:
234 # Restore instance attributes
235 self.__dict__.update(state)
236 self.cache = self._makefile()
239class ReadAheadCache(BaseCache):
240 """Cache which reads only when we get beyond a block of data
242 This is a much simpler version of BytesCache, and does not attempt to
243 fill holes in the cache or keep fragments alive. It is best suited to
244 many small reads in a sequential order (e.g., reading lines from a file).
245 """
247 name = "readahead"
249 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
250 super().__init__(blocksize, fetcher, size)
251 self.cache = b""
252 self.start = 0
253 self.end = 0
255 def _fetch(self, start: int | None, end: int | None) -> bytes:
256 if start is None:
257 start = 0
258 if end is None or end > self.size:
259 end = self.size
260 if start >= self.size or start >= end:
261 return b""
262 l = end - start
263 if start >= self.start and end <= self.end:
264 # cache hit
265 self.hit_count += 1
266 return self.cache[start - self.start : end - self.start]
267 elif self.start <= start < self.end:
268 # partial hit
269 self.miss_count += 1
270 part = self.cache[start - self.start :]
271 l -= len(part)
272 start = self.end
273 else:
274 # miss
275 self.miss_count += 1
276 part = b""
277 end = min(self.size, end + self.blocksize)
278 self.total_requested_bytes += end - start
279 self.cache = self.fetcher(start, end) # new block replaces old
280 self.start = start
281 self.end = self.start + len(self.cache)
282 return part + self.cache[:l]
285class FirstChunkCache(BaseCache):
286 """Caches the first block of a file only
288 This may be useful for file types where the metadata is stored in the header,
289 but is randomly accessed.
290 """
292 name = "first"
294 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
295 if blocksize > size:
296 # this will buffer the whole thing
297 blocksize = size
298 super().__init__(blocksize, fetcher, size)
299 self.cache: bytes | None = None
301 def _fetch(self, start: int | None, end: int | None) -> bytes:
302 start = start or 0
303 if start > self.size:
304 logger.debug("FirstChunkCache: requested start > file size")
305 return b""
307 end = min(end, self.size)
309 if start < self.blocksize:
310 if self.cache is None:
311 self.miss_count += 1
312 if end > self.blocksize:
313 self.total_requested_bytes += end
314 data = self.fetcher(0, end)
315 self.cache = data[: self.blocksize]
316 return data[start:]
317 self.cache = self.fetcher(0, self.blocksize)
318 self.total_requested_bytes += self.blocksize
319 part = self.cache[start:end]
320 if end > self.blocksize:
321 self.total_requested_bytes += end - self.blocksize
322 part += self.fetcher(self.blocksize, end)
323 self.hit_count += 1
324 return part
325 else:
326 self.miss_count += 1
327 self.total_requested_bytes += end - start
328 return self.fetcher(start, end)
331class BlockCache(BaseCache):
332 """
333 Cache holding memory as a set of blocks.
335 Requests are only ever made ``blocksize`` at a time, and are
336 stored in an LRU cache. The least recently accessed block is
337 discarded when more than ``maxblocks`` are stored.
339 Parameters
340 ----------
341 blocksize : int
342 The number of bytes to store in each block.
343 Requests are only ever made for ``blocksize``, so this
344 should balance the overhead of making a request against
345 the granularity of the blocks.
346 fetcher : Callable
347 size : int
348 The total size of the file being cached.
349 maxblocks : int
350 The maximum number of blocks to cache for. The maximum memory
351 use for this cache is then ``blocksize * maxblocks``.
352 """
354 name = "blockcache"
356 def __init__(
357 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
358 ) -> None:
359 super().__init__(blocksize, fetcher, size)
360 self.nblocks = math.ceil(size / blocksize)
361 self.maxblocks = maxblocks
362 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block)
364 def cache_info(self):
365 """
366 The statistics on the block cache.
368 Returns
369 -------
370 NamedTuple
371 Returned directly from the LRU Cache used internally.
372 """
373 return self._fetch_block_cached.cache_info()
375 def __getstate__(self) -> dict[str, Any]:
376 state = self.__dict__
377 del state["_fetch_block_cached"]
378 return state
380 def __setstate__(self, state: dict[str, Any]) -> None:
381 self.__dict__.update(state)
382 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])(
383 self._fetch_block
384 )
386 def _fetch(self, start: int | None, end: int | None) -> bytes:
387 if start is None:
388 start = 0
389 if end is None:
390 end = self.size
391 if start >= self.size or start >= end:
392 return b""
394 return self._read_cache(
395 start, end, start // self.blocksize, (end - 1) // self.blocksize
396 )
398 def _fetch_block(self, block_number: int) -> bytes:
399 """
400 Fetch the block of data for `block_number`.
401 """
402 if block_number > self.nblocks:
403 raise ValueError(
404 f"'block_number={block_number}' is greater than "
405 f"the number of blocks ({self.nblocks})"
406 )
408 start = block_number * self.blocksize
409 end = start + self.blocksize
410 self.total_requested_bytes += end - start
411 self.miss_count += 1
412 logger.info("BlockCache fetching block %d", block_number)
413 block_contents = super()._fetch(start, end)
414 return block_contents
416 def _read_cache(
417 self, start: int, end: int, start_block_number: int, end_block_number: int
418 ) -> bytes:
419 """
420 Read from our block cache.
422 Parameters
423 ----------
424 start, end : int
425 The start and end byte positions.
426 start_block_number, end_block_number : int
427 The start and end block numbers.
428 """
429 start_pos = start % self.blocksize
430 end_pos = end % self.blocksize
431 if end_pos == 0:
432 end_pos = self.blocksize
434 self.hit_count += 1
435 if start_block_number == end_block_number:
436 block: bytes = self._fetch_block_cached(start_block_number)
437 return block[start_pos:end_pos]
439 else:
440 # read from the initial
441 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
443 # intermediate blocks
444 # Note: it'd be nice to combine these into one big request. However
445 # that doesn't play nicely with our LRU cache.
446 out.extend(
447 map(
448 self._fetch_block_cached,
449 range(start_block_number + 1, end_block_number),
450 )
451 )
453 # final block
454 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
456 return b"".join(out)
459class BytesCache(BaseCache):
460 """Cache which holds data in a in-memory bytes object
462 Implements read-ahead by the block size, for semi-random reads progressing
463 through the file.
465 Parameters
466 ----------
467 trim: bool
468 As we read more data, whether to discard the start of the buffer when
469 we are more than a blocksize ahead of it.
470 """
472 name: ClassVar[str] = "bytes"
474 def __init__(
475 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True
476 ) -> None:
477 super().__init__(blocksize, fetcher, size)
478 self.cache = b""
479 self.start: int | None = None
480 self.end: int | None = None
481 self.trim = trim
483 def _fetch(self, start: int | None, end: int | None) -> bytes:
484 # TODO: only set start/end after fetch, in case it fails?
485 # is this where retry logic might go?
486 if start is None:
487 start = 0
488 if end is None:
489 end = self.size
490 if start >= self.size or start >= end:
491 return b""
492 if (
493 self.start is not None
494 and start >= self.start
495 and self.end is not None
496 and end < self.end
497 ):
498 # cache hit: we have all the required data
499 offset = start - self.start
500 self.hit_count += 1
501 return self.cache[offset : offset + end - start]
503 if self.blocksize:
504 bend = min(self.size, end + self.blocksize)
505 else:
506 bend = end
508 if bend == start or start > self.size:
509 return b""
511 if (self.start is None or start < self.start) and (
512 self.end is None or end > self.end
513 ):
514 # First read, or extending both before and after
515 self.total_requested_bytes += bend - start
516 self.miss_count += 1
517 self.cache = self.fetcher(start, bend)
518 self.start = start
519 else:
520 assert self.start is not None
521 assert self.end is not None
522 self.miss_count += 1
524 if start < self.start:
525 if self.end is None or self.end - end > self.blocksize:
526 self.total_requested_bytes += bend - start
527 self.cache = self.fetcher(start, bend)
528 self.start = start
529 else:
530 self.total_requested_bytes += self.start - start
531 new = self.fetcher(start, self.start)
532 self.start = start
533 self.cache = new + self.cache
534 elif self.end is not None and bend > self.end:
535 if self.end > self.size:
536 pass
537 elif end - self.end > self.blocksize:
538 self.total_requested_bytes += bend - start
539 self.cache = self.fetcher(start, bend)
540 self.start = start
541 else:
542 self.total_requested_bytes += bend - self.end
543 new = self.fetcher(self.end, bend)
544 self.cache = self.cache + new
546 self.end = self.start + len(self.cache)
547 offset = start - self.start
548 out = self.cache[offset : offset + end - start]
549 if self.trim:
550 num = (self.end - self.start) // (self.blocksize + 1)
551 if num > 1:
552 self.start += self.blocksize * num
553 self.cache = self.cache[self.blocksize * num :]
554 return out
556 def __len__(self) -> int:
557 return len(self.cache)
560class AllBytes(BaseCache):
561 """Cache entire contents of the file"""
563 name: ClassVar[str] = "all"
565 def __init__(
566 self,
567 blocksize: int | None = None,
568 fetcher: Fetcher | None = None,
569 size: int | None = None,
570 data: bytes | None = None,
571 ) -> None:
572 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type]
573 if data is None:
574 self.miss_count += 1
575 self.total_requested_bytes += self.size
576 data = self.fetcher(0, self.size)
577 self.data = data
579 def _fetch(self, start: int | None, stop: int | None) -> bytes:
580 self.hit_count += 1
581 return self.data[start:stop]
584class KnownPartsOfAFile(BaseCache):
585 """
586 Cache holding known file parts.
588 Parameters
589 ----------
590 blocksize: int
591 How far to read ahead in numbers of bytes
592 fetcher: func
593 Function of the form f(start, end) which gets bytes from remote as
594 specified
595 size: int
596 How big this file is
597 data: dict
598 A dictionary mapping explicit `(start, stop)` file-offset tuples
599 with known bytes.
600 strict: bool, default True
601 Whether to fetch reads that go beyond a known byte-range boundary.
602 If `False`, any read that ends outside a known part will be zero
603 padded. Note that zero padding will not be used for reads that
604 begin outside a known byte-range.
605 """
607 name: ClassVar[str] = "parts"
609 def __init__(
610 self,
611 blocksize: int,
612 fetcher: Fetcher,
613 size: int,
614 data: dict[tuple[int, int], bytes] | None = None,
615 strict: bool = False,
616 **_: Any,
617 ):
618 super().__init__(blocksize, fetcher, size)
619 self.strict = strict
621 # simple consolidation of contiguous blocks
622 if data:
623 old_offsets = sorted(data.keys())
624 offsets = [old_offsets[0]]
625 blocks = [data.pop(old_offsets[0])]
626 for start, stop in old_offsets[1:]:
627 start0, stop0 = offsets[-1]
628 if start == stop0:
629 offsets[-1] = (start0, stop)
630 blocks[-1] += data.pop((start, stop))
631 else:
632 offsets.append((start, stop))
633 blocks.append(data.pop((start, stop)))
635 self.data = dict(zip(offsets, blocks))
636 else:
637 self.data = {}
639 @property
640 def size(self):
641 return sum(_[1] - _[0] for _ in self.data)
643 @size.setter
644 def size(self, value):
645 pass
647 @property
648 def nblocks(self):
649 return len(self.data)
651 @nblocks.setter
652 def nblocks(self, value):
653 pass
655 def _fetch(self, start: int | None, stop: int | None) -> bytes:
656 logger.debug("Known parts request %s %s", start, stop)
657 if start is None:
658 start = 0
659 if stop is None:
660 stop = self.size
661 self.total_requested_bytes += stop - start
662 out = b""
663 started = False
664 loc_old = 0
665 for loc0, loc1 in sorted(self.data):
666 if (loc0 <= start < loc1) and (loc0 <= stop <= loc1):
667 # entirely within the block
668 off = start - loc0
669 self.hit_count += 1
670 return self.data[(loc0, loc1)][off : off + stop - start]
671 if stop <= loc0:
672 break
673 if started and loc0 > loc_old:
674 # a gap where we need data
675 self.miss_count += 1
676 if self.strict:
677 raise ValueError
678 out += b"\x00" * (loc0 - loc_old)
679 if loc0 <= start < loc1:
680 # found the start
681 self.hit_count += 1
682 off = start - loc0
683 out = self.data[(loc0, loc1)][off : off + stop - start]
684 started = True
685 elif start < loc0 and stop > loc1:
686 # the whole block
687 self.hit_count += 1
688 out += self.data[(loc0, loc1)]
689 elif loc0 <= stop <= loc1:
690 # end block
691 self.hit_count += 1
692 out = out + self.data[(loc0, loc1)][: stop - loc0]
693 return out
694 loc_old = loc1
695 self.miss_count += 1
696 if started and not self.strict:
697 out = out + b"\x00" * (stop - loc_old)
698 return out
699 raise ValueError
702class UpdatableLRU(Generic[P, T]):
703 """
704 Custom implementation of LRU cache that allows updating keys
706 Used by BackgroudBlockCache
707 """
709 class CacheInfo(NamedTuple):
710 hits: int
711 misses: int
712 maxsize: int
713 currsize: int
715 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None:
716 self._cache: OrderedDict[Any, T] = collections.OrderedDict()
717 self._func = func
718 self._max_size = max_size
719 self._hits = 0
720 self._misses = 0
721 self._lock = threading.Lock()
723 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
724 if kwargs:
725 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}")
726 with self._lock:
727 if args in self._cache:
728 self._cache.move_to_end(args)
729 self._hits += 1
730 return self._cache[args]
732 result = self._func(*args, **kwargs)
734 with self._lock:
735 self._cache[args] = result
736 self._misses += 1
737 if len(self._cache) > self._max_size:
738 self._cache.popitem(last=False)
740 return result
742 def is_key_cached(self, *args: Any) -> bool:
743 with self._lock:
744 return args in self._cache
746 def add_key(self, result: T, *args: Any) -> None:
747 with self._lock:
748 self._cache[args] = result
749 if len(self._cache) > self._max_size:
750 self._cache.popitem(last=False)
752 def cache_info(self) -> UpdatableLRU.CacheInfo:
753 with self._lock:
754 return self.CacheInfo(
755 maxsize=self._max_size,
756 currsize=len(self._cache),
757 hits=self._hits,
758 misses=self._misses,
759 )
762class BackgroundBlockCache(BaseCache):
763 """
764 Cache holding memory as a set of blocks with pre-loading of
765 the next block in the background.
767 Requests are only ever made ``blocksize`` at a time, and are
768 stored in an LRU cache. The least recently accessed block is
769 discarded when more than ``maxblocks`` are stored. If the
770 next block is not in cache, it is loaded in a separate thread
771 in non-blocking way.
773 Parameters
774 ----------
775 blocksize : int
776 The number of bytes to store in each block.
777 Requests are only ever made for ``blocksize``, so this
778 should balance the overhead of making a request against
779 the granularity of the blocks.
780 fetcher : Callable
781 size : int
782 The total size of the file being cached.
783 maxblocks : int
784 The maximum number of blocks to cache for. The maximum memory
785 use for this cache is then ``blocksize * maxblocks``.
786 """
788 name: ClassVar[str] = "background"
790 def __init__(
791 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
792 ) -> None:
793 super().__init__(blocksize, fetcher, size)
794 self.nblocks = math.ceil(size / blocksize)
795 self.maxblocks = maxblocks
796 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks)
798 self._thread_executor = ThreadPoolExecutor(max_workers=1)
799 self._fetch_future_block_number: int | None = None
800 self._fetch_future: Future[bytes] | None = None
801 self._fetch_future_lock = threading.Lock()
803 def cache_info(self) -> UpdatableLRU.CacheInfo:
804 """
805 The statistics on the block cache.
807 Returns
808 -------
809 NamedTuple
810 Returned directly from the LRU Cache used internally.
811 """
812 return self._fetch_block_cached.cache_info()
814 def __getstate__(self) -> dict[str, Any]:
815 state = self.__dict__
816 del state["_fetch_block_cached"]
817 del state["_thread_executor"]
818 del state["_fetch_future_block_number"]
819 del state["_fetch_future"]
820 del state["_fetch_future_lock"]
821 return state
823 def __setstate__(self, state) -> None:
824 self.__dict__.update(state)
825 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"])
826 self._thread_executor = ThreadPoolExecutor(max_workers=1)
827 self._fetch_future_block_number = None
828 self._fetch_future = None
829 self._fetch_future_lock = threading.Lock()
831 def _fetch(self, start: int | None, end: int | None) -> bytes:
832 if start is None:
833 start = 0
834 if end is None:
835 end = self.size
836 if start >= self.size or start >= end:
837 return b""
839 # byte position -> block numbers
840 start_block_number = start // self.blocksize
841 end_block_number = end // self.blocksize
843 fetch_future_block_number = None
844 fetch_future = None
845 with self._fetch_future_lock:
846 # Background thread is running. Check we we can or must join it.
847 if self._fetch_future is not None:
848 assert self._fetch_future_block_number is not None
849 if self._fetch_future.done():
850 logger.info("BlockCache joined background fetch without waiting.")
851 self._fetch_block_cached.add_key(
852 self._fetch_future.result(), self._fetch_future_block_number
853 )
854 # Cleanup the fetch variables. Done with fetching the block.
855 self._fetch_future_block_number = None
856 self._fetch_future = None
857 else:
858 # Must join if we need the block for the current fetch
859 must_join = bool(
860 start_block_number
861 <= self._fetch_future_block_number
862 <= end_block_number
863 )
864 if must_join:
865 # Copy to the local variables to release lock
866 # before waiting for result
867 fetch_future_block_number = self._fetch_future_block_number
868 fetch_future = self._fetch_future
870 # Cleanup the fetch variables. Have a local copy.
871 self._fetch_future_block_number = None
872 self._fetch_future = None
874 # Need to wait for the future for the current read
875 if fetch_future is not None:
876 logger.info("BlockCache waiting for background fetch.")
877 # Wait until result and put it in cache
878 self._fetch_block_cached.add_key(
879 fetch_future.result(), fetch_future_block_number
880 )
882 # these are cached, so safe to do multiple calls for the same start and end.
883 for block_number in range(start_block_number, end_block_number + 1):
884 self._fetch_block_cached(block_number)
886 # fetch next block in the background if nothing is running in the background,
887 # the block is within file and it is not already cached
888 end_block_plus_1 = end_block_number + 1
889 with self._fetch_future_lock:
890 if (
891 self._fetch_future is None
892 and end_block_plus_1 <= self.nblocks
893 and not self._fetch_block_cached.is_key_cached(end_block_plus_1)
894 ):
895 self._fetch_future_block_number = end_block_plus_1
896 self._fetch_future = self._thread_executor.submit(
897 self._fetch_block, end_block_plus_1, "async"
898 )
900 return self._read_cache(
901 start,
902 end,
903 start_block_number=start_block_number,
904 end_block_number=end_block_number,
905 )
907 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes:
908 """
909 Fetch the block of data for `block_number`.
910 """
911 if block_number > self.nblocks:
912 raise ValueError(
913 f"'block_number={block_number}' is greater than "
914 f"the number of blocks ({self.nblocks})"
915 )
917 start = block_number * self.blocksize
918 end = start + self.blocksize
919 logger.info("BlockCache fetching block (%s) %d", log_info, block_number)
920 self.total_requested_bytes += end - start
921 self.miss_count += 1
922 block_contents = super()._fetch(start, end)
923 return block_contents
925 def _read_cache(
926 self, start: int, end: int, start_block_number: int, end_block_number: int
927 ) -> bytes:
928 """
929 Read from our block cache.
931 Parameters
932 ----------
933 start, end : int
934 The start and end byte positions.
935 start_block_number, end_block_number : int
936 The start and end block numbers.
937 """
938 start_pos = start % self.blocksize
939 end_pos = end % self.blocksize
941 # kind of pointless to count this as a hit, but it is
942 self.hit_count += 1
944 if start_block_number == end_block_number:
945 block = self._fetch_block_cached(start_block_number)
946 return block[start_pos:end_pos]
948 else:
949 # read from the initial
950 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
952 # intermediate blocks
953 # Note: it'd be nice to combine these into one big request. However
954 # that doesn't play nicely with our LRU cache.
955 out.extend(
956 map(
957 self._fetch_block_cached,
958 range(start_block_number + 1, end_block_number),
959 )
960 )
962 # final block
963 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
965 return b"".join(out)
968caches: dict[str | None, type[BaseCache]] = {
969 # one custom case
970 None: BaseCache,
971}
974def register_cache(cls: type[BaseCache], clobber: bool = False) -> None:
975 """'Register' cache implementation.
977 Parameters
978 ----------
979 clobber: bool, optional
980 If set to True (default is False) - allow to overwrite existing
981 entry.
983 Raises
984 ------
985 ValueError
986 """
987 name = cls.name
988 if not clobber and name in caches:
989 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}")
990 caches[name] = cls
993for c in (
994 BaseCache,
995 MMapCache,
996 BytesCache,
997 ReadAheadCache,
998 BlockCache,
999 FirstChunkCache,
1000 AllBytes,
1001 KnownPartsOfAFile,
1002 BackgroundBlockCache,
1003):
1004 register_cache(c)