Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/caching.py: 19%
389 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1import collections
2import functools
3import logging
4import math
5import os
6import threading
7import warnings
8from concurrent.futures import ThreadPoolExecutor
10logger = logging.getLogger("fsspec")
13class BaseCache(object):
14 """Pass-though cache: doesn't keep anything, calls every time
16 Acts as base class for other cachers
18 Parameters
19 ----------
20 blocksize: int
21 How far to read ahead in numbers of bytes
22 fetcher: func
23 Function of the form f(start, end) which gets bytes from remote as
24 specified
25 size: int
26 How big this file is
27 """
29 name = "none"
31 def __init__(self, blocksize, fetcher, size):
32 self.blocksize = blocksize
33 self.fetcher = fetcher
34 self.size = size
36 def _fetch(self, start, stop):
37 if start is None:
38 start = 0
39 if stop is None:
40 stop = self.size
41 if start >= self.size or start >= stop:
42 return b""
43 return self.fetcher(start, stop)
46class MMapCache(BaseCache):
47 """memory-mapped sparse file cache
49 Opens temporary file, which is filled blocks-wise when data is requested.
50 Ensure there is enough disc space in the temporary location.
52 This cache method might only work on posix
53 """
55 name = "mmap"
57 def __init__(self, blocksize, fetcher, size, location=None, blocks=None):
58 super().__init__(blocksize, fetcher, size)
59 self.blocks = set() if blocks is None else blocks
60 self.location = location
61 self.cache = self._makefile()
63 def _makefile(self):
64 import mmap
65 import tempfile
67 if self.size == 0:
68 return bytearray()
70 # posix version
71 if self.location is None or not os.path.exists(self.location):
72 if self.location is None:
73 fd = tempfile.TemporaryFile()
74 self.blocks = set()
75 else:
76 fd = open(self.location, "wb+")
77 fd.seek(self.size - 1)
78 fd.write(b"1")
79 fd.flush()
80 else:
81 fd = open(self.location, "rb+")
83 return mmap.mmap(fd.fileno(), self.size)
85 def _fetch(self, start, end):
86 logger.debug(f"MMap cache fetching {start}-{end}")
87 if start is None:
88 start = 0
89 if end is None:
90 end = self.size
91 if start >= self.size or start >= end:
92 return b""
93 start_block = start // self.blocksize
94 end_block = end // self.blocksize
95 need = [i for i in range(start_block, end_block + 1) if i not in self.blocks]
96 while need:
97 # TODO: not a for loop so we can consolidate blocks later to
98 # make fewer fetch calls; this could be parallel
99 i = need.pop(0)
100 sstart = i * self.blocksize
101 send = min(sstart + self.blocksize, self.size)
102 logger.debug(f"MMap get block #{i} ({sstart}-{send}")
103 self.cache[sstart:send] = self.fetcher(sstart, send)
104 self.blocks.add(i)
106 return self.cache[start:end]
108 def __getstate__(self):
109 state = self.__dict__.copy()
110 # Remove the unpicklable entries.
111 del state["cache"]
112 return state
114 def __setstate__(self, state):
115 # Restore instance attributes
116 self.__dict__.update(state)
117 self.cache = self._makefile()
120class ReadAheadCache(BaseCache):
121 """Cache which reads only when we get beyond a block of data
123 This is a much simpler version of BytesCache, and does not attempt to
124 fill holes in the cache or keep fragments alive. It is best suited to
125 many small reads in a sequential order (e.g., reading lines from a file).
126 """
128 name = "readahead"
130 def __init__(self, blocksize, fetcher, size):
131 super().__init__(blocksize, fetcher, size)
132 self.cache = b""
133 self.start = 0
134 self.end = 0
136 def _fetch(self, start, end):
137 if start is None:
138 start = 0
139 if end is None or end > self.size:
140 end = self.size
141 if start >= self.size or start >= end:
142 return b""
143 l = end - start
144 if start >= self.start and end <= self.end:
145 # cache hit
146 return self.cache[start - self.start : end - self.start]
147 elif self.start <= start < self.end:
148 # partial hit
149 part = self.cache[start - self.start :]
150 l -= len(part)
151 start = self.end
152 else:
153 # miss
154 part = b""
155 end = min(self.size, end + self.blocksize)
156 self.cache = self.fetcher(start, end) # new block replaces old
157 self.start = start
158 self.end = self.start + len(self.cache)
159 return part + self.cache[:l]
162class FirstChunkCache(BaseCache):
163 """Caches the first block of a file only
165 This may be useful for file types where the metadata is stored in the header,
166 but is randomly accessed.
167 """
169 name = "first"
171 def __init__(self, blocksize, fetcher, size):
172 super().__init__(blocksize, fetcher, size)
173 self.cache = None
175 def _fetch(self, start, end):
176 start = start or 0
177 end = end or self.size
178 if start < self.blocksize:
179 if self.cache is None:
180 if end > self.blocksize:
181 data = self.fetcher(0, end)
182 self.cache = data[: self.blocksize]
183 return data[start:]
184 self.cache = self.fetcher(0, self.blocksize)
185 part = self.cache[start:end]
186 if end > self.blocksize:
187 part += self.fetcher(self.blocksize, end)
188 return part
189 else:
190 return self.fetcher(start, end)
193class BlockCache(BaseCache):
194 """
195 Cache holding memory as a set of blocks.
197 Requests are only ever made ``blocksize`` at a time, and are
198 stored in an LRU cache. The least recently accessed block is
199 discarded when more than ``maxblocks`` are stored.
201 Parameters
202 ----------
203 blocksize : int
204 The number of bytes to store in each block.
205 Requests are only ever made for ``blocksize``, so this
206 should balance the overhead of making a request against
207 the granularity of the blocks.
208 fetcher : Callable
209 size : int
210 The total size of the file being cached.
211 maxblocks : int
212 The maximum number of blocks to cache for. The maximum memory
213 use for this cache is then ``blocksize * maxblocks``.
214 """
216 name = "blockcache"
218 def __init__(self, blocksize, fetcher, size, maxblocks=32):
219 super().__init__(blocksize, fetcher, size)
220 self.nblocks = math.ceil(size / blocksize)
221 self.maxblocks = maxblocks
222 self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block)
224 def __repr__(self):
225 return "<BlockCache blocksize={}, size={}, nblocks={}>".format(
226 self.blocksize, self.size, self.nblocks
227 )
229 def cache_info(self):
230 """
231 The statistics on the block cache.
233 Returns
234 -------
235 NamedTuple
236 Returned directly from the LRU Cache used internally.
237 """
238 return self._fetch_block_cached.cache_info()
240 def __getstate__(self):
241 state = self.__dict__
242 del state["_fetch_block_cached"]
243 return state
245 def __setstate__(self, state):
246 self.__dict__.update(state)
247 self._fetch_block_cached = functools.lru_cache(state["maxblocks"])(
248 self._fetch_block
249 )
251 def _fetch(self, start, end):
252 if start is None:
253 start = 0
254 if end is None:
255 end = self.size
256 if start >= self.size or start >= end:
257 return b""
259 # byte position -> block numbers
260 start_block_number = start // self.blocksize
261 end_block_number = end // self.blocksize
263 # these are cached, so safe to do multiple calls for the same start and end.
264 for block_number in range(start_block_number, end_block_number + 1):
265 self._fetch_block_cached(block_number)
267 return self._read_cache(
268 start,
269 end,
270 start_block_number=start_block_number,
271 end_block_number=end_block_number,
272 )
274 def _fetch_block(self, block_number):
275 """
276 Fetch the block of data for `block_number`.
277 """
278 if block_number > self.nblocks:
279 raise ValueError(
280 "'block_number={}' is greater than the number of blocks ({})".format(
281 block_number, self.nblocks
282 )
283 )
285 start = block_number * self.blocksize
286 end = start + self.blocksize
287 logger.info("BlockCache fetching block %d", block_number)
288 block_contents = super()._fetch(start, end)
289 return block_contents
291 def _read_cache(self, start, end, start_block_number, end_block_number):
292 """
293 Read from our block cache.
295 Parameters
296 ----------
297 start, end : int
298 The start and end byte positions.
299 start_block_number, end_block_number : int
300 The start and end block numbers.
301 """
302 start_pos = start % self.blocksize
303 end_pos = end % self.blocksize
305 if start_block_number == end_block_number:
306 block = self._fetch_block_cached(start_block_number)
307 return block[start_pos:end_pos]
309 else:
310 # read from the initial
311 out = []
312 out.append(self._fetch_block_cached(start_block_number)[start_pos:])
314 # intermediate blocks
315 # Note: it'd be nice to combine these into one big request. However
316 # that doesn't play nicely with our LRU cache.
317 for block_number in range(start_block_number + 1, end_block_number):
318 out.append(self._fetch_block_cached(block_number))
320 # final block
321 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
323 return b"".join(out)
326class BytesCache(BaseCache):
327 """Cache which holds data in a in-memory bytes object
329 Implements read-ahead by the block size, for semi-random reads progressing
330 through the file.
332 Parameters
333 ----------
334 trim: bool
335 As we read more data, whether to discard the start of the buffer when
336 we are more than a blocksize ahead of it.
337 """
339 name = "bytes"
341 def __init__(self, blocksize, fetcher, size, trim=True):
342 super().__init__(blocksize, fetcher, size)
343 self.cache = b""
344 self.start = None
345 self.end = None
346 self.trim = trim
348 def _fetch(self, start, end):
349 # TODO: only set start/end after fetch, in case it fails?
350 # is this where retry logic might go?
351 if start is None:
352 start = 0
353 if end is None:
354 end = self.size
355 if start >= self.size or start >= end:
356 return b""
357 if (
358 self.start is not None
359 and start >= self.start
360 and self.end is not None
361 and end < self.end
362 ):
363 # cache hit: we have all the required data
364 offset = start - self.start
365 return self.cache[offset : offset + end - start]
367 if self.blocksize:
368 bend = min(self.size, end + self.blocksize)
369 else:
370 bend = end
372 if bend == start or start > self.size:
373 return b""
375 if (self.start is None or start < self.start) and (
376 self.end is None or end > self.end
377 ):
378 # First read, or extending both before and after
379 self.cache = self.fetcher(start, bend)
380 self.start = start
381 elif start < self.start:
382 if self.end - end > self.blocksize:
383 self.cache = self.fetcher(start, bend)
384 self.start = start
385 else:
386 new = self.fetcher(start, self.start)
387 self.start = start
388 self.cache = new + self.cache
389 elif bend > self.end:
390 if self.end > self.size:
391 pass
392 elif end - self.end > self.blocksize:
393 self.cache = self.fetcher(start, bend)
394 self.start = start
395 else:
396 new = self.fetcher(self.end, bend)
397 self.cache = self.cache + new
399 self.end = self.start + len(self.cache)
400 offset = start - self.start
401 out = self.cache[offset : offset + end - start]
402 if self.trim:
403 num = (self.end - self.start) // (self.blocksize + 1)
404 if num > 1:
405 self.start += self.blocksize * num
406 self.cache = self.cache[self.blocksize * num :]
407 return out
409 def __len__(self):
410 return len(self.cache)
413class AllBytes(BaseCache):
414 """Cache entire contents of the file"""
416 name = "all"
418 def __init__(self, blocksize=None, fetcher=None, size=None, data=None):
419 super().__init__(blocksize, fetcher, size)
420 if data is None:
421 data = self.fetcher(0, self.size)
422 self.data = data
424 def _fetch(self, start, end):
425 return self.data[start:end]
428class KnownPartsOfAFile(BaseCache):
429 """
430 Cache holding known file parts.
432 Parameters
433 ----------
434 blocksize: int
435 How far to read ahead in numbers of bytes
436 fetcher: func
437 Function of the form f(start, end) which gets bytes from remote as
438 specified
439 size: int
440 How big this file is
441 data: dict
442 A dictionary mapping explicit `(start, stop)` file-offset tuples
443 with known bytes.
444 strict: bool, default True
445 Whether to fetch reads that go beyond a known byte-range boundary.
446 If `False`, any read that ends outside a known part will be zero
447 padded. Note that zero padding will not be used for reads that
448 begin outside a known byte-range.
449 """
451 name = "parts"
453 def __init__(self, blocksize, fetcher, size, data={}, strict=True, **_):
454 super(KnownPartsOfAFile, self).__init__(blocksize, fetcher, size)
455 self.strict = strict
457 # simple consolidation of contiguous blocks
458 if data:
459 old_offsets = sorted(list(data.keys()))
460 offsets = [old_offsets[0]]
461 blocks = [data.pop(old_offsets[0])]
462 for start, stop in old_offsets[1:]:
463 start0, stop0 = offsets[-1]
464 if start == stop0:
465 offsets[-1] = (start0, stop)
466 blocks[-1] += data.pop((start, stop))
467 else:
468 offsets.append((start, stop))
469 blocks.append(data.pop((start, stop)))
471 self.data = dict(zip(offsets, blocks))
472 else:
473 self.data = data
475 def _fetch(self, start, stop):
476 out = b""
477 for (loc0, loc1), data in self.data.items():
478 # If self.strict=False, use zero-padded data
479 # for reads beyond the end of a "known" buffer
480 if loc0 <= start < loc1:
481 off = start - loc0
482 out = data[off : off + stop - start]
483 if not self.strict or loc0 <= stop <= loc1:
484 # The request is within a known range, or
485 # it begins within a known range, and we
486 # are allowed to pad reads beyond the
487 # buffer with zero
488 out += b"\x00" * (stop - start - len(out))
489 return out
490 else:
491 # The request ends outside a known range,
492 # and we are being "strict" about reads
493 # beyond the buffer
494 start = loc1
495 break
497 # We only get here if there is a request outside the
498 # known parts of the file. In an ideal world, this
499 # should never happen
500 if self.fetcher is None:
501 # We cannot fetch the data, so raise an error
502 raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ")
503 # We can fetch the data, but should warn the user
504 # that this may be slow
505 warnings.warn(
506 f"Read is outside the known file parts: {(start, stop)}. "
507 f"IO/caching performance may be poor!"
508 )
509 logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}")
510 return out + super()._fetch(start, stop)
513class UpdatableLRU:
514 """
515 Custom implementation of LRU cache that allows updating keys
517 Used by BackgroudBlockCache
518 """
520 CacheInfo = collections.namedtuple(
521 "CacheInfo", ["hits", "misses", "maxsize", "currsize"]
522 )
524 def __init__(self, func, max_size=128):
525 self._cache = collections.OrderedDict()
526 self._func = func
527 self._max_size = max_size
528 self._hits = 0
529 self._misses = 0
530 self._lock = threading.Lock()
532 def __call__(self, *args):
533 with self._lock:
534 if args in self._cache:
535 self._cache.move_to_end(args)
536 self._hits += 1
537 return self._cache[args]
539 result = self._func(*args)
541 with self._lock:
542 self._cache[args] = result
543 self._misses += 1
544 if len(self._cache) > self._max_size:
545 self._cache.popitem(last=False)
547 return result
549 def is_key_cached(self, *args):
550 with self._lock:
551 return args in self._cache
553 def add_key(self, result, *args):
554 with self._lock:
555 self._cache[args] = result
556 if len(self._cache) > self._max_size:
557 self._cache.popitem(last=False)
559 def cache_info(self):
560 with self._lock:
561 return self.CacheInfo(
562 maxsize=self._max_size,
563 currsize=len(self._cache),
564 hits=self._hits,
565 misses=self._misses,
566 )
569class BackgroundBlockCache(BaseCache):
570 """
571 Cache holding memory as a set of blocks with pre-loading of
572 the next block in the background.
574 Requests are only ever made ``blocksize`` at a time, and are
575 stored in an LRU cache. The least recently accessed block is
576 discarded when more than ``maxblocks`` are stored. If the
577 next block is not in cache, it is loaded in a separate thread
578 in non-blocking way.
580 Parameters
581 ----------
582 blocksize : int
583 The number of bytes to store in each block.
584 Requests are only ever made for ``blocksize``, so this
585 should balance the overhead of making a request against
586 the granularity of the blocks.
587 fetcher : Callable
588 size : int
589 The total size of the file being cached.
590 maxblocks : int
591 The maximum number of blocks to cache for. The maximum memory
592 use for this cache is then ``blocksize * maxblocks``.
593 """
595 name = "background"
597 def __init__(self, blocksize, fetcher, size, maxblocks=32):
598 super().__init__(blocksize, fetcher, size)
599 self.nblocks = math.ceil(size / blocksize)
600 self.maxblocks = maxblocks
601 self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks)
603 self._thread_executor = ThreadPoolExecutor(max_workers=1)
604 self._fetch_future_block_number = None
605 self._fetch_future = None
606 self._fetch_future_lock = threading.Lock()
608 def __repr__(self):
609 return "<BackgroundBlockCache blocksize={}, size={}, nblocks={}>".format(
610 self.blocksize, self.size, self.nblocks
611 )
613 def cache_info(self):
614 """
615 The statistics on the block cache.
617 Returns
618 -------
619 NamedTuple
620 Returned directly from the LRU Cache used internally.
621 """
622 return self._fetch_block_cached.cache_info()
624 def __getstate__(self):
625 state = self.__dict__
626 del state["_fetch_block_cached"]
627 del state["_thread_executor"]
628 del state["_fetch_future_block_number"]
629 del state["_fetch_future"]
630 del state["_fetch_future_lock"]
631 return state
633 def __setstate__(self, state):
634 self.__dict__.update(state)
635 self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"])
636 self._thread_executor = ThreadPoolExecutor(max_workers=1)
637 self._fetch_future_block_number = None
638 self._fetch_future = None
639 self._fetch_future_lock = threading.Lock()
641 def _fetch(self, start, end):
642 if start is None:
643 start = 0
644 if end is None:
645 end = self.size
646 if start >= self.size or start >= end:
647 return b""
649 # byte position -> block numbers
650 start_block_number = start // self.blocksize
651 end_block_number = end // self.blocksize
653 fetch_future_block_number = None
654 fetch_future = None
655 with self._fetch_future_lock:
656 # Background thread is running. Check we we can or must join it.
657 if self._fetch_future is not None:
658 if self._fetch_future.done():
659 logger.info("BlockCache joined background fetch without waiting.")
660 self._fetch_block_cached.add_key(
661 self._fetch_future.result(), self._fetch_future_block_number
662 )
663 # Cleanup the fetch variables. Done with fetching the block.
664 self._fetch_future_block_number = None
665 self._fetch_future = None
666 else:
667 # Must join if we need the block for the current fetch
668 must_join = bool(
669 start_block_number
670 <= self._fetch_future_block_number
671 <= end_block_number
672 )
673 if must_join:
674 # Copy to the local variables to release lock
675 # before waiting for result
676 fetch_future_block_number = self._fetch_future_block_number
677 fetch_future = self._fetch_future
679 # Cleanup the fetch variables. Have a local copy.
680 self._fetch_future_block_number = None
681 self._fetch_future = None
683 # Need to wait for the future for the current read
684 if fetch_future is not None:
685 logger.info("BlockCache waiting for background fetch.")
686 # Wait until result and put it in cache
687 self._fetch_block_cached.add_key(
688 fetch_future.result(), fetch_future_block_number
689 )
691 # these are cached, so safe to do multiple calls for the same start and end.
692 for block_number in range(start_block_number, end_block_number + 1):
693 self._fetch_block_cached(block_number)
695 # fetch next block in the background if nothing is running in the background,
696 # the block is within file and it is not already cached
697 end_block_plus_1 = end_block_number + 1
698 with self._fetch_future_lock:
699 if (
700 self._fetch_future is None
701 and end_block_plus_1 <= self.nblocks
702 and not self._fetch_block_cached.is_key_cached(end_block_plus_1)
703 ):
704 self._fetch_future_block_number = end_block_plus_1
705 self._fetch_future = self._thread_executor.submit(
706 self._fetch_block, end_block_plus_1, "async"
707 )
709 return self._read_cache(
710 start,
711 end,
712 start_block_number=start_block_number,
713 end_block_number=end_block_number,
714 )
716 def _fetch_block(self, block_number, log_info="sync"):
717 """
718 Fetch the block of data for `block_number`.
719 """
720 if block_number > self.nblocks:
721 raise ValueError(
722 "'block_number={}' is greater than the number of blocks ({})".format(
723 block_number, self.nblocks
724 )
725 )
727 start = block_number * self.blocksize
728 end = start + self.blocksize
729 logger.info("BlockCache fetching block (%s) %d", log_info, block_number)
730 block_contents = super()._fetch(start, end)
731 return block_contents
733 def _read_cache(self, start, end, start_block_number, end_block_number):
734 """
735 Read from our block cache.
737 Parameters
738 ----------
739 start, end : int
740 The start and end byte positions.
741 start_block_number, end_block_number : int
742 The start and end block numbers.
743 """
744 start_pos = start % self.blocksize
745 end_pos = end % self.blocksize
747 if start_block_number == end_block_number:
748 block = self._fetch_block_cached(start_block_number)
749 return block[start_pos:end_pos]
751 else:
752 # read from the initial
753 out = []
754 out.append(self._fetch_block_cached(start_block_number)[start_pos:])
756 # intermediate blocks
757 # Note: it'd be nice to combine these into one big request. However
758 # that doesn't play nicely with our LRU cache.
759 for block_number in range(start_block_number + 1, end_block_number):
760 out.append(self._fetch_block_cached(block_number))
762 # final block
763 out.append(self._fetch_block_cached(end_block_number)[:end_pos])
765 return b"".join(out)
768caches = {
769 # one custom case
770 None: BaseCache,
771}
774def register_cache(cls, clobber=False):
775 """'Register' cache implementation.
777 Parameters
778 ----------
779 clobber: bool, optional
780 If set to True (default is False) - allow to overwrite existing
781 entry.
783 Raises
784 ------
785 ValueError
786 """
787 name = cls.name
788 if not clobber and name in caches:
789 raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}")
790 caches[name] = cls
793for c in (
794 BaseCache,
795 MMapCache,
796 BytesCache,
797 ReadAheadCache,
798 BlockCache,
799 FirstChunkCache,
800 AllBytes,
801 KnownPartsOfAFile,
802 BackgroundBlockCache,
803):
804 register_cache(c)