Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/caching.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections
4import functools
5import logging
6import math
7import os
8import threading
9from collections import OrderedDict
10from collections.abc import Callable
11from concurrent.futures import Future, ThreadPoolExecutor
12from itertools import groupby
13from operator import itemgetter
14from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, TypeVar
16if TYPE_CHECKING:
17 import mmap
19 from typing_extensions import ParamSpec
21 P = ParamSpec("P")
22else:
23 P = TypeVar("P")
25T = TypeVar("T")
28logger = logging.getLogger("fsspec")
30Fetcher = Callable[[int, int], bytes] # Maps (start, end) to bytes
31MultiFetcher = Callable[[list[int, int]], bytes] # Maps [(start, end)] to bytes
34class BaseCache:
35 """Pass-though cache: doesn't keep anything, calls every time
37 Acts as base class for other cachers
39 Parameters
40 ----------
41 blocksize: int
42 How far to read ahead in numbers of bytes
43 fetcher: func
44 Function of the form f(start, end) which gets bytes from remote as
45 specified
46 size: int
47 How big this file is
48 """
50 name: ClassVar[str] = "none"
52 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
53 self.blocksize = blocksize
54 self.nblocks = 0
55 self.fetcher = fetcher
56 self.size = size
57 self.hit_count = 0
58 self.miss_count = 0
59 # the bytes that we actually requested
60 self.total_requested_bytes = 0
62 def _fetch(self, start: int | None, stop: int | None) -> bytes:
63 if start is None:
64 start = 0
65 if stop is None:
66 stop = self.size
67 if start >= self.size or start >= stop:
68 return b""
69 return self.fetcher(start, stop)
71 def _reset_stats(self) -> None:
72 """Reset hit and miss counts for a more ganular report e.g. by file."""
73 self.hit_count = 0
74 self.miss_count = 0
75 self.total_requested_bytes = 0
77 def _log_stats(self) -> str:
78 """Return a formatted string of the cache statistics."""
79 if self.hit_count == 0 and self.miss_count == 0:
80 # a cache that does nothing, this is for logs only
81 return ""
82 return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes"
84 def __repr__(self) -> str:
85 # TODO: use rich for better formatting
86 return f"""
87 <{self.__class__.__name__}:
88 block size : {self.blocksize}
89 block count : {self.nblocks}
90 file size : {self.size}
91 cache hits : {self.hit_count}
92 cache misses: {self.miss_count}
93 total requested bytes: {self.total_requested_bytes}>
94 """
97class MMapCache(BaseCache):
98 """memory-mapped sparse file cache
100 Opens temporary file, which is filled blocks-wise when data is requested.
101 Ensure there is enough disc space in the temporary location.
103 This cache method might only work on posix
105 Parameters
106 ----------
107 blocksize: int
108 How far to read ahead in numbers of bytes
109 fetcher: Fetcher
110 Function of the form f(start, end) which gets bytes from remote as
111 specified
112 size: int
113 How big this file is
114 location: str
115 Where to create the temporary file. If None, a temporary file is
116 created using tempfile.TemporaryFile().
117 blocks: set[int]
118 Set of block numbers that have already been fetched. If None, an empty
119 set is created.
120 multi_fetcher: MultiFetcher
121 Function of the form f([(start, end)]) which gets bytes from remote
122 as specified. This function is used to fetch multiple blocks at once.
123 If not specified, the fetcher function is used instead.
124 """
126 name = "mmap"
128 def __init__(
129 self,
130 blocksize: int,
131 fetcher: Fetcher,
132 size: int,
133 location: str | None = None,
134 blocks: set[int] | None = None,
135 multi_fetcher: MultiFetcher | None = None,
136 ) -> None:
137 super().__init__(blocksize, fetcher, size)
138 self.blocks = set() if blocks is None else blocks
139 self.location = location
140 self.multi_fetcher = multi_fetcher
141 self.cache = self._makefile()
143 def _makefile(self) -> mmap.mmap | bytearray:
144 import mmap
145 import tempfile
147 if self.size == 0:
148 return bytearray()
150 # posix version
151 if self.location is None or not os.path.exists(self.location):
152 if self.location is None:
153 fd = tempfile.TemporaryFile()
154 self.blocks = set()
155 else:
156 fd = open(self.location, "wb+")
157 fd.seek(self.size - 1)
158 fd.write(b"1")
159 fd.flush()
160 else:
161 fd = open(self.location, "r+b")
163 return mmap.mmap(fd.fileno(), self.size)
165 def _fetch(self, start: int | None, end: int | None) -> bytes:
166 logger.debug(f"MMap cache fetching {start}-{end}")
167 if start is None:
168 start = 0
169 if end is None:
170 end = self.size
171 if start >= self.size or start >= end:
172 return b""
173 start_block = start // self.blocksize
174 end_block = end // self.blocksize
175 block_range = range(start_block, end_block + 1)
176 # Determine which blocks need to be fetched. This sequence is sorted by construction.
177 need = (i for i in block_range if i not in self.blocks)
178 # Count the number of blocks already cached
179 self.hit_count += sum(1 for i in block_range if i in self.blocks)
181 ranges = []
183 # Consolidate needed blocks.
184 # Algorithm adapted from Python 2.x itertools documentation.
185 # We are grouping an enumerated sequence of blocks. By comparing when the difference
186 # between an ascending range (provided by enumerate) and the needed block numbers
187 # we can detect when the block number skips values. The key computes this difference.
188 # Whenever the difference changes, we know that we have previously cached block(s),
189 # and a new group is started. In other words, this algorithm neatly groups
190 # runs of consecutive block numbers so they can be fetched together.
191 for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]):
192 # Extract the blocks from the enumerated sequence
193 _blocks = tuple(map(itemgetter(1), _blocks))
194 # Compute start of first block
195 sstart = _blocks[0] * self.blocksize
196 # Compute the end of the last block. Last block may not be full size.
197 send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size)
199 # Fetch bytes (could be multiple consecutive blocks)
200 self.total_requested_bytes += send - sstart
201 logger.debug(
202 f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})"
203 )
204 ranges.append((sstart, send))
206 # Update set of cached blocks
207 self.blocks.update(_blocks)
208 # Update cache statistics with number of blocks we had to cache
209 self.miss_count += len(_blocks)
211 if not ranges:
212 return self.cache[start:end]
214 if self.multi_fetcher:
215 logger.debug(f"MMap get blocks {ranges}")
216 for idx, r in enumerate(self.multi_fetcher(ranges)):
217 (sstart, send) = ranges[idx]
218 logger.debug(f"MMap copy block ({sstart}-{send}")
219 self.cache[sstart:send] = r
220 else:
221 for sstart, send in ranges:
222 logger.debug(f"MMap get block ({sstart}-{send}")
223 self.cache[sstart:send] = self.fetcher(sstart, send)
225 return self.cache[start:end]
227 def __getstate__(self) -> dict[str, Any]:
228 state = self.__dict__.copy()
229 # Remove the unpicklable entries.
230 del state["cache"]
231 return state
233 def __setstate__(self, state: dict[str, Any]) -> None:
234 # Restore instance attributes
235 self.__dict__.update(state)
236 self.cache = self._makefile()
239class ReadAheadCache(BaseCache):
240 """Cache which reads only when we get beyond a block of data
242 This is a much simpler version of BytesCache, and does not attempt to
243 fill holes in the cache or keep fragments alive. It is best suited to
244 many small reads in a sequential order (e.g., reading lines from a file).
245 """
247 name = "readahead"
249 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
250 super().__init__(blocksize, fetcher, size)
251 self.cache = b""
252 self.start = 0
253 self.end = 0
255 def _fetch(self, start: int | None, end: int | None) -> bytes:
256 if start is None:
257 start = 0
258 if end is None or end > self.size:
259 end = self.size
260 if start >= self.size or start >= end:
261 return b""
262 l = end - start
263 if start >= self.start and end <= self.end:
264 # cache hit
265 self.hit_count += 1
266 return self.cache[start - self.start : end - self.start]
267 elif self.start <= start < self.end:
268 # partial hit
269 self.miss_count += 1
270 part = self.cache[start - self.start :]
271 l -= len(part)
272 start = self.end
273 else:
274 # miss
275 self.miss_count += 1
276 part = b""
277 end = min(self.size, end + self.blocksize)
278 self.total_requested_bytes += end - start
279 self.cache = self.fetcher(start, end) # new block replaces old
280 self.start = start
281 self.end = self.start + len(self.cache)
282 return part + self.cache[:l]
285class FirstChunkCache(BaseCache):
286 """Caches the first block of a file only
288 This may be useful for file types where the metadata is stored in the header,
289 but is randomly accessed.
290 """
292 name = "first"
294 def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None:
295 if blocksize > size:
296 # this will buffer the whole thing
297 blocksize = size
298 super().__init__(blocksize, fetcher, size)
299 self.cache: bytes | None = None
301 def _fetch(self, start: int | None, end: int | None) -> bytes:
302 start = start or 0
303 if start > self.size:
304 logger.debug("FirstChunkCache: requested start > file size")
305 return b""
307 end = min(end, self.size)
309 if start < self.blocksize:
310 if self.cache is None:
311 self.miss_count += 1
312 if end > self.blocksize:
313 self.total_requested_bytes += end
314 data = self.fetcher(0, end)
315 self.cache = data[: self.blocksize]
316 return data[start:]
317 self.cache = self.fetcher(0, self.blocksize)
318 self.total_requested_bytes += self.blocksize
319 part = self.cache[start:end]
320 if end > self.blocksize:
321 self.total_requested_bytes += end - self.blocksize
322 part += self.fetcher(self.blocksize, end)
323 self.hit_count += 1
324 return part
325 else:
326 self.miss_count += 1
327 self.total_requested_bytes += end - start
328 return self.fetcher(start, end)
331class BlockCache(BaseCache):
332 """
333 Cache holding memory as a set of blocks.
335 Requests are only ever made ``blocksize`` at a time, and are
336 stored in an LRU cache. The least recently accessed block is
337 discarded when more than ``maxblocks`` are stored.
339 Parameters
340 ----------
341 blocksize : int
342 The number of bytes to store in each block.
343 Requests are only ever made for ``blocksize``, so this
344 should balance the overhead of making a request against
345 the granularity of the blocks.
346 fetcher : Callable
347 size : int
348 The total size of the file being cached.
349 maxblocks : int
350 The maximum number of blocks to cache for. The maximum memory
351 use for this cache is then ``blocksize * maxblocks``.
352 """
354 name = "blockcache"
356 def __init__(
357 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
358 ) -> None:
359 super().__init__(blocksize, fetcher, size)
360 self.nblocks = math.ceil(size / blocksize)
361 self.maxblocks = maxblocks
362 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block)
364 def cache_info(self):
365 """
366 The statistics on the block cache.
368 Returns
369 -------
370 NamedTuple
371 Returned directly from the LRU Cache used internally.
372 """
373 return self._fetch_block_cached.cache_info()
375 def __getstate__(self) -> dict[str, Any]:
376 state = self.__dict__
377 del state["_fetch_block_cached"]
378 return state
380 def __setstate__(self, state: dict[str, Any]) -> None:
381 self.__dict__.update(state)
382 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])(
383 self._fetch_block
384 )
386 def _fetch(self, start: int | None, end: int | None) -> bytes:
387 if start is None:
388 start = 0
389 if end is None:
390 end = self.size
391 if start >= self.size or start >= end:
392 return b""
394 # byte position -> block numbers
395 start_block_number = start // self.blocksize
396 end_block_number = end // self.blocksize
398 # these are cached, so safe to do multiple calls for the same start and end.
399 for block_number in range(start_block_number, end_block_number + 1):
400 self._fetch_block_cached(block_number)
402 return self._read_cache(
403 start,
404 end,
405 start_block_number=start_block_number,
406 end_block_number=end_block_number,
407 )
409 def _fetch_block(self, block_number: int) -> bytes:
410 """
411 Fetch the block of data for `block_number`.
412 """
413 if block_number > self.nblocks:
414 raise ValueError(
415 f"'block_number={block_number}' is greater than "
416 f"the number of blocks ({self.nblocks})"
417 )
419 start = block_number * self.blocksize
420 end = start + self.blocksize
421 self.total_requested_bytes += end - start
422 self.miss_count += 1
423 logger.info("BlockCache fetching block %d", block_number)
424 block_contents = super()._fetch(start, end)
425 return block_contents
427 def _read_cache(
428 self, start: int, end: int, start_block_number: int, end_block_number: int
429 ) -> bytes:
430 """
431 Read from our block cache.
433 Parameters
434 ----------
435 start, end : int
436 The start and end byte positions.
437 start_block_number, end_block_number : int
438 The start and end block numbers.
439 """
440 start_pos = start % self.blocksize
441 end_pos = end % self.blocksize
443 self.hit_count += 1
444 if start_block_number == end_block_number:
445 block: bytes = self._fetch_block_cached(start_block_number)
446 return block[start_pos:end_pos]
448 else:
449 # read from the initial
450 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
452 # intermediate blocks
453 # Note: it'd be nice to combine these into one big request. However
454 # that doesn't play nicely with our LRU cache.
455 out.extend(
456 map(
457 self._fetch_block_cached,
458 range(start_block_number + 1, end_block_number),
459 )
460 )
462 # final block
463 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
465 return b"".join(out)
468class BytesCache(BaseCache):
469 """Cache which holds data in a in-memory bytes object
471 Implements read-ahead by the block size, for semi-random reads progressing
472 through the file.
474 Parameters
475 ----------
476 trim: bool
477 As we read more data, whether to discard the start of the buffer when
478 we are more than a blocksize ahead of it.
479 """
481 name: ClassVar[str] = "bytes"
483 def __init__(
484 self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True
485 ) -> None:
486 super().__init__(blocksize, fetcher, size)
487 self.cache = b""
488 self.start: int | None = None
489 self.end: int | None = None
490 self.trim = trim
492 def _fetch(self, start: int | None, end: int | None) -> bytes:
493 # TODO: only set start/end after fetch, in case it fails?
494 # is this where retry logic might go?
495 if start is None:
496 start = 0
497 if end is None:
498 end = self.size
499 if start >= self.size or start >= end:
500 return b""
501 if (
502 self.start is not None
503 and start >= self.start
504 and self.end is not None
505 and end < self.end
506 ):
507 # cache hit: we have all the required data
508 offset = start - self.start
509 self.hit_count += 1
510 return self.cache[offset : offset + end - start]
512 if self.blocksize:
513 bend = min(self.size, end + self.blocksize)
514 else:
515 bend = end
517 if bend == start or start > self.size:
518 return b""
520 if (self.start is None or start < self.start) and (
521 self.end is None or end > self.end
522 ):
523 # First read, or extending both before and after
524 self.total_requested_bytes += bend - start
525 self.miss_count += 1
526 self.cache = self.fetcher(start, bend)
527 self.start = start
528 else:
529 assert self.start is not None
530 assert self.end is not None
531 self.miss_count += 1
533 if start < self.start:
534 if self.end is None or self.end - end > self.blocksize:
535 self.total_requested_bytes += bend - start
536 self.cache = self.fetcher(start, bend)
537 self.start = start
538 else:
539 self.total_requested_bytes += self.start - start
540 new = self.fetcher(start, self.start)
541 self.start = start
542 self.cache = new + self.cache
543 elif self.end is not None and bend > self.end:
544 if self.end > self.size:
545 pass
546 elif end - self.end > self.blocksize:
547 self.total_requested_bytes += bend - start
548 self.cache = self.fetcher(start, bend)
549 self.start = start
550 else:
551 self.total_requested_bytes += bend - self.end
552 new = self.fetcher(self.end, bend)
553 self.cache = self.cache + new
555 self.end = self.start + len(self.cache)
556 offset = start - self.start
557 out = self.cache[offset : offset + end - start]
558 if self.trim:
559 num = (self.end - self.start) // (self.blocksize + 1)
560 if num > 1:
561 self.start += self.blocksize * num
562 self.cache = self.cache[self.blocksize * num :]
563 return out
565 def __len__(self) -> int:
566 return len(self.cache)
569class AllBytes(BaseCache):
570 """Cache entire contents of the file"""
572 name: ClassVar[str] = "all"
574 def __init__(
575 self,
576 blocksize: int | None = None,
577 fetcher: Fetcher | None = None,
578 size: int | None = None,
579 data: bytes | None = None,
580 ) -> None:
581 super().__init__(blocksize, fetcher, size) # type: ignore[arg-type]
582 if data is None:
583 self.miss_count += 1
584 self.total_requested_bytes += self.size
585 data = self.fetcher(0, self.size)
586 self.data = data
588 def _fetch(self, start: int | None, stop: int | None) -> bytes:
589 self.hit_count += 1
590 return self.data[start:stop]
593class KnownPartsOfAFile(BaseCache):
594 """
595 Cache holding known file parts.
597 Parameters
598 ----------
599 blocksize: int
600 How far to read ahead in numbers of bytes
601 fetcher: func
602 Function of the form f(start, end) which gets bytes from remote as
603 specified
604 size: int
605 How big this file is
606 data: dict
607 A dictionary mapping explicit `(start, stop)` file-offset tuples
608 with known bytes.
609 strict: bool, default True
610 Whether to fetch reads that go beyond a known byte-range boundary.
611 If `False`, any read that ends outside a known part will be zero
612 padded. Note that zero padding will not be used for reads that
613 begin outside a known byte-range.
614 """
616 name: ClassVar[str] = "parts"
618 def __init__(
619 self,
620 blocksize: int,
621 fetcher: Fetcher,
622 size: int,
623 data: dict[tuple[int, int], bytes] | None = None,
624 strict: bool = False,
625 **_: Any,
626 ):
627 super().__init__(blocksize, fetcher, size)
628 self.strict = strict
630 # simple consolidation of contiguous blocks
631 if data:
632 old_offsets = sorted(data.keys())
633 offsets = [old_offsets[0]]
634 blocks = [data.pop(old_offsets[0])]
635 for start, stop in old_offsets[1:]:
636 start0, stop0 = offsets[-1]
637 if start == stop0:
638 offsets[-1] = (start0, stop)
639 blocks[-1] += data.pop((start, stop))
640 else:
641 offsets.append((start, stop))
642 blocks.append(data.pop((start, stop)))
644 self.data = dict(zip(offsets, blocks))
645 else:
646 self.data = {}
648 @property
649 def size(self):
650 return sum(_[1] - _[0] for _ in self.data)
652 @size.setter
653 def size(self, value):
654 pass
656 @property
657 def nblocks(self):
658 return len(self.data)
660 @nblocks.setter
661 def nblocks(self, value):
662 pass
664 def _fetch(self, start: int | None, stop: int | None) -> bytes:
665 if start is None:
666 start = 0
667 if stop is None:
668 stop = self.size
669 self.total_requested_bytes += stop - start
671 out = b""
672 started = False
673 loc_old = 0
674 for loc0, loc1 in sorted(self.data):
675 if (loc0 <= start < loc1) and (loc0 <= stop <= loc1):
676 # entirely within the block
677 off = start - loc0
678 self.hit_count += 1
679 return self.data[(loc0, loc1)][off : off + stop - start]
680 if stop <= loc0:
681 break
682 if started and loc0 > loc_old:
683 # a gap where we need data
684 self.miss_count += 1
685 if self.strict:
686 raise ValueError
687 out += b"\x00" * (loc0 - loc_old)
688 if loc0 <= start < loc1:
689 # found the start
690 self.hit_count += 1
691 off = start - loc0
692 out = self.data[(loc0, loc1)][off : off + stop - start]
693 started = True
694 elif start < loc0 and stop > loc1:
695 # the whole block
696 self.hit_count += 1
697 out += self.data[(loc0, loc1)]
698 elif loc0 <= stop <= loc1:
699 # end block
700 self.hit_count += 1
701 return out + self.data[(loc0, loc1)][: stop - loc0]
702 loc_old = loc1
703 self.miss_count += 1
704 if started and not self.strict:
705 return out + b"\x00" * (stop - loc_old)
706 raise ValueError
709class UpdatableLRU(Generic[P, T]):
710 """
711 Custom implementation of LRU cache that allows updating keys
713 Used by BackgroudBlockCache
714 """
716 class CacheInfo(NamedTuple):
717 hits: int
718 misses: int
719 maxsize: int
720 currsize: int
722 def __init__(self, func: Callable[P, T], max_size: int = 128) -> None:
723 self._cache: OrderedDict[Any, T] = collections.OrderedDict()
724 self._func = func
725 self._max_size = max_size
726 self._hits = 0
727 self._misses = 0
728 self._lock = threading.Lock()
730 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
731 if kwargs:
732 raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}")
733 with self._lock:
734 if args in self._cache:
735 self._cache.move_to_end(args)
736 self._hits += 1
737 return self._cache[args]
739 result = self._func(*args, **kwargs)
741 with self._lock:
742 self._cache[args] = result
743 self._misses += 1
744 if len(self._cache) > self._max_size:
745 self._cache.popitem(last=False)
747 return result
749 def is_key_cached(self, *args: Any) -> bool:
750 with self._lock:
751 return args in self._cache
753 def add_key(self, result: T, *args: Any) -> None:
754 with self._lock:
755 self._cache[args] = result
756 if len(self._cache) > self._max_size:
757 self._cache.popitem(last=False)
759 def cache_info(self) -> UpdatableLRU.CacheInfo:
760 with self._lock:
761 return self.CacheInfo(
762 maxsize=self._max_size,
763 currsize=len(self._cache),
764 hits=self._hits,
765 misses=self._misses,
766 )
769class BackgroundBlockCache(BaseCache):
770 """
771 Cache holding memory as a set of blocks with pre-loading of
772 the next block in the background.
774 Requests are only ever made ``blocksize`` at a time, and are
775 stored in an LRU cache. The least recently accessed block is
776 discarded when more than ``maxblocks`` are stored. If the
777 next block is not in cache, it is loaded in a separate thread
778 in non-blocking way.
780 Parameters
781 ----------
782 blocksize : int
783 The number of bytes to store in each block.
784 Requests are only ever made for ``blocksize``, so this
785 should balance the overhead of making a request against
786 the granularity of the blocks.
787 fetcher : Callable
788 size : int
789 The total size of the file being cached.
790 maxblocks : int
791 The maximum number of blocks to cache for. The maximum memory
792 use for this cache is then ``blocksize * maxblocks``.
793 """
795 name: ClassVar[str] = "background"
797 def __init__(
798 self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32
799 ) -> None:
800 super().__init__(blocksize, fetcher, size)
801 self.nblocks = math.ceil(size / blocksize)
802 self.maxblocks = maxblocks
803 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks)
805 self._thread_executor = ThreadPoolExecutor(max_workers=1)
806 self._fetch_future_block_number: int | None = None
807 self._fetch_future: Future[bytes] | None = None
808 self._fetch_future_lock = threading.Lock()
810 def cache_info(self) -> UpdatableLRU.CacheInfo:
811 """
812 The statistics on the block cache.
814 Returns
815 -------
816 NamedTuple
817 Returned directly from the LRU Cache used internally.
818 """
819 return self._fetch_block_cached.cache_info()
821 def __getstate__(self) -> dict[str, Any]:
822 state = self.__dict__
823 del state["_fetch_block_cached"]
824 del state["_thread_executor"]
825 del state["_fetch_future_block_number"]
826 del state["_fetch_future"]
827 del state["_fetch_future_lock"]
828 return state
830 def __setstate__(self, state) -> None:
831 self.__dict__.update(state)
832 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"])
833 self._thread_executor = ThreadPoolExecutor(max_workers=1)
834 self._fetch_future_block_number = None
835 self._fetch_future = None
836 self._fetch_future_lock = threading.Lock()
838 def _fetch(self, start: int | None, end: int | None) -> bytes:
839 if start is None:
840 start = 0
841 if end is None:
842 end = self.size
843 if start >= self.size or start >= end:
844 return b""
846 # byte position -> block numbers
847 start_block_number = start // self.blocksize
848 end_block_number = end // self.blocksize
850 fetch_future_block_number = None
851 fetch_future = None
852 with self._fetch_future_lock:
853 # Background thread is running. Check we we can or must join it.
854 if self._fetch_future is not None:
855 assert self._fetch_future_block_number is not None
856 if self._fetch_future.done():
857 logger.info("BlockCache joined background fetch without waiting.")
858 self._fetch_block_cached.add_key(
859 self._fetch_future.result(), self._fetch_future_block_number
860 )
861 # Cleanup the fetch variables. Done with fetching the block.
862 self._fetch_future_block_number = None
863 self._fetch_future = None
864 else:
865 # Must join if we need the block for the current fetch
866 must_join = bool(
867 start_block_number
868 <= self._fetch_future_block_number
869 <= end_block_number
870 )
871 if must_join:
872 # Copy to the local variables to release lock
873 # before waiting for result
874 fetch_future_block_number = self._fetch_future_block_number
875 fetch_future = self._fetch_future
877 # Cleanup the fetch variables. Have a local copy.
878 self._fetch_future_block_number = None
879 self._fetch_future = None
881 # Need to wait for the future for the current read
882 if fetch_future is not None:
883 logger.info("BlockCache waiting for background fetch.")
884 # Wait until result and put it in cache
885 self._fetch_block_cached.add_key(
886 fetch_future.result(), fetch_future_block_number
887 )
889 # these are cached, so safe to do multiple calls for the same start and end.
890 for block_number in range(start_block_number, end_block_number + 1):
891 self._fetch_block_cached(block_number)
893 # fetch next block in the background if nothing is running in the background,
894 # the block is within file and it is not already cached
895 end_block_plus_1 = end_block_number + 1
896 with self._fetch_future_lock:
897 if (
898 self._fetch_future is None
899 and end_block_plus_1 <= self.nblocks
900 and not self._fetch_block_cached.is_key_cached(end_block_plus_1)
901 ):
902 self._fetch_future_block_number = end_block_plus_1
903 self._fetch_future = self._thread_executor.submit(
904 self._fetch_block, end_block_plus_1, "async"
905 )
907 return self._read_cache(
908 start,
909 end,
910 start_block_number=start_block_number,
911 end_block_number=end_block_number,
912 )
914 def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes:
915 """
916 Fetch the block of data for `block_number`.
917 """
918 if block_number > self.nblocks:
919 raise ValueError(
920 f"'block_number={block_number}' is greater than "
921 f"the number of blocks ({self.nblocks})"
922 )
924 start = block_number * self.blocksize
925 end = start + self.blocksize
926 logger.info("BlockCache fetching block (%s) %d", log_info, block_number)
927 self.total_requested_bytes += end - start
928 self.miss_count += 1
929 block_contents = super()._fetch(start, end)
930 return block_contents
932 def _read_cache(
933 self, start: int, end: int, start_block_number: int, end_block_number: int
934 ) -> bytes:
935 """
936 Read from our block cache.
938 Parameters
939 ----------
940 start, end : int
941 The start and end byte positions.
942 start_block_number, end_block_number : int
943 The start and end block numbers.
944 """
945 start_pos = start % self.blocksize
946 end_pos = end % self.blocksize
948 # kind of pointless to count this as a hit, but it is
949 self.hit_count += 1
951 if start_block_number == end_block_number:
952 block = self._fetch_block_cached(start_block_number)
953 return block[start_pos:end_pos]
955 else:
956 # read from the initial
957 out = [self._fetch_block_cached(start_block_number)[start_pos:]]
959 # intermediate blocks
960 # Note: it'd be nice to combine these into one big request. However
961 # that doesn't play nicely with our LRU cache.
962 out.extend(
963 map(
964 self._fetch_block_cached,
965 range(start_block_number + 1, end_block_number),
966 )
967 )
969 # final block
970 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
972 return b"".join(out)
975caches: dict[str | None, type[BaseCache]] = {
976 # one custom case
977 None: BaseCache,
978}
981def register_cache(cls: type[BaseCache], clobber: bool = False) -> None:
982 """'Register' cache implementation.
984 Parameters
985 ----------
986 clobber: bool, optional
987 If set to True (default is False) - allow to overwrite existing
988 entry.
990 Raises
991 ------
992 ValueError
993 """
994 name = cls.name
995 if not clobber and name in caches:
996 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}")
997 caches[name] = cls
1000for c in (
1001 BaseCache,
1002 MMapCache,
1003 BytesCache,
1004 ReadAheadCache,
1005 BlockCache,
1006 FirstChunkCache,
1007 AllBytes,
1008 KnownPartsOfAFile,
1009 BackgroundBlockCache,
1010):
1011 register_cache(c)