Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/spec.py: 25%
781 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1from __future__ import annotations
3import io
4import logging
5import os
6import threading
7import warnings
8import weakref
9from errno import ESPIPE
10from glob import has_magic
11from hashlib import sha256
12from typing import ClassVar
14from .callbacks import _DEFAULT_CALLBACK
15from .config import apply_config, conf
16from .dircache import DirCache
17from .transaction import Transaction
18from .utils import (
19 _unstrip_protocol,
20 isfilelike,
21 other_paths,
22 read_block,
23 stringify_path,
24 tokenize,
25)
27logger = logging.getLogger("fsspec")
30def make_instance(cls, args, kwargs):
31 return cls(*args, **kwargs)
34class _Cached(type):
35 """
36 Metaclass for caching file system instances.
38 Notes
39 -----
40 Instances are cached according to
42 * The values of the class attributes listed in `_extra_tokenize_attributes`
43 * The arguments passed to ``__init__``.
45 This creates an additional reference to the filesystem, which prevents the
46 filesystem from being garbage collected when all *user* references go away.
47 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
48 be made for a filesystem instance to be garbage collected.
49 """
51 def __init__(cls, *args, **kwargs):
52 super().__init__(*args, **kwargs)
53 # Note: we intentionally create a reference here, to avoid garbage
54 # collecting instances when all other references are gone. To really
55 # delete a FileSystem, the cache must be cleared.
56 if conf.get("weakref_instance_cache"): # pragma: no cover
57 # debug option for analysing fork/spawn conditions
58 cls._cache = weakref.WeakValueDictionary()
59 else:
60 cls._cache = {}
61 cls._pid = os.getpid()
63 def __call__(cls, *args, **kwargs):
64 kwargs = apply_config(cls, kwargs)
65 extra_tokens = tuple(
66 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
67 )
68 token = tokenize(
69 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
70 )
71 skip = kwargs.pop("skip_instance_cache", False)
72 if os.getpid() != cls._pid:
73 cls._cache.clear()
74 cls._pid = os.getpid()
75 if not skip and cls.cachable and token in cls._cache:
76 cls._latest = token
77 return cls._cache[token]
78 else:
79 obj = super().__call__(*args, **kwargs)
80 # Setting _fs_token here causes some static linters to complain.
81 obj._fs_token_ = token
82 obj.storage_args = args
83 obj.storage_options = kwargs
84 if obj.async_impl and obj.mirror_sync_methods:
85 from .asyn import mirror_sync_methods
87 mirror_sync_methods(obj)
89 if cls.cachable and not skip:
90 cls._latest = token
91 cls._cache[token] = obj
92 return obj
95class AbstractFileSystem(metaclass=_Cached):
96 """
97 An abstract super-class for pythonic file-systems
99 Implementations are expected to be compatible with or, better, subclass
100 from here.
101 """
103 cachable = True # this class can be cached, instances reused
104 _cached = False
105 blocksize = 2**22
106 sep = "/"
107 protocol: ClassVar[str | tuple[str, ...]] = "abstract"
108 _latest = None
109 async_impl = False
110 mirror_sync_methods = False
111 root_marker = "" # For some FSs, may require leading '/' or other character
113 #: Extra *class attributes* that should be considered when hashing.
114 _extra_tokenize_attributes = ()
116 def __init__(self, *args, **storage_options):
117 """Create and configure file-system instance
119 Instances may be cachable, so if similar enough arguments are seen
120 a new instance is not required. The token attribute exists to allow
121 implementations to cache instances if they wish.
123 A reasonable default should be provided if there are no arguments.
125 Subclasses should call this method.
127 Parameters
128 ----------
129 use_listings_cache, listings_expiry_time, max_paths:
130 passed to ``DirCache``, if the implementation supports
131 directory listing caching. Pass use_listings_cache=False
132 to disable such caching.
133 skip_instance_cache: bool
134 If this is a cachable implementation, pass True here to force
135 creating a new instance even if a matching instance exists, and prevent
136 storing this instance.
137 asynchronous: bool
138 loop: asyncio-compatible IOLoop or None
139 """
140 if self._cached:
141 # reusing instance, don't change
142 return
143 self._cached = True
144 self._intrans = False
145 self._transaction = None
146 self._invalidated_caches_in_transaction = []
147 self.dircache = DirCache(**storage_options)
149 if storage_options.pop("add_docs", None):
150 warnings.warn("add_docs is no longer supported.", FutureWarning)
152 if storage_options.pop("add_aliases", None):
153 warnings.warn("add_aliases has been removed.", FutureWarning)
154 # This is set in _Cached
155 self._fs_token_ = None
157 @property
158 def fsid(self):
159 """Persistent filesystem id that can be used to compare filesystems
160 across sessions.
161 """
162 raise NotImplementedError
164 @property
165 def _fs_token(self):
166 return self._fs_token_
168 def __dask_tokenize__(self):
169 return self._fs_token
171 def __hash__(self):
172 return int(self._fs_token, 16)
174 def __eq__(self, other):
175 return isinstance(other, type(self)) and self._fs_token == other._fs_token
177 def __reduce__(self):
178 return make_instance, (type(self), self.storage_args, self.storage_options)
180 @classmethod
181 def _strip_protocol(cls, path):
182 """Turn path from fully-qualified to file-system-specific
184 May require FS-specific handling, e.g., for relative paths or links.
185 """
186 if isinstance(path, list):
187 return [cls._strip_protocol(p) for p in path]
188 path = stringify_path(path)
189 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
190 for protocol in protos:
191 if path.startswith(protocol + "://"):
192 path = path[len(protocol) + 3 :]
193 elif path.startswith(protocol + "::"):
194 path = path[len(protocol) + 2 :]
195 path = path.rstrip("/")
196 # use of root_marker to make minimum required path, e.g., "/"
197 return path or cls.root_marker
199 def unstrip_protocol(self, name):
200 """Format FS-specific path to generic, including protocol"""
201 protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol
202 for protocol in protos:
203 if name.startswith(f"{protocol}://"):
204 return name
205 return f"{protos[0]}://{name}"
207 @staticmethod
208 def _get_kwargs_from_urls(path):
209 """If kwargs can be encoded in the paths, extract them here
211 This should happen before instantiation of the class; incoming paths
212 then should be amended to strip the options in methods.
214 Examples may look like an sftp path "sftp://user@host:/my/path", where
215 the user and host should become kwargs and later get stripped.
216 """
217 # by default, nothing happens
218 return {}
220 @classmethod
221 def current(cls):
222 """Return the most recently instantiated FileSystem
224 If no instance has been created, then create one with defaults
225 """
226 if cls._latest in cls._cache:
227 return cls._cache[cls._latest]
228 return cls()
230 @property
231 def transaction(self):
232 """A context within which files are committed together upon exit
234 Requires the file class to implement `.commit()` and `.discard()`
235 for the normal and exception cases.
236 """
237 if self._transaction is None:
238 self._transaction = Transaction(self)
239 return self._transaction
241 def start_transaction(self):
242 """Begin write transaction for deferring files, non-context version"""
243 self._intrans = True
244 self._transaction = Transaction(self)
245 return self.transaction
247 def end_transaction(self):
248 """Finish write transaction, non-context version"""
249 self.transaction.complete()
250 self._transaction = None
251 # The invalid cache must be cleared after the transcation is completed.
252 for path in self._invalidated_caches_in_transaction:
253 self.invalidate_cache(path)
254 self._invalidated_caches_in_transaction.clear()
256 def invalidate_cache(self, path=None):
257 """
258 Discard any cached directory information
260 Parameters
261 ----------
262 path: string or None
263 If None, clear all listings cached else listings at or under given
264 path.
265 """
266 # Not necessary to implement invalidation mechanism, may have no cache.
267 # But if have, you should call this method of parent class from your
268 # subclass to ensure expiring caches after transacations correctly.
269 # See the implementation of FTPFileSystem in ftp.py
270 if self._intrans:
271 self._invalidated_caches_in_transaction.append(path)
273 def mkdir(self, path, create_parents=True, **kwargs):
274 """
275 Create directory entry at path
277 For systems that don't have true directories, may create an for
278 this instance only and not touch the real filesystem
280 Parameters
281 ----------
282 path: str
283 location
284 create_parents: bool
285 if True, this is equivalent to ``makedirs``
286 kwargs:
287 may be permissions, etc.
288 """
289 pass # not necessary to implement, may not have directories
291 def makedirs(self, path, exist_ok=False):
292 """Recursively make directories
294 Creates directory at path and any intervening required directories.
295 Raises exception if, for instance, the path already exists but is a
296 file.
298 Parameters
299 ----------
300 path: str
301 leaf directory name
302 exist_ok: bool (False)
303 If False, will error if the target already exists
304 """
305 pass # not necessary to implement, may not have directories
307 def rmdir(self, path):
308 """Remove a directory, if empty"""
309 pass # not necessary to implement, may not have directories
311 def ls(self, path, detail=True, **kwargs):
312 """List objects at path.
314 This should include subdirectories and files at that location. The
315 difference between a file and a directory must be clear when details
316 are requested.
318 The specific keys, or perhaps a FileInfo class, or similar, is TBD,
319 but must be consistent across implementations.
320 Must include:
322 - full path to the entry (without protocol)
323 - size of the entry, in bytes. If the value cannot be determined, will
324 be ``None``.
325 - type of entry, "file", "directory" or other
327 Additional information
328 may be present, appropriate to the file-system, e.g., generation,
329 checksum, etc.
331 May use refresh=True|False to allow use of self._ls_from_cache to
332 check for a saved listing and avoid calling the backend. This would be
333 common where listing may be expensive.
335 Parameters
336 ----------
337 path: str
338 detail: bool
339 if True, gives a list of dictionaries, where each is the same as
340 the result of ``info(path)``. If False, gives a list of paths
341 (str).
342 kwargs: may have additional backend-specific options, such as version
343 information
345 Returns
346 -------
347 List of strings if detail is False, or list of directory information
348 dicts if detail is True.
349 """
350 raise NotImplementedError
352 def _ls_from_cache(self, path):
353 """Check cache for listing
355 Returns listing, if found (may be empty list for a directly that exists
356 but contains nothing), None if not in cache.
357 """
358 parent = self._parent(path)
359 if path.rstrip("/") in self.dircache:
360 return self.dircache[path.rstrip("/")]
361 try:
362 files = [
363 f
364 for f in self.dircache[parent]
365 if f["name"] == path
366 or (f["name"] == path.rstrip("/") and f["type"] == "directory")
367 ]
368 if len(files) == 0:
369 # parent dir was listed but did not contain this file
370 raise FileNotFoundError(path)
371 return files
372 except KeyError:
373 pass
375 def walk(self, path, maxdepth=None, topdown=True, **kwargs):
376 """Return all files belows path
378 List all files, recursing into subdirectories; output is iterator-style,
379 like ``os.walk()``. For a simple list of files, ``find()`` is available.
381 When topdown is True, the caller can modify the dirnames list in-place (perhaps
382 using del or slice assignment), and walk() will
383 only recurse into the subdirectories whose names remain in dirnames;
384 this can be used to prune the search, impose a specific order of visiting,
385 or even to inform walk() about directories the caller creates or renames before
386 it resumes walk() again.
387 Modifying dirnames when topdown is False has no effect. (see os.walk)
389 Note that the "files" outputted will include anything that is not
390 a directory, such as links.
392 Parameters
393 ----------
394 path: str
395 Root to recurse into
396 maxdepth: int
397 Maximum recursion depth. None means limitless, but not recommended
398 on link-based file-systems.
399 topdown: bool (True)
400 Whether to walk the directory tree from the top downwards or from
401 the bottom upwards.
402 kwargs: passed to ``ls``
403 """
404 if maxdepth is not None and maxdepth < 1:
405 raise ValueError("maxdepth must be at least 1")
407 path = self._strip_protocol(path)
408 full_dirs = {}
409 dirs = {}
410 files = {}
412 detail = kwargs.pop("detail", False)
413 try:
414 listing = self.ls(path, detail=True, **kwargs)
415 except (FileNotFoundError, OSError):
416 if detail:
417 return path, {}, {}
418 return path, [], []
420 for info in listing:
421 # each info name must be at least [path]/part , but here
422 # we check also for names like [path]/part/
423 pathname = info["name"].rstrip("/")
424 name = pathname.rsplit("/", 1)[-1]
425 if info["type"] == "directory" and pathname != path:
426 # do not include "self" path
427 full_dirs[name] = pathname
428 dirs[name] = info
429 elif pathname == path:
430 # file-like with same name as give path
431 files[""] = info
432 else:
433 files[name] = info
435 if not detail:
436 dirs = list(dirs)
437 files = list(files)
439 if topdown:
440 # Yield before recursion if walking top down
441 yield path, dirs, files
443 if maxdepth is not None:
444 maxdepth -= 1
445 if maxdepth < 1:
446 if not topdown:
447 yield path, dirs, files
448 return
450 for d in dirs:
451 yield from self.walk(
452 full_dirs[d],
453 maxdepth=maxdepth,
454 detail=detail,
455 topdown=topdown,
456 **kwargs,
457 )
459 if not topdown:
460 # Yield after recursion if walking bottom up
461 yield path, dirs, files
463 def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
464 """List all files below path.
466 Like posix ``find`` command without conditions
468 Parameters
469 ----------
470 path : str
471 maxdepth: int or None
472 If not None, the maximum number of levels to descend
473 withdirs: bool
474 Whether to include directory paths in the output. This is True
475 when used by glob, but users usually only want files.
476 kwargs are passed to ``ls``.
477 """
478 # TODO: allow equivalent of -name parameter
479 path = self._strip_protocol(path)
480 out = dict()
481 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
482 if withdirs:
483 files.update(dirs)
484 out.update({info["name"]: info for name, info in files.items()})
485 if not out and self.isfile(path):
486 # walk works on directories, but find should also return [path]
487 # when path happens to be a file
488 out[path] = {}
489 names = sorted(out)
490 if not detail:
491 return names
492 else:
493 return {name: out[name] for name in names}
495 def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs):
496 """Space used by files and optionally directories within a path
498 Directory size does not include the size of its contents.
500 Parameters
501 ----------
502 path: str
503 total: bool
504 Whether to sum all the file sizes
505 maxdepth: int or None
506 Maximum number of directory levels to descend, None for unlimited.
507 withdirs: bool
508 Whether to include directory paths in the output.
509 kwargs: passed to ``find``
511 Returns
512 -------
513 Dict of {path: size} if total=False, or int otherwise, where numbers
514 refer to bytes used.
515 """
516 sizes = {}
517 if withdirs and self.isdir(path):
518 # Include top-level directory in output
519 info = self.info(path)
520 sizes[info["name"]] = info["size"]
521 for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs):
522 info = self.info(f)
523 sizes[info["name"]] = info["size"]
524 if total:
525 return sum(sizes.values())
526 else:
527 return sizes
529 def glob(self, path, **kwargs):
530 """
531 Find files by glob-matching.
533 If the path ends with '/' and does not contain "*", it is essentially
534 the same as ``ls(path)``, returning only files.
536 We support ``"**"``,
537 ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation.
539 Search path names that contain embedded characters special to this
540 implementation of glob may not produce expected results;
541 e.g., 'foo/bar/*starredfilename*'.
543 kwargs are passed to ``ls``.
544 """
545 import re
547 ends = path.endswith("/")
548 path = self._strip_protocol(path)
549 indstar = path.find("*") if path.find("*") >= 0 else len(path)
550 indques = path.find("?") if path.find("?") >= 0 else len(path)
551 indbrace = path.find("[") if path.find("[") >= 0 else len(path)
553 ind = min(indstar, indques, indbrace)
555 detail = kwargs.pop("detail", False)
557 if not has_magic(path):
558 root = path
559 depth = 1
560 if ends:
561 path += "/*"
562 elif self.exists(path):
563 if not detail:
564 return [path]
565 else:
566 return {path: self.info(path)}
567 else:
568 if not detail:
569 return [] # glob of non-existent returns empty
570 else:
571 return {}
572 elif "/" in path[:ind]:
573 ind2 = path[:ind].rindex("/")
574 root = path[: ind2 + 1]
575 depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1
576 else:
577 root = ""
578 depth = None if "**" in path else path[ind + 1 :].count("/") + 1
580 allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
581 # Escape characters special to python regex, leaving our supported
582 # special characters in place.
583 # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html
584 # for shell globbing details.
585 pattern = (
586 "^"
587 + (
588 path.replace("\\", r"\\")
589 .replace(".", r"\.")
590 .replace("+", r"\+")
591 .replace("//", "/")
592 .replace("(", r"\(")
593 .replace(")", r"\)")
594 .replace("|", r"\|")
595 .replace("^", r"\^")
596 .replace("$", r"\$")
597 .replace("{", r"\{")
598 .replace("}", r"\}")
599 .rstrip("/")
600 .replace("?", ".")
601 )
602 + "$"
603 )
604 pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern)
605 pattern = re.sub("[*]", "[^/]*", pattern)
606 pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*"))
607 out = {
608 p: allpaths[p]
609 for p in sorted(allpaths)
610 if pattern.match(p.replace("//", "/").rstrip("/"))
611 }
612 if detail:
613 return out
614 else:
615 return list(out)
617 def exists(self, path, **kwargs):
618 """Is there a file at the given path"""
619 try:
620 self.info(path, **kwargs)
621 return True
622 except: # noqa: E722
623 # any exception allowed bar FileNotFoundError?
624 return False
626 def lexists(self, path, **kwargs):
627 """If there is a file at the given path (including
628 broken links)"""
629 return self.exists(path)
631 def info(self, path, **kwargs):
632 """Give details of entry at path
634 Returns a single dictionary, with exactly the same information as ``ls``
635 would with ``detail=True``.
637 The default implementation should calls ls and could be overridden by a
638 shortcut. kwargs are passed on to ```ls()``.
640 Some file systems might not be able to measure the file's size, in
641 which case, the returned dict will include ``'size': None``.
643 Returns
644 -------
645 dict with keys: name (full path in the FS), size (in bytes), type (file,
646 directory, or something else) and other FS-specific keys.
647 """
648 path = self._strip_protocol(path)
649 out = self.ls(self._parent(path), detail=True, **kwargs)
650 out = [o for o in out if o["name"].rstrip("/") == path]
651 if out:
652 return out[0]
653 out = self.ls(path, detail=True, **kwargs)
654 path = path.rstrip("/")
655 out1 = [o for o in out if o["name"].rstrip("/") == path]
656 if len(out1) == 1:
657 if "size" not in out1[0]:
658 out1[0]["size"] = None
659 return out1[0]
660 elif len(out1) > 1 or out:
661 return {"name": path, "size": 0, "type": "directory"}
662 else:
663 raise FileNotFoundError(path)
665 def checksum(self, path):
666 """Unique value for current version of file
668 If the checksum is the same from one moment to another, the contents
669 are guaranteed to be the same. If the checksum changes, the contents
670 *might* have changed.
672 This should normally be overridden; default will probably capture
673 creation/modification timestamp (which would be good) or maybe
674 access timestamp (which would be bad)
675 """
676 return int(tokenize(self.info(path)), 16)
678 def size(self, path):
679 """Size in bytes of file"""
680 return self.info(path).get("size", None)
682 def sizes(self, paths):
683 """Size in bytes of each file in a list of paths"""
684 return [self.size(p) for p in paths]
686 def isdir(self, path):
687 """Is this entry directory-like?"""
688 try:
689 return self.info(path)["type"] == "directory"
690 except OSError:
691 return False
693 def isfile(self, path):
694 """Is this entry file-like?"""
695 try:
696 return self.info(path)["type"] == "file"
697 except: # noqa: E722
698 return False
700 def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs):
701 """Get the contents of the file as a string.
703 Parameters
704 ----------
705 path: str
706 URL of file on this filesystems
707 encoding, errors, newline: same as `open`.
708 """
709 with self.open(
710 path,
711 mode="r",
712 encoding=encoding,
713 errors=errors,
714 newline=newline,
715 **kwargs,
716 ) as f:
717 return f.read()
719 def write_text(
720 self, path, value, encoding=None, errors=None, newline=None, **kwargs
721 ):
722 """Write the text to the given file.
724 An existing file will be overwritten.
726 Parameters
727 ----------
728 path: str
729 URL of file on this filesystems
730 value: str
731 Text to write.
732 encoding, errors, newline: same as `open`.
733 """
734 with self.open(
735 path,
736 mode="w",
737 encoding=encoding,
738 errors=errors,
739 newline=newline,
740 **kwargs,
741 ) as f:
742 return f.write(value)
744 def cat_file(self, path, start=None, end=None, **kwargs):
745 """Get the content of a file
747 Parameters
748 ----------
749 path: URL of file on this filesystems
750 start, end: int
751 Bytes limits of the read. If negative, backwards from end,
752 like usual python slices. Either can be None for start or
753 end of file, respectively
754 kwargs: passed to ``open()``.
755 """
756 # explicitly set buffering off?
757 with self.open(path, "rb", **kwargs) as f:
758 if start is not None:
759 if start >= 0:
760 f.seek(start)
761 else:
762 f.seek(max(0, f.size + start))
763 if end is not None:
764 if end < 0:
765 end = f.size + end
766 return f.read(end - f.tell())
767 return f.read()
769 def pipe_file(self, path, value, **kwargs):
770 """Set the bytes of given file"""
771 with self.open(path, "wb", **kwargs) as f:
772 f.write(value)
774 def pipe(self, path, value=None, **kwargs):
775 """Put value into path
777 (counterpart to ``cat``)
779 Parameters
780 ----------
781 path: string or dict(str, bytes)
782 If a string, a single remote location to put ``value`` bytes; if a dict,
783 a mapping of {path: bytesvalue}.
784 value: bytes, optional
785 If using a single path, these are the bytes to put there. Ignored if
786 ``path`` is a dict
787 """
788 if isinstance(path, str):
789 self.pipe_file(self._strip_protocol(path), value, **kwargs)
790 elif isinstance(path, dict):
791 for k, v in path.items():
792 self.pipe_file(self._strip_protocol(k), v, **kwargs)
793 else:
794 raise ValueError("path must be str or dict")
796 def cat_ranges(
797 self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
798 ):
799 if max_gap is not None:
800 raise NotImplementedError
801 if not isinstance(paths, list):
802 raise TypeError
803 if not isinstance(starts, list):
804 starts = [starts] * len(paths)
805 if not isinstance(ends, list):
806 ends = [starts] * len(paths)
807 if len(starts) != len(paths) or len(ends) != len(paths):
808 raise ValueError
809 out = []
810 for p, s, e in zip(paths, starts, ends):
811 try:
812 out.append(self.cat_file(p, s, e))
813 except Exception as e:
814 if on_error == "return":
815 out.append(e)
816 else:
817 raise
818 return out
820 def cat(self, path, recursive=False, on_error="raise", **kwargs):
821 """Fetch (potentially multiple) paths' contents
823 Parameters
824 ----------
825 recursive: bool
826 If True, assume the path(s) are directories, and get all the
827 contained files
828 on_error : "raise", "omit", "return"
829 If raise, an underlying exception will be raised (converted to KeyError
830 if the type is in self.missing_exceptions); if omit, keys with exception
831 will simply not be included in the output; if "return", all keys are
832 included in the output, but the value will be bytes or an exception
833 instance.
834 kwargs: passed to cat_file
836 Returns
837 -------
838 dict of {path: contents} if there are multiple paths
839 or the path has been otherwise expanded
840 """
841 paths = self.expand_path(path, recursive=recursive)
842 if (
843 len(paths) > 1
844 or isinstance(path, list)
845 or paths[0] != self._strip_protocol(path)
846 ):
847 out = {}
848 for path in paths:
849 try:
850 out[path] = self.cat_file(path, **kwargs)
851 except Exception as e:
852 if on_error == "raise":
853 raise
854 if on_error == "return":
855 out[path] = e
856 return out
857 else:
858 return self.cat_file(paths[0], **kwargs)
860 def get_file(
861 self, rpath, lpath, callback=_DEFAULT_CALLBACK, outfile=None, **kwargs
862 ):
863 """Copy single remote file to local"""
864 from .implementations.local import LocalFileSystem
866 if isfilelike(lpath):
867 outfile = lpath
868 elif self.isdir(rpath):
869 os.makedirs(lpath, exist_ok=True)
870 return None
872 LocalFileSystem(auto_mkdir=True).makedirs(self._parent(lpath), exist_ok=True)
874 with self.open(rpath, "rb", **kwargs) as f1:
875 if outfile is None:
876 outfile = open(lpath, "wb")
878 try:
879 callback.set_size(getattr(f1, "size", None))
880 data = True
881 while data:
882 data = f1.read(self.blocksize)
883 segment_len = outfile.write(data)
884 if segment_len is None:
885 segment_len = len(data)
886 callback.relative_update(segment_len)
887 finally:
888 if not isfilelike(lpath):
889 outfile.close()
891 def get(
892 self,
893 rpath,
894 lpath,
895 recursive=False,
896 callback=_DEFAULT_CALLBACK,
897 maxdepth=None,
898 **kwargs,
899 ):
900 """Copy file(s) to local.
902 Copies a specific file or tree of files (if recursive=True). If lpath
903 ends with a "/", it will be assumed to be a directory, and target files
904 will go within. Can submit a list of paths, which may be glob-patterns
905 and will be expanded.
907 Calls get_file for each source.
908 """
909 from .implementations.local import (
910 LocalFileSystem,
911 make_path_posix,
912 trailing_sep,
913 trailing_sep_maybe_asterisk,
914 )
916 source_is_str = isinstance(rpath, str)
917 rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth)
918 if source_is_str and (not recursive or maxdepth is not None):
919 # Non-recursive glob does not copy directories
920 rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))]
921 if not rpaths:
922 return
924 if isinstance(lpath, str):
925 lpath = make_path_posix(lpath)
926 isdir = isinstance(lpath, str) and (
927 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
928 )
929 lpaths = other_paths(
930 rpaths,
931 lpath,
932 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(rpath),
933 is_dir=isdir,
934 flatten=not source_is_str,
935 )
937 callback.set_size(len(lpaths))
938 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
939 callback.branch(rpath, lpath, kwargs)
940 self.get_file(rpath, lpath, **kwargs)
942 def put_file(self, lpath, rpath, callback=_DEFAULT_CALLBACK, **kwargs):
943 """Copy single file to remote"""
944 if os.path.isdir(lpath):
945 self.makedirs(rpath, exist_ok=True)
946 return None
948 with open(lpath, "rb") as f1:
949 size = f1.seek(0, 2)
950 callback.set_size(size)
951 f1.seek(0)
953 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True)
954 with self.open(rpath, "wb", **kwargs) as f2:
955 while f1.tell() < size:
956 data = f1.read(self.blocksize)
957 segment_len = f2.write(data)
958 if segment_len is None:
959 segment_len = len(data)
960 callback.relative_update(segment_len)
962 def put(
963 self,
964 lpath,
965 rpath,
966 recursive=False,
967 callback=_DEFAULT_CALLBACK,
968 maxdepth=None,
969 **kwargs,
970 ):
971 """Copy file(s) from local.
973 Copies a specific file or tree of files (if recursive=True). If rpath
974 ends with a "/", it will be assumed to be a directory, and target files
975 will go within.
977 Calls put_file for each source.
978 """
979 from .implementations.local import (
980 LocalFileSystem,
981 make_path_posix,
982 trailing_sep,
983 trailing_sep_maybe_asterisk,
984 )
986 source_is_str = isinstance(lpath, str)
987 if source_is_str:
988 lpath = make_path_posix(lpath)
989 fs = LocalFileSystem()
990 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
991 if source_is_str and (not recursive or maxdepth is not None):
992 # Non-recursive glob does not copy directories
993 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
994 if not lpaths:
995 return
997 isdir = isinstance(rpath, str) and (trailing_sep(rpath) or self.isdir(rpath))
998 rpath = (
999 self._strip_protocol(rpath)
1000 if isinstance(rpath, str)
1001 else [self._strip_protocol(p) for p in rpath]
1002 )
1003 rpaths = other_paths(
1004 lpaths,
1005 rpath,
1006 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(lpath),
1007 is_dir=isdir,
1008 flatten=not source_is_str,
1009 )
1011 callback.set_size(len(rpaths))
1012 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1013 callback.branch(lpath, rpath, kwargs)
1014 self.put_file(lpath, rpath, **kwargs)
1016 def head(self, path, size=1024):
1017 """Get the first ``size`` bytes from file"""
1018 with self.open(path, "rb") as f:
1019 return f.read(size)
1021 def tail(self, path, size=1024):
1022 """Get the last ``size`` bytes from file"""
1023 with self.open(path, "rb") as f:
1024 f.seek(max(-size, -f.size), 2)
1025 return f.read()
1027 def cp_file(self, path1, path2, **kwargs):
1028 raise NotImplementedError
1030 def copy(
1031 self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs
1032 ):
1033 """Copy within two locations in the filesystem
1035 on_error : "raise", "ignore"
1036 If raise, any not-found exceptions will be raised; if ignore any
1037 not-found exceptions will cause the path to be skipped; defaults to
1038 raise unless recursive is true, where the default is ignore
1039 """
1040 from .implementations.local import trailing_sep, trailing_sep_maybe_asterisk
1042 if on_error is None and recursive:
1043 on_error = "ignore"
1044 elif on_error is None:
1045 on_error = "raise"
1047 source_is_str = isinstance(path1, str)
1048 paths = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth)
1049 if source_is_str and (not recursive or maxdepth is not None):
1050 # Non-recursive glob does not copy directories
1051 paths = [p for p in paths if not (trailing_sep(p) or self.isdir(p))]
1052 if not paths:
1053 return
1055 isdir = isinstance(path2, str) and (trailing_sep(path2) or self.isdir(path2))
1056 path2 = other_paths(
1057 paths,
1058 path2,
1059 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(path1),
1060 is_dir=isdir,
1061 flatten=not source_is_str,
1062 )
1064 for p1, p2 in zip(paths, path2):
1065 try:
1066 self.cp_file(p1, p2, **kwargs)
1067 except FileNotFoundError:
1068 if on_error == "raise":
1069 raise
1071 def expand_path(self, path, recursive=False, maxdepth=None, **kwargs):
1072 """Turn one or more globs or directories into a list of all matching paths
1073 to files or directories.
1075 kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls``
1076 """
1078 if maxdepth is not None and maxdepth < 1:
1079 raise ValueError("maxdepth must be at least 1")
1081 if isinstance(path, str):
1082 out = self.expand_path([path], recursive, maxdepth)
1083 else:
1084 out = set()
1085 path = [self._strip_protocol(p) for p in path]
1086 for p in path:
1087 if has_magic(p):
1088 bit = set(self.glob(p, **kwargs))
1089 out |= bit
1090 if recursive:
1091 # glob call above expanded one depth so if maxdepth is defined
1092 # then decrement it in expand_path call below. If it is zero
1093 # after decrementing then avoid expand_path call.
1094 if maxdepth is not None and maxdepth <= 1:
1095 continue
1096 out |= set(
1097 self.expand_path(
1098 list(bit),
1099 recursive=recursive,
1100 maxdepth=maxdepth - 1 if maxdepth is not None else None,
1101 **kwargs,
1102 )
1103 )
1104 continue
1105 elif recursive:
1106 rec = set(
1107 self.find(
1108 p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs
1109 )
1110 )
1111 out |= rec
1112 if p not in out and (recursive is False or self.exists(p)):
1113 # should only check once, for the root
1114 out.add(p)
1115 if not out:
1116 raise FileNotFoundError(path)
1117 return list(sorted(out))
1119 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
1120 """Move file(s) from one location to another"""
1121 if path1 == path2:
1122 logger.debug(
1123 "%s mv: The paths are the same, so no files were moved." % (self)
1124 )
1125 else:
1126 self.copy(path1, path2, recursive=recursive, maxdepth=maxdepth)
1127 self.rm(path1, recursive=recursive)
1129 def rm_file(self, path):
1130 """Delete a file"""
1131 self._rm(path)
1133 def _rm(self, path):
1134 """Delete one file"""
1135 # this is the old name for the method, prefer rm_file
1136 raise NotImplementedError
1138 def rm(self, path, recursive=False, maxdepth=None):
1139 """Delete files.
1141 Parameters
1142 ----------
1143 path: str or list of str
1144 File(s) to delete.
1145 recursive: bool
1146 If file(s) are directories, recursively delete contents and then
1147 also remove the directory
1148 maxdepth: int or None
1149 Depth to pass to walk for finding files to delete, if recursive.
1150 If None, there will be no limit and infinite recursion may be
1151 possible.
1152 """
1153 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
1154 for p in reversed(path):
1155 self.rm_file(p)
1157 @classmethod
1158 def _parent(cls, path):
1159 path = cls._strip_protocol(path)
1160 if "/" in path:
1161 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
1162 return cls.root_marker + parent
1163 else:
1164 return cls.root_marker
1166 def _open(
1167 self,
1168 path,
1169 mode="rb",
1170 block_size=None,
1171 autocommit=True,
1172 cache_options=None,
1173 **kwargs,
1174 ):
1175 """Return raw bytes-mode file-like from the file-system"""
1176 return AbstractBufferedFile(
1177 self,
1178 path,
1179 mode,
1180 block_size,
1181 autocommit,
1182 cache_options=cache_options,
1183 **kwargs,
1184 )
1186 def open(
1187 self,
1188 path,
1189 mode="rb",
1190 block_size=None,
1191 cache_options=None,
1192 compression=None,
1193 **kwargs,
1194 ):
1195 """
1196 Return a file-like object from the filesystem
1198 The resultant instance must function correctly in a context ``with``
1199 block.
1201 Parameters
1202 ----------
1203 path: str
1204 Target file
1205 mode: str like 'rb', 'w'
1206 See builtin ``open()``
1207 block_size: int
1208 Some indication of buffering - this is a value in bytes
1209 cache_options : dict, optional
1210 Extra arguments to pass through to the cache.
1211 compression: string or None
1212 If given, open file using compression codec. Can either be a compression
1213 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
1214 compression from the filename suffix.
1215 encoding, errors, newline: passed on to TextIOWrapper for text mode
1216 """
1217 import io
1219 path = self._strip_protocol(path)
1220 if "b" not in mode:
1221 mode = mode.replace("t", "") + "b"
1223 text_kwargs = {
1224 k: kwargs.pop(k)
1225 for k in ["encoding", "errors", "newline"]
1226 if k in kwargs
1227 }
1228 return io.TextIOWrapper(
1229 self.open(
1230 path,
1231 mode,
1232 block_size=block_size,
1233 cache_options=cache_options,
1234 compression=compression,
1235 **kwargs,
1236 ),
1237 **text_kwargs,
1238 )
1239 else:
1240 ac = kwargs.pop("autocommit", not self._intrans)
1241 f = self._open(
1242 path,
1243 mode=mode,
1244 block_size=block_size,
1245 autocommit=ac,
1246 cache_options=cache_options,
1247 **kwargs,
1248 )
1249 if compression is not None:
1250 from fsspec.compression import compr
1251 from fsspec.core import get_compression
1253 compression = get_compression(path, compression)
1254 compress = compr[compression]
1255 f = compress(f, mode=mode[0])
1257 if not ac and "r" not in mode:
1258 self.transaction.files.append(f)
1259 return f
1261 def touch(self, path, truncate=True, **kwargs):
1262 """Create empty file, or update timestamp
1264 Parameters
1265 ----------
1266 path: str
1267 file location
1268 truncate: bool
1269 If True, always set file size to 0; if False, update timestamp and
1270 leave file unchanged, if backend allows this
1271 """
1272 if truncate or not self.exists(path):
1273 with self.open(path, "wb", **kwargs):
1274 pass
1275 else:
1276 raise NotImplementedError # update timestamp, if possible
1278 def ukey(self, path):
1279 """Hash of file properties, to tell if it has changed"""
1280 return sha256(str(self.info(path)).encode()).hexdigest()
1282 def read_block(self, fn, offset, length, delimiter=None):
1283 """Read a block of bytes from
1285 Starting at ``offset`` of the file, read ``length`` bytes. If
1286 ``delimiter`` is set then we ensure that the read starts and stops at
1287 delimiter boundaries that follow the locations ``offset`` and ``offset
1288 + length``. If ``offset`` is zero then we start at zero. The
1289 bytestring returned WILL include the end delimiter string.
1291 If offset+length is beyond the eof, reads to eof.
1293 Parameters
1294 ----------
1295 fn: string
1296 Path to filename
1297 offset: int
1298 Byte offset to start read
1299 length: int
1300 Number of bytes to read. If None, read to end.
1301 delimiter: bytes (optional)
1302 Ensure reading starts and stops at delimiter bytestring
1304 Examples
1305 --------
1306 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP
1307 b'Alice, 100\\nBo'
1308 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP
1309 b'Alice, 100\\nBob, 200\\n'
1311 Use ``length=None`` to read to the end of the file.
1312 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP
1313 b'Alice, 100\\nBob, 200\\nCharlie, 300'
1315 See Also
1316 --------
1317 :func:`fsspec.utils.read_block`
1318 """
1319 with self.open(fn, "rb") as f:
1320 size = f.size
1321 if length is None:
1322 length = size
1323 if size is not None and offset + length > size:
1324 length = size - offset
1325 return read_block(f, offset, length, delimiter)
1327 def to_json(self):
1328 """
1329 JSON representation of this filesystem instance
1331 Returns
1332 -------
1333 str: JSON structure with keys cls (the python location of this class),
1334 protocol (text name of this class's protocol, first one in case of
1335 multiple), args (positional args, usually empty), and all other
1336 kwargs as their own keys.
1337 """
1338 import json
1340 cls = type(self)
1341 cls = ".".join((cls.__module__, cls.__name__))
1342 proto = (
1343 self.protocol[0]
1344 if isinstance(self.protocol, (tuple, list))
1345 else self.protocol
1346 )
1347 return json.dumps(
1348 dict(
1349 **{"cls": cls, "protocol": proto, "args": self.storage_args},
1350 **self.storage_options,
1351 )
1352 )
1354 @staticmethod
1355 def from_json(blob):
1356 """
1357 Recreate a filesystem instance from JSON representation
1359 See ``.to_json()`` for the expected structure of the input
1361 Parameters
1362 ----------
1363 blob: str
1365 Returns
1366 -------
1367 file system instance, not necessarily of this particular class.
1368 """
1369 import json
1371 from .registry import _import_class, get_filesystem_class
1373 dic = json.loads(blob)
1374 protocol = dic.pop("protocol")
1375 try:
1376 cls = _import_class(dic.pop("cls"))
1377 except (ImportError, ValueError, RuntimeError, KeyError):
1378 cls = get_filesystem_class(protocol)
1379 return cls(*dic.pop("args", ()), **dic)
1381 def _get_pyarrow_filesystem(self):
1382 """
1383 Make a version of the FS instance which will be acceptable to pyarrow
1384 """
1385 # all instances already also derive from pyarrow
1386 return self
1388 def get_mapper(self, root="", check=False, create=False, missing_exceptions=None):
1389 """Create key/value store based on this file-system
1391 Makes a MutableMapping interface to the FS at the given root path.
1392 See ``fsspec.mapping.FSMap`` for further details.
1393 """
1394 from .mapping import FSMap
1396 return FSMap(
1397 root,
1398 self,
1399 check=check,
1400 create=create,
1401 missing_exceptions=missing_exceptions,
1402 )
1404 @classmethod
1405 def clear_instance_cache(cls):
1406 """
1407 Clear the cache of filesystem instances.
1409 Notes
1410 -----
1411 Unless overridden by setting the ``cachable`` class attribute to False,
1412 the filesystem class stores a reference to newly created instances. This
1413 prevents Python's normal rules around garbage collection from working,
1414 since the instances refcount will not drop to zero until
1415 ``clear_instance_cache`` is called.
1416 """
1417 cls._cache.clear()
1419 def created(self, path):
1420 """Return the created timestamp of a file as a datetime.datetime"""
1421 raise NotImplementedError
1423 def modified(self, path):
1424 """Return the modified timestamp of a file as a datetime.datetime"""
1425 raise NotImplementedError
1427 # ------------------------------------------------------------------------
1428 # Aliases
1430 def read_bytes(self, path, start=None, end=None, **kwargs):
1431 """Alias of `AbstractFileSystem.cat_file`."""
1432 return self.cat_file(path, start=start, end=end, **kwargs)
1434 def write_bytes(self, path, value, **kwargs):
1435 """Alias of `AbstractFileSystem.pipe_file`."""
1436 self.pipe_file(path, value, **kwargs)
1438 def makedir(self, path, create_parents=True, **kwargs):
1439 """Alias of `AbstractFileSystem.mkdir`."""
1440 return self.mkdir(path, create_parents=create_parents, **kwargs)
1442 def mkdirs(self, path, exist_ok=False):
1443 """Alias of `AbstractFileSystem.makedirs`."""
1444 return self.makedirs(path, exist_ok=exist_ok)
1446 def listdir(self, path, detail=True, **kwargs):
1447 """Alias of `AbstractFileSystem.ls`."""
1448 return self.ls(path, detail=detail, **kwargs)
1450 def cp(self, path1, path2, **kwargs):
1451 """Alias of `AbstractFileSystem.copy`."""
1452 return self.copy(path1, path2, **kwargs)
1454 def move(self, path1, path2, **kwargs):
1455 """Alias of `AbstractFileSystem.mv`."""
1456 return self.mv(path1, path2, **kwargs)
1458 def stat(self, path, **kwargs):
1459 """Alias of `AbstractFileSystem.info`."""
1460 return self.info(path, **kwargs)
1462 def disk_usage(self, path, total=True, maxdepth=None, **kwargs):
1463 """Alias of `AbstractFileSystem.du`."""
1464 return self.du(path, total=total, maxdepth=maxdepth, **kwargs)
1466 def rename(self, path1, path2, **kwargs):
1467 """Alias of `AbstractFileSystem.mv`."""
1468 return self.mv(path1, path2, **kwargs)
1470 def delete(self, path, recursive=False, maxdepth=None):
1471 """Alias of `AbstractFileSystem.rm`."""
1472 return self.rm(path, recursive=recursive, maxdepth=maxdepth)
1474 def upload(self, lpath, rpath, recursive=False, **kwargs):
1475 """Alias of `AbstractFileSystem.put`."""
1476 return self.put(lpath, rpath, recursive=recursive, **kwargs)
1478 def download(self, rpath, lpath, recursive=False, **kwargs):
1479 """Alias of `AbstractFileSystem.get`."""
1480 return self.get(rpath, lpath, recursive=recursive, **kwargs)
1482 def sign(self, path, expiration=100, **kwargs):
1483 """Create a signed URL representing the given path
1485 Some implementations allow temporary URLs to be generated, as a
1486 way of delegating credentials.
1488 Parameters
1489 ----------
1490 path : str
1491 The path on the filesystem
1492 expiration : int
1493 Number of seconds to enable the URL for (if supported)
1495 Returns
1496 -------
1497 URL : str
1498 The signed URL
1500 Raises
1501 ------
1502 NotImplementedError : if method is not implemented for a filesystem
1503 """
1504 raise NotImplementedError("Sign is not implemented for this filesystem")
1506 def _isfilestore(self):
1507 # Originally inherited from pyarrow DaskFileSystem. Keeping this
1508 # here for backwards compatibility as long as pyarrow uses its
1509 # legacy fsspec-compatible filesystems and thus accepts fsspec
1510 # filesystems as well
1511 return False
1514class AbstractBufferedFile(io.IOBase):
1515 """Convenient class to derive from to provide buffering
1517 In the case that the backend does not provide a pythonic file-like object
1518 already, this class contains much of the logic to build one. The only
1519 methods that need to be overridden are ``_upload_chunk``,
1520 ``_initiate_upload`` and ``_fetch_range``.
1521 """
1523 DEFAULT_BLOCK_SIZE = 5 * 2**20
1524 _details = None
1526 def __init__(
1527 self,
1528 fs,
1529 path,
1530 mode="rb",
1531 block_size="default",
1532 autocommit=True,
1533 cache_type="readahead",
1534 cache_options=None,
1535 size=None,
1536 **kwargs,
1537 ):
1538 """
1539 Template for files with buffered reading and writing
1541 Parameters
1542 ----------
1543 fs: instance of FileSystem
1544 path: str
1545 location in file-system
1546 mode: str
1547 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file
1548 systems may be read-only, and some may not support append.
1549 block_size: int
1550 Buffer size for reading or writing, 'default' for class default
1551 autocommit: bool
1552 Whether to write to final destination; may only impact what
1553 happens when file is being closed.
1554 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead"
1555 Caching policy in read mode. See the definitions in ``core``.
1556 cache_options : dict
1557 Additional options passed to the constructor for the cache specified
1558 by `cache_type`.
1559 size: int
1560 If given and in read mode, suppressed having to look up the file size
1561 kwargs:
1562 Gets stored as self.kwargs
1563 """
1564 from .core import caches
1566 self.path = path
1567 self.fs = fs
1568 self.mode = mode
1569 self.blocksize = (
1570 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
1571 )
1572 self.loc = 0
1573 self.autocommit = autocommit
1574 self.end = None
1575 self.start = None
1576 self.closed = False
1578 if cache_options is None:
1579 cache_options = {}
1581 if "trim" in kwargs:
1582 warnings.warn(
1583 "Passing 'trim' to control the cache behavior has been deprecated. "
1584 "Specify it within the 'cache_options' argument instead.",
1585 FutureWarning,
1586 )
1587 cache_options["trim"] = kwargs.pop("trim")
1589 self.kwargs = kwargs
1591 if mode not in {"ab", "rb", "wb"}:
1592 raise NotImplementedError("File mode not supported")
1593 if mode == "rb":
1594 if size is not None:
1595 self.size = size
1596 else:
1597 self.size = self.details["size"]
1598 self.cache = caches[cache_type](
1599 self.blocksize, self._fetch_range, self.size, **cache_options
1600 )
1601 else:
1602 self.buffer = io.BytesIO()
1603 self.offset = None
1604 self.forced = False
1605 self.location = None
1607 @property
1608 def details(self):
1609 if self._details is None:
1610 self._details = self.fs.info(self.path)
1611 return self._details
1613 @details.setter
1614 def details(self, value):
1615 self._details = value
1616 self.size = value["size"]
1618 @property
1619 def full_name(self):
1620 return _unstrip_protocol(self.path, self.fs)
1622 @property
1623 def closed(self):
1624 # get around this attr being read-only in IOBase
1625 # use getattr here, since this can be called during del
1626 return getattr(self, "_closed", True)
1628 @closed.setter
1629 def closed(self, c):
1630 self._closed = c
1632 def __hash__(self):
1633 if "w" in self.mode:
1634 return id(self)
1635 else:
1636 return int(tokenize(self.details), 16)
1638 def __eq__(self, other):
1639 """Files are equal if they have the same checksum, only in read mode"""
1640 return self.mode == "rb" and other.mode == "rb" and hash(self) == hash(other)
1642 def commit(self):
1643 """Move from temp to final destination"""
1645 def discard(self):
1646 """Throw away temporary file"""
1648 def info(self):
1649 """File information about this path"""
1650 if "r" in self.mode:
1651 return self.details
1652 else:
1653 raise ValueError("Info not available while writing")
1655 def tell(self):
1656 """Current file location"""
1657 return self.loc
1659 def seek(self, loc, whence=0):
1660 """Set current file location
1662 Parameters
1663 ----------
1664 loc: int
1665 byte location
1666 whence: {0, 1, 2}
1667 from start of file, current location or end of file, resp.
1668 """
1669 loc = int(loc)
1670 if not self.mode == "rb":
1671 raise OSError(ESPIPE, "Seek only available in read mode")
1672 if whence == 0:
1673 nloc = loc
1674 elif whence == 1:
1675 nloc = self.loc + loc
1676 elif whence == 2:
1677 nloc = self.size + loc
1678 else:
1679 raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % whence)
1680 if nloc < 0:
1681 raise ValueError("Seek before start of file")
1682 self.loc = nloc
1683 return self.loc
1685 def write(self, data):
1686 """
1687 Write data to buffer.
1689 Buffer only sent on flush() or if buffer is greater than
1690 or equal to blocksize.
1692 Parameters
1693 ----------
1694 data: bytes
1695 Set of bytes to be written.
1696 """
1697 if self.mode not in {"wb", "ab"}:
1698 raise ValueError("File not in write mode")
1699 if self.closed:
1700 raise ValueError("I/O operation on closed file.")
1701 if self.forced:
1702 raise ValueError("This file has been force-flushed, can only close")
1703 out = self.buffer.write(data)
1704 self.loc += out
1705 if self.buffer.tell() >= self.blocksize:
1706 self.flush()
1707 return out
1709 def flush(self, force=False):
1710 """
1711 Write buffered data to backend store.
1713 Writes the current buffer, if it is larger than the block-size, or if
1714 the file is being closed.
1716 Parameters
1717 ----------
1718 force: bool
1719 When closing, write the last block even if it is smaller than
1720 blocks are allowed to be. Disallows further writing to this file.
1721 """
1723 if self.closed:
1724 raise ValueError("Flush on closed file")
1725 if force and self.forced:
1726 raise ValueError("Force flush cannot be called more than once")
1727 if force:
1728 self.forced = True
1730 if self.mode not in {"wb", "ab"}:
1731 # no-op to flush on read-mode
1732 return
1734 if not force and self.buffer.tell() < self.blocksize:
1735 # Defer write on small block
1736 return
1738 if self.offset is None:
1739 # Initialize a multipart upload
1740 self.offset = 0
1741 try:
1742 self._initiate_upload()
1743 except: # noqa: E722
1744 self.closed = True
1745 raise
1747 if self._upload_chunk(final=force) is not False:
1748 self.offset += self.buffer.seek(0, 2)
1749 self.buffer = io.BytesIO()
1751 def _upload_chunk(self, final=False):
1752 """Write one part of a multi-block file upload
1754 Parameters
1755 ==========
1756 final: bool
1757 This is the last block, so should complete file, if
1758 self.autocommit is True.
1759 """
1760 # may not yet have been initialized, may need to call _initialize_upload
1762 def _initiate_upload(self):
1763 """Create remote file/upload"""
1764 pass
1766 def _fetch_range(self, start, end):
1767 """Get the specified set of bytes from remote"""
1768 raise NotImplementedError
1770 def read(self, length=-1):
1771 """
1772 Return data from cache, or fetch pieces as necessary
1774 Parameters
1775 ----------
1776 length: int (-1)
1777 Number of bytes to read; if <0, all remaining bytes.
1778 """
1779 length = -1 if length is None else int(length)
1780 if self.mode != "rb":
1781 raise ValueError("File not in read mode")
1782 if length < 0:
1783 length = self.size - self.loc
1784 if self.closed:
1785 raise ValueError("I/O operation on closed file.")
1786 logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))
1787 if length == 0:
1788 # don't even bother calling fetch
1789 return b""
1790 out = self.cache._fetch(self.loc, self.loc + length)
1791 self.loc += len(out)
1792 return out
1794 def readinto(self, b):
1795 """mirrors builtin file's readinto method
1797 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
1798 """
1799 out = memoryview(b).cast("B")
1800 data = self.read(out.nbytes)
1801 out[: len(data)] = data
1802 return len(data)
1804 def readuntil(self, char=b"\n", blocks=None):
1805 """Return data between current position and first occurrence of char
1807 char is included in the output, except if the end of the tile is
1808 encountered first.
1810 Parameters
1811 ----------
1812 char: bytes
1813 Thing to find
1814 blocks: None or int
1815 How much to read in each go. Defaults to file blocksize - which may
1816 mean a new read on every call.
1817 """
1818 out = []
1819 while True:
1820 start = self.tell()
1821 part = self.read(blocks or self.blocksize)
1822 if len(part) == 0:
1823 break
1824 found = part.find(char)
1825 if found > -1:
1826 out.append(part[: found + len(char)])
1827 self.seek(start + found + len(char))
1828 break
1829 out.append(part)
1830 return b"".join(out)
1832 def readline(self):
1833 """Read until first occurrence of newline character
1835 Note that, because of character encoding, this is not necessarily a
1836 true line ending.
1837 """
1838 return self.readuntil(b"\n")
1840 def __next__(self):
1841 out = self.readline()
1842 if out:
1843 return out
1844 raise StopIteration
1846 def __iter__(self):
1847 return self
1849 def readlines(self):
1850 """Return all data, split by the newline character"""
1851 data = self.read()
1852 lines = data.split(b"\n")
1853 out = [l + b"\n" for l in lines[:-1]]
1854 if data.endswith(b"\n"):
1855 return out
1856 else:
1857 return out + [lines[-1]]
1858 # return list(self) ???
1860 def readinto1(self, b):
1861 return self.readinto(b)
1863 def close(self):
1864 """Close file
1866 Finalizes writes, discards cache
1867 """
1868 if getattr(self, "_unclosable", False):
1869 return
1870 if self.closed:
1871 return
1872 if self.mode == "rb":
1873 self.cache = None
1874 else:
1875 if not self.forced:
1876 self.flush(force=True)
1878 if self.fs is not None:
1879 self.fs.invalidate_cache(self.path)
1880 self.fs.invalidate_cache(self.fs._parent(self.path))
1882 self.closed = True
1884 def readable(self):
1885 """Whether opened for reading"""
1886 return self.mode == "rb" and not self.closed
1888 def seekable(self):
1889 """Whether is seekable (only in read mode)"""
1890 return self.readable()
1892 def writable(self):
1893 """Whether opened for writing"""
1894 return self.mode in {"wb", "ab"} and not self.closed
1896 def __del__(self):
1897 if not self.closed:
1898 self.close()
1900 def __str__(self):
1901 return "<File-like object %s, %s>" % (type(self.fs).__name__, self.path)
1903 __repr__ = __str__
1905 def __enter__(self):
1906 return self
1908 def __exit__(self, *args):
1909 self.close()