Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/spec.py: 24%
808 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1from __future__ import annotations
3import io
4import logging
5import os
6import threading
7import warnings
8import weakref
9from errno import ESPIPE
10from glob import has_magic
11from hashlib import sha256
12from typing import ClassVar
14from .callbacks import _DEFAULT_CALLBACK
15from .config import apply_config, conf
16from .dircache import DirCache
17from .transaction import Transaction
18from .utils import (
19 _unstrip_protocol,
20 glob_translate,
21 isfilelike,
22 other_paths,
23 read_block,
24 stringify_path,
25 tokenize,
26)
28logger = logging.getLogger("fsspec")
31def make_instance(cls, args, kwargs):
32 return cls(*args, **kwargs)
35class _Cached(type):
36 """
37 Metaclass for caching file system instances.
39 Notes
40 -----
41 Instances are cached according to
43 * The values of the class attributes listed in `_extra_tokenize_attributes`
44 * The arguments passed to ``__init__``.
46 This creates an additional reference to the filesystem, which prevents the
47 filesystem from being garbage collected when all *user* references go away.
48 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
49 be made for a filesystem instance to be garbage collected.
50 """
52 def __init__(cls, *args, **kwargs):
53 super().__init__(*args, **kwargs)
54 # Note: we intentionally create a reference here, to avoid garbage
55 # collecting instances when all other references are gone. To really
56 # delete a FileSystem, the cache must be cleared.
57 if conf.get("weakref_instance_cache"): # pragma: no cover
58 # debug option for analysing fork/spawn conditions
59 cls._cache = weakref.WeakValueDictionary()
60 else:
61 cls._cache = {}
62 cls._pid = os.getpid()
64 def __call__(cls, *args, **kwargs):
65 kwargs = apply_config(cls, kwargs)
66 extra_tokens = tuple(
67 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
68 )
69 token = tokenize(
70 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
71 )
72 skip = kwargs.pop("skip_instance_cache", False)
73 if os.getpid() != cls._pid:
74 cls._cache.clear()
75 cls._pid = os.getpid()
76 if not skip and cls.cachable and token in cls._cache:
77 cls._latest = token
78 return cls._cache[token]
79 else:
80 obj = super().__call__(*args, **kwargs)
81 # Setting _fs_token here causes some static linters to complain.
82 obj._fs_token_ = token
83 obj.storage_args = args
84 obj.storage_options = kwargs
85 if obj.async_impl and obj.mirror_sync_methods:
86 from .asyn import mirror_sync_methods
88 mirror_sync_methods(obj)
90 if cls.cachable and not skip:
91 cls._latest = token
92 cls._cache[token] = obj
93 return obj
96class AbstractFileSystem(metaclass=_Cached):
97 """
98 An abstract super-class for pythonic file-systems
100 Implementations are expected to be compatible with or, better, subclass
101 from here.
102 """
104 cachable = True # this class can be cached, instances reused
105 _cached = False
106 blocksize = 2**22
107 sep = "/"
108 protocol: ClassVar[str | tuple[str, ...]] = "abstract"
109 _latest = None
110 async_impl = False
111 mirror_sync_methods = False
112 root_marker = "" # For some FSs, may require leading '/' or other character
113 transaction_type = Transaction
115 #: Extra *class attributes* that should be considered when hashing.
116 _extra_tokenize_attributes = ()
118 def __init__(self, *args, **storage_options):
119 """Create and configure file-system instance
121 Instances may be cachable, so if similar enough arguments are seen
122 a new instance is not required. The token attribute exists to allow
123 implementations to cache instances if they wish.
125 A reasonable default should be provided if there are no arguments.
127 Subclasses should call this method.
129 Parameters
130 ----------
131 use_listings_cache, listings_expiry_time, max_paths:
132 passed to ``DirCache``, if the implementation supports
133 directory listing caching. Pass use_listings_cache=False
134 to disable such caching.
135 skip_instance_cache: bool
136 If this is a cachable implementation, pass True here to force
137 creating a new instance even if a matching instance exists, and prevent
138 storing this instance.
139 asynchronous: bool
140 loop: asyncio-compatible IOLoop or None
141 """
142 if self._cached:
143 # reusing instance, don't change
144 return
145 self._cached = True
146 self._intrans = False
147 self._transaction = None
148 self._invalidated_caches_in_transaction = []
149 self.dircache = DirCache(**storage_options)
151 if storage_options.pop("add_docs", None):
152 warnings.warn("add_docs is no longer supported.", FutureWarning)
154 if storage_options.pop("add_aliases", None):
155 warnings.warn("add_aliases has been removed.", FutureWarning)
156 # This is set in _Cached
157 self._fs_token_ = None
159 @property
160 def fsid(self):
161 """Persistent filesystem id that can be used to compare filesystems
162 across sessions.
163 """
164 raise NotImplementedError
166 @property
167 def _fs_token(self):
168 return self._fs_token_
170 def __dask_tokenize__(self):
171 return self._fs_token
173 def __hash__(self):
174 return int(self._fs_token, 16)
176 def __eq__(self, other):
177 return isinstance(other, type(self)) and self._fs_token == other._fs_token
179 def __reduce__(self):
180 return make_instance, (type(self), self.storage_args, self.storage_options)
182 @classmethod
183 def _strip_protocol(cls, path):
184 """Turn path from fully-qualified to file-system-specific
186 May require FS-specific handling, e.g., for relative paths or links.
187 """
188 if isinstance(path, list):
189 return [cls._strip_protocol(p) for p in path]
190 path = stringify_path(path)
191 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
192 for protocol in protos:
193 if path.startswith(protocol + "://"):
194 path = path[len(protocol) + 3 :]
195 elif path.startswith(protocol + "::"):
196 path = path[len(protocol) + 2 :]
197 path = path.rstrip("/")
198 # use of root_marker to make minimum required path, e.g., "/"
199 return path or cls.root_marker
201 def unstrip_protocol(self, name: str) -> str:
202 """Format FS-specific path to generic, including protocol"""
203 protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol
204 for protocol in protos:
205 if name.startswith(f"{protocol}://"):
206 return name
207 return f"{protos[0]}://{name}"
209 @staticmethod
210 def _get_kwargs_from_urls(path):
211 """If kwargs can be encoded in the paths, extract them here
213 This should happen before instantiation of the class; incoming paths
214 then should be amended to strip the options in methods.
216 Examples may look like an sftp path "sftp://user@host:/my/path", where
217 the user and host should become kwargs and later get stripped.
218 """
219 # by default, nothing happens
220 return {}
222 @classmethod
223 def current(cls):
224 """Return the most recently instantiated FileSystem
226 If no instance has been created, then create one with defaults
227 """
228 if cls._latest in cls._cache:
229 return cls._cache[cls._latest]
230 return cls()
232 @property
233 def transaction(self):
234 """A context within which files are committed together upon exit
236 Requires the file class to implement `.commit()` and `.discard()`
237 for the normal and exception cases.
238 """
239 if self._transaction is None:
240 self._transaction = self.transaction_type(self)
241 return self._transaction
243 def start_transaction(self):
244 """Begin write transaction for deferring files, non-context version"""
245 self._intrans = True
246 self._transaction = self.transaction_type(self)
247 return self.transaction
249 def end_transaction(self):
250 """Finish write transaction, non-context version"""
251 self.transaction.complete()
252 self._transaction = None
253 # The invalid cache must be cleared after the transaction is completed.
254 for path in self._invalidated_caches_in_transaction:
255 self.invalidate_cache(path)
256 self._invalidated_caches_in_transaction.clear()
258 def invalidate_cache(self, path=None):
259 """
260 Discard any cached directory information
262 Parameters
263 ----------
264 path: string or None
265 If None, clear all listings cached else listings at or under given
266 path.
267 """
268 # Not necessary to implement invalidation mechanism, may have no cache.
269 # But if have, you should call this method of parent class from your
270 # subclass to ensure expiring caches after transacations correctly.
271 # See the implementation of FTPFileSystem in ftp.py
272 if self._intrans:
273 self._invalidated_caches_in_transaction.append(path)
275 def mkdir(self, path, create_parents=True, **kwargs):
276 """
277 Create directory entry at path
279 For systems that don't have true directories, may create an for
280 this instance only and not touch the real filesystem
282 Parameters
283 ----------
284 path: str
285 location
286 create_parents: bool
287 if True, this is equivalent to ``makedirs``
288 kwargs:
289 may be permissions, etc.
290 """
291 pass # not necessary to implement, may not have directories
293 def makedirs(self, path, exist_ok=False):
294 """Recursively make directories
296 Creates directory at path and any intervening required directories.
297 Raises exception if, for instance, the path already exists but is a
298 file.
300 Parameters
301 ----------
302 path: str
303 leaf directory name
304 exist_ok: bool (False)
305 If False, will error if the target already exists
306 """
307 pass # not necessary to implement, may not have directories
309 def rmdir(self, path):
310 """Remove a directory, if empty"""
311 pass # not necessary to implement, may not have directories
313 def ls(self, path, detail=True, **kwargs):
314 """List objects at path.
316 This should include subdirectories and files at that location. The
317 difference between a file and a directory must be clear when details
318 are requested.
320 The specific keys, or perhaps a FileInfo class, or similar, is TBD,
321 but must be consistent across implementations.
322 Must include:
324 - full path to the entry (without protocol)
325 - size of the entry, in bytes. If the value cannot be determined, will
326 be ``None``.
327 - type of entry, "file", "directory" or other
329 Additional information
330 may be present, appropriate to the file-system, e.g., generation,
331 checksum, etc.
333 May use refresh=True|False to allow use of self._ls_from_cache to
334 check for a saved listing and avoid calling the backend. This would be
335 common where listing may be expensive.
337 Parameters
338 ----------
339 path: str
340 detail: bool
341 if True, gives a list of dictionaries, where each is the same as
342 the result of ``info(path)``. If False, gives a list of paths
343 (str).
344 kwargs: may have additional backend-specific options, such as version
345 information
347 Returns
348 -------
349 List of strings if detail is False, or list of directory information
350 dicts if detail is True.
351 """
352 raise NotImplementedError
354 def _ls_from_cache(self, path):
355 """Check cache for listing
357 Returns listing, if found (may be empty list for a directly that exists
358 but contains nothing), None if not in cache.
359 """
360 parent = self._parent(path)
361 if path.rstrip("/") in self.dircache:
362 return self.dircache[path.rstrip("/")]
363 try:
364 files = [
365 f
366 for f in self.dircache[parent]
367 if f["name"] == path
368 or (f["name"] == path.rstrip("/") and f["type"] == "directory")
369 ]
370 if len(files) == 0:
371 # parent dir was listed but did not contain this file
372 raise FileNotFoundError(path)
373 return files
374 except KeyError:
375 pass
377 def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs):
378 """Return all files belows path
380 List all files, recursing into subdirectories; output is iterator-style,
381 like ``os.walk()``. For a simple list of files, ``find()`` is available.
383 When topdown is True, the caller can modify the dirnames list in-place (perhaps
384 using del or slice assignment), and walk() will
385 only recurse into the subdirectories whose names remain in dirnames;
386 this can be used to prune the search, impose a specific order of visiting,
387 or even to inform walk() about directories the caller creates or renames before
388 it resumes walk() again.
389 Modifying dirnames when topdown is False has no effect. (see os.walk)
391 Note that the "files" outputted will include anything that is not
392 a directory, such as links.
394 Parameters
395 ----------
396 path: str
397 Root to recurse into
398 maxdepth: int
399 Maximum recursion depth. None means limitless, but not recommended
400 on link-based file-systems.
401 topdown: bool (True)
402 Whether to walk the directory tree from the top downwards or from
403 the bottom upwards.
404 on_error: "omit", "raise", a collable
405 if omit (default), path with exception will simply be empty;
406 If raise, an underlying exception will be raised;
407 if callable, it will be called with a single OSError instance as argument
408 kwargs: passed to ``ls``
409 """
410 if maxdepth is not None and maxdepth < 1:
411 raise ValueError("maxdepth must be at least 1")
413 path = self._strip_protocol(path)
414 full_dirs = {}
415 dirs = {}
416 files = {}
418 detail = kwargs.pop("detail", False)
419 try:
420 listing = self.ls(path, detail=True, **kwargs)
421 except (FileNotFoundError, OSError) as e:
422 if on_error == "raise":
423 raise
424 elif callable(on_error):
425 on_error(e)
426 if detail:
427 return path, {}, {}
428 return path, [], []
430 for info in listing:
431 # each info name must be at least [path]/part , but here
432 # we check also for names like [path]/part/
433 pathname = info["name"].rstrip("/")
434 name = pathname.rsplit("/", 1)[-1]
435 if info["type"] == "directory" and pathname != path:
436 # do not include "self" path
437 full_dirs[name] = pathname
438 dirs[name] = info
439 elif pathname == path:
440 # file-like with same name as give path
441 files[""] = info
442 else:
443 files[name] = info
445 if not detail:
446 dirs = list(dirs)
447 files = list(files)
449 if topdown:
450 # Yield before recursion if walking top down
451 yield path, dirs, files
453 if maxdepth is not None:
454 maxdepth -= 1
455 if maxdepth < 1:
456 if not topdown:
457 yield path, dirs, files
458 return
460 for d in dirs:
461 yield from self.walk(
462 full_dirs[d],
463 maxdepth=maxdepth,
464 detail=detail,
465 topdown=topdown,
466 **kwargs,
467 )
469 if not topdown:
470 # Yield after recursion if walking bottom up
471 yield path, dirs, files
473 def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
474 """List all files below path.
476 Like posix ``find`` command without conditions
478 Parameters
479 ----------
480 path : str
481 maxdepth: int or None
482 If not None, the maximum number of levels to descend
483 withdirs: bool
484 Whether to include directory paths in the output. This is True
485 when used by glob, but users usually only want files.
486 kwargs are passed to ``ls``.
487 """
488 # TODO: allow equivalent of -name parameter
489 path = self._strip_protocol(path)
490 out = {}
492 # Add the root directory if withdirs is requested
493 # This is needed for posix glob compliance
494 if withdirs and path != "" and self.isdir(path):
495 out[path] = self.info(path)
497 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
498 if withdirs:
499 files.update(dirs)
500 out.update({info["name"]: info for name, info in files.items()})
501 if not out and self.isfile(path):
502 # walk works on directories, but find should also return [path]
503 # when path happens to be a file
504 out[path] = {}
505 names = sorted(out)
506 if not detail:
507 return names
508 else:
509 return {name: out[name] for name in names}
511 def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs):
512 """Space used by files and optionally directories within a path
514 Directory size does not include the size of its contents.
516 Parameters
517 ----------
518 path: str
519 total: bool
520 Whether to sum all the file sizes
521 maxdepth: int or None
522 Maximum number of directory levels to descend, None for unlimited.
523 withdirs: bool
524 Whether to include directory paths in the output.
525 kwargs: passed to ``find``
527 Returns
528 -------
529 Dict of {path: size} if total=False, or int otherwise, where numbers
530 refer to bytes used.
531 """
532 sizes = {}
533 if withdirs and self.isdir(path):
534 # Include top-level directory in output
535 info = self.info(path)
536 sizes[info["name"]] = info["size"]
537 for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs):
538 info = self.info(f)
539 sizes[info["name"]] = info["size"]
540 if total:
541 return sum(sizes.values())
542 else:
543 return sizes
545 def glob(self, path, maxdepth=None, **kwargs):
546 """
547 Find files by glob-matching.
549 If the path ends with '/', only folders are returned.
551 We support ``"**"``,
552 ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation.
554 The `maxdepth` option is applied on the first `**` found in the path.
556 kwargs are passed to ``ls``.
557 """
558 if maxdepth is not None and maxdepth < 1:
559 raise ValueError("maxdepth must be at least 1")
561 import re
563 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
564 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
565 path = self._strip_protocol(path)
566 append_slash_to_dirname = ends_with_sep or path.endswith(
567 tuple(sep + "**" for sep in seps)
568 )
569 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
570 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
571 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
573 min_idx = min(idx_star, idx_qmark, idx_brace)
575 detail = kwargs.pop("detail", False)
577 if not has_magic(path):
578 if self.exists(path, **kwargs):
579 if not detail:
580 return [path]
581 else:
582 return {path: self.info(path, **kwargs)}
583 else:
584 if not detail:
585 return [] # glob of non-existent returns empty
586 else:
587 return {}
588 elif "/" in path[:min_idx]:
589 min_idx = path[:min_idx].rindex("/")
590 root = path[: min_idx + 1]
591 depth = path[min_idx + 1 :].count("/") + 1
592 else:
593 root = ""
594 depth = path[min_idx + 1 :].count("/") + 1
596 if "**" in path:
597 if maxdepth is not None:
598 idx_double_stars = path.find("**")
599 depth_double_stars = path[idx_double_stars:].count("/") + 1
600 depth = depth - depth_double_stars + maxdepth
601 else:
602 depth = None
604 allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
606 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
607 pattern = re.compile(pattern)
609 out = {
610 p: info
611 for p, info in sorted(allpaths.items())
612 if pattern.match(
613 (
614 p + "/"
615 if append_slash_to_dirname and info["type"] == "directory"
616 else p
617 )
618 )
619 }
621 if detail:
622 return out
623 else:
624 return list(out)
626 def exists(self, path, **kwargs):
627 """Is there a file at the given path"""
628 try:
629 self.info(path, **kwargs)
630 return True
631 except: # noqa: E722
632 # any exception allowed bar FileNotFoundError?
633 return False
635 def lexists(self, path, **kwargs):
636 """If there is a file at the given path (including
637 broken links)"""
638 return self.exists(path)
640 def info(self, path, **kwargs):
641 """Give details of entry at path
643 Returns a single dictionary, with exactly the same information as ``ls``
644 would with ``detail=True``.
646 The default implementation should calls ls and could be overridden by a
647 shortcut. kwargs are passed on to ```ls()``.
649 Some file systems might not be able to measure the file's size, in
650 which case, the returned dict will include ``'size': None``.
652 Returns
653 -------
654 dict with keys: name (full path in the FS), size (in bytes), type (file,
655 directory, or something else) and other FS-specific keys.
656 """
657 path = self._strip_protocol(path)
658 out = self.ls(self._parent(path), detail=True, **kwargs)
659 out = [o for o in out if o["name"].rstrip("/") == path]
660 if out:
661 return out[0]
662 out = self.ls(path, detail=True, **kwargs)
663 path = path.rstrip("/")
664 out1 = [o for o in out if o["name"].rstrip("/") == path]
665 if len(out1) == 1:
666 if "size" not in out1[0]:
667 out1[0]["size"] = None
668 return out1[0]
669 elif len(out1) > 1 or out:
670 return {"name": path, "size": 0, "type": "directory"}
671 else:
672 raise FileNotFoundError(path)
674 def checksum(self, path):
675 """Unique value for current version of file
677 If the checksum is the same from one moment to another, the contents
678 are guaranteed to be the same. If the checksum changes, the contents
679 *might* have changed.
681 This should normally be overridden; default will probably capture
682 creation/modification timestamp (which would be good) or maybe
683 access timestamp (which would be bad)
684 """
685 return int(tokenize(self.info(path)), 16)
687 def size(self, path):
688 """Size in bytes of file"""
689 return self.info(path).get("size", None)
691 def sizes(self, paths):
692 """Size in bytes of each file in a list of paths"""
693 return [self.size(p) for p in paths]
695 def isdir(self, path):
696 """Is this entry directory-like?"""
697 try:
698 return self.info(path)["type"] == "directory"
699 except OSError:
700 return False
702 def isfile(self, path):
703 """Is this entry file-like?"""
704 try:
705 return self.info(path)["type"] == "file"
706 except: # noqa: E722
707 return False
709 def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs):
710 """Get the contents of the file as a string.
712 Parameters
713 ----------
714 path: str
715 URL of file on this filesystems
716 encoding, errors, newline: same as `open`.
717 """
718 with self.open(
719 path,
720 mode="r",
721 encoding=encoding,
722 errors=errors,
723 newline=newline,
724 **kwargs,
725 ) as f:
726 return f.read()
728 def write_text(
729 self, path, value, encoding=None, errors=None, newline=None, **kwargs
730 ):
731 """Write the text to the given file.
733 An existing file will be overwritten.
735 Parameters
736 ----------
737 path: str
738 URL of file on this filesystems
739 value: str
740 Text to write.
741 encoding, errors, newline: same as `open`.
742 """
743 with self.open(
744 path,
745 mode="w",
746 encoding=encoding,
747 errors=errors,
748 newline=newline,
749 **kwargs,
750 ) as f:
751 return f.write(value)
753 def cat_file(self, path, start=None, end=None, **kwargs):
754 """Get the content of a file
756 Parameters
757 ----------
758 path: URL of file on this filesystems
759 start, end: int
760 Bytes limits of the read. If negative, backwards from end,
761 like usual python slices. Either can be None for start or
762 end of file, respectively
763 kwargs: passed to ``open()``.
764 """
765 # explicitly set buffering off?
766 with self.open(path, "rb", **kwargs) as f:
767 if start is not None:
768 if start >= 0:
769 f.seek(start)
770 else:
771 f.seek(max(0, f.size + start))
772 if end is not None:
773 if end < 0:
774 end = f.size + end
775 return f.read(end - f.tell())
776 return f.read()
778 def pipe_file(self, path, value, **kwargs):
779 """Set the bytes of given file"""
780 with self.open(path, "wb", **kwargs) as f:
781 f.write(value)
783 def pipe(self, path, value=None, **kwargs):
784 """Put value into path
786 (counterpart to ``cat``)
788 Parameters
789 ----------
790 path: string or dict(str, bytes)
791 If a string, a single remote location to put ``value`` bytes; if a dict,
792 a mapping of {path: bytesvalue}.
793 value: bytes, optional
794 If using a single path, these are the bytes to put there. Ignored if
795 ``path`` is a dict
796 """
797 if isinstance(path, str):
798 self.pipe_file(self._strip_protocol(path), value, **kwargs)
799 elif isinstance(path, dict):
800 for k, v in path.items():
801 self.pipe_file(self._strip_protocol(k), v, **kwargs)
802 else:
803 raise ValueError("path must be str or dict")
805 def cat_ranges(
806 self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
807 ):
808 """Get the contents of byte ranges from one or more files
810 Parameters
811 ----------
812 paths: list
813 A list of of filepaths on this filesystems
814 starts, ends: int or list
815 Bytes limits of the read. If using a single int, the same value will be
816 used to read all the specified files.
817 """
818 if max_gap is not None:
819 raise NotImplementedError
820 if not isinstance(paths, list):
821 raise TypeError
822 if not isinstance(starts, list):
823 starts = [starts] * len(paths)
824 if not isinstance(ends, list):
825 ends = [ends] * len(paths)
826 if len(starts) != len(paths) or len(ends) != len(paths):
827 raise ValueError
828 out = []
829 for p, s, e in zip(paths, starts, ends):
830 try:
831 out.append(self.cat_file(p, s, e))
832 except Exception as e:
833 if on_error == "return":
834 out.append(e)
835 else:
836 raise
837 return out
839 def cat(self, path, recursive=False, on_error="raise", **kwargs):
840 """Fetch (potentially multiple) paths' contents
842 Parameters
843 ----------
844 recursive: bool
845 If True, assume the path(s) are directories, and get all the
846 contained files
847 on_error : "raise", "omit", "return"
848 If raise, an underlying exception will be raised (converted to KeyError
849 if the type is in self.missing_exceptions); if omit, keys with exception
850 will simply not be included in the output; if "return", all keys are
851 included in the output, but the value will be bytes or an exception
852 instance.
853 kwargs: passed to cat_file
855 Returns
856 -------
857 dict of {path: contents} if there are multiple paths
858 or the path has been otherwise expanded
859 """
860 paths = self.expand_path(path, recursive=recursive)
861 if (
862 len(paths) > 1
863 or isinstance(path, list)
864 or paths[0] != self._strip_protocol(path)
865 ):
866 out = {}
867 for path in paths:
868 try:
869 out[path] = self.cat_file(path, **kwargs)
870 except Exception as e:
871 if on_error == "raise":
872 raise
873 if on_error == "return":
874 out[path] = e
875 return out
876 else:
877 return self.cat_file(paths[0], **kwargs)
879 def get_file(
880 self, rpath, lpath, callback=_DEFAULT_CALLBACK, outfile=None, **kwargs
881 ):
882 """Copy single remote file to local"""
883 from .implementations.local import LocalFileSystem
885 if isfilelike(lpath):
886 outfile = lpath
887 elif self.isdir(rpath):
888 os.makedirs(lpath, exist_ok=True)
889 return None
891 fs = LocalFileSystem(auto_mkdir=True)
892 fs.makedirs(fs._parent(lpath), exist_ok=True)
894 with self.open(rpath, "rb", **kwargs) as f1:
895 if outfile is None:
896 outfile = open(lpath, "wb")
898 try:
899 callback.set_size(getattr(f1, "size", None))
900 data = True
901 while data:
902 data = f1.read(self.blocksize)
903 segment_len = outfile.write(data)
904 if segment_len is None:
905 segment_len = len(data)
906 callback.relative_update(segment_len)
907 finally:
908 if not isfilelike(lpath):
909 outfile.close()
911 def get(
912 self,
913 rpath,
914 lpath,
915 recursive=False,
916 callback=_DEFAULT_CALLBACK,
917 maxdepth=None,
918 **kwargs,
919 ):
920 """Copy file(s) to local.
922 Copies a specific file or tree of files (if recursive=True). If lpath
923 ends with a "/", it will be assumed to be a directory, and target files
924 will go within. Can submit a list of paths, which may be glob-patterns
925 and will be expanded.
927 Calls get_file for each source.
928 """
929 if isinstance(lpath, list) and isinstance(rpath, list):
930 # No need to expand paths when both source and destination
931 # are provided as lists
932 rpaths = rpath
933 lpaths = lpath
934 else:
935 from .implementations.local import (
936 LocalFileSystem,
937 make_path_posix,
938 trailing_sep,
939 )
941 source_is_str = isinstance(rpath, str)
942 rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth)
943 if source_is_str and (not recursive or maxdepth is not None):
944 # Non-recursive glob does not copy directories
945 rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))]
946 if not rpaths:
947 return
949 if isinstance(lpath, str):
950 lpath = make_path_posix(lpath)
952 source_is_file = len(rpaths) == 1
953 dest_is_dir = isinstance(lpath, str) and (
954 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
955 )
957 exists = source_is_str and (
958 (has_magic(rpath) and source_is_file)
959 or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath))
960 )
961 lpaths = other_paths(
962 rpaths,
963 lpath,
964 exists=exists,
965 flatten=not source_is_str,
966 )
968 callback.set_size(len(lpaths))
969 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
970 callback.branch(rpath, lpath, kwargs)
971 self.get_file(rpath, lpath, **kwargs)
973 def put_file(self, lpath, rpath, callback=_DEFAULT_CALLBACK, **kwargs):
974 """Copy single file to remote"""
975 if os.path.isdir(lpath):
976 self.makedirs(rpath, exist_ok=True)
977 return None
979 with open(lpath, "rb") as f1:
980 size = f1.seek(0, 2)
981 callback.set_size(size)
982 f1.seek(0)
984 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True)
985 with self.open(rpath, "wb", **kwargs) as f2:
986 while f1.tell() < size:
987 data = f1.read(self.blocksize)
988 segment_len = f2.write(data)
989 if segment_len is None:
990 segment_len = len(data)
991 callback.relative_update(segment_len)
993 def put(
994 self,
995 lpath,
996 rpath,
997 recursive=False,
998 callback=_DEFAULT_CALLBACK,
999 maxdepth=None,
1000 **kwargs,
1001 ):
1002 """Copy file(s) from local.
1004 Copies a specific file or tree of files (if recursive=True). If rpath
1005 ends with a "/", it will be assumed to be a directory, and target files
1006 will go within.
1008 Calls put_file for each source.
1009 """
1010 if isinstance(lpath, list) and isinstance(rpath, list):
1011 # No need to expand paths when both source and destination
1012 # are provided as lists
1013 rpaths = rpath
1014 lpaths = lpath
1015 else:
1016 from .implementations.local import (
1017 LocalFileSystem,
1018 make_path_posix,
1019 trailing_sep,
1020 )
1022 source_is_str = isinstance(lpath, str)
1023 if source_is_str:
1024 lpath = make_path_posix(lpath)
1025 fs = LocalFileSystem()
1026 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
1027 if source_is_str and (not recursive or maxdepth is not None):
1028 # Non-recursive glob does not copy directories
1029 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
1030 if not lpaths:
1031 return
1033 source_is_file = len(lpaths) == 1
1034 dest_is_dir = isinstance(rpath, str) and (
1035 trailing_sep(rpath) or self.isdir(rpath)
1036 )
1038 rpath = (
1039 self._strip_protocol(rpath)
1040 if isinstance(rpath, str)
1041 else [self._strip_protocol(p) for p in rpath]
1042 )
1043 exists = source_is_str and (
1044 (has_magic(lpath) and source_is_file)
1045 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
1046 )
1047 rpaths = other_paths(
1048 lpaths,
1049 rpath,
1050 exists=exists,
1051 flatten=not source_is_str,
1052 )
1054 callback.set_size(len(rpaths))
1055 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1056 callback.branch(lpath, rpath, kwargs)
1057 self.put_file(lpath, rpath, **kwargs)
1059 def head(self, path, size=1024):
1060 """Get the first ``size`` bytes from file"""
1061 with self.open(path, "rb") as f:
1062 return f.read(size)
1064 def tail(self, path, size=1024):
1065 """Get the last ``size`` bytes from file"""
1066 with self.open(path, "rb") as f:
1067 f.seek(max(-size, -f.size), 2)
1068 return f.read()
1070 def cp_file(self, path1, path2, **kwargs):
1071 raise NotImplementedError
1073 def copy(
1074 self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs
1075 ):
1076 """Copy within two locations in the filesystem
1078 on_error : "raise", "ignore"
1079 If raise, any not-found exceptions will be raised; if ignore any
1080 not-found exceptions will cause the path to be skipped; defaults to
1081 raise unless recursive is true, where the default is ignore
1082 """
1083 if on_error is None and recursive:
1084 on_error = "ignore"
1085 elif on_error is None:
1086 on_error = "raise"
1088 if isinstance(path1, list) and isinstance(path2, list):
1089 # No need to expand paths when both source and destination
1090 # are provided as lists
1091 paths1 = path1
1092 paths2 = path2
1093 else:
1094 from .implementations.local import trailing_sep
1096 source_is_str = isinstance(path1, str)
1097 paths1 = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth)
1098 if source_is_str and (not recursive or maxdepth is not None):
1099 # Non-recursive glob does not copy directories
1100 paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))]
1101 if not paths1:
1102 return
1104 source_is_file = len(paths1) == 1
1105 dest_is_dir = isinstance(path2, str) and (
1106 trailing_sep(path2) or self.isdir(path2)
1107 )
1109 exists = source_is_str and (
1110 (has_magic(path1) and source_is_file)
1111 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
1112 )
1113 paths2 = other_paths(
1114 paths1,
1115 path2,
1116 exists=exists,
1117 flatten=not source_is_str,
1118 )
1120 for p1, p2 in zip(paths1, paths2):
1121 try:
1122 self.cp_file(p1, p2, **kwargs)
1123 except FileNotFoundError:
1124 if on_error == "raise":
1125 raise
1127 def expand_path(self, path, recursive=False, maxdepth=None, **kwargs):
1128 """Turn one or more globs or directories into a list of all matching paths
1129 to files or directories.
1131 kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls``
1132 """
1134 if maxdepth is not None and maxdepth < 1:
1135 raise ValueError("maxdepth must be at least 1")
1137 if isinstance(path, str):
1138 out = self.expand_path([path], recursive, maxdepth)
1139 else:
1140 out = set()
1141 path = [self._strip_protocol(p) for p in path]
1142 for p in path:
1143 if has_magic(p):
1144 bit = set(self.glob(p, maxdepth=maxdepth, **kwargs))
1145 out |= bit
1146 if recursive:
1147 # glob call above expanded one depth so if maxdepth is defined
1148 # then decrement it in expand_path call below. If it is zero
1149 # after decrementing then avoid expand_path call.
1150 if maxdepth is not None and maxdepth <= 1:
1151 continue
1152 out |= set(
1153 self.expand_path(
1154 list(bit),
1155 recursive=recursive,
1156 maxdepth=maxdepth - 1 if maxdepth is not None else None,
1157 **kwargs,
1158 )
1159 )
1160 continue
1161 elif recursive:
1162 rec = set(
1163 self.find(
1164 p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs
1165 )
1166 )
1167 out |= rec
1168 if p not in out and (recursive is False or self.exists(p)):
1169 # should only check once, for the root
1170 out.add(p)
1171 if not out:
1172 raise FileNotFoundError(path)
1173 return sorted(out)
1175 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
1176 """Move file(s) from one location to another"""
1177 if path1 == path2:
1178 logger.debug("%s mv: The paths are the same, so no files were moved.", self)
1179 else:
1180 self.copy(path1, path2, recursive=recursive, maxdepth=maxdepth)
1181 self.rm(path1, recursive=recursive)
1183 def rm_file(self, path):
1184 """Delete a file"""
1185 self._rm(path)
1187 def _rm(self, path):
1188 """Delete one file"""
1189 # this is the old name for the method, prefer rm_file
1190 raise NotImplementedError
1192 def rm(self, path, recursive=False, maxdepth=None):
1193 """Delete files.
1195 Parameters
1196 ----------
1197 path: str or list of str
1198 File(s) to delete.
1199 recursive: bool
1200 If file(s) are directories, recursively delete contents and then
1201 also remove the directory
1202 maxdepth: int or None
1203 Depth to pass to walk for finding files to delete, if recursive.
1204 If None, there will be no limit and infinite recursion may be
1205 possible.
1206 """
1207 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
1208 for p in reversed(path):
1209 self.rm_file(p)
1211 @classmethod
1212 def _parent(cls, path):
1213 path = cls._strip_protocol(path)
1214 if "/" in path:
1215 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
1216 return cls.root_marker + parent
1217 else:
1218 return cls.root_marker
1220 def _open(
1221 self,
1222 path,
1223 mode="rb",
1224 block_size=None,
1225 autocommit=True,
1226 cache_options=None,
1227 **kwargs,
1228 ):
1229 """Return raw bytes-mode file-like from the file-system"""
1230 return AbstractBufferedFile(
1231 self,
1232 path,
1233 mode,
1234 block_size,
1235 autocommit,
1236 cache_options=cache_options,
1237 **kwargs,
1238 )
1240 def open(
1241 self,
1242 path,
1243 mode="rb",
1244 block_size=None,
1245 cache_options=None,
1246 compression=None,
1247 **kwargs,
1248 ):
1249 """
1250 Return a file-like object from the filesystem
1252 The resultant instance must function correctly in a context ``with``
1253 block.
1255 Parameters
1256 ----------
1257 path: str
1258 Target file
1259 mode: str like 'rb', 'w'
1260 See builtin ``open()``
1261 block_size: int
1262 Some indication of buffering - this is a value in bytes
1263 cache_options : dict, optional
1264 Extra arguments to pass through to the cache.
1265 compression: string or None
1266 If given, open file using compression codec. Can either be a compression
1267 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
1268 compression from the filename suffix.
1269 encoding, errors, newline: passed on to TextIOWrapper for text mode
1270 """
1271 import io
1273 path = self._strip_protocol(path)
1274 if "b" not in mode:
1275 mode = mode.replace("t", "") + "b"
1277 text_kwargs = {
1278 k: kwargs.pop(k)
1279 for k in ["encoding", "errors", "newline"]
1280 if k in kwargs
1281 }
1282 return io.TextIOWrapper(
1283 self.open(
1284 path,
1285 mode,
1286 block_size=block_size,
1287 cache_options=cache_options,
1288 compression=compression,
1289 **kwargs,
1290 ),
1291 **text_kwargs,
1292 )
1293 else:
1294 ac = kwargs.pop("autocommit", not self._intrans)
1295 f = self._open(
1296 path,
1297 mode=mode,
1298 block_size=block_size,
1299 autocommit=ac,
1300 cache_options=cache_options,
1301 **kwargs,
1302 )
1303 if compression is not None:
1304 from fsspec.compression import compr
1305 from fsspec.core import get_compression
1307 compression = get_compression(path, compression)
1308 compress = compr[compression]
1309 f = compress(f, mode=mode[0])
1311 if not ac and "r" not in mode:
1312 self.transaction.files.append(f)
1313 return f
1315 def touch(self, path, truncate=True, **kwargs):
1316 """Create empty file, or update timestamp
1318 Parameters
1319 ----------
1320 path: str
1321 file location
1322 truncate: bool
1323 If True, always set file size to 0; if False, update timestamp and
1324 leave file unchanged, if backend allows this
1325 """
1326 if truncate or not self.exists(path):
1327 with self.open(path, "wb", **kwargs):
1328 pass
1329 else:
1330 raise NotImplementedError # update timestamp, if possible
1332 def ukey(self, path):
1333 """Hash of file properties, to tell if it has changed"""
1334 return sha256(str(self.info(path)).encode()).hexdigest()
1336 def read_block(self, fn, offset, length, delimiter=None):
1337 """Read a block of bytes from
1339 Starting at ``offset`` of the file, read ``length`` bytes. If
1340 ``delimiter`` is set then we ensure that the read starts and stops at
1341 delimiter boundaries that follow the locations ``offset`` and ``offset
1342 + length``. If ``offset`` is zero then we start at zero. The
1343 bytestring returned WILL include the end delimiter string.
1345 If offset+length is beyond the eof, reads to eof.
1347 Parameters
1348 ----------
1349 fn: string
1350 Path to filename
1351 offset: int
1352 Byte offset to start read
1353 length: int
1354 Number of bytes to read. If None, read to end.
1355 delimiter: bytes (optional)
1356 Ensure reading starts and stops at delimiter bytestring
1358 Examples
1359 --------
1360 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP
1361 b'Alice, 100\\nBo'
1362 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP
1363 b'Alice, 100\\nBob, 200\\n'
1365 Use ``length=None`` to read to the end of the file.
1366 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP
1367 b'Alice, 100\\nBob, 200\\nCharlie, 300'
1369 See Also
1370 --------
1371 :func:`fsspec.utils.read_block`
1372 """
1373 with self.open(fn, "rb") as f:
1374 size = f.size
1375 if length is None:
1376 length = size
1377 if size is not None and offset + length > size:
1378 length = size - offset
1379 return read_block(f, offset, length, delimiter)
1381 def to_json(self):
1382 """
1383 JSON representation of this filesystem instance
1385 Returns
1386 -------
1387 str: JSON structure with keys cls (the python location of this class),
1388 protocol (text name of this class's protocol, first one in case of
1389 multiple), args (positional args, usually empty), and all other
1390 kwargs as their own keys.
1391 """
1392 import json
1394 cls = type(self)
1395 cls = ".".join((cls.__module__, cls.__name__))
1396 proto = (
1397 self.protocol[0]
1398 if isinstance(self.protocol, (tuple, list))
1399 else self.protocol
1400 )
1401 return json.dumps(
1402 dict(
1403 **{"cls": cls, "protocol": proto, "args": self.storage_args},
1404 **self.storage_options,
1405 )
1406 )
1408 @staticmethod
1409 def from_json(blob):
1410 """
1411 Recreate a filesystem instance from JSON representation
1413 See ``.to_json()`` for the expected structure of the input
1415 Parameters
1416 ----------
1417 blob: str
1419 Returns
1420 -------
1421 file system instance, not necessarily of this particular class.
1422 """
1423 import json
1425 from .registry import _import_class, get_filesystem_class
1427 dic = json.loads(blob)
1428 protocol = dic.pop("protocol")
1429 try:
1430 cls = _import_class(dic.pop("cls"))
1431 except (ImportError, ValueError, RuntimeError, KeyError):
1432 cls = get_filesystem_class(protocol)
1433 return cls(*dic.pop("args", ()), **dic)
1435 def _get_pyarrow_filesystem(self):
1436 """
1437 Make a version of the FS instance which will be acceptable to pyarrow
1438 """
1439 # all instances already also derive from pyarrow
1440 return self
1442 def get_mapper(self, root="", check=False, create=False, missing_exceptions=None):
1443 """Create key/value store based on this file-system
1445 Makes a MutableMapping interface to the FS at the given root path.
1446 See ``fsspec.mapping.FSMap`` for further details.
1447 """
1448 from .mapping import FSMap
1450 return FSMap(
1451 root,
1452 self,
1453 check=check,
1454 create=create,
1455 missing_exceptions=missing_exceptions,
1456 )
1458 @classmethod
1459 def clear_instance_cache(cls):
1460 """
1461 Clear the cache of filesystem instances.
1463 Notes
1464 -----
1465 Unless overridden by setting the ``cachable`` class attribute to False,
1466 the filesystem class stores a reference to newly created instances. This
1467 prevents Python's normal rules around garbage collection from working,
1468 since the instances refcount will not drop to zero until
1469 ``clear_instance_cache`` is called.
1470 """
1471 cls._cache.clear()
1473 def created(self, path):
1474 """Return the created timestamp of a file as a datetime.datetime"""
1475 raise NotImplementedError
1477 def modified(self, path):
1478 """Return the modified timestamp of a file as a datetime.datetime"""
1479 raise NotImplementedError
1481 # ------------------------------------------------------------------------
1482 # Aliases
1484 def read_bytes(self, path, start=None, end=None, **kwargs):
1485 """Alias of `AbstractFileSystem.cat_file`."""
1486 return self.cat_file(path, start=start, end=end, **kwargs)
1488 def write_bytes(self, path, value, **kwargs):
1489 """Alias of `AbstractFileSystem.pipe_file`."""
1490 self.pipe_file(path, value, **kwargs)
1492 def makedir(self, path, create_parents=True, **kwargs):
1493 """Alias of `AbstractFileSystem.mkdir`."""
1494 return self.mkdir(path, create_parents=create_parents, **kwargs)
1496 def mkdirs(self, path, exist_ok=False):
1497 """Alias of `AbstractFileSystem.makedirs`."""
1498 return self.makedirs(path, exist_ok=exist_ok)
1500 def listdir(self, path, detail=True, **kwargs):
1501 """Alias of `AbstractFileSystem.ls`."""
1502 return self.ls(path, detail=detail, **kwargs)
1504 def cp(self, path1, path2, **kwargs):
1505 """Alias of `AbstractFileSystem.copy`."""
1506 return self.copy(path1, path2, **kwargs)
1508 def move(self, path1, path2, **kwargs):
1509 """Alias of `AbstractFileSystem.mv`."""
1510 return self.mv(path1, path2, **kwargs)
1512 def stat(self, path, **kwargs):
1513 """Alias of `AbstractFileSystem.info`."""
1514 return self.info(path, **kwargs)
1516 def disk_usage(self, path, total=True, maxdepth=None, **kwargs):
1517 """Alias of `AbstractFileSystem.du`."""
1518 return self.du(path, total=total, maxdepth=maxdepth, **kwargs)
1520 def rename(self, path1, path2, **kwargs):
1521 """Alias of `AbstractFileSystem.mv`."""
1522 return self.mv(path1, path2, **kwargs)
1524 def delete(self, path, recursive=False, maxdepth=None):
1525 """Alias of `AbstractFileSystem.rm`."""
1526 return self.rm(path, recursive=recursive, maxdepth=maxdepth)
1528 def upload(self, lpath, rpath, recursive=False, **kwargs):
1529 """Alias of `AbstractFileSystem.put`."""
1530 return self.put(lpath, rpath, recursive=recursive, **kwargs)
1532 def download(self, rpath, lpath, recursive=False, **kwargs):
1533 """Alias of `AbstractFileSystem.get`."""
1534 return self.get(rpath, lpath, recursive=recursive, **kwargs)
1536 def sign(self, path, expiration=100, **kwargs):
1537 """Create a signed URL representing the given path
1539 Some implementations allow temporary URLs to be generated, as a
1540 way of delegating credentials.
1542 Parameters
1543 ----------
1544 path : str
1545 The path on the filesystem
1546 expiration : int
1547 Number of seconds to enable the URL for (if supported)
1549 Returns
1550 -------
1551 URL : str
1552 The signed URL
1554 Raises
1555 ------
1556 NotImplementedError : if method is not implemented for a filesystem
1557 """
1558 raise NotImplementedError("Sign is not implemented for this filesystem")
1560 def _isfilestore(self):
1561 # Originally inherited from pyarrow DaskFileSystem. Keeping this
1562 # here for backwards compatibility as long as pyarrow uses its
1563 # legacy fsspec-compatible filesystems and thus accepts fsspec
1564 # filesystems as well
1565 return False
1568class AbstractBufferedFile(io.IOBase):
1569 """Convenient class to derive from to provide buffering
1571 In the case that the backend does not provide a pythonic file-like object
1572 already, this class contains much of the logic to build one. The only
1573 methods that need to be overridden are ``_upload_chunk``,
1574 ``_initiate_upload`` and ``_fetch_range``.
1575 """
1577 DEFAULT_BLOCK_SIZE = 5 * 2**20
1578 _details = None
1580 def __init__(
1581 self,
1582 fs,
1583 path,
1584 mode="rb",
1585 block_size="default",
1586 autocommit=True,
1587 cache_type="readahead",
1588 cache_options=None,
1589 size=None,
1590 **kwargs,
1591 ):
1592 """
1593 Template for files with buffered reading and writing
1595 Parameters
1596 ----------
1597 fs: instance of FileSystem
1598 path: str
1599 location in file-system
1600 mode: str
1601 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file
1602 systems may be read-only, and some may not support append.
1603 block_size: int
1604 Buffer size for reading or writing, 'default' for class default
1605 autocommit: bool
1606 Whether to write to final destination; may only impact what
1607 happens when file is being closed.
1608 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead"
1609 Caching policy in read mode. See the definitions in ``core``.
1610 cache_options : dict
1611 Additional options passed to the constructor for the cache specified
1612 by `cache_type`.
1613 size: int
1614 If given and in read mode, suppressed having to look up the file size
1615 kwargs:
1616 Gets stored as self.kwargs
1617 """
1618 from .core import caches
1620 self.path = path
1621 self.fs = fs
1622 self.mode = mode
1623 self.blocksize = (
1624 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
1625 )
1626 self.loc = 0
1627 self.autocommit = autocommit
1628 self.end = None
1629 self.start = None
1630 self.closed = False
1632 if cache_options is None:
1633 cache_options = {}
1635 if "trim" in kwargs:
1636 warnings.warn(
1637 "Passing 'trim' to control the cache behavior has been deprecated. "
1638 "Specify it within the 'cache_options' argument instead.",
1639 FutureWarning,
1640 )
1641 cache_options["trim"] = kwargs.pop("trim")
1643 self.kwargs = kwargs
1645 if mode not in {"ab", "rb", "wb"}:
1646 raise NotImplementedError("File mode not supported")
1647 if mode == "rb":
1648 if size is not None:
1649 self.size = size
1650 else:
1651 self.size = self.details["size"]
1652 self.cache = caches[cache_type](
1653 self.blocksize, self._fetch_range, self.size, **cache_options
1654 )
1655 else:
1656 self.buffer = io.BytesIO()
1657 self.offset = None
1658 self.forced = False
1659 self.location = None
1661 @property
1662 def details(self):
1663 if self._details is None:
1664 self._details = self.fs.info(self.path)
1665 return self._details
1667 @details.setter
1668 def details(self, value):
1669 self._details = value
1670 self.size = value["size"]
1672 @property
1673 def full_name(self):
1674 return _unstrip_protocol(self.path, self.fs)
1676 @property
1677 def closed(self):
1678 # get around this attr being read-only in IOBase
1679 # use getattr here, since this can be called during del
1680 return getattr(self, "_closed", True)
1682 @closed.setter
1683 def closed(self, c):
1684 self._closed = c
1686 def __hash__(self):
1687 if "w" in self.mode:
1688 return id(self)
1689 else:
1690 return int(tokenize(self.details), 16)
1692 def __eq__(self, other):
1693 """Files are equal if they have the same checksum, only in read mode"""
1694 return self.mode == "rb" and other.mode == "rb" and hash(self) == hash(other)
1696 def commit(self):
1697 """Move from temp to final destination"""
1699 def discard(self):
1700 """Throw away temporary file"""
1702 def info(self):
1703 """File information about this path"""
1704 if "r" in self.mode:
1705 return self.details
1706 else:
1707 raise ValueError("Info not available while writing")
1709 def tell(self):
1710 """Current file location"""
1711 return self.loc
1713 def seek(self, loc, whence=0):
1714 """Set current file location
1716 Parameters
1717 ----------
1718 loc: int
1719 byte location
1720 whence: {0, 1, 2}
1721 from start of file, current location or end of file, resp.
1722 """
1723 loc = int(loc)
1724 if not self.mode == "rb":
1725 raise OSError(ESPIPE, "Seek only available in read mode")
1726 if whence == 0:
1727 nloc = loc
1728 elif whence == 1:
1729 nloc = self.loc + loc
1730 elif whence == 2:
1731 nloc = self.size + loc
1732 else:
1733 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
1734 if nloc < 0:
1735 raise ValueError("Seek before start of file")
1736 self.loc = nloc
1737 return self.loc
1739 def write(self, data):
1740 """
1741 Write data to buffer.
1743 Buffer only sent on flush() or if buffer is greater than
1744 or equal to blocksize.
1746 Parameters
1747 ----------
1748 data: bytes
1749 Set of bytes to be written.
1750 """
1751 if self.mode not in {"wb", "ab"}:
1752 raise ValueError("File not in write mode")
1753 if self.closed:
1754 raise ValueError("I/O operation on closed file.")
1755 if self.forced:
1756 raise ValueError("This file has been force-flushed, can only close")
1757 out = self.buffer.write(data)
1758 self.loc += out
1759 if self.buffer.tell() >= self.blocksize:
1760 self.flush()
1761 return out
1763 def flush(self, force=False):
1764 """
1765 Write buffered data to backend store.
1767 Writes the current buffer, if it is larger than the block-size, or if
1768 the file is being closed.
1770 Parameters
1771 ----------
1772 force: bool
1773 When closing, write the last block even if it is smaller than
1774 blocks are allowed to be. Disallows further writing to this file.
1775 """
1777 if self.closed:
1778 raise ValueError("Flush on closed file")
1779 if force and self.forced:
1780 raise ValueError("Force flush cannot be called more than once")
1781 if force:
1782 self.forced = True
1784 if self.mode not in {"wb", "ab"}:
1785 # no-op to flush on read-mode
1786 return
1788 if not force and self.buffer.tell() < self.blocksize:
1789 # Defer write on small block
1790 return
1792 if self.offset is None:
1793 # Initialize a multipart upload
1794 self.offset = 0
1795 try:
1796 self._initiate_upload()
1797 except: # noqa: E722
1798 self.closed = True
1799 raise
1801 if self._upload_chunk(final=force) is not False:
1802 self.offset += self.buffer.seek(0, 2)
1803 self.buffer = io.BytesIO()
1805 def _upload_chunk(self, final=False):
1806 """Write one part of a multi-block file upload
1808 Parameters
1809 ==========
1810 final: bool
1811 This is the last block, so should complete file, if
1812 self.autocommit is True.
1813 """
1814 # may not yet have been initialized, may need to call _initialize_upload
1816 def _initiate_upload(self):
1817 """Create remote file/upload"""
1818 pass
1820 def _fetch_range(self, start, end):
1821 """Get the specified set of bytes from remote"""
1822 raise NotImplementedError
1824 def read(self, length=-1):
1825 """
1826 Return data from cache, or fetch pieces as necessary
1828 Parameters
1829 ----------
1830 length: int (-1)
1831 Number of bytes to read; if <0, all remaining bytes.
1832 """
1833 length = -1 if length is None else int(length)
1834 if self.mode != "rb":
1835 raise ValueError("File not in read mode")
1836 if length < 0:
1837 length = self.size - self.loc
1838 if self.closed:
1839 raise ValueError("I/O operation on closed file.")
1840 logger.debug("%s read: %i - %i", self, self.loc, self.loc + length)
1841 if length == 0:
1842 # don't even bother calling fetch
1843 return b""
1844 out = self.cache._fetch(self.loc, self.loc + length)
1845 self.loc += len(out)
1846 return out
1848 def readinto(self, b):
1849 """mirrors builtin file's readinto method
1851 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
1852 """
1853 out = memoryview(b).cast("B")
1854 data = self.read(out.nbytes)
1855 out[: len(data)] = data
1856 return len(data)
1858 def readuntil(self, char=b"\n", blocks=None):
1859 """Return data between current position and first occurrence of char
1861 char is included in the output, except if the end of the tile is
1862 encountered first.
1864 Parameters
1865 ----------
1866 char: bytes
1867 Thing to find
1868 blocks: None or int
1869 How much to read in each go. Defaults to file blocksize - which may
1870 mean a new read on every call.
1871 """
1872 out = []
1873 while True:
1874 start = self.tell()
1875 part = self.read(blocks or self.blocksize)
1876 if len(part) == 0:
1877 break
1878 found = part.find(char)
1879 if found > -1:
1880 out.append(part[: found + len(char)])
1881 self.seek(start + found + len(char))
1882 break
1883 out.append(part)
1884 return b"".join(out)
1886 def readline(self):
1887 """Read until first occurrence of newline character
1889 Note that, because of character encoding, this is not necessarily a
1890 true line ending.
1891 """
1892 return self.readuntil(b"\n")
1894 def __next__(self):
1895 out = self.readline()
1896 if out:
1897 return out
1898 raise StopIteration
1900 def __iter__(self):
1901 return self
1903 def readlines(self):
1904 """Return all data, split by the newline character"""
1905 data = self.read()
1906 lines = data.split(b"\n")
1907 out = [l + b"\n" for l in lines[:-1]]
1908 if data.endswith(b"\n"):
1909 return out
1910 else:
1911 return out + [lines[-1]]
1912 # return list(self) ???
1914 def readinto1(self, b):
1915 return self.readinto(b)
1917 def close(self):
1918 """Close file
1920 Finalizes writes, discards cache
1921 """
1922 if getattr(self, "_unclosable", False):
1923 return
1924 if self.closed:
1925 return
1926 if self.mode == "rb":
1927 self.cache = None
1928 else:
1929 if not self.forced:
1930 self.flush(force=True)
1932 if self.fs is not None:
1933 self.fs.invalidate_cache(self.path)
1934 self.fs.invalidate_cache(self.fs._parent(self.path))
1936 self.closed = True
1938 def readable(self):
1939 """Whether opened for reading"""
1940 return self.mode == "rb" and not self.closed
1942 def seekable(self):
1943 """Whether is seekable (only in read mode)"""
1944 return self.readable()
1946 def writable(self):
1947 """Whether opened for writing"""
1948 return self.mode in {"wb", "ab"} and not self.closed
1950 def __del__(self):
1951 if not self.closed:
1952 self.close()
1954 def __str__(self):
1955 return f"<File-like object {type(self.fs).__name__}, {self.path}>"
1957 __repr__ = __str__
1959 def __enter__(self):
1960 return self
1962 def __exit__(self, *args):
1963 self.close()