Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/spec.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import json
5import logging
6import os
7import threading
8import warnings
9import weakref
10from errno import ESPIPE
11from glob import has_magic
12from hashlib import sha256
13from typing import Any, ClassVar
15from .callbacks import DEFAULT_CALLBACK
16from .config import apply_config, conf
17from .dircache import DirCache
18from .transaction import Transaction
19from .utils import (
20 _unstrip_protocol,
21 glob_translate,
22 isfilelike,
23 other_paths,
24 read_block,
25 stringify_path,
26 tokenize,
27)
29logger = logging.getLogger("fsspec")
32def make_instance(cls, args, kwargs):
33 return cls(*args, **kwargs)
36class _Cached(type):
37 """
38 Metaclass for caching file system instances.
40 Notes
41 -----
42 Instances are cached according to
44 * The values of the class attributes listed in `_extra_tokenize_attributes`
45 * The arguments passed to ``__init__``.
47 This creates an additional reference to the filesystem, which prevents the
48 filesystem from being garbage collected when all *user* references go away.
49 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
50 be made for a filesystem instance to be garbage collected.
51 """
53 def __init__(cls, *args, **kwargs):
54 super().__init__(*args, **kwargs)
55 # Note: we intentionally create a reference here, to avoid garbage
56 # collecting instances when all other references are gone. To really
57 # delete a FileSystem, the cache must be cleared.
58 if conf.get("weakref_instance_cache"): # pragma: no cover
59 # debug option for analysing fork/spawn conditions
60 cls._cache = weakref.WeakValueDictionary()
61 else:
62 cls._cache = {}
63 cls._pid = os.getpid()
65 def __call__(cls, *args, **kwargs):
66 kwargs = apply_config(cls, kwargs)
67 extra_tokens = tuple(
68 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
69 )
70 token = tokenize(
71 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
72 )
73 skip = kwargs.pop("skip_instance_cache", False)
74 if os.getpid() != cls._pid:
75 cls._cache.clear()
76 cls._pid = os.getpid()
77 if not skip and cls.cachable and token in cls._cache:
78 cls._latest = token
79 return cls._cache[token]
80 else:
81 obj = super().__call__(*args, **kwargs)
82 # Setting _fs_token here causes some static linters to complain.
83 obj._fs_token_ = token
84 obj.storage_args = args
85 obj.storage_options = kwargs
86 if obj.async_impl and obj.mirror_sync_methods:
87 from .asyn import mirror_sync_methods
89 mirror_sync_methods(obj)
91 if cls.cachable and not skip:
92 cls._latest = token
93 cls._cache[token] = obj
94 return obj
97class AbstractFileSystem(metaclass=_Cached):
98 """
99 An abstract super-class for pythonic file-systems
101 Implementations are expected to be compatible with or, better, subclass
102 from here.
103 """
105 cachable = True # this class can be cached, instances reused
106 _cached = False
107 blocksize = 2**22
108 sep = "/"
109 protocol: ClassVar[str | tuple[str, ...]] = "abstract"
110 _latest = None
111 async_impl = False
112 mirror_sync_methods = False
113 root_marker = "" # For some FSs, may require leading '/' or other character
114 transaction_type = Transaction
116 #: Extra *class attributes* that should be considered when hashing.
117 _extra_tokenize_attributes = ()
119 # Set by _Cached metaclass
120 storage_args: tuple[Any, ...]
121 storage_options: dict[str, Any]
123 def __init__(self, *args, **storage_options):
124 """Create and configure file-system instance
126 Instances may be cachable, so if similar enough arguments are seen
127 a new instance is not required. The token attribute exists to allow
128 implementations to cache instances if they wish.
130 A reasonable default should be provided if there are no arguments.
132 Subclasses should call this method.
134 Parameters
135 ----------
136 use_listings_cache, listings_expiry_time, max_paths:
137 passed to ``DirCache``, if the implementation supports
138 directory listing caching. Pass use_listings_cache=False
139 to disable such caching.
140 skip_instance_cache: bool
141 If this is a cachable implementation, pass True here to force
142 creating a new instance even if a matching instance exists, and prevent
143 storing this instance.
144 asynchronous: bool
145 loop: asyncio-compatible IOLoop or None
146 """
147 if self._cached:
148 # reusing instance, don't change
149 return
150 self._cached = True
151 self._intrans = False
152 self._transaction = None
153 self._invalidated_caches_in_transaction = []
154 self.dircache = DirCache(**storage_options)
156 if storage_options.pop("add_docs", None):
157 warnings.warn("add_docs is no longer supported.", FutureWarning)
159 if storage_options.pop("add_aliases", None):
160 warnings.warn("add_aliases has been removed.", FutureWarning)
161 # This is set in _Cached
162 self._fs_token_ = None
164 @property
165 def fsid(self):
166 """Persistent filesystem id that can be used to compare filesystems
167 across sessions.
168 """
169 raise NotImplementedError
171 @property
172 def _fs_token(self):
173 return self._fs_token_
175 def __dask_tokenize__(self):
176 return self._fs_token
178 def __hash__(self):
179 return int(self._fs_token, 16)
181 def __eq__(self, other):
182 return isinstance(other, type(self)) and self._fs_token == other._fs_token
184 def __reduce__(self):
185 return make_instance, (type(self), self.storage_args, self.storage_options)
187 @classmethod
188 def _strip_protocol(cls, path):
189 """Turn path from fully-qualified to file-system-specific
191 May require FS-specific handling, e.g., for relative paths or links.
192 """
193 if isinstance(path, list):
194 return [cls._strip_protocol(p) for p in path]
195 path = stringify_path(path)
196 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
197 for protocol in protos:
198 if path.startswith(protocol + "://"):
199 path = path[len(protocol) + 3 :]
200 elif path.startswith(protocol + "::"):
201 path = path[len(protocol) + 2 :]
202 path = path.rstrip("/")
203 # use of root_marker to make minimum required path, e.g., "/"
204 return path or cls.root_marker
206 def unstrip_protocol(self, name: str) -> str:
207 """Format FS-specific path to generic, including protocol"""
208 protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol
209 for protocol in protos:
210 if name.startswith(f"{protocol}://"):
211 return name
212 return f"{protos[0]}://{name}"
214 @staticmethod
215 def _get_kwargs_from_urls(path):
216 """If kwargs can be encoded in the paths, extract them here
218 This should happen before instantiation of the class; incoming paths
219 then should be amended to strip the options in methods.
221 Examples may look like an sftp path "sftp://user@host:/my/path", where
222 the user and host should become kwargs and later get stripped.
223 """
224 # by default, nothing happens
225 return {}
227 @classmethod
228 def current(cls):
229 """Return the most recently instantiated FileSystem
231 If no instance has been created, then create one with defaults
232 """
233 if cls._latest in cls._cache:
234 return cls._cache[cls._latest]
235 return cls()
237 @property
238 def transaction(self):
239 """A context within which files are committed together upon exit
241 Requires the file class to implement `.commit()` and `.discard()`
242 for the normal and exception cases.
243 """
244 if self._transaction is None:
245 self._transaction = self.transaction_type(self)
246 return self._transaction
248 def start_transaction(self):
249 """Begin write transaction for deferring files, non-context version"""
250 self._intrans = True
251 self._transaction = self.transaction_type(self)
252 return self.transaction
254 def end_transaction(self):
255 """Finish write transaction, non-context version"""
256 self.transaction.complete()
257 self._transaction = None
258 # The invalid cache must be cleared after the transaction is completed.
259 for path in self._invalidated_caches_in_transaction:
260 self.invalidate_cache(path)
261 self._invalidated_caches_in_transaction.clear()
263 def invalidate_cache(self, path=None):
264 """
265 Discard any cached directory information
267 Parameters
268 ----------
269 path: string or None
270 If None, clear all listings cached else listings at or under given
271 path.
272 """
273 # Not necessary to implement invalidation mechanism, may have no cache.
274 # But if have, you should call this method of parent class from your
275 # subclass to ensure expiring caches after transacations correctly.
276 # See the implementation of FTPFileSystem in ftp.py
277 if self._intrans:
278 self._invalidated_caches_in_transaction.append(path)
280 def mkdir(self, path, create_parents=True, **kwargs):
281 """
282 Create directory entry at path
284 For systems that don't have true directories, may create an for
285 this instance only and not touch the real filesystem
287 Parameters
288 ----------
289 path: str
290 location
291 create_parents: bool
292 if True, this is equivalent to ``makedirs``
293 kwargs:
294 may be permissions, etc.
295 """
296 pass # not necessary to implement, may not have directories
298 def makedirs(self, path, exist_ok=False):
299 """Recursively make directories
301 Creates directory at path and any intervening required directories.
302 Raises exception if, for instance, the path already exists but is a
303 file.
305 Parameters
306 ----------
307 path: str
308 leaf directory name
309 exist_ok: bool (False)
310 If False, will error if the target already exists
311 """
312 pass # not necessary to implement, may not have directories
314 def rmdir(self, path):
315 """Remove a directory, if empty"""
316 pass # not necessary to implement, may not have directories
318 def ls(self, path, detail=True, **kwargs):
319 """List objects at path.
321 This should include subdirectories and files at that location. The
322 difference between a file and a directory must be clear when details
323 are requested.
325 The specific keys, or perhaps a FileInfo class, or similar, is TBD,
326 but must be consistent across implementations.
327 Must include:
329 - full path to the entry (without protocol)
330 - size of the entry, in bytes. If the value cannot be determined, will
331 be ``None``.
332 - type of entry, "file", "directory" or other
334 Additional information
335 may be present, appropriate to the file-system, e.g., generation,
336 checksum, etc.
338 May use refresh=True|False to allow use of self._ls_from_cache to
339 check for a saved listing and avoid calling the backend. This would be
340 common where listing may be expensive.
342 Parameters
343 ----------
344 path: str
345 detail: bool
346 if True, gives a list of dictionaries, where each is the same as
347 the result of ``info(path)``. If False, gives a list of paths
348 (str).
349 kwargs: may have additional backend-specific options, such as version
350 information
352 Returns
353 -------
354 List of strings if detail is False, or list of directory information
355 dicts if detail is True.
356 """
357 raise NotImplementedError
359 def _ls_from_cache(self, path):
360 """Check cache for listing
362 Returns listing, if found (may be empty list for a directly that exists
363 but contains nothing), None if not in cache.
364 """
365 parent = self._parent(path)
366 try:
367 return self.dircache[path.rstrip("/")]
368 except KeyError:
369 pass
370 try:
371 files = [
372 f
373 for f in self.dircache[parent]
374 if f["name"] == path
375 or (f["name"] == path.rstrip("/") and f["type"] == "directory")
376 ]
377 if len(files) == 0:
378 # parent dir was listed but did not contain this file
379 raise FileNotFoundError(path)
380 return files
381 except KeyError:
382 pass
384 def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs):
385 """Return all files under the given path.
387 List all files, recursing into subdirectories; output is iterator-style,
388 like ``os.walk()``. For a simple list of files, ``find()`` is available.
390 When topdown is True, the caller can modify the dirnames list in-place (perhaps
391 using del or slice assignment), and walk() will
392 only recurse into the subdirectories whose names remain in dirnames;
393 this can be used to prune the search, impose a specific order of visiting,
394 or even to inform walk() about directories the caller creates or renames before
395 it resumes walk() again.
396 Modifying dirnames when topdown is False has no effect. (see os.walk)
398 Note that the "files" outputted will include anything that is not
399 a directory, such as links.
401 Parameters
402 ----------
403 path: str
404 Root to recurse into
405 maxdepth: int
406 Maximum recursion depth. None means limitless, but not recommended
407 on link-based file-systems.
408 topdown: bool (True)
409 Whether to walk the directory tree from the top downwards or from
410 the bottom upwards.
411 on_error: "omit", "raise", a callable
412 if omit (default), path with exception will simply be empty;
413 If raise, an underlying exception will be raised;
414 if callable, it will be called with a single OSError instance as argument
415 kwargs: passed to ``ls``
416 """
417 if maxdepth is not None and maxdepth < 1:
418 raise ValueError("maxdepth must be at least 1")
420 path = self._strip_protocol(path)
421 full_dirs = {}
422 dirs = {}
423 files = {}
425 detail = kwargs.pop("detail", False)
426 try:
427 listing = self.ls(path, detail=True, **kwargs)
428 except (FileNotFoundError, OSError) as e:
429 if on_error == "raise":
430 raise
431 if callable(on_error):
432 on_error(e)
433 return
435 for info in listing:
436 # each info name must be at least [path]/part , but here
437 # we check also for names like [path]/part/
438 pathname = info["name"].rstrip("/")
439 name = pathname.rsplit("/", 1)[-1]
440 if info["type"] == "directory" and pathname != path:
441 # do not include "self" path
442 full_dirs[name] = pathname
443 dirs[name] = info
444 elif pathname == path:
445 # file-like with same name as give path
446 files[""] = info
447 else:
448 files[name] = info
450 if not detail:
451 dirs = list(dirs)
452 files = list(files)
454 if topdown:
455 # Yield before recursion if walking top down
456 yield path, dirs, files
458 if maxdepth is not None:
459 maxdepth -= 1
460 if maxdepth < 1:
461 if not topdown:
462 yield path, dirs, files
463 return
465 for d in dirs:
466 yield from self.walk(
467 full_dirs[d],
468 maxdepth=maxdepth,
469 detail=detail,
470 topdown=topdown,
471 **kwargs,
472 )
474 if not topdown:
475 # Yield after recursion if walking bottom up
476 yield path, dirs, files
478 def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
479 """List all files below path.
481 Like posix ``find`` command without conditions
483 Parameters
484 ----------
485 path : str
486 maxdepth: int or None
487 If not None, the maximum number of levels to descend
488 withdirs: bool
489 Whether to include directory paths in the output. This is True
490 when used by glob, but users usually only want files.
491 kwargs are passed to ``ls``.
492 """
493 # TODO: allow equivalent of -name parameter
494 path = self._strip_protocol(path)
495 out = {}
497 # Add the root directory if withdirs is requested
498 # This is needed for posix glob compliance
499 if withdirs and path != "" and self.isdir(path):
500 out[path] = self.info(path)
502 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
503 if withdirs:
504 files.update(dirs)
505 out.update({info["name"]: info for name, info in files.items()})
506 if not out and self.isfile(path):
507 # walk works on directories, but find should also return [path]
508 # when path happens to be a file
509 out[path] = {}
510 names = sorted(out)
511 if not detail:
512 return names
513 else:
514 return {name: out[name] for name in names}
516 def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs):
517 """Space used by files and optionally directories within a path
519 Directory size does not include the size of its contents.
521 Parameters
522 ----------
523 path: str
524 total: bool
525 Whether to sum all the file sizes
526 maxdepth: int or None
527 Maximum number of directory levels to descend, None for unlimited.
528 withdirs: bool
529 Whether to include directory paths in the output.
530 kwargs: passed to ``find``
532 Returns
533 -------
534 Dict of {path: size} if total=False, or int otherwise, where numbers
535 refer to bytes used.
536 """
537 sizes = {}
538 if withdirs and self.isdir(path):
539 # Include top-level directory in output
540 info = self.info(path)
541 sizes[info["name"]] = info["size"]
542 for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs):
543 info = self.info(f)
544 sizes[info["name"]] = info["size"]
545 if total:
546 return sum(sizes.values())
547 else:
548 return sizes
550 def glob(self, path, maxdepth=None, **kwargs):
551 """Find files by glob-matching.
553 Pattern matching capabilities for finding files that match the given pattern.
555 Parameters
556 ----------
557 path: str
558 The glob pattern to match against
559 maxdepth: int or None
560 Maximum depth for ``'**'`` patterns. Applied on the first ``'**'`` found.
561 Must be at least 1 if provided.
562 kwargs:
563 Additional arguments passed to ``find`` (e.g., detail=True)
565 Returns
566 -------
567 List of matched paths, or dict of paths and their info if detail=True
569 Notes
570 -----
571 Supported patterns:
572 - '*': Matches any sequence of characters within a single directory level
573 - ``'**'``: Matches any number of directory levels (must be an entire path component)
574 - '?': Matches exactly one character
575 - '[abc]': Matches any character in the set
576 - '[a-z]': Matches any character in the range
577 - '[!abc]': Matches any character NOT in the set
579 Special behaviors:
580 - If the path ends with '/', only folders are returned
581 - Consecutive '*' characters are compressed into a single '*'
582 - Empty brackets '[]' never match anything
583 - Negated empty brackets '[!]' match any single character
584 - Special characters in character classes are escaped properly
586 Limitations:
587 - ``'**'`` must be a complete path component (e.g., ``'a/**/b'``, not ``'a**b'``)
588 - No brace expansion ('{a,b}.txt')
589 - No extended glob patterns ('+(pattern)', '!(pattern)')
590 """
591 if maxdepth is not None and maxdepth < 1:
592 raise ValueError("maxdepth must be at least 1")
594 import re
596 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
597 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
598 path = self._strip_protocol(path)
599 append_slash_to_dirname = ends_with_sep or path.endswith(
600 tuple(sep + "**" for sep in seps)
601 )
602 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
603 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
604 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
606 min_idx = min(idx_star, idx_qmark, idx_brace)
608 detail = kwargs.pop("detail", False)
610 if not has_magic(path):
611 if self.exists(path, **kwargs):
612 if not detail:
613 return [path]
614 else:
615 return {path: self.info(path, **kwargs)}
616 else:
617 if not detail:
618 return [] # glob of non-existent returns empty
619 else:
620 return {}
621 elif "/" in path[:min_idx]:
622 min_idx = path[:min_idx].rindex("/")
623 root = path[: min_idx + 1]
624 depth = path[min_idx + 1 :].count("/") + 1
625 else:
626 root = ""
627 depth = path[min_idx + 1 :].count("/") + 1
629 if "**" in path:
630 if maxdepth is not None:
631 idx_double_stars = path.find("**")
632 depth_double_stars = path[idx_double_stars:].count("/") + 1
633 depth = depth - depth_double_stars + maxdepth
634 else:
635 depth = None
637 allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
639 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
640 pattern = re.compile(pattern)
642 out = {
643 p: info
644 for p, info in sorted(allpaths.items())
645 if pattern.match(
646 p + "/"
647 if append_slash_to_dirname and info["type"] == "directory"
648 else p
649 )
650 }
652 if detail:
653 return out
654 else:
655 return list(out)
657 def exists(self, path, **kwargs):
658 """Is there a file at the given path"""
659 try:
660 self.info(path, **kwargs)
661 return True
662 except: # noqa: E722
663 # any exception allowed bar FileNotFoundError?
664 return False
666 def lexists(self, path, **kwargs):
667 """If there is a file at the given path (including
668 broken links)"""
669 return self.exists(path)
671 def info(self, path, **kwargs):
672 """Give details of entry at path
674 Returns a single dictionary, with exactly the same information as ``ls``
675 would with ``detail=True``.
677 The default implementation calls ls and could be overridden by a
678 shortcut. kwargs are passed on to ```ls()``.
680 Some file systems might not be able to measure the file's size, in
681 which case, the returned dict will include ``'size': None``.
683 Returns
684 -------
685 dict with keys: name (full path in the FS), size (in bytes), type (file,
686 directory, or something else) and other FS-specific keys.
687 """
688 path = self._strip_protocol(path)
689 out = self.ls(self._parent(path), detail=True, **kwargs)
690 out = [o for o in out if o["name"].rstrip("/") == path]
691 if out:
692 return out[0]
693 out = self.ls(path, detail=True, **kwargs)
694 path = path.rstrip("/")
695 out1 = [o for o in out if o["name"].rstrip("/") == path]
696 if len(out1) == 1:
697 if "size" not in out1[0]:
698 out1[0]["size"] = None
699 return out1[0]
700 elif len(out1) > 1 or out:
701 return {"name": path, "size": 0, "type": "directory"}
702 else:
703 raise FileNotFoundError(path)
705 def checksum(self, path):
706 """Unique value for current version of file
708 If the checksum is the same from one moment to another, the contents
709 are guaranteed to be the same. If the checksum changes, the contents
710 *might* have changed.
712 This should normally be overridden; default will probably capture
713 creation/modification timestamp (which would be good) or maybe
714 access timestamp (which would be bad)
715 """
716 return int(tokenize(self.info(path)), 16)
718 def size(self, path):
719 """Size in bytes of file"""
720 return self.info(path).get("size", None)
722 def sizes(self, paths):
723 """Size in bytes of each file in a list of paths"""
724 return [self.size(p) for p in paths]
726 def isdir(self, path):
727 """Is this entry directory-like?"""
728 try:
729 return self.info(path)["type"] == "directory"
730 except OSError:
731 return False
733 def isfile(self, path):
734 """Is this entry file-like?"""
735 try:
736 return self.info(path)["type"] == "file"
737 except: # noqa: E722
738 return False
740 def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs):
741 """Get the contents of the file as a string.
743 Parameters
744 ----------
745 path: str
746 URL of file on this filesystems
747 encoding, errors, newline: same as `open`.
748 """
749 with self.open(
750 path,
751 mode="r",
752 encoding=encoding,
753 errors=errors,
754 newline=newline,
755 **kwargs,
756 ) as f:
757 return f.read()
759 def write_text(
760 self, path, value, encoding=None, errors=None, newline=None, **kwargs
761 ):
762 """Write the text to the given file.
764 An existing file will be overwritten.
766 Parameters
767 ----------
768 path: str
769 URL of file on this filesystems
770 value: str
771 Text to write.
772 encoding, errors, newline: same as `open`.
773 """
774 with self.open(
775 path,
776 mode="w",
777 encoding=encoding,
778 errors=errors,
779 newline=newline,
780 **kwargs,
781 ) as f:
782 return f.write(value)
784 def cat_file(self, path, start=None, end=None, **kwargs):
785 """Get the content of a file
787 Parameters
788 ----------
789 path: URL of file on this filesystems
790 start, end: int
791 Bytes limits of the read. If negative, backwards from end,
792 like usual python slices. Either can be None for start or
793 end of file, respectively
794 kwargs: passed to ``open()``.
795 """
796 # explicitly set buffering off?
797 with self.open(path, "rb", **kwargs) as f:
798 if start is not None:
799 if start >= 0:
800 f.seek(start)
801 else:
802 f.seek(max(0, f.size + start))
803 if end is not None:
804 if end < 0:
805 end = f.size + end
806 return f.read(end - f.tell())
807 return f.read()
809 def pipe_file(self, path, value, mode="overwrite", **kwargs):
810 """Set the bytes of given file"""
811 if mode == "create" and self.exists(path):
812 # non-atomic but simple way; or could use "xb" in open(), which is likely
813 # not as well supported
814 raise FileExistsError
815 with self.open(path, "wb", **kwargs) as f:
816 f.write(value)
818 def pipe(self, path, value=None, **kwargs):
819 """Put value into path
821 (counterpart to ``cat``)
823 Parameters
824 ----------
825 path: string or dict(str, bytes)
826 If a string, a single remote location to put ``value`` bytes; if a dict,
827 a mapping of {path: bytesvalue}.
828 value: bytes, optional
829 If using a single path, these are the bytes to put there. Ignored if
830 ``path`` is a dict
831 """
832 if isinstance(path, str):
833 self.pipe_file(self._strip_protocol(path), value, **kwargs)
834 elif isinstance(path, dict):
835 for k, v in path.items():
836 self.pipe_file(self._strip_protocol(k), v, **kwargs)
837 else:
838 raise ValueError("path must be str or dict")
840 def cat_ranges(
841 self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
842 ):
843 """Get the contents of byte ranges from one or more files
845 Parameters
846 ----------
847 paths: list
848 A list of of filepaths on this filesystems
849 starts, ends: int or list
850 Bytes limits of the read. If using a single int, the same value will be
851 used to read all the specified files.
852 """
853 if max_gap is not None:
854 raise NotImplementedError
855 if not isinstance(paths, list):
856 raise TypeError
857 if not isinstance(starts, list):
858 starts = [starts] * len(paths)
859 if not isinstance(ends, list):
860 ends = [ends] * len(paths)
861 if len(starts) != len(paths) or len(ends) != len(paths):
862 raise ValueError
863 out = []
864 for p, s, e in zip(paths, starts, ends):
865 try:
866 out.append(self.cat_file(p, s, e))
867 except Exception as e:
868 if on_error == "return":
869 out.append(e)
870 else:
871 raise
872 return out
874 def cat(self, path, recursive=False, on_error="raise", **kwargs):
875 """Fetch (potentially multiple) paths' contents
877 Parameters
878 ----------
879 recursive: bool
880 If True, assume the path(s) are directories, and get all the
881 contained files
882 on_error : "raise", "omit", "return"
883 If raise, an underlying exception will be raised (converted to KeyError
884 if the type is in self.missing_exceptions); if omit, keys with exception
885 will simply not be included in the output; if "return", all keys are
886 included in the output, but the value will be bytes or an exception
887 instance.
888 kwargs: passed to cat_file
890 Returns
891 -------
892 dict of {path: contents} if there are multiple paths
893 or the path has been otherwise expanded
894 """
895 paths = self.expand_path(path, recursive=recursive)
896 if (
897 len(paths) > 1
898 or isinstance(path, list)
899 or paths[0] != self._strip_protocol(path)
900 ):
901 out = {}
902 for path in paths:
903 try:
904 out[path] = self.cat_file(path, **kwargs)
905 except Exception as e:
906 if on_error == "raise":
907 raise
908 if on_error == "return":
909 out[path] = e
910 return out
911 else:
912 return self.cat_file(paths[0], **kwargs)
914 def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, outfile=None, **kwargs):
915 """Copy single remote file to local"""
916 from .implementations.local import LocalFileSystem
918 if isfilelike(lpath):
919 outfile = lpath
920 elif self.isdir(rpath):
921 os.makedirs(lpath, exist_ok=True)
922 return None
924 fs = LocalFileSystem(auto_mkdir=True)
925 fs.makedirs(fs._parent(lpath), exist_ok=True)
927 with self.open(rpath, "rb", **kwargs) as f1:
928 if outfile is None:
929 outfile = open(lpath, "wb")
931 try:
932 callback.set_size(getattr(f1, "size", None))
933 data = True
934 while data:
935 data = f1.read(self.blocksize)
936 segment_len = outfile.write(data)
937 if segment_len is None:
938 segment_len = len(data)
939 callback.relative_update(segment_len)
940 finally:
941 if not isfilelike(lpath):
942 outfile.close()
944 def get(
945 self,
946 rpath,
947 lpath,
948 recursive=False,
949 callback=DEFAULT_CALLBACK,
950 maxdepth=None,
951 **kwargs,
952 ):
953 """Copy file(s) to local.
955 Copies a specific file or tree of files (if recursive=True). If lpath
956 ends with a "/", it will be assumed to be a directory, and target files
957 will go within. Can submit a list of paths, which may be glob-patterns
958 and will be expanded.
960 Calls get_file for each source.
961 """
962 if isinstance(lpath, list) and isinstance(rpath, list):
963 # No need to expand paths when both source and destination
964 # are provided as lists
965 rpaths = rpath
966 lpaths = lpath
967 else:
968 from .implementations.local import (
969 LocalFileSystem,
970 make_path_posix,
971 trailing_sep,
972 )
974 source_is_str = isinstance(rpath, str)
975 rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth)
976 if source_is_str and (not recursive or maxdepth is not None):
977 # Non-recursive glob does not copy directories
978 rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))]
979 if not rpaths:
980 return
982 if isinstance(lpath, str):
983 lpath = make_path_posix(lpath)
985 source_is_file = len(rpaths) == 1
986 dest_is_dir = isinstance(lpath, str) and (
987 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
988 )
990 exists = source_is_str and (
991 (has_magic(rpath) and source_is_file)
992 or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath))
993 )
994 lpaths = other_paths(
995 rpaths,
996 lpath,
997 exists=exists,
998 flatten=not source_is_str,
999 )
1001 callback.set_size(len(lpaths))
1002 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1003 with callback.branched(rpath, lpath) as child:
1004 self.get_file(rpath, lpath, callback=child, **kwargs)
1006 def put_file(
1007 self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
1008 ):
1009 """Copy single file to remote"""
1010 if mode == "create" and self.exists(rpath):
1011 raise FileExistsError
1012 if os.path.isdir(lpath):
1013 self.makedirs(rpath, exist_ok=True)
1014 return None
1016 with open(lpath, "rb") as f1:
1017 size = f1.seek(0, 2)
1018 callback.set_size(size)
1019 f1.seek(0)
1021 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True)
1022 with self.open(rpath, "wb", **kwargs) as f2:
1023 while f1.tell() < size:
1024 data = f1.read(self.blocksize)
1025 segment_len = f2.write(data)
1026 if segment_len is None:
1027 segment_len = len(data)
1028 callback.relative_update(segment_len)
1030 def put(
1031 self,
1032 lpath,
1033 rpath,
1034 recursive=False,
1035 callback=DEFAULT_CALLBACK,
1036 maxdepth=None,
1037 **kwargs,
1038 ):
1039 """Copy file(s) from local.
1041 Copies a specific file or tree of files (if recursive=True). If rpath
1042 ends with a "/", it will be assumed to be a directory, and target files
1043 will go within.
1045 Calls put_file for each source.
1046 """
1047 if isinstance(lpath, list) and isinstance(rpath, list):
1048 # No need to expand paths when both source and destination
1049 # are provided as lists
1050 rpaths = rpath
1051 lpaths = lpath
1052 else:
1053 from .implementations.local import (
1054 LocalFileSystem,
1055 make_path_posix,
1056 trailing_sep,
1057 )
1059 source_is_str = isinstance(lpath, str)
1060 if source_is_str:
1061 lpath = make_path_posix(lpath)
1062 fs = LocalFileSystem()
1063 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
1064 if source_is_str and (not recursive or maxdepth is not None):
1065 # Non-recursive glob does not copy directories
1066 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
1067 if not lpaths:
1068 return
1070 source_is_file = len(lpaths) == 1
1071 dest_is_dir = isinstance(rpath, str) and (
1072 trailing_sep(rpath) or self.isdir(rpath)
1073 )
1075 rpath = (
1076 self._strip_protocol(rpath)
1077 if isinstance(rpath, str)
1078 else [self._strip_protocol(p) for p in rpath]
1079 )
1080 exists = source_is_str and (
1081 (has_magic(lpath) and source_is_file)
1082 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
1083 )
1084 rpaths = other_paths(
1085 lpaths,
1086 rpath,
1087 exists=exists,
1088 flatten=not source_is_str,
1089 )
1091 callback.set_size(len(rpaths))
1092 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1093 with callback.branched(lpath, rpath) as child:
1094 self.put_file(lpath, rpath, callback=child, **kwargs)
1096 def head(self, path, size=1024):
1097 """Get the first ``size`` bytes from file"""
1098 with self.open(path, "rb") as f:
1099 return f.read(size)
1101 def tail(self, path, size=1024):
1102 """Get the last ``size`` bytes from file"""
1103 with self.open(path, "rb") as f:
1104 f.seek(max(-size, -f.size), 2)
1105 return f.read()
1107 def cp_file(self, path1, path2, **kwargs):
1108 raise NotImplementedError
1110 def copy(
1111 self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs
1112 ):
1113 """Copy within two locations in the filesystem
1115 on_error : "raise", "ignore"
1116 If raise, any not-found exceptions will be raised; if ignore any
1117 not-found exceptions will cause the path to be skipped; defaults to
1118 raise unless recursive is true, where the default is ignore
1119 """
1120 if on_error is None and recursive:
1121 on_error = "ignore"
1122 elif on_error is None:
1123 on_error = "raise"
1125 if isinstance(path1, list) and isinstance(path2, list):
1126 # No need to expand paths when both source and destination
1127 # are provided as lists
1128 paths1 = path1
1129 paths2 = path2
1130 else:
1131 from .implementations.local import trailing_sep
1133 source_is_str = isinstance(path1, str)
1134 paths1 = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth)
1135 if source_is_str and (not recursive or maxdepth is not None):
1136 # Non-recursive glob does not copy directories
1137 paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))]
1138 if not paths1:
1139 return
1141 source_is_file = len(paths1) == 1
1142 dest_is_dir = isinstance(path2, str) and (
1143 trailing_sep(path2) or self.isdir(path2)
1144 )
1146 exists = source_is_str and (
1147 (has_magic(path1) and source_is_file)
1148 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
1149 )
1150 paths2 = other_paths(
1151 paths1,
1152 path2,
1153 exists=exists,
1154 flatten=not source_is_str,
1155 )
1157 for p1, p2 in zip(paths1, paths2):
1158 try:
1159 self.cp_file(p1, p2, **kwargs)
1160 except FileNotFoundError:
1161 if on_error == "raise":
1162 raise
1164 def expand_path(self, path, recursive=False, maxdepth=None, **kwargs):
1165 """Turn one or more globs or directories into a list of all matching paths
1166 to files or directories.
1168 kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls``
1169 """
1171 if maxdepth is not None and maxdepth < 1:
1172 raise ValueError("maxdepth must be at least 1")
1174 if isinstance(path, (str, os.PathLike)):
1175 out = self.expand_path([path], recursive, maxdepth)
1176 else:
1177 out = set()
1178 path = [self._strip_protocol(p) for p in path]
1179 for p in path:
1180 if has_magic(p):
1181 bit = set(self.glob(p, maxdepth=maxdepth, **kwargs))
1182 out |= bit
1183 if recursive:
1184 # glob call above expanded one depth so if maxdepth is defined
1185 # then decrement it in expand_path call below. If it is zero
1186 # after decrementing then avoid expand_path call.
1187 if maxdepth is not None and maxdepth <= 1:
1188 continue
1189 out |= set(
1190 self.expand_path(
1191 list(bit),
1192 recursive=recursive,
1193 maxdepth=maxdepth - 1 if maxdepth is not None else None,
1194 **kwargs,
1195 )
1196 )
1197 continue
1198 elif recursive:
1199 rec = set(
1200 self.find(
1201 p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs
1202 )
1203 )
1204 out |= rec
1205 if p not in out and (recursive is False or self.exists(p)):
1206 # should only check once, for the root
1207 out.add(p)
1208 if not out:
1209 raise FileNotFoundError(path)
1210 return sorted(out)
1212 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
1213 """Move file(s) from one location to another"""
1214 if path1 == path2:
1215 logger.debug("%s mv: The paths are the same, so no files were moved.", self)
1216 else:
1217 # explicitly raise exception to prevent data corruption
1218 self.copy(
1219 path1, path2, recursive=recursive, maxdepth=maxdepth, onerror="raise"
1220 )
1221 self.rm(path1, recursive=recursive)
1223 def rm_file(self, path):
1224 """Delete a file"""
1225 self._rm(path)
1227 def _rm(self, path):
1228 """Delete one file"""
1229 # this is the old name for the method, prefer rm_file
1230 raise NotImplementedError
1232 def rm(self, path, recursive=False, maxdepth=None):
1233 """Delete files.
1235 Parameters
1236 ----------
1237 path: str or list of str
1238 File(s) to delete.
1239 recursive: bool
1240 If file(s) are directories, recursively delete contents and then
1241 also remove the directory
1242 maxdepth: int or None
1243 Depth to pass to walk for finding files to delete, if recursive.
1244 If None, there will be no limit and infinite recursion may be
1245 possible.
1246 """
1247 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
1248 for p in reversed(path):
1249 self.rm_file(p)
1251 @classmethod
1252 def _parent(cls, path):
1253 path = cls._strip_protocol(path)
1254 if "/" in path:
1255 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
1256 return cls.root_marker + parent
1257 else:
1258 return cls.root_marker
1260 def _open(
1261 self,
1262 path,
1263 mode="rb",
1264 block_size=None,
1265 autocommit=True,
1266 cache_options=None,
1267 **kwargs,
1268 ):
1269 """Return raw bytes-mode file-like from the file-system"""
1270 return AbstractBufferedFile(
1271 self,
1272 path,
1273 mode,
1274 block_size,
1275 autocommit,
1276 cache_options=cache_options,
1277 **kwargs,
1278 )
1280 def open(
1281 self,
1282 path,
1283 mode="rb",
1284 block_size=None,
1285 cache_options=None,
1286 compression=None,
1287 **kwargs,
1288 ):
1289 """
1290 Return a file-like object from the filesystem
1292 The resultant instance must function correctly in a context ``with``
1293 block.
1295 Parameters
1296 ----------
1297 path: str
1298 Target file
1299 mode: str like 'rb', 'w'
1300 See builtin ``open()``
1301 Mode "x" (exclusive write) may be implemented by the backend. Even if
1302 it is, whether it is checked up front or on commit, and whether it is
1303 atomic is implementation-dependent.
1304 block_size: int
1305 Some indication of buffering - this is a value in bytes
1306 cache_options : dict, optional
1307 Extra arguments to pass through to the cache.
1308 compression: string or None
1309 If given, open file using compression codec. Can either be a compression
1310 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
1311 compression from the filename suffix.
1312 encoding, errors, newline: passed on to TextIOWrapper for text mode
1313 """
1314 import io
1316 path = self._strip_protocol(path)
1317 if "b" not in mode:
1318 mode = mode.replace("t", "") + "b"
1320 text_kwargs = {
1321 k: kwargs.pop(k)
1322 for k in ["encoding", "errors", "newline"]
1323 if k in kwargs
1324 }
1325 return io.TextIOWrapper(
1326 self.open(
1327 path,
1328 mode,
1329 block_size=block_size,
1330 cache_options=cache_options,
1331 compression=compression,
1332 **kwargs,
1333 ),
1334 **text_kwargs,
1335 )
1336 else:
1337 ac = kwargs.pop("autocommit", not self._intrans)
1338 f = self._open(
1339 path,
1340 mode=mode,
1341 block_size=block_size,
1342 autocommit=ac,
1343 cache_options=cache_options,
1344 **kwargs,
1345 )
1346 if compression is not None:
1347 from fsspec.compression import compr
1348 from fsspec.core import get_compression
1350 compression = get_compression(path, compression)
1351 compress = compr[compression]
1352 f = compress(f, mode=mode[0])
1354 if not ac and "r" not in mode:
1355 self.transaction.files.append(f)
1356 return f
1358 def touch(self, path, truncate=True, **kwargs):
1359 """Create empty file, or update timestamp
1361 Parameters
1362 ----------
1363 path: str
1364 file location
1365 truncate: bool
1366 If True, always set file size to 0; if False, update timestamp and
1367 leave file unchanged, if backend allows this
1368 """
1369 if truncate or not self.exists(path):
1370 with self.open(path, "wb", **kwargs):
1371 pass
1372 else:
1373 raise NotImplementedError # update timestamp, if possible
1375 def ukey(self, path):
1376 """Hash of file properties, to tell if it has changed"""
1377 return sha256(str(self.info(path)).encode()).hexdigest()
1379 def read_block(self, fn, offset, length, delimiter=None):
1380 """Read a block of bytes from
1382 Starting at ``offset`` of the file, read ``length`` bytes. If
1383 ``delimiter`` is set then we ensure that the read starts and stops at
1384 delimiter boundaries that follow the locations ``offset`` and ``offset
1385 + length``. If ``offset`` is zero then we start at zero. The
1386 bytestring returned WILL include the end delimiter string.
1388 If offset+length is beyond the eof, reads to eof.
1390 Parameters
1391 ----------
1392 fn: string
1393 Path to filename
1394 offset: int
1395 Byte offset to start read
1396 length: int
1397 Number of bytes to read. If None, read to end.
1398 delimiter: bytes (optional)
1399 Ensure reading starts and stops at delimiter bytestring
1401 Examples
1402 --------
1403 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP
1404 b'Alice, 100\\nBo'
1405 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP
1406 b'Alice, 100\\nBob, 200\\n'
1408 Use ``length=None`` to read to the end of the file.
1409 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP
1410 b'Alice, 100\\nBob, 200\\nCharlie, 300'
1412 See Also
1413 --------
1414 :func:`fsspec.utils.read_block`
1415 """
1416 with self.open(fn, "rb") as f:
1417 size = f.size
1418 if length is None:
1419 length = size
1420 if size is not None and offset + length > size:
1421 length = size - offset
1422 return read_block(f, offset, length, delimiter)
1424 def to_json(self, *, include_password: bool = True) -> str:
1425 """
1426 JSON representation of this filesystem instance.
1428 Parameters
1429 ----------
1430 include_password: bool, default True
1431 Whether to include the password (if any) in the output.
1433 Returns
1434 -------
1435 JSON string with keys ``cls`` (the python location of this class),
1436 protocol (text name of this class's protocol, first one in case of
1437 multiple), ``args`` (positional args, usually empty), and all other
1438 keyword arguments as their own keys.
1440 Warnings
1441 --------
1442 Serialized filesystems may contain sensitive information which have been
1443 passed to the constructor, such as passwords and tokens. Make sure you
1444 store and send them in a secure environment!
1445 """
1446 from .json import FilesystemJSONEncoder
1448 return json.dumps(
1449 self,
1450 cls=type(
1451 "_FilesystemJSONEncoder",
1452 (FilesystemJSONEncoder,),
1453 {"include_password": include_password},
1454 ),
1455 )
1457 @staticmethod
1458 def from_json(blob: str) -> AbstractFileSystem:
1459 """
1460 Recreate a filesystem instance from JSON representation.
1462 See ``.to_json()`` for the expected structure of the input.
1464 Parameters
1465 ----------
1466 blob: str
1468 Returns
1469 -------
1470 file system instance, not necessarily of this particular class.
1472 Warnings
1473 --------
1474 This can import arbitrary modules (as determined by the ``cls`` key).
1475 Make sure you haven't installed any modules that may execute malicious code
1476 at import time.
1477 """
1478 from .json import FilesystemJSONDecoder
1480 return json.loads(blob, cls=FilesystemJSONDecoder)
1482 def to_dict(self, *, include_password: bool = True) -> dict[str, Any]:
1483 """
1484 JSON-serializable dictionary representation of this filesystem instance.
1486 Parameters
1487 ----------
1488 include_password: bool, default True
1489 Whether to include the password (if any) in the output.
1491 Returns
1492 -------
1493 Dictionary with keys ``cls`` (the python location of this class),
1494 protocol (text name of this class's protocol, first one in case of
1495 multiple), ``args`` (positional args, usually empty), and all other
1496 keyword arguments as their own keys.
1498 Warnings
1499 --------
1500 Serialized filesystems may contain sensitive information which have been
1501 passed to the constructor, such as passwords and tokens. Make sure you
1502 store and send them in a secure environment!
1503 """
1504 from .json import FilesystemJSONEncoder
1506 json_encoder = FilesystemJSONEncoder()
1508 cls = type(self)
1509 proto = self.protocol
1511 storage_options = dict(self.storage_options)
1512 if not include_password:
1513 storage_options.pop("password", None)
1515 return dict(
1516 cls=f"{cls.__module__}:{cls.__name__}",
1517 protocol=proto[0] if isinstance(proto, (tuple, list)) else proto,
1518 args=json_encoder.make_serializable(self.storage_args),
1519 **json_encoder.make_serializable(storage_options),
1520 )
1522 @staticmethod
1523 def from_dict(dct: dict[str, Any]) -> AbstractFileSystem:
1524 """
1525 Recreate a filesystem instance from dictionary representation.
1527 See ``.to_dict()`` for the expected structure of the input.
1529 Parameters
1530 ----------
1531 dct: Dict[str, Any]
1533 Returns
1534 -------
1535 file system instance, not necessarily of this particular class.
1537 Warnings
1538 --------
1539 This can import arbitrary modules (as determined by the ``cls`` key).
1540 Make sure you haven't installed any modules that may execute malicious code
1541 at import time.
1542 """
1543 from .json import FilesystemJSONDecoder
1545 json_decoder = FilesystemJSONDecoder()
1547 dct = dict(dct) # Defensive copy
1549 cls = FilesystemJSONDecoder.try_resolve_fs_cls(dct)
1550 if cls is None:
1551 raise ValueError("Not a serialized AbstractFileSystem")
1553 dct.pop("cls", None)
1554 dct.pop("protocol", None)
1556 return cls(
1557 *json_decoder.unmake_serializable(dct.pop("args", ())),
1558 **json_decoder.unmake_serializable(dct),
1559 )
1561 def _get_pyarrow_filesystem(self):
1562 """
1563 Make a version of the FS instance which will be acceptable to pyarrow
1564 """
1565 # all instances already also derive from pyarrow
1566 return self
1568 def get_mapper(self, root="", check=False, create=False, missing_exceptions=None):
1569 """Create key/value store based on this file-system
1571 Makes a MutableMapping interface to the FS at the given root path.
1572 See ``fsspec.mapping.FSMap`` for further details.
1573 """
1574 from .mapping import FSMap
1576 return FSMap(
1577 root,
1578 self,
1579 check=check,
1580 create=create,
1581 missing_exceptions=missing_exceptions,
1582 )
1584 @classmethod
1585 def clear_instance_cache(cls):
1586 """
1587 Clear the cache of filesystem instances.
1589 Notes
1590 -----
1591 Unless overridden by setting the ``cachable`` class attribute to False,
1592 the filesystem class stores a reference to newly created instances. This
1593 prevents Python's normal rules around garbage collection from working,
1594 since the instances refcount will not drop to zero until
1595 ``clear_instance_cache`` is called.
1596 """
1597 cls._cache.clear()
1599 def created(self, path):
1600 """Return the created timestamp of a file as a datetime.datetime"""
1601 raise NotImplementedError
1603 def modified(self, path):
1604 """Return the modified timestamp of a file as a datetime.datetime"""
1605 raise NotImplementedError
1607 def tree(
1608 self,
1609 path: str = "/",
1610 recursion_limit: int = 2,
1611 max_display: int = 25,
1612 display_size: bool = False,
1613 prefix: str = "",
1614 is_last: bool = True,
1615 first: bool = True,
1616 indent_size: int = 4,
1617 ) -> str:
1618 """
1619 Return a tree-like structure of the filesystem starting from the given path as a string.
1621 Parameters
1622 ----------
1623 path: Root path to start traversal from
1624 recursion_limit: Maximum depth of directory traversal
1625 max_display: Maximum number of items to display per directory
1626 display_size: Whether to display file sizes
1627 prefix: Current line prefix for visual tree structure
1628 is_last: Whether current item is last in its level
1629 first: Whether this is the first call (displays root path)
1630 indent_size: Number of spaces by indent
1632 Returns
1633 -------
1634 str: A string representing the tree structure.
1636 Example
1637 -------
1638 >>> from fsspec import filesystem
1640 >>> fs = filesystem('ftp', host='test.rebex.net', user='demo', password='password')
1641 >>> tree = fs.tree(display_size=True, recursion_limit=3, indent_size=8, max_display=10)
1642 >>> print(tree)
1643 """
1645 def format_bytes(n: int) -> str:
1646 """Format bytes as text."""
1647 for prefix, k in (
1648 ("P", 2**50),
1649 ("T", 2**40),
1650 ("G", 2**30),
1651 ("M", 2**20),
1652 ("k", 2**10),
1653 ):
1654 if n >= 0.9 * k:
1655 return f"{n / k:.2f} {prefix}b"
1656 return f"{n}B"
1658 result = []
1660 if first:
1661 result.append(path)
1663 if recursion_limit:
1664 indent = " " * indent_size
1665 contents = self.ls(path, detail=True)
1666 contents.sort(
1667 key=lambda x: (x.get("type") != "directory", x.get("name", ""))
1668 )
1670 if max_display is not None and len(contents) > max_display:
1671 displayed_contents = contents[:max_display]
1672 remaining_count = len(contents) - max_display
1673 else:
1674 displayed_contents = contents
1675 remaining_count = 0
1677 for i, item in enumerate(displayed_contents):
1678 is_last_item = (i == len(displayed_contents) - 1) and (
1679 remaining_count == 0
1680 )
1682 branch = (
1683 "└" + ("─" * (indent_size - 2))
1684 if is_last_item
1685 else "├" + ("─" * (indent_size - 2))
1686 )
1687 branch += " "
1688 new_prefix = prefix + (
1689 indent if is_last_item else "│" + " " * (indent_size - 1)
1690 )
1692 name = os.path.basename(item.get("name", ""))
1694 if display_size and item.get("type") == "directory":
1695 sub_contents = self.ls(item.get("name", ""), detail=True)
1696 num_files = sum(
1697 1 for sub_item in sub_contents if sub_item.get("type") == "file"
1698 )
1699 num_folders = sum(
1700 1
1701 for sub_item in sub_contents
1702 if sub_item.get("type") == "directory"
1703 )
1705 if num_files == 0 and num_folders == 0:
1706 size = " (empty folder)"
1707 elif num_files == 0:
1708 size = f" ({num_folders} subfolder{'s' if num_folders > 1 else ''})"
1709 elif num_folders == 0:
1710 size = f" ({num_files} file{'s' if num_files > 1 else ''})"
1711 else:
1712 size = f" ({num_files} file{'s' if num_files > 1 else ''}, {num_folders} subfolder{'s' if num_folders > 1 else ''})"
1713 elif display_size and item.get("type") == "file":
1714 size = f" ({format_bytes(item.get('size', 0))})"
1715 else:
1716 size = ""
1718 result.append(f"{prefix}{branch}{name}{size}")
1720 if item.get("type") == "directory" and recursion_limit > 0:
1721 result.append(
1722 self.tree(
1723 path=item.get("name", ""),
1724 recursion_limit=recursion_limit - 1,
1725 max_display=max_display,
1726 display_size=display_size,
1727 prefix=new_prefix,
1728 is_last=is_last_item,
1729 first=False,
1730 indent_size=indent_size,
1731 )
1732 )
1734 if remaining_count > 0:
1735 more_message = f"{remaining_count} more item(s) not displayed."
1736 result.append(
1737 f"{prefix}{'└' + ('─' * (indent_size - 2))} {more_message}"
1738 )
1740 return "\n".join(_ for _ in result if _)
1742 # ------------------------------------------------------------------------
1743 # Aliases
1745 def read_bytes(self, path, start=None, end=None, **kwargs):
1746 """Alias of `AbstractFileSystem.cat_file`."""
1747 return self.cat_file(path, start=start, end=end, **kwargs)
1749 def write_bytes(self, path, value, **kwargs):
1750 """Alias of `AbstractFileSystem.pipe_file`."""
1751 self.pipe_file(path, value, **kwargs)
1753 def makedir(self, path, create_parents=True, **kwargs):
1754 """Alias of `AbstractFileSystem.mkdir`."""
1755 return self.mkdir(path, create_parents=create_parents, **kwargs)
1757 def mkdirs(self, path, exist_ok=False):
1758 """Alias of `AbstractFileSystem.makedirs`."""
1759 return self.makedirs(path, exist_ok=exist_ok)
1761 def listdir(self, path, detail=True, **kwargs):
1762 """Alias of `AbstractFileSystem.ls`."""
1763 return self.ls(path, detail=detail, **kwargs)
1765 def cp(self, path1, path2, **kwargs):
1766 """Alias of `AbstractFileSystem.copy`."""
1767 return self.copy(path1, path2, **kwargs)
1769 def move(self, path1, path2, **kwargs):
1770 """Alias of `AbstractFileSystem.mv`."""
1771 return self.mv(path1, path2, **kwargs)
1773 def stat(self, path, **kwargs):
1774 """Alias of `AbstractFileSystem.info`."""
1775 return self.info(path, **kwargs)
1777 def disk_usage(self, path, total=True, maxdepth=None, **kwargs):
1778 """Alias of `AbstractFileSystem.du`."""
1779 return self.du(path, total=total, maxdepth=maxdepth, **kwargs)
1781 def rename(self, path1, path2, **kwargs):
1782 """Alias of `AbstractFileSystem.mv`."""
1783 return self.mv(path1, path2, **kwargs)
1785 def delete(self, path, recursive=False, maxdepth=None):
1786 """Alias of `AbstractFileSystem.rm`."""
1787 return self.rm(path, recursive=recursive, maxdepth=maxdepth)
1789 def upload(self, lpath, rpath, recursive=False, **kwargs):
1790 """Alias of `AbstractFileSystem.put`."""
1791 return self.put(lpath, rpath, recursive=recursive, **kwargs)
1793 def download(self, rpath, lpath, recursive=False, **kwargs):
1794 """Alias of `AbstractFileSystem.get`."""
1795 return self.get(rpath, lpath, recursive=recursive, **kwargs)
1797 def sign(self, path, expiration=100, **kwargs):
1798 """Create a signed URL representing the given path
1800 Some implementations allow temporary URLs to be generated, as a
1801 way of delegating credentials.
1803 Parameters
1804 ----------
1805 path : str
1806 The path on the filesystem
1807 expiration : int
1808 Number of seconds to enable the URL for (if supported)
1810 Returns
1811 -------
1812 URL : str
1813 The signed URL
1815 Raises
1816 ------
1817 NotImplementedError : if method is not implemented for a filesystem
1818 """
1819 raise NotImplementedError("Sign is not implemented for this filesystem")
1821 def _isfilestore(self):
1822 # Originally inherited from pyarrow DaskFileSystem. Keeping this
1823 # here for backwards compatibility as long as pyarrow uses its
1824 # legacy fsspec-compatible filesystems and thus accepts fsspec
1825 # filesystems as well
1826 return False
1829class AbstractBufferedFile(io.IOBase):
1830 """Convenient class to derive from to provide buffering
1832 In the case that the backend does not provide a pythonic file-like object
1833 already, this class contains much of the logic to build one. The only
1834 methods that need to be overridden are ``_upload_chunk``,
1835 ``_initiate_upload`` and ``_fetch_range``.
1836 """
1838 DEFAULT_BLOCK_SIZE = 5 * 2**20
1839 _details = None
1841 def __init__(
1842 self,
1843 fs,
1844 path,
1845 mode="rb",
1846 block_size="default",
1847 autocommit=True,
1848 cache_type="readahead",
1849 cache_options=None,
1850 size=None,
1851 **kwargs,
1852 ):
1853 """
1854 Template for files with buffered reading and writing
1856 Parameters
1857 ----------
1858 fs: instance of FileSystem
1859 path: str
1860 location in file-system
1861 mode: str
1862 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file
1863 systems may be read-only, and some may not support append.
1864 block_size: int
1865 Buffer size for reading or writing, 'default' for class default
1866 autocommit: bool
1867 Whether to write to final destination; may only impact what
1868 happens when file is being closed.
1869 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead"
1870 Caching policy in read mode. See the definitions in ``core``.
1871 cache_options : dict
1872 Additional options passed to the constructor for the cache specified
1873 by `cache_type`.
1874 size: int
1875 If given and in read mode, suppressed having to look up the file size
1876 kwargs:
1877 Gets stored as self.kwargs
1878 """
1879 from .core import caches
1881 self.path = path
1882 self.fs = fs
1883 self.mode = mode
1884 self.blocksize = (
1885 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
1886 )
1887 self.loc = 0
1888 self.autocommit = autocommit
1889 self.end = None
1890 self.start = None
1891 self.closed = False
1893 if cache_options is None:
1894 cache_options = {}
1896 if "trim" in kwargs:
1897 warnings.warn(
1898 "Passing 'trim' to control the cache behavior has been deprecated. "
1899 "Specify it within the 'cache_options' argument instead.",
1900 FutureWarning,
1901 )
1902 cache_options["trim"] = kwargs.pop("trim")
1904 self.kwargs = kwargs
1906 if mode not in {"ab", "rb", "wb", "xb"}:
1907 raise NotImplementedError("File mode not supported")
1908 if mode == "rb":
1909 if size is not None:
1910 self.size = size
1911 else:
1912 self.size = self.details["size"]
1913 self.cache = caches[cache_type](
1914 self.blocksize, self._fetch_range, self.size, **cache_options
1915 )
1916 else:
1917 self.buffer = io.BytesIO()
1918 self.offset = None
1919 self.forced = False
1920 self.location = None
1922 @property
1923 def details(self):
1924 if self._details is None:
1925 self._details = self.fs.info(self.path)
1926 return self._details
1928 @details.setter
1929 def details(self, value):
1930 self._details = value
1931 self.size = value["size"]
1933 @property
1934 def full_name(self):
1935 return _unstrip_protocol(self.path, self.fs)
1937 @property
1938 def closed(self):
1939 # get around this attr being read-only in IOBase
1940 # use getattr here, since this can be called during del
1941 return getattr(self, "_closed", True)
1943 @closed.setter
1944 def closed(self, c):
1945 self._closed = c
1947 def __hash__(self):
1948 if "w" in self.mode:
1949 return id(self)
1950 else:
1951 return int(tokenize(self.details), 16)
1953 def __eq__(self, other):
1954 """Files are equal if they have the same checksum, only in read mode"""
1955 if self is other:
1956 return True
1957 return (
1958 isinstance(other, type(self))
1959 and self.mode == "rb"
1960 and other.mode == "rb"
1961 and hash(self) == hash(other)
1962 )
1964 def commit(self):
1965 """Move from temp to final destination"""
1967 def discard(self):
1968 """Throw away temporary file"""
1970 def info(self):
1971 """File information about this path"""
1972 if self.readable():
1973 return self.details
1974 else:
1975 raise ValueError("Info not available while writing")
1977 def tell(self):
1978 """Current file location"""
1979 return self.loc
1981 def seek(self, loc, whence=0):
1982 """Set current file location
1984 Parameters
1985 ----------
1986 loc: int
1987 byte location
1988 whence: {0, 1, 2}
1989 from start of file, current location or end of file, resp.
1990 """
1991 loc = int(loc)
1992 if not self.mode == "rb":
1993 raise OSError(ESPIPE, "Seek only available in read mode")
1994 if whence == 0:
1995 nloc = loc
1996 elif whence == 1:
1997 nloc = self.loc + loc
1998 elif whence == 2:
1999 nloc = self.size + loc
2000 else:
2001 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
2002 if nloc < 0:
2003 raise ValueError("Seek before start of file")
2004 self.loc = nloc
2005 return self.loc
2007 def write(self, data):
2008 """
2009 Write data to buffer.
2011 Buffer only sent on flush() or if buffer is greater than
2012 or equal to blocksize.
2014 Parameters
2015 ----------
2016 data: bytes
2017 Set of bytes to be written.
2018 """
2019 if not self.writable():
2020 raise ValueError("File not in write mode")
2021 if self.closed:
2022 raise ValueError("I/O operation on closed file.")
2023 if self.forced:
2024 raise ValueError("This file has been force-flushed, can only close")
2025 out = self.buffer.write(data)
2026 self.loc += out
2027 if self.buffer.tell() >= self.blocksize:
2028 self.flush()
2029 return out
2031 def flush(self, force=False):
2032 """
2033 Write buffered data to backend store.
2035 Writes the current buffer, if it is larger than the block-size, or if
2036 the file is being closed.
2038 Parameters
2039 ----------
2040 force: bool
2041 When closing, write the last block even if it is smaller than
2042 blocks are allowed to be. Disallows further writing to this file.
2043 """
2045 if self.closed:
2046 raise ValueError("Flush on closed file")
2047 if force and self.forced:
2048 raise ValueError("Force flush cannot be called more than once")
2049 if force:
2050 self.forced = True
2052 if self.readable():
2053 # no-op to flush on read-mode
2054 return
2056 if not force and self.buffer.tell() < self.blocksize:
2057 # Defer write on small block
2058 return
2060 if self.offset is None:
2061 # Initialize a multipart upload
2062 self.offset = 0
2063 try:
2064 self._initiate_upload()
2065 except:
2066 self.closed = True
2067 raise
2069 if self._upload_chunk(final=force) is not False:
2070 self.offset += self.buffer.seek(0, 2)
2071 self.buffer = io.BytesIO()
2073 def _upload_chunk(self, final=False):
2074 """Write one part of a multi-block file upload
2076 Parameters
2077 ==========
2078 final: bool
2079 This is the last block, so should complete file, if
2080 self.autocommit is True.
2081 """
2082 # may not yet have been initialized, may need to call _initialize_upload
2084 def _initiate_upload(self):
2085 """Create remote file/upload"""
2086 pass
2088 def _fetch_range(self, start, end):
2089 """Get the specified set of bytes from remote"""
2090 return self.fs.cat_file(self.path, start=start, end=end)
2092 def read(self, length=-1):
2093 """
2094 Return data from cache, or fetch pieces as necessary
2096 Parameters
2097 ----------
2098 length: int (-1)
2099 Number of bytes to read; if <0, all remaining bytes.
2100 """
2101 length = -1 if length is None else int(length)
2102 if self.mode != "rb":
2103 raise ValueError("File not in read mode")
2104 if length < 0:
2105 length = self.size - self.loc
2106 if self.closed:
2107 raise ValueError("I/O operation on closed file.")
2108 if length == 0:
2109 # don't even bother calling fetch
2110 return b""
2111 out = self.cache._fetch(self.loc, self.loc + length)
2113 logger.debug(
2114 "%s read: %i - %i %s",
2115 self,
2116 self.loc,
2117 self.loc + length,
2118 self.cache._log_stats(),
2119 )
2120 self.loc += len(out)
2121 return out
2123 def readinto(self, b):
2124 """mirrors builtin file's readinto method
2126 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
2127 """
2128 out = memoryview(b).cast("B")
2129 data = self.read(out.nbytes)
2130 out[: len(data)] = data
2131 return len(data)
2133 def readuntil(self, char=b"\n", blocks=None):
2134 """Return data between current position and first occurrence of char
2136 char is included in the output, except if the end of the tile is
2137 encountered first.
2139 Parameters
2140 ----------
2141 char: bytes
2142 Thing to find
2143 blocks: None or int
2144 How much to read in each go. Defaults to file blocksize - which may
2145 mean a new read on every call.
2146 """
2147 out = []
2148 while True:
2149 start = self.tell()
2150 part = self.read(blocks or self.blocksize)
2151 if len(part) == 0:
2152 break
2153 found = part.find(char)
2154 if found > -1:
2155 out.append(part[: found + len(char)])
2156 self.seek(start + found + len(char))
2157 break
2158 out.append(part)
2159 return b"".join(out)
2161 def readline(self):
2162 """Read until and including the first occurrence of newline character
2164 Note that, because of character encoding, this is not necessarily a
2165 true line ending.
2166 """
2167 return self.readuntil(b"\n")
2169 def __next__(self):
2170 out = self.readline()
2171 if out:
2172 return out
2173 raise StopIteration
2175 def __iter__(self):
2176 return self
2178 def readlines(self):
2179 """Return all data, split by the newline character, including the newline character"""
2180 data = self.read()
2181 lines = data.split(b"\n")
2182 out = [l + b"\n" for l in lines[:-1]]
2183 if data.endswith(b"\n"):
2184 return out
2185 else:
2186 return out + [lines[-1]]
2187 # return list(self) ???
2189 def readinto1(self, b):
2190 return self.readinto(b)
2192 def close(self):
2193 """Close file
2195 Finalizes writes, discards cache
2196 """
2197 if getattr(self, "_unclosable", False):
2198 return
2199 if self.closed:
2200 return
2201 try:
2202 if self.mode == "rb":
2203 self.cache = None
2204 else:
2205 if not self.forced:
2206 self.flush(force=True)
2208 if self.fs is not None:
2209 self.fs.invalidate_cache(self.path)
2210 self.fs.invalidate_cache(self.fs._parent(self.path))
2211 finally:
2212 self.closed = True
2214 def readable(self):
2215 """Whether opened for reading"""
2216 return "r" in self.mode and not self.closed
2218 def seekable(self):
2219 """Whether is seekable (only in read mode)"""
2220 return self.readable()
2222 def writable(self):
2223 """Whether opened for writing"""
2224 return self.mode in {"wb", "ab", "xb"} and not self.closed
2226 def __reduce__(self):
2227 if self.mode != "rb":
2228 raise RuntimeError("Pickling a writeable file is not supported")
2230 return reopen, (
2231 self.fs,
2232 self.path,
2233 self.mode,
2234 self.blocksize,
2235 self.loc,
2236 self.size,
2237 self.autocommit,
2238 self.cache.name if self.cache else "none",
2239 self.kwargs,
2240 )
2242 def __del__(self):
2243 if not self.closed:
2244 self.close()
2246 def __str__(self):
2247 return f"<File-like object {type(self.fs).__name__}, {self.path}>"
2249 __repr__ = __str__
2251 def __enter__(self):
2252 return self
2254 def __exit__(self, *args):
2255 self.close()
2258def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs):
2259 file = fs.open(
2260 path,
2261 mode=mode,
2262 block_size=blocksize,
2263 autocommit=autocommit,
2264 cache_type=cache_type,
2265 size=size,
2266 **kwargs,
2267 )
2268 if loc > 0:
2269 file.seek(loc)
2270 return file