Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/caching.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections
4import functools
5import logging
6import math
7import os
8import threading
9import warnings
10from collections import OrderedDict
11from collections.abc import Callable
12from concurrent.futures import Future, ThreadPoolExecutor
13from itertools import groupby
14from operator import itemgetter
15from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, TypeVar
17if TYPE_CHECKING:
18 import mmap
20 from typing_extensions import ParamSpec
22 P = ParamSpec("P")
23else:
24 P = TypeVar("P")
26T = TypeVar("T")
29logger = logging.getLogger("fsspec")
31Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes
32MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes
35class BaseCache:
36 """Pass-though cache: doesn't keep anything, calls every time
38 Acts as base class for other cachers
40 Parameters
41 ----------
42 blocksize: int
43 How far to read ahead in numbers of bytes
44 fetcher: func
45 Function of the form f(start, end) which gets bytes from remote as
46 specified
47 size: int
48 How big this file is
49 """
51 name: ClassVar[str] = "none"
53 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
54 self.blocksize = blocksize
55 self.nblocks = 0
56 self.fetcher = fetcher
57 self.size = size
58 self.hit_count = 0
59 self.miss_count = 0
60 # the bytes that we actually requested
61 self.total_requested_bytes = 0
63 def _fetch(self, start: int | None, stop: int | None) -> bytes:
64 if start is None:
65 start = 0
66 if stop is None:
67 stop = self.size
68 if start >= self.size or start >= stop:
69 return b""
70 return self.fetcher(start, stop)
72 def _reset_stats(self) -> None:
73 """Reset hit and miss counts for a more ganular report e.g. by file."""
74 self.hit_count = 0
75 self.miss_count = 0
76 self.total_requested_bytes = 0
78 def _log_stats(self) -> str:
79 """Return a formatted string of the cache statistics."""
80 if self.hit_count == 0 and self.miss_count == 0:
81 # a cache that does nothing, this is for logs only
82 return ""
83 return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes"
85 def __repr__(self) -> str:
86 # TODO: use rich for better formatting
87 return f"""
88 <{self.__class__.__name__}:
89 block size : {self.blocksize}
90 block count : {self.nblocks}
91 file size : {self.size}
92 cache hits : {self.hit_count}
93 cache misses: {self.miss_count}
94 total requested bytes: {self.total_requested_bytes}>
95 """
98class MMapCache(BaseCache):
99 """memory-mapped sparse file cache
101 Opens temporary file, which is filled blocks-wise when data is requested.
102 Ensure there is enough disc space in the temporary location.
104 This cache method might only work on posix
106 Parameters
107 ----------
108 blocksize: int
109 How far to read ahead in numbers of bytes
110 fetcher: Fetcher
111 Function of the form f(start, end) which gets bytes from remote as
112 specified
113 size: int
114 How big this file is
115 location: str
116 Where to create the temporary file. If None, a temporary file is
117 created using tempfile.TemporaryFile().
118 blocks: set[int]
119 Set of block numbers that have already been fetched. If None, an empty
120 set is created.
121 multi_fetcher: MultiFetcher
122 Function of the form f([(start, end)]) which gets bytes from remote
123 as specified. This function is used to fetch multiple blocks at once.
124 If not specified, the fetcher function is used instead.
125 """
127 name = "mmap"
129 def __init__(
130 self,
131 blocksize: int,
132 fetcher: Fetcher,
133 size: int,
134 location: str | None = None,
135 blocks: set[int] | None = None,
136 multi_fetcher: MultiFetcher | None = None,
137 ) -> None:
138 super().__init__(blocksize, fetcher, size)
139 self.blocks = set() if blocks is None else blocks
140 self.location = location
141 self.multi_fetcher = multi_fetcher
142 self.cache = self._makefile()
144 def _makefile(self) -> mmap.mmap | bytearray:
145 import mmap
146 import tempfile
148 if self.size == 0:
149 return bytearray()
151 # posix version
152 if self.location is None or not os.path.exists(self.location):
153 if self.location is None:
154 fd = tempfile.TemporaryFile()
155 self.blocks = set()
156 else:
157 fd = open(self.location, "wb+")
158 fd.seek(self.size - 1)
159 fd.write(b"1")
160 fd.flush()
161 else:
162 fd = open(self.location, "r+b")
164 return mmap.mmap(fd.fileno(), self.size)
166 def _fetch(self, start: int | None, end: int | None) -> bytes:
167 logger.debug(f"MMap cache fetching {start}-{end}")
168 if start is None:
169 start = 0
170 if end is None:
171 end = self.size
172 if start >= self.size or start >= end:
173 return b""
174 start_block = start // self.blocksize
175 end_block = end // self.blocksize
176 block_range = range(start_block, end_block + 1)
177 # Determine which blocks need to be fetched. This sequence is sorted by construction.
178 need = (i for i in block_range if i not in self.blocks)
179 # Count the number of blocks already cached
180 self.hit_count += sum(1 for i in block_range if i in self.blocks)
182 ranges = []
184 # Consolidate needed blocks.
185 # Algorithm adapted from Python 2.x itertools documentation.
186 # We are grouping an enumerated sequence of blocks. By comparing when the difference
187 # between an ascending range (provided by enumerate) and the needed block numbers
188 # we can detect when the block number skips values. The key computes this difference.
189 # Whenever the difference changes, we know that we have previously cached block(s),
190 # and a new group is started. In other words, this algorithm neatly groups
191 # runs of consecutive block numbers so they can be fetched together.
192 for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]):
193 # Extract the blocks from the enumerated sequence
194 _blocks = tuple(map(itemgetter(1), _blocks))
195 # Compute start of first block
196 sstart = _blocks[0] * self.blocksize
197 # Compute the end of the last block. Last block may not be full size.
198 send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size)
200 # Fetch bytes (could be multiple consecutive blocks)
201 self.total_requested_bytes += send - sstart
202 logger.debug(
203 f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})"
204 )
205 ranges.append((sstart, send))
207 # Update set of cached blocks
208 self.blocks.update(_blocks)
209 # Update cache statistics with number of blocks we had to cache
210 self.miss_count += len(_blocks)
212 if not ranges:
213 return self.cache[start:end]
215 if self.multi_fetcher:
216 logger.debug(f"MMap get blocks {ranges}")
217 for idx, r in enumerate(self.multi_fetcher(ranges)):
218 (sstart, send) = ranges[idx]
219 logger.debug(f"MMap copy block ({sstart}-{send}")
220 self.cache[sstart:send] = r
221 else:
222 for sstart, send in ranges:
223 logger.debug(f"MMap get block ({sstart}-{send}")
224 self.cache[sstart:send] = self.fetcher(sstart, send)
226 return self.cache[start:end]
228 def __getstate__(self) -> dict[str, Any]:
229 state = self.__dict__.copy()
230 # Remove the unpicklable entries.
231 del state["cache"]
232 return state
234 def __setstate__(self, state: dict[str, Any]) -> None:
235 # Restore instance attributes
236 self.__dict__.update(state)
237 self.cache = self._makefile()
240class ReadAheadCache(BaseCache):
241 """Cache which reads only when we get beyond a block of data
243 This is a much simpler version of BytesCache, and does not attempt to
244 fill holes in the cache or keep fragments alive. It is best suited to
245 many small reads in a sequential order (e.g., reading lines from a file).
246 """
248 name = "readahead"
250 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
251 super().__init__(blocksize, fetcher, size)
252 self.cache = b""
253 self.start = 0
254 self.end = 0
256 def _fetch(self, start: int | None, end: int | None) -> bytes:
257 if start is None:
258 start = 0
259 if end is None or end > self.size:
260 end = self.size
261 if start >= self.size or start >= end:
262 return b""
263 l = end - start
264 if start >= self.start and end <= self.end:
265 # cache hit
266 self.hit_count += 1
267 return self.cache[start - self.start : end - self.start]
268 elif self.start <= start < self.end:
269 # partial hit
270 self.miss_count += 1
271 part = self.cache[start - self.start :]
272 l -= len(part)
273 start = self.end
274 else:
275 # miss
276 self.miss_count += 1
277 part = b""
278 end = min(self.size, end + self.blocksize)
279 self.total_requested_bytes += end - start
280 self.cache = self.fetcher(start, end) # new block replaces old
281 self.start = start
282 self.end = self.start + len(self.cache)
283 return part + self.cache[:l]
286class FirstChunkCache(BaseCache):
287 """Caches the first block of a file only
289 This may be useful for file types where the metadata is stored in the header,
290 but is randomly accessed.
291 """
293 name = "first"
295 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
296 if blocksize > size:
297 # this will buffer the whole thing
298 blocksize = size
299 super().__init__(blocksize, fetcher, size)
300 self.cache: bytes | None = None
302 def _fetch(self, start: int | None, end: int | None) -> bytes:
303 start = start or 0
304 if start > self.size:
305 logger.debug("FirstChunkCache: requested start > file size")
306 return b""
308 end = min(end, self.size)
310 if start < self.blocksize:
311 if self.cache is None:
312 self.miss_count += 1
313 if end > self.blocksize:
314 self.total_requested_bytes += end
315 data = self.fetcher(0, end)
316 self.cache = data[: self.blocksize]
317 return data[start:]
318 self.cache = self.fetcher(0, self.blocksize)
319 self.total_requested_bytes += self.blocksize
320 part = self.cache[start:end]
321 if end > self.blocksize:
322 self.total_requested_bytes += end - self.blocksize
323 part += self.fetcher(self.blocksize, end)
324 self.hit_count += 1
325 return part
326 else:
327 self.miss_count += 1
328 self.total_requested_bytes += end - start
329 return self.fetcher(start, end)
332class BlockCache(BaseCache):
333 """
334 Cache holding memory as a set of blocks.
336 Requests are only ever made ``blocksize`` at a time, and are
337 stored in an LRU cache. The least recently accessed block is
338 discarded when more than ``maxblocks`` are stored.
340 Parameters
341 ----------
342 blocksize : int
343 The number of bytes to store in each block.
344 Requests are only ever made for ``blocksize``, so this
345 should balance the overhead of making a request against
346 the granularity of the blocks.
347 fetcher : Callable
348 size : int
349 The total size of the file being cached.
350 maxblocks : int
351 The maximum number of blocks to cache for. The maximum memory
352 use for this cache is then ``blocksize * maxblocks``.
353 """
355 name = "blockcache"
357 def __init__(
358 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
359 ) -> None:
360 super().__init__(blocksize, fetcher, size)
361 self.nblocks = math.ceil(size / blocksize)
362 self.maxblocks = maxblocks
363 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block)
365 def cache_info(self):
366 """
367 The statistics on the block cache.
369 Returns
370 -------
371 NamedTuple
372 Returned directly from the LRU Cache used internally.
373 """
374 return self._fetch_block_cached.cache_info()
376 def __getstate__(self) -> dict[str, Any]:
377 state = self.__dict__
378 del state["_fetch_block_cached"]
379 return state
381 def __setstate__(self, state: dict[str, Any]) -> None:
382 self.__dict__.update(state)
383 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])(
384 self._fetch_block
385 )
387 def _fetch(self, start: int | None, end: int | None) -> bytes:
388 if start is None:
389 start = 0
390 if end is None:
391 end = self.size
392 if start >= self.size or start >= end:
393 return b""
395 # byte position -> block numbers
396 start_block_number = start // self.blocksize
397 end_block_number = end // self.blocksize
399 # these are cached, so safe to do multiple calls for the same start and end.
400 for block_number in range(start_block_number, end_block_number + 1):
401 self._fetch_block_cached(block_number)
403 return self._read_cache(
404 start,
405 end,
406 start_block_number=start_block_number,
407 end_block_number=end_block_number,
408 )
410 def _fetch_block(self, block_number: int) -> bytes:
411 """
412 Fetch the block of data for `block_number`.
413 """
414 if block_number > self.nblocks:
415 raise ValueError(
416 f"'block_number={block_number}' is greater than "
417 f"the number of blocks ({self.nblocks})"
418 )
420 start = block_number * self.blocksize
421 end = start + self.blocksize
422 self.total_requested_bytes += end - start
423 self.miss_count += 1
424 logger.info("BlockCache fetching block %d", block_number)
425 block_contents = super()._fetch(start, end)
426 return block_contents
428 def _read_cache(
429 self, start: int, end: int, start_block_number: int, end_block_number: int
430 ) -> bytes:
431 """
432 Read from our block cache.
434 Parameters
435 ----------
436 start, end : int
437 The start and end byte positions.
438 start_block_number, end_block_number : int
439 The start and end block numbers.
440 """
441 start_pos = start % self.blocksize
442 end_pos = end % self.blocksize
444 self.hit_count += 1
445 if start_block_number == end_block_number:
446 block: bytes = self._fetch_block_cached(start_block_number)
447 return block[start_pos:end_pos]
449 else:
450 # read from the initial
451 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
453 # intermediate blocks
454 # Note: it'd be nice to combine these into one big request. However
455 # that doesn't play nicely with our LRU cache.
456 out.extend(
457 map(
458 self._fetch_block_cached,
459 range(start_block_number + 1, end_block_number),
460 )
461 )
463 # final block
464 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
466 return b"".join(out)
469class BytesCache(BaseCache):
470 """Cache which holds data in a in-memory bytes object
472 Implements read-ahead by the block size, for semi-random reads progressing
473 through the file.
475 Parameters
476 ----------
477 trim: bool
478 As we read more data, whether to discard the start of the buffer when
479 we are more than a blocksize ahead of it.
480 """
482 name: ClassVar[str] = "bytes"
484 def __init__(
485 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True
486 ) -> None:
487 super().__init__(blocksize, fetcher, size)
488 self.cache = b""
489 self.start: int | None = None
490 self.end: int | None = None
491 self.trim = trim
493 def _fetch(self, start: int | None, end: int | None) -> bytes:
494 # TODO: only set start/end after fetch, in case it fails?
495 # is this where retry logic might go?
496 if start is None:
497 start = 0
498 if end is None:
499 end = self.size
500 if start >= self.size or start >= end:
501 return b""
502 if (
503 self.start is not None
504 and start >= self.start
505 and self.end is not None
506 and end < self.end
507 ):
508 # cache hit: we have all the required data
509 offset = start - self.start
510 self.hit_count += 1
511 return self.cache[offset : offset + end - start]
513 if self.blocksize:
514 bend = min(self.size, end + self.blocksize)
515 else:
516 bend = end
518 if bend == start or start > self.size:
519 return b""
521 if (self.start is None or start < self.start) and (
522 self.end is None or end > self.end
523 ):
524 # First read, or extending both before and after
525 self.total_requested_bytes += bend - start
526 self.miss_count += 1
527 self.cache = self.fetcher(start, bend)
528 self.start = start
529 else:
530 assert self.start is not None
531 assert self.end is not None
532 self.miss_count += 1
534 if start < self.start:
535 if self.end is None or self.end - end > self.blocksize:
536 self.total_requested_bytes += bend - start
537 self.cache = self.fetcher(start, bend)
538 self.start = start
539 else:
540 self.total_requested_bytes += self.start - start
541 new = self.fetcher(start, self.start)
542 self.start = start
543 self.cache = new + self.cache
544 elif self.end is not None and bend > self.end:
545 if self.end > self.size:
546 pass
547 elif end - self.end > self.blocksize:
548 self.total_requested_bytes += bend - start
549 self.cache = self.fetcher(start, bend)
550 self.start = start
551 else:
552 self.total_requested_bytes += bend - self.end
553 new = self.fetcher(self.end, bend)
554 self.cache = self.cache + new
556 self.end = self.start + len(self.cache)
557 offset = start - self.start
558 out = self.cache[offset : offset + end - start]
559 if self.trim:
560 num = (self.end - self.start) // (self.blocksize + 1)
561 if num > 1:
562 self.start += self.blocksize * num
563 self.cache = self.cache[self.blocksize * num :]
564 return out
566 def __len__(self) -> int:
567 return len(self.cache)
570class AllBytes(BaseCache):
571 """Cache entire contents of the file"""
573 name: ClassVar[str] = "all"
575 def __init__(
576 self,
577 blocksize: int | None = None,
578 fetcher: Fetcher | None = None,
579 size: int | None = None,
580 data: bytes | None = None,
581 ) -> None:
582 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type]
583 if data is None:
584 self.miss_count += 1
585 self.total_requested_bytes += self.size
586 data = self.fetcher(0, self.size)
587 self.data = data
589 def _fetch(self, start: int | None, stop: int | None) -> bytes:
590 self.hit_count += 1
591 return self.data[start:stop]
594class KnownPartsOfAFile(BaseCache):
595 """
596 Cache holding known file parts.
598 Parameters
599 ----------
600 blocksize: int
601 How far to read ahead in numbers of bytes
602 fetcher: func
603 Function of the form f(start, end) which gets bytes from remote as
604 specified
605 size: int
606 How big this file is
607 data: dict
608 A dictionary mapping explicit `(start, stop)` file-offset tuples
609 with known bytes.
610 strict: bool, default True
611 Whether to fetch reads that go beyond a known byte-range boundary.
612 If `False`, any read that ends outside a known part will be zero
613 padded. Note that zero padding will not be used for reads that
614 begin outside a known byte-range.
615 """
617 name: ClassVar[str] = "parts"
619 def __init__(
620 self,
621 blocksize: int,
622 fetcher: Fetcher,
623 size: int,
624 data: dict[tuple[int, int], bytes] | None = None,
625 strict: bool = True,
626 **_: Any,
627 ):
628 super().__init__(blocksize, fetcher, size)
629 self.strict = strict
631 # simple consolidation of contiguous blocks
632 if data:
633 old_offsets = sorted(data.keys())
634 offsets = [old_offsets[0]]
635 blocks = [data.pop(old_offsets[0])]
636 for start, stop in old_offsets[1:]:
637 start0, stop0 = offsets[-1]
638 if start == stop0:
639 offsets[-1] = (start0, stop)
640 blocks[-1] += data.pop((start, stop))
641 else:
642 offsets.append((start, stop))
643 blocks.append(data.pop((start, stop)))
645 self.data = dict(zip(offsets, blocks))
646 else:
647 self.data = {}
649 def _fetch(self, start: int | None, stop: int | None) -> bytes:
650 if start is None:
651 start = 0
652 if stop is None:
653 stop = self.size
655 out = b""
656 for (loc0, loc1), data in self.data.items():
657 # If self.strict=False, use zero-padded data
658 # for reads beyond the end of a "known" buffer
659 if loc0 <= start < loc1:
660 off = start - loc0
661 out = data[off : off + stop - start]
662 if not self.strict or loc0 <= stop <= loc1:
663 # The request is within a known range, or
664 # it begins within a known range, and we
665 # are allowed to pad reads beyond the
666 # buffer with zero
667 out += b"\x00" * (stop - start - len(out))
668 self.hit_count += 1
669 return out
670 else:
671 # The request ends outside a known range,
672 # and we are being "strict" about reads
673 # beyond the buffer
674 start = loc1
675 break
677 # We only get here if there is a request outside the
678 # known parts of the file. In an ideal world, this
679 # should never happen
680 if self.fetcher is None:
681 # We cannot fetch the data, so raise an error
682 raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ")
683 # We can fetch the data, but should warn the user
684 # that this may be slow
685 warnings.warn(
686 f"Read is outside the known file parts: {(start, stop)}. "
687 f"IO/caching performance may be poor!"
688 )
689 logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}")
690 self.total_requested_bytes += stop - start
691 self.miss_count += 1
692 return out + super()._fetch(start, stop)
695class UpdatableLRU(Generic[P, T]):
696 """
697 Custom implementation of LRU cache that allows updating keys
699 Used by BackgroudBlockCache
700 """
702 class CacheInfo(NamedTuple):
703 hits: int
704 misses: int
705 maxsize: int
706 currsize: int
708 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None:
709 self._cache: OrderedDict[Any, T] = collections.OrderedDict()
710 self._func = func
711 self._max_size = max_size
712 self._hits = 0
713 self._misses = 0
714 self._lock = threading.Lock()
716 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
717 if kwargs:
718 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}")
719 with self._lock:
720 if args in self._cache:
721 self._cache.move_to_end(args)
722 self._hits += 1
723 return self._cache[args]
725 result = self._func(*args, **kwargs)
727 with self._lock:
728 self._cache[args] = result
729 self._misses += 1
730 if len(self._cache) > self._max_size:
731 self._cache.popitem(last=False)
733 return result
735 def is_key_cached(self, *args: Any) -> bool:
736 with self._lock:
737 return args in self._cache
739 def add_key(self, result: T, *args: Any) -> None:
740 with self._lock:
741 self._cache[args] = result
742 if len(self._cache) > self._max_size:
743 self._cache.popitem(last=False)
745 def cache_info(self) -> UpdatableLRU.CacheInfo:
746 with self._lock:
747 return self.CacheInfo(
748 maxsize=self._max_size,
749 currsize=len(self._cache),
750 hits=self._hits,
751 misses=self._misses,
752 )
755class BackgroundBlockCache(BaseCache):
756 """
757 Cache holding memory as a set of blocks with pre-loading of
758 the next block in the background.
760 Requests are only ever made ``blocksize`` at a time, and are
761 stored in an LRU cache. The least recently accessed block is
762 discarded when more than ``maxblocks`` are stored. If the
763 next block is not in cache, it is loaded in a separate thread
764 in non-blocking way.
766 Parameters
767 ----------
768 blocksize : int
769 The number of bytes to store in each block.
770 Requests are only ever made for ``blocksize``, so this
771 should balance the overhead of making a request against
772 the granularity of the blocks.
773 fetcher : Callable
774 size : int
775 The total size of the file being cached.
776 maxblocks : int
777 The maximum number of blocks to cache for. The maximum memory
778 use for this cache is then ``blocksize * maxblocks``.
779 """
781 name: ClassVar[str] = "background"
783 def __init__(
784 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
785 ) -> None:
786 super().__init__(blocksize, fetcher, size)
787 self.nblocks = math.ceil(size / blocksize)
788 self.maxblocks = maxblocks
789 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks)
791 self._thread_executor = ThreadPoolExecutor(max_workers=1)
792 self._fetch_future_block_number: int | None = None
793 self._fetch_future: Future[bytes] | None = None
794 self._fetch_future_lock = threading.Lock()
796 def cache_info(self) -> UpdatableLRU.CacheInfo:
797 """
798 The statistics on the block cache.
800 Returns
801 -------
802 NamedTuple
803 Returned directly from the LRU Cache used internally.
804 """
805 return self._fetch_block_cached.cache_info()
807 def __getstate__(self) -> dict[str, Any]:
808 state = self.__dict__
809 del state["_fetch_block_cached"]
810 del state["_thread_executor"]
811 del state["_fetch_future_block_number"]
812 del state["_fetch_future"]
813 del state["_fetch_future_lock"]
814 return state
816 def __setstate__(self, state) -> None:
817 self.__dict__.update(state)
818 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"])
819 self._thread_executor = ThreadPoolExecutor(max_workers=1)
820 self._fetch_future_block_number = None
821 self._fetch_future = None
822 self._fetch_future_lock = threading.Lock()
824 def _fetch(self, start: int | None, end: int | None) -> bytes:
825 if start is None:
826 start = 0
827 if end is None:
828 end = self.size
829 if start >= self.size or start >= end:
830 return b""
832 # byte position -> block numbers
833 start_block_number = start // self.blocksize
834 end_block_number = end // self.blocksize
836 fetch_future_block_number = None
837 fetch_future = None
838 with self._fetch_future_lock:
839 # Background thread is running. Check we we can or must join it.
840 if self._fetch_future is not None:
841 assert self._fetch_future_block_number is not None
842 if self._fetch_future.done():
843 logger.info("BlockCache joined background fetch without waiting.")
844 self._fetch_block_cached.add_key(
845 self._fetch_future.result(), self._fetch_future_block_number
846 )
847 # Cleanup the fetch variables. Done with fetching the block.
848 self._fetch_future_block_number = None
849 self._fetch_future = None
850 else:
851 # Must join if we need the block for the current fetch
852 must_join = bool(
853 start_block_number
854 <= self._fetch_future_block_number
855 <= end_block_number
856 )
857 if must_join:
858 # Copy to the local variables to release lock
859 # before waiting for result
860 fetch_future_block_number = self._fetch_future_block_number
861 fetch_future = self._fetch_future
863 # Cleanup the fetch variables. Have a local copy.
864 self._fetch_future_block_number = None
865 self._fetch_future = None
867 # Need to wait for the future for the current read
868 if fetch_future is not None:
869 logger.info("BlockCache waiting for background fetch.")
870 # Wait until result and put it in cache
871 self._fetch_block_cached.add_key(
872 fetch_future.result(), fetch_future_block_number
873 )
875 # these are cached, so safe to do multiple calls for the same start and end.
876 for block_number in range(start_block_number, end_block_number + 1):
877 self._fetch_block_cached(block_number)
879 # fetch next block in the background if nothing is running in the background,
880 # the block is within file and it is not already cached
881 end_block_plus_1 = end_block_number + 1
882 with self._fetch_future_lock:
883 if (
884 self._fetch_future is None
885 and end_block_plus_1 <= self.nblocks
886 and not self._fetch_block_cached.is_key_cached(end_block_plus_1)
887 ):
888 self._fetch_future_block_number = end_block_plus_1
889 self._fetch_future = self._thread_executor.submit(
890 self._fetch_block, end_block_plus_1, "async"
891 )
893 return self._read_cache(
894 start,
895 end,
896 start_block_number=start_block_number,
897 end_block_number=end_block_number,
898 )
900 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes:
901 """
902 Fetch the block of data for `block_number`.
903 """
904 if block_number > self.nblocks:
905 raise ValueError(
906 f"'block_number={block_number}' is greater than "
907 f"the number of blocks ({self.nblocks})"
908 )
910 start = block_number * self.blocksize
911 end = start + self.blocksize
912 logger.info("BlockCache fetching block (%s) %d", log_info, block_number)
913 self.total_requested_bytes += end - start
914 self.miss_count += 1
915 block_contents = super()._fetch(start, end)
916 return block_contents
918 def _read_cache(
919 self, start: int, end: int, start_block_number: int, end_block_number: int
920 ) -> bytes:
921 """
922 Read from our block cache.
924 Parameters
925 ----------
926 start, end : int
927 The start and end byte positions.
928 start_block_number, end_block_number : int
929 The start and end block numbers.
930 """
931 start_pos = start % self.blocksize
932 end_pos = end % self.blocksize
934 # kind of pointless to count this as a hit, but it is
935 self.hit_count += 1
937 if start_block_number == end_block_number:
938 block = self._fetch_block_cached(start_block_number)
939 return block[start_pos:end_pos]
941 else:
942 # read from the initial
943 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
945 # intermediate blocks
946 # Note: it'd be nice to combine these into one big request. However
947 # that doesn't play nicely with our LRU cache.
948 out.extend(
949 map(
950 self._fetch_block_cached,
951 range(start_block_number + 1, end_block_number),
952 )
953 )
955 # final block
956 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
958 return b"".join(out)
961caches: dict[str | None, type[BaseCache]] = {
962 # one custom case
963 None: BaseCache,
964}
967def register_cache(cls: type[BaseCache], clobber: bool = False) -> None:
968 """'Register' cache implementation.
970 Parameters
971 ----------
972 clobber: bool, optional
973 If set to True (default is False) - allow to overwrite existing
974 entry.
976 Raises
977 ------
978 ValueError
979 """
980 name = cls.name
981 if not clobber and name in caches:
982 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}")
983 caches[name] = cls
986for c in (
987 BaseCache,
988 MMapCache,
989 BytesCache,
990 ReadAheadCache,
991 BlockCache,
992 FirstChunkCache,
993 AllBytes,
994 KnownPartsOfAFile,
995 BackgroundBlockCache,
996):
997 register_cache(c)