Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/spec.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import json
5import logging
6import os
7import threading
8import warnings
9import weakref
10from errno import ESPIPE
11from glob import has_magic
12from hashlib import sha256
13from typing import Any, ClassVar
15from .callbacks import DEFAULT_CALLBACK
16from .config import apply_config, conf
17from .dircache import DirCache
18from .transaction import Transaction
19from .utils import (
20 _unstrip_protocol,
21 glob_translate,
22 isfilelike,
23 other_paths,
24 read_block,
25 stringify_path,
26 tokenize,
27)
29logger = logging.getLogger("fsspec")
32def make_instance(cls, args, kwargs):
33 return cls(*args, **kwargs)
36class _Cached(type):
37 """
38 Metaclass for caching file system instances.
40 Notes
41 -----
42 Instances are cached according to
44 * The values of the class attributes listed in `_extra_tokenize_attributes`
45 * The arguments passed to ``__init__``.
47 This creates an additional reference to the filesystem, which prevents the
48 filesystem from being garbage collected when all *user* references go away.
49 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
50 be made for a filesystem instance to be garbage collected.
51 """
53 def __init__(cls, *args, **kwargs):
54 super().__init__(*args, **kwargs)
55 # Note: we intentionally create a reference here, to avoid garbage
56 # collecting instances when all other references are gone. To really
57 # delete a FileSystem, the cache must be cleared.
58 if conf.get("weakref_instance_cache"): # pragma: no cover
59 # debug option for analysing fork/spawn conditions
60 cls._cache = weakref.WeakValueDictionary()
61 else:
62 cls._cache = {}
63 cls._pid = os.getpid()
65 def __call__(cls, *args, **kwargs):
66 kwargs = apply_config(cls, kwargs)
67 extra_tokens = tuple(
68 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
69 )
70 strip_tokenize_options = {
71 k: kwargs.pop(k) for k in cls._strip_tokenize_options if k in kwargs
72 }
73 token = tokenize(
74 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
75 )
76 skip = kwargs.pop("skip_instance_cache", False)
77 if os.getpid() != cls._pid:
78 cls._cache.clear()
79 cls._pid = os.getpid()
80 if not skip and cls.cachable and token in cls._cache:
81 cls._latest = token
82 return cls._cache[token]
83 else:
84 obj = super().__call__(*args, **kwargs, **strip_tokenize_options)
85 # Setting _fs_token here causes some static linters to complain.
86 obj._fs_token_ = token
87 obj.storage_args = args
88 obj.storage_options = kwargs
89 if obj.async_impl and obj.mirror_sync_methods:
90 from .asyn import mirror_sync_methods
92 mirror_sync_methods(obj)
94 if cls.cachable and not skip:
95 cls._latest = token
96 cls._cache[token] = obj
97 return obj
100class AbstractFileSystem(metaclass=_Cached):
101 """
102 An abstract super-class for pythonic file-systems
104 Implementations are expected to be compatible with or, better, subclass
105 from here.
106 """
108 cachable = True # this class can be cached, instances reused
109 _cached = False
110 blocksize = 2**22
111 sep = "/"
112 protocol: ClassVar[str | tuple[str, ...]] = "abstract"
113 _latest = None
114 async_impl = False
115 mirror_sync_methods = False
116 root_marker = "" # For some FSs, may require leading '/' or other character
117 transaction_type = Transaction
119 #: Extra *class attributes* that should be considered when hashing.
120 _extra_tokenize_attributes = ()
121 #: *storage options* that should not be considered when hashing.
122 _strip_tokenize_options = ()
124 # Set by _Cached metaclass
125 storage_args: tuple[Any, ...]
126 storage_options: dict[str, Any]
128 def __init__(self, *args, **storage_options):
129 """Create and configure file-system instance
131 Instances may be cachable, so if similar enough arguments are seen
132 a new instance is not required. The token attribute exists to allow
133 implementations to cache instances if they wish.
135 A reasonable default should be provided if there are no arguments.
137 Subclasses should call this method.
139 Parameters
140 ----------
141 use_listings_cache, listings_expiry_time, max_paths:
142 passed to ``DirCache``, if the implementation supports
143 directory listing caching. Pass use_listings_cache=False
144 to disable such caching.
145 skip_instance_cache: bool
146 If this is a cachable implementation, pass True here to force
147 creating a new instance even if a matching instance exists, and prevent
148 storing this instance.
149 asynchronous: bool
150 loop: asyncio-compatible IOLoop or None
151 """
152 if self._cached:
153 # reusing instance, don't change
154 return
155 self._cached = True
156 self._intrans = False
157 self._transaction = None
158 self._invalidated_caches_in_transaction = []
159 self.dircache = DirCache(**storage_options)
161 if storage_options.pop("add_docs", None):
162 warnings.warn("add_docs is no longer supported.", FutureWarning)
164 if storage_options.pop("add_aliases", None):
165 warnings.warn("add_aliases has been removed.", FutureWarning)
166 # This is set in _Cached
167 self._fs_token_ = None
169 @property
170 def fsid(self):
171 """Persistent filesystem id that can be used to compare filesystems
172 across sessions.
173 """
174 raise NotImplementedError
176 @property
177 def _fs_token(self):
178 return self._fs_token_
180 def __dask_tokenize__(self):
181 return self._fs_token
183 def __hash__(self):
184 return int(self._fs_token, 16)
186 def __eq__(self, other):
187 return isinstance(other, type(self)) and self._fs_token == other._fs_token
189 def __reduce__(self):
190 return make_instance, (type(self), self.storage_args, self.storage_options)
192 @classmethod
193 def _strip_protocol(cls, path):
194 """Turn path from fully-qualified to file-system-specific
196 May require FS-specific handling, e.g., for relative paths or links.
197 """
198 if isinstance(path, list):
199 return [cls._strip_protocol(p) for p in path]
200 path = stringify_path(path)
201 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
202 for protocol in protos:
203 if path.startswith(protocol + "://"):
204 path = path[len(protocol) + 3 :]
205 elif path.startswith(protocol + "::"):
206 path = path[len(protocol) + 2 :]
207 path = path.rstrip("/")
208 # use of root_marker to make minimum required path, e.g., "/"
209 return path or cls.root_marker
211 def unstrip_protocol(self, name: str) -> str:
212 """Format FS-specific path to generic, including protocol"""
213 protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol
214 for protocol in protos:
215 if name.startswith(f"{protocol}://"):
216 return name
217 return f"{protos[0]}://{name}"
219 @staticmethod
220 def _get_kwargs_from_urls(path):
221 """If kwargs can be encoded in the paths, extract them here
223 This should happen before instantiation of the class; incoming paths
224 then should be amended to strip the options in methods.
226 Examples may look like an sftp path "sftp://user@host:/my/path", where
227 the user and host should become kwargs and later get stripped.
228 """
229 # by default, nothing happens
230 return {}
232 @classmethod
233 def current(cls):
234 """Return the most recently instantiated FileSystem
236 If no instance has been created, then create one with defaults
237 """
238 if cls._latest in cls._cache:
239 return cls._cache[cls._latest]
240 return cls()
242 @property
243 def transaction(self):
244 """A context within which files are committed together upon exit
246 Requires the file class to implement `.commit()` and `.discard()`
247 for the normal and exception cases.
248 """
249 if self._transaction is None:
250 self._transaction = self.transaction_type(self)
251 return self._transaction
253 def start_transaction(self):
254 """Begin write transaction for deferring files, non-context version"""
255 self._intrans = True
256 self._transaction = self.transaction_type(self)
257 return self.transaction
259 def end_transaction(self):
260 """Finish write transaction, non-context version"""
261 self.transaction.complete()
262 self._transaction = None
263 # The invalid cache must be cleared after the transaction is completed.
264 for path in self._invalidated_caches_in_transaction:
265 self.invalidate_cache(path)
266 self._invalidated_caches_in_transaction.clear()
268 def invalidate_cache(self, path=None):
269 """
270 Discard any cached directory information
272 Parameters
273 ----------
274 path: string or None
275 If None, clear all listings cached else listings at or under given
276 path.
277 """
278 # Not necessary to implement invalidation mechanism, may have no cache.
279 # But if have, you should call this method of parent class from your
280 # subclass to ensure expiring caches after transacations correctly.
281 # See the implementation of FTPFileSystem in ftp.py
282 if self._intrans:
283 self._invalidated_caches_in_transaction.append(path)
285 def mkdir(self, path, create_parents=True, **kwargs):
286 """
287 Create directory entry at path
289 For systems that don't have true directories, may create an for
290 this instance only and not touch the real filesystem
292 Parameters
293 ----------
294 path: str
295 location
296 create_parents: bool
297 if True, this is equivalent to ``makedirs``
298 kwargs:
299 may be permissions, etc.
300 """
301 pass # not necessary to implement, may not have directories
303 def makedirs(self, path, exist_ok=False):
304 """Recursively make directories
306 Creates directory at path and any intervening required directories.
307 Raises exception if, for instance, the path already exists but is a
308 file.
310 Parameters
311 ----------
312 path: str
313 leaf directory name
314 exist_ok: bool (False)
315 If False, will error if the target already exists
316 """
317 pass # not necessary to implement, may not have directories
319 def rmdir(self, path):
320 """Remove a directory, if empty"""
321 pass # not necessary to implement, may not have directories
323 def ls(self, path, detail=True, **kwargs):
324 """List objects at path.
326 This should include subdirectories and files at that location. The
327 difference between a file and a directory must be clear when details
328 are requested.
330 The specific keys, or perhaps a FileInfo class, or similar, is TBD,
331 but must be consistent across implementations.
332 Must include:
334 - full path to the entry (without protocol)
335 - size of the entry, in bytes. If the value cannot be determined, will
336 be ``None``.
337 - type of entry, "file", "directory" or other
339 Additional information
340 may be present, appropriate to the file-system, e.g., generation,
341 checksum, etc.
343 May use refresh=True|False to allow use of self._ls_from_cache to
344 check for a saved listing and avoid calling the backend. This would be
345 common where listing may be expensive.
347 Parameters
348 ----------
349 path: str
350 detail: bool
351 if True, gives a list of dictionaries, where each is the same as
352 the result of ``info(path)``. If False, gives a list of paths
353 (str).
354 kwargs: may have additional backend-specific options, such as version
355 information
357 Returns
358 -------
359 List of strings if detail is False, or list of directory information
360 dicts if detail is True.
361 """
362 raise NotImplementedError
364 def _ls_from_cache(self, path):
365 """Check cache for listing
367 Returns listing, if found (may be empty list for a directly that exists
368 but contains nothing), None if not in cache.
369 """
370 parent = self._parent(path)
371 try:
372 return self.dircache[path.rstrip("/")]
373 except KeyError:
374 pass
375 try:
376 files = [
377 f
378 for f in self.dircache[parent]
379 if f["name"] == path
380 or (f["name"] == path.rstrip("/") and f["type"] == "directory")
381 ]
382 if len(files) == 0:
383 # parent dir was listed but did not contain this file
384 raise FileNotFoundError(path)
385 return files
386 except KeyError:
387 pass
389 def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs):
390 """Return all files under the given path.
392 List all files, recursing into subdirectories; output is iterator-style,
393 like ``os.walk()``. For a simple list of files, ``find()`` is available.
395 When topdown is True, the caller can modify the dirnames list in-place (perhaps
396 using del or slice assignment), and walk() will
397 only recurse into the subdirectories whose names remain in dirnames;
398 this can be used to prune the search, impose a specific order of visiting,
399 or even to inform walk() about directories the caller creates or renames before
400 it resumes walk() again.
401 Modifying dirnames when topdown is False has no effect. (see os.walk)
403 Note that the "files" outputted will include anything that is not
404 a directory, such as links.
406 Parameters
407 ----------
408 path: str
409 Root to recurse into
410 maxdepth: int
411 Maximum recursion depth. None means limitless, but not recommended
412 on link-based file-systems.
413 topdown: bool (True)
414 Whether to walk the directory tree from the top downwards or from
415 the bottom upwards.
416 on_error: "omit", "raise", a callable
417 if omit (default), path with exception will simply be empty;
418 If raise, an underlying exception will be raised;
419 if callable, it will be called with a single OSError instance as argument
420 kwargs: passed to ``ls``
421 """
422 if maxdepth is not None and maxdepth < 1:
423 raise ValueError("maxdepth must be at least 1")
425 path = self._strip_protocol(path)
426 full_dirs = {}
427 dirs = {}
428 files = {}
430 detail = kwargs.pop("detail", False)
431 try:
432 listing = self.ls(path, detail=True, **kwargs)
433 except (FileNotFoundError, OSError) as e:
434 if on_error == "raise":
435 raise
436 if callable(on_error):
437 on_error(e)
438 return
440 for info in listing:
441 # each info name must be at least [path]/part , but here
442 # we check also for names like [path]/part/
443 pathname = info["name"].rstrip("/")
444 name = pathname.rsplit("/", 1)[-1]
445 if info["type"] == "directory" and pathname != path:
446 # do not include "self" path
447 full_dirs[name] = pathname
448 dirs[name] = info
449 elif pathname == path:
450 # file-like with same name as give path
451 files[""] = info
452 else:
453 files[name] = info
455 if not detail:
456 dirs = list(dirs)
457 files = list(files)
459 if topdown:
460 # Yield before recursion if walking top down
461 yield path, dirs, files
463 if maxdepth is not None:
464 maxdepth -= 1
465 if maxdepth < 1:
466 if not topdown:
467 yield path, dirs, files
468 return
470 for d in dirs:
471 yield from self.walk(
472 full_dirs[d],
473 maxdepth=maxdepth,
474 detail=detail,
475 topdown=topdown,
476 **kwargs,
477 )
479 if not topdown:
480 # Yield after recursion if walking bottom up
481 yield path, dirs, files
483 def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
484 """List all files below path.
486 Like posix ``find`` command without conditions
488 Parameters
489 ----------
490 path : str
491 maxdepth: int or None
492 If not None, the maximum number of levels to descend
493 withdirs: bool
494 Whether to include directory paths in the output. This is True
495 when used by glob, but users usually only want files.
496 kwargs are passed to ``ls``.
497 """
498 # TODO: allow equivalent of -name parameter
499 path = self._strip_protocol(path)
500 out = {}
502 # Add the root directory if withdirs is requested
503 # This is needed for posix glob compliance
504 if withdirs and path != "" and self.isdir(path):
505 out[path] = self.info(path)
507 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
508 if withdirs:
509 files.update(dirs)
510 out.update({info["name"]: info for name, info in files.items()})
511 if not out and self.isfile(path):
512 # walk works on directories, but find should also return [path]
513 # when path happens to be a file
514 out[path] = {}
515 names = sorted(out)
516 if not detail:
517 return names
518 else:
519 return {name: out[name] for name in names}
521 def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs):
522 """Space used by files and optionally directories within a path
524 Directory size does not include the size of its contents.
526 Parameters
527 ----------
528 path: str
529 total: bool
530 Whether to sum all the file sizes
531 maxdepth: int or None
532 Maximum number of directory levels to descend, None for unlimited.
533 withdirs: bool
534 Whether to include directory paths in the output.
535 kwargs: passed to ``find``
537 Returns
538 -------
539 Dict of {path: size} if total=False, or int otherwise, where numbers
540 refer to bytes used.
541 """
542 sizes = {}
543 if withdirs and self.isdir(path):
544 # Include top-level directory in output
545 info = self.info(path)
546 sizes[info["name"]] = info["size"]
547 for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs):
548 info = self.info(f)
549 sizes[info["name"]] = info["size"]
550 if total:
551 return sum(sizes.values())
552 else:
553 return sizes
555 def glob(self, path, maxdepth=None, **kwargs):
556 """Find files by glob-matching.
558 Pattern matching capabilities for finding files that match the given pattern.
560 Parameters
561 ----------
562 path: str
563 The glob pattern to match against
564 maxdepth: int or None
565 Maximum depth for ``'**'`` patterns. Applied on the first ``'**'`` found.
566 Must be at least 1 if provided.
567 kwargs:
568 Additional arguments passed to ``find`` (e.g., detail=True)
570 Returns
571 -------
572 List of matched paths, or dict of paths and their info if detail=True
574 Notes
575 -----
576 Supported patterns:
577 - '*': Matches any sequence of characters within a single directory level
578 - ``'**'``: Matches any number of directory levels (must be an entire path component)
579 - '?': Matches exactly one character
580 - '[abc]': Matches any character in the set
581 - '[a-z]': Matches any character in the range
582 - '[!abc]': Matches any character NOT in the set
584 Special behaviors:
585 - If the path ends with '/', only folders are returned
586 - Consecutive '*' characters are compressed into a single '*'
587 - Empty brackets '[]' never match anything
588 - Negated empty brackets '[!]' match any single character
589 - Special characters in character classes are escaped properly
591 Limitations:
592 - ``'**'`` must be a complete path component (e.g., ``'a/**/b'``, not ``'a**b'``)
593 - No brace expansion ('{a,b}.txt')
594 - No extended glob patterns ('+(pattern)', '!(pattern)')
595 """
596 if maxdepth is not None and maxdepth < 1:
597 raise ValueError("maxdepth must be at least 1")
599 import re
601 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
602 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
603 path = self._strip_protocol(path)
604 append_slash_to_dirname = ends_with_sep or path.endswith(
605 tuple(sep + "**" for sep in seps)
606 )
607 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
608 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
609 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
611 min_idx = min(idx_star, idx_qmark, idx_brace)
613 detail = kwargs.pop("detail", False)
615 if not has_magic(path):
616 if self.exists(path, **kwargs):
617 if not detail:
618 return [path]
619 else:
620 return {path: self.info(path, **kwargs)}
621 else:
622 if not detail:
623 return [] # glob of non-existent returns empty
624 else:
625 return {}
626 elif "/" in path[:min_idx]:
627 min_idx = path[:min_idx].rindex("/")
628 root = path[: min_idx + 1]
629 depth = path[min_idx + 1 :].count("/") + 1
630 else:
631 root = ""
632 depth = path[min_idx + 1 :].count("/") + 1
634 if "**" in path:
635 if maxdepth is not None:
636 idx_double_stars = path.find("**")
637 depth_double_stars = path[idx_double_stars:].count("/") + 1
638 depth = depth - depth_double_stars + maxdepth
639 else:
640 depth = None
642 allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
644 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
645 pattern = re.compile(pattern)
647 out = {
648 p: info
649 for p, info in sorted(allpaths.items())
650 if pattern.match(
651 p + "/"
652 if append_slash_to_dirname and info["type"] == "directory"
653 else p
654 )
655 }
657 if detail:
658 return out
659 else:
660 return list(out)
662 def exists(self, path, **kwargs):
663 """Is there a file at the given path"""
664 try:
665 self.info(path, **kwargs)
666 return True
667 except: # noqa: E722
668 # any exception allowed bar FileNotFoundError?
669 return False
671 def lexists(self, path, **kwargs):
672 """If there is a file at the given path (including
673 broken links)"""
674 return self.exists(path)
676 def info(self, path, **kwargs):
677 """Give details of entry at path
679 Returns a single dictionary, with exactly the same information as ``ls``
680 would with ``detail=True``.
682 The default implementation calls ls and could be overridden by a
683 shortcut. kwargs are passed on to ```ls()``.
685 Some file systems might not be able to measure the file's size, in
686 which case, the returned dict will include ``'size': None``.
688 Returns
689 -------
690 dict with keys: name (full path in the FS), size (in bytes), type (file,
691 directory, or something else) and other FS-specific keys.
692 """
693 path = self._strip_protocol(path)
694 out = self.ls(self._parent(path), detail=True, **kwargs)
695 out = [o for o in out if o["name"].rstrip("/") == path]
696 if out:
697 return out[0]
698 out = self.ls(path, detail=True, **kwargs)
699 path = path.rstrip("/")
700 out1 = [o for o in out if o["name"].rstrip("/") == path]
701 if len(out1) == 1:
702 if "size" not in out1[0]:
703 out1[0]["size"] = None
704 return out1[0]
705 elif len(out1) > 1 or out:
706 return {"name": path, "size": 0, "type": "directory"}
707 else:
708 raise FileNotFoundError(path)
710 def checksum(self, path):
711 """Unique value for current version of file
713 If the checksum is the same from one moment to another, the contents
714 are guaranteed to be the same. If the checksum changes, the contents
715 *might* have changed.
717 This should normally be overridden; default will probably capture
718 creation/modification timestamp (which would be good) or maybe
719 access timestamp (which would be bad)
720 """
721 return int(tokenize(self.info(path)), 16)
723 def size(self, path):
724 """Size in bytes of file"""
725 return self.info(path).get("size", None)
727 def sizes(self, paths):
728 """Size in bytes of each file in a list of paths"""
729 return [self.size(p) for p in paths]
731 def isdir(self, path):
732 """Is this entry directory-like?"""
733 try:
734 return self.info(path)["type"] == "directory"
735 except OSError:
736 return False
738 def isfile(self, path):
739 """Is this entry file-like?"""
740 try:
741 return self.info(path)["type"] == "file"
742 except: # noqa: E722
743 return False
745 def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs):
746 """Get the contents of the file as a string.
748 Parameters
749 ----------
750 path: str
751 URL of file on this filesystems
752 encoding, errors, newline: same as `open`.
753 """
754 with self.open(
755 path,
756 mode="r",
757 encoding=encoding,
758 errors=errors,
759 newline=newline,
760 **kwargs,
761 ) as f:
762 return f.read()
764 def write_text(
765 self, path, value, encoding=None, errors=None, newline=None, **kwargs
766 ):
767 """Write the text to the given file.
769 An existing file will be overwritten.
771 Parameters
772 ----------
773 path: str
774 URL of file on this filesystems
775 value: str
776 Text to write.
777 encoding, errors, newline: same as `open`.
778 """
779 with self.open(
780 path,
781 mode="w",
782 encoding=encoding,
783 errors=errors,
784 newline=newline,
785 **kwargs,
786 ) as f:
787 return f.write(value)
789 def cat_file(self, path, start=None, end=None, **kwargs):
790 """Get the content of a file
792 Parameters
793 ----------
794 path: URL of file on this filesystems
795 start, end: int
796 Bytes limits of the read. If negative, backwards from end,
797 like usual python slices. Either can be None for start or
798 end of file, respectively
799 kwargs: passed to ``open()``.
800 """
801 # explicitly set buffering off?
802 with self.open(path, "rb", **kwargs) as f:
803 if start is not None:
804 if start >= 0:
805 f.seek(start)
806 else:
807 f.seek(max(0, f.size + start))
808 if end is not None:
809 if end < 0:
810 end = f.size + end
811 return f.read(end - f.tell())
812 return f.read()
814 def pipe_file(self, path, value, mode="overwrite", **kwargs):
815 """Set the bytes of given file"""
816 if mode == "create" and self.exists(path):
817 # non-atomic but simple way; or could use "xb" in open(), which is likely
818 # not as well supported
819 raise FileExistsError
820 with self.open(path, "wb", **kwargs) as f:
821 f.write(value)
823 def pipe(self, path, value=None, **kwargs):
824 """Put value into path
826 (counterpart to ``cat``)
828 Parameters
829 ----------
830 path: string or dict(str, bytes)
831 If a string, a single remote location to put ``value`` bytes; if a dict,
832 a mapping of {path: bytesvalue}.
833 value: bytes, optional
834 If using a single path, these are the bytes to put there. Ignored if
835 ``path`` is a dict
836 """
837 if isinstance(path, str):
838 self.pipe_file(self._strip_protocol(path), value, **kwargs)
839 elif isinstance(path, dict):
840 for k, v in path.items():
841 self.pipe_file(self._strip_protocol(k), v, **kwargs)
842 else:
843 raise ValueError("path must be str or dict")
845 def cat_ranges(
846 self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
847 ):
848 """Get the contents of byte ranges from one or more files
850 Parameters
851 ----------
852 paths: list
853 A list of of filepaths on this filesystems
854 starts, ends: int or list
855 Bytes limits of the read. If using a single int, the same value will be
856 used to read all the specified files.
857 """
858 if max_gap is not None:
859 raise NotImplementedError
860 if not isinstance(paths, list):
861 raise TypeError
862 if not isinstance(starts, list):
863 starts = [starts] * len(paths)
864 if not isinstance(ends, list):
865 ends = [ends] * len(paths)
866 if len(starts) != len(paths) or len(ends) != len(paths):
867 raise ValueError
868 out = []
869 for p, s, e in zip(paths, starts, ends):
870 try:
871 out.append(self.cat_file(p, s, e))
872 except Exception as e:
873 if on_error == "return":
874 out.append(e)
875 else:
876 raise
877 return out
879 def cat(self, path, recursive=False, on_error="raise", **kwargs):
880 """Fetch (potentially multiple) paths' contents
882 Parameters
883 ----------
884 recursive: bool
885 If True, assume the path(s) are directories, and get all the
886 contained files
887 on_error : "raise", "omit", "return"
888 If raise, an underlying exception will be raised (converted to KeyError
889 if the type is in self.missing_exceptions); if omit, keys with exception
890 will simply not be included in the output; if "return", all keys are
891 included in the output, but the value will be bytes or an exception
892 instance.
893 kwargs: passed to cat_file
895 Returns
896 -------
897 dict of {path: contents} if there are multiple paths
898 or the path has been otherwise expanded
899 """
900 paths = self.expand_path(path, recursive=recursive, **kwargs)
901 if (
902 len(paths) > 1
903 or isinstance(path, list)
904 or paths[0] != self._strip_protocol(path)
905 ):
906 out = {}
907 for path in paths:
908 try:
909 out[path] = self.cat_file(path, **kwargs)
910 except Exception as e:
911 if on_error == "raise":
912 raise
913 if on_error == "return":
914 out[path] = e
915 return out
916 else:
917 return self.cat_file(paths[0], **kwargs)
919 def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, outfile=None, **kwargs):
920 """Copy single remote file to local"""
921 from .implementations.local import LocalFileSystem
923 if isfilelike(lpath):
924 outfile = lpath
925 elif self.isdir(rpath):
926 os.makedirs(lpath, exist_ok=True)
927 return None
929 fs = LocalFileSystem(auto_mkdir=True)
930 fs.makedirs(fs._parent(lpath), exist_ok=True)
932 with self.open(rpath, "rb", **kwargs) as f1:
933 if outfile is None:
934 outfile = open(lpath, "wb")
936 try:
937 callback.set_size(getattr(f1, "size", None))
938 data = True
939 while data:
940 data = f1.read(self.blocksize)
941 segment_len = outfile.write(data)
942 if segment_len is None:
943 segment_len = len(data)
944 callback.relative_update(segment_len)
945 finally:
946 if not isfilelike(lpath):
947 outfile.close()
949 def get(
950 self,
951 rpath,
952 lpath,
953 recursive=False,
954 callback=DEFAULT_CALLBACK,
955 maxdepth=None,
956 **kwargs,
957 ):
958 """Copy file(s) to local.
960 Copies a specific file or tree of files (if recursive=True). If lpath
961 ends with a "/", it will be assumed to be a directory, and target files
962 will go within. Can submit a list of paths, which may be glob-patterns
963 and will be expanded.
965 Calls get_file for each source.
966 """
967 if isinstance(lpath, list) and isinstance(rpath, list):
968 # No need to expand paths when both source and destination
969 # are provided as lists
970 rpaths = rpath
971 lpaths = lpath
972 else:
973 from .implementations.local import (
974 LocalFileSystem,
975 make_path_posix,
976 trailing_sep,
977 )
979 source_is_str = isinstance(rpath, str)
980 rpaths = self.expand_path(
981 rpath, recursive=recursive, maxdepth=maxdepth, **kwargs
982 )
983 if source_is_str and (not recursive or maxdepth is not None):
984 # Non-recursive glob does not copy directories
985 rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))]
986 if not rpaths:
987 return
989 if isinstance(lpath, str):
990 lpath = make_path_posix(lpath)
992 source_is_file = len(rpaths) == 1
993 dest_is_dir = isinstance(lpath, str) and (
994 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
995 )
997 exists = source_is_str and (
998 (has_magic(rpath) and source_is_file)
999 or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath))
1000 )
1001 lpaths = other_paths(
1002 rpaths,
1003 lpath,
1004 exists=exists,
1005 flatten=not source_is_str,
1006 )
1008 callback.set_size(len(lpaths))
1009 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1010 with callback.branched(rpath, lpath) as child:
1011 self.get_file(rpath, lpath, callback=child, **kwargs)
1013 def put_file(
1014 self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
1015 ):
1016 """Copy single file to remote"""
1017 if mode == "create" and self.exists(rpath):
1018 raise FileExistsError
1019 if os.path.isdir(lpath):
1020 self.makedirs(rpath, exist_ok=True)
1021 return None
1023 with open(lpath, "rb") as f1:
1024 size = f1.seek(0, 2)
1025 callback.set_size(size)
1026 f1.seek(0)
1028 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True)
1029 with self.open(rpath, "wb", **kwargs) as f2:
1030 while f1.tell() < size:
1031 data = f1.read(self.blocksize)
1032 segment_len = f2.write(data)
1033 if segment_len is None:
1034 segment_len = len(data)
1035 callback.relative_update(segment_len)
1037 def put(
1038 self,
1039 lpath,
1040 rpath,
1041 recursive=False,
1042 callback=DEFAULT_CALLBACK,
1043 maxdepth=None,
1044 **kwargs,
1045 ):
1046 """Copy file(s) from local.
1048 Copies a specific file or tree of files (if recursive=True). If rpath
1049 ends with a "/", it will be assumed to be a directory, and target files
1050 will go within.
1052 Calls put_file for each source.
1053 """
1054 if isinstance(lpath, list) and isinstance(rpath, list):
1055 # No need to expand paths when both source and destination
1056 # are provided as lists
1057 rpaths = rpath
1058 lpaths = lpath
1059 else:
1060 from .implementations.local import (
1061 LocalFileSystem,
1062 make_path_posix,
1063 trailing_sep,
1064 )
1066 source_is_str = isinstance(lpath, str)
1067 if source_is_str:
1068 lpath = make_path_posix(lpath)
1069 fs = LocalFileSystem()
1070 lpaths = fs.expand_path(
1071 lpath, recursive=recursive, maxdepth=maxdepth, **kwargs
1072 )
1073 if source_is_str and (not recursive or maxdepth is not None):
1074 # Non-recursive glob does not copy directories
1075 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
1076 if not lpaths:
1077 return
1079 source_is_file = len(lpaths) == 1
1080 dest_is_dir = isinstance(rpath, str) and (
1081 trailing_sep(rpath) or self.isdir(rpath)
1082 )
1084 rpath = (
1085 self._strip_protocol(rpath)
1086 if isinstance(rpath, str)
1087 else [self._strip_protocol(p) for p in rpath]
1088 )
1089 exists = source_is_str and (
1090 (has_magic(lpath) and source_is_file)
1091 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
1092 )
1093 rpaths = other_paths(
1094 lpaths,
1095 rpath,
1096 exists=exists,
1097 flatten=not source_is_str,
1098 )
1100 callback.set_size(len(rpaths))
1101 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1102 with callback.branched(lpath, rpath) as child:
1103 self.put_file(lpath, rpath, callback=child, **kwargs)
1105 def head(self, path, size=1024):
1106 """Get the first ``size`` bytes from file"""
1107 with self.open(path, "rb") as f:
1108 return f.read(size)
1110 def tail(self, path, size=1024):
1111 """Get the last ``size`` bytes from file"""
1112 with self.open(path, "rb") as f:
1113 f.seek(max(-size, -f.size), 2)
1114 return f.read()
1116 def cp_file(self, path1, path2, **kwargs):
1117 raise NotImplementedError
1119 def copy(
1120 self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs
1121 ):
1122 """Copy within two locations in the filesystem
1124 on_error : "raise", "ignore"
1125 If raise, any not-found exceptions will be raised; if ignore any
1126 not-found exceptions will cause the path to be skipped; defaults to
1127 raise unless recursive is true, where the default is ignore
1128 """
1129 if on_error is None and recursive:
1130 on_error = "ignore"
1131 elif on_error is None:
1132 on_error = "raise"
1134 if isinstance(path1, list) and isinstance(path2, list):
1135 # No need to expand paths when both source and destination
1136 # are provided as lists
1137 paths1 = path1
1138 paths2 = path2
1139 else:
1140 from .implementations.local import trailing_sep
1142 source_is_str = isinstance(path1, str)
1143 paths1 = self.expand_path(
1144 path1, recursive=recursive, maxdepth=maxdepth, **kwargs
1145 )
1146 if source_is_str and (not recursive or maxdepth is not None):
1147 # Non-recursive glob does not copy directories
1148 paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))]
1149 if not paths1:
1150 return
1152 source_is_file = len(paths1) == 1
1153 dest_is_dir = isinstance(path2, str) and (
1154 trailing_sep(path2) or self.isdir(path2)
1155 )
1157 exists = source_is_str and (
1158 (has_magic(path1) and source_is_file)
1159 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
1160 )
1161 paths2 = other_paths(
1162 paths1,
1163 path2,
1164 exists=exists,
1165 flatten=not source_is_str,
1166 )
1168 for p1, p2 in zip(paths1, paths2):
1169 try:
1170 self.cp_file(p1, p2, **kwargs)
1171 except FileNotFoundError:
1172 if on_error == "raise":
1173 raise
1175 def expand_path(self, path, recursive=False, maxdepth=None, **kwargs):
1176 """Turn one or more globs or directories into a list of all matching paths
1177 to files or directories.
1179 kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls``
1180 """
1182 if maxdepth is not None and maxdepth < 1:
1183 raise ValueError("maxdepth must be at least 1")
1185 if isinstance(path, (str, os.PathLike)):
1186 out = self.expand_path([path], recursive, maxdepth, **kwargs)
1187 else:
1188 out = set()
1189 path = [self._strip_protocol(p) for p in path]
1190 for p in path:
1191 if has_magic(p):
1192 bit = set(self.glob(p, maxdepth=maxdepth, **kwargs))
1193 out |= bit
1194 if recursive:
1195 # glob call above expanded one depth so if maxdepth is defined
1196 # then decrement it in expand_path call below. If it is zero
1197 # after decrementing then avoid expand_path call.
1198 if maxdepth is not None and maxdepth <= 1:
1199 continue
1200 out |= set(
1201 self.expand_path(
1202 list(bit),
1203 recursive=recursive,
1204 maxdepth=maxdepth - 1 if maxdepth is not None else None,
1205 **kwargs,
1206 )
1207 )
1208 continue
1209 elif recursive:
1210 rec = set(
1211 self.find(
1212 p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs
1213 )
1214 )
1215 out |= rec
1216 if p not in out and (recursive is False or self.exists(p)):
1217 # should only check once, for the root
1218 out.add(p)
1219 if not out:
1220 raise FileNotFoundError(path)
1221 return sorted(out)
1223 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
1224 """Move file(s) from one location to another"""
1225 if path1 == path2:
1226 logger.debug("%s mv: The paths are the same, so no files were moved.", self)
1227 else:
1228 # explicitly raise exception to prevent data corruption
1229 self.copy(
1230 path1, path2, recursive=recursive, maxdepth=maxdepth, onerror="raise"
1231 )
1232 self.rm(path1, recursive=recursive)
1234 def rm_file(self, path):
1235 """Delete a file"""
1236 self._rm(path)
1238 def _rm(self, path):
1239 """Delete one file"""
1240 # this is the old name for the method, prefer rm_file
1241 raise NotImplementedError
1243 def rm(self, path, recursive=False, maxdepth=None):
1244 """Delete files.
1246 Parameters
1247 ----------
1248 path: str or list of str
1249 File(s) to delete.
1250 recursive: bool
1251 If file(s) are directories, recursively delete contents and then
1252 also remove the directory
1253 maxdepth: int or None
1254 Depth to pass to walk for finding files to delete, if recursive.
1255 If None, there will be no limit and infinite recursion may be
1256 possible.
1257 """
1258 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
1259 for p in reversed(path):
1260 self.rm_file(p)
1262 @classmethod
1263 def _parent(cls, path):
1264 path = cls._strip_protocol(path)
1265 if "/" in path:
1266 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
1267 return cls.root_marker + parent
1268 else:
1269 return cls.root_marker
1271 def _open(
1272 self,
1273 path,
1274 mode="rb",
1275 block_size=None,
1276 autocommit=True,
1277 cache_options=None,
1278 **kwargs,
1279 ):
1280 """Return raw bytes-mode file-like from the file-system"""
1281 return AbstractBufferedFile(
1282 self,
1283 path,
1284 mode,
1285 block_size,
1286 autocommit,
1287 cache_options=cache_options,
1288 **kwargs,
1289 )
1291 def open(
1292 self,
1293 path,
1294 mode="rb",
1295 block_size=None,
1296 cache_options=None,
1297 compression=None,
1298 **kwargs,
1299 ):
1300 """
1301 Return a file-like object from the filesystem
1303 The resultant instance must function correctly in a context ``with``
1304 block.
1306 Parameters
1307 ----------
1308 path: str
1309 Target file
1310 mode: str like 'rb', 'w'
1311 See builtin ``open()``
1312 Mode "x" (exclusive write) may be implemented by the backend. Even if
1313 it is, whether it is checked up front or on commit, and whether it is
1314 atomic is implementation-dependent.
1315 block_size: int
1316 Some indication of buffering - this is a value in bytes
1317 cache_options : dict, optional
1318 Extra arguments to pass through to the cache.
1319 compression: string or None
1320 If given, open file using compression codec. Can either be a compression
1321 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
1322 compression from the filename suffix.
1323 encoding, errors, newline: passed on to TextIOWrapper for text mode
1324 """
1325 import io
1327 path = self._strip_protocol(path)
1328 if "b" not in mode:
1329 mode = mode.replace("t", "") + "b"
1331 text_kwargs = {
1332 k: kwargs.pop(k)
1333 for k in ["encoding", "errors", "newline"]
1334 if k in kwargs
1335 }
1336 return io.TextIOWrapper(
1337 self.open(
1338 path,
1339 mode,
1340 block_size=block_size,
1341 cache_options=cache_options,
1342 compression=compression,
1343 **kwargs,
1344 ),
1345 **text_kwargs,
1346 )
1347 else:
1348 ac = kwargs.pop("autocommit", not self._intrans)
1349 f = self._open(
1350 path,
1351 mode=mode,
1352 block_size=block_size,
1353 autocommit=ac,
1354 cache_options=cache_options,
1355 **kwargs,
1356 )
1357 if compression is not None:
1358 from fsspec.compression import compr
1359 from fsspec.core import get_compression
1361 compression = get_compression(path, compression)
1362 compress = compr[compression]
1363 f = compress(f, mode=mode[0])
1365 if not ac and "r" not in mode:
1366 self.transaction.files.append(f)
1367 return f
1369 def touch(self, path, truncate=True, **kwargs):
1370 """Create empty file, or update timestamp
1372 Parameters
1373 ----------
1374 path: str
1375 file location
1376 truncate: bool
1377 If True, always set file size to 0; if False, update timestamp and
1378 leave file unchanged, if backend allows this
1379 """
1380 if truncate or not self.exists(path):
1381 with self.open(path, "wb", **kwargs):
1382 pass
1383 else:
1384 raise NotImplementedError # update timestamp, if possible
1386 def ukey(self, path):
1387 """Hash of file properties, to tell if it has changed"""
1388 return sha256(str(self.info(path)).encode()).hexdigest()
1390 def read_block(self, fn, offset, length, delimiter=None):
1391 """Read a block of bytes from
1393 Starting at ``offset`` of the file, read ``length`` bytes. If
1394 ``delimiter`` is set then we ensure that the read starts and stops at
1395 delimiter boundaries that follow the locations ``offset`` and ``offset
1396 + length``. If ``offset`` is zero then we start at zero. The
1397 bytestring returned WILL include the end delimiter string.
1399 If offset+length is beyond the eof, reads to eof.
1401 Parameters
1402 ----------
1403 fn: string
1404 Path to filename
1405 offset: int
1406 Byte offset to start read
1407 length: int
1408 Number of bytes to read. If None, read to end.
1409 delimiter: bytes (optional)
1410 Ensure reading starts and stops at delimiter bytestring
1412 Examples
1413 --------
1414 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP
1415 b'Alice, 100\\nBo'
1416 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP
1417 b'Alice, 100\\nBob, 200\\n'
1419 Use ``length=None`` to read to the end of the file.
1420 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP
1421 b'Alice, 100\\nBob, 200\\nCharlie, 300'
1423 See Also
1424 --------
1425 :func:`fsspec.utils.read_block`
1426 """
1427 with self.open(fn, "rb") as f:
1428 size = f.size
1429 if length is None:
1430 length = size
1431 if size is not None and offset + length > size:
1432 length = size - offset
1433 return read_block(f, offset, length, delimiter)
1435 def to_json(self, *, include_password: bool = True) -> str:
1436 """
1437 JSON representation of this filesystem instance.
1439 Parameters
1440 ----------
1441 include_password: bool, default True
1442 Whether to include the password (if any) in the output.
1444 Returns
1445 -------
1446 JSON string with keys ``cls`` (the python location of this class),
1447 protocol (text name of this class's protocol, first one in case of
1448 multiple), ``args`` (positional args, usually empty), and all other
1449 keyword arguments as their own keys.
1451 Warnings
1452 --------
1453 Serialized filesystems may contain sensitive information which have been
1454 passed to the constructor, such as passwords and tokens. Make sure you
1455 store and send them in a secure environment!
1456 """
1457 from .json import FilesystemJSONEncoder
1459 return json.dumps(
1460 self,
1461 cls=type(
1462 "_FilesystemJSONEncoder",
1463 (FilesystemJSONEncoder,),
1464 {"include_password": include_password},
1465 ),
1466 )
1468 @staticmethod
1469 def from_json(blob: str) -> AbstractFileSystem:
1470 """
1471 Recreate a filesystem instance from JSON representation.
1473 See ``.to_json()`` for the expected structure of the input.
1475 Parameters
1476 ----------
1477 blob: str
1479 Returns
1480 -------
1481 file system instance, not necessarily of this particular class.
1483 Warnings
1484 --------
1485 This can import arbitrary modules (as determined by the ``cls`` key).
1486 Make sure you haven't installed any modules that may execute malicious code
1487 at import time.
1488 """
1489 from .json import FilesystemJSONDecoder
1491 return json.loads(blob, cls=FilesystemJSONDecoder)
1493 def to_dict(self, *, include_password: bool = True) -> dict[str, Any]:
1494 """
1495 JSON-serializable dictionary representation of this filesystem instance.
1497 Parameters
1498 ----------
1499 include_password: bool, default True
1500 Whether to include the password (if any) in the output.
1502 Returns
1503 -------
1504 Dictionary with keys ``cls`` (the python location of this class),
1505 protocol (text name of this class's protocol, first one in case of
1506 multiple), ``args`` (positional args, usually empty), and all other
1507 keyword arguments as their own keys.
1509 Warnings
1510 --------
1511 Serialized filesystems may contain sensitive information which have been
1512 passed to the constructor, such as passwords and tokens. Make sure you
1513 store and send them in a secure environment!
1514 """
1515 from .json import FilesystemJSONEncoder
1517 json_encoder = FilesystemJSONEncoder()
1519 cls = type(self)
1520 proto = self.protocol
1522 storage_options = dict(self.storage_options)
1523 if not include_password:
1524 storage_options.pop("password", None)
1526 return dict(
1527 cls=f"{cls.__module__}:{cls.__name__}",
1528 protocol=proto[0] if isinstance(proto, (tuple, list)) else proto,
1529 args=json_encoder.make_serializable(self.storage_args),
1530 **json_encoder.make_serializable(storage_options),
1531 )
1533 @staticmethod
1534 def from_dict(dct: dict[str, Any]) -> AbstractFileSystem:
1535 """
1536 Recreate a filesystem instance from dictionary representation.
1538 See ``.to_dict()`` for the expected structure of the input.
1540 Parameters
1541 ----------
1542 dct: Dict[str, Any]
1544 Returns
1545 -------
1546 file system instance, not necessarily of this particular class.
1548 Warnings
1549 --------
1550 This can import arbitrary modules (as determined by the ``cls`` key).
1551 Make sure you haven't installed any modules that may execute malicious code
1552 at import time.
1553 """
1554 from .json import FilesystemJSONDecoder
1556 json_decoder = FilesystemJSONDecoder()
1558 dct = dict(dct) # Defensive copy
1560 cls = FilesystemJSONDecoder.try_resolve_fs_cls(dct)
1561 if cls is None:
1562 raise ValueError("Not a serialized AbstractFileSystem")
1564 dct.pop("cls", None)
1565 dct.pop("protocol", None)
1567 return cls(
1568 *json_decoder.unmake_serializable(dct.pop("args", ())),
1569 **json_decoder.unmake_serializable(dct),
1570 )
1572 def _get_pyarrow_filesystem(self):
1573 """
1574 Make a version of the FS instance which will be acceptable to pyarrow
1575 """
1576 # all instances already also derive from pyarrow
1577 return self
1579 def get_mapper(self, root="", check=False, create=False, missing_exceptions=None):
1580 """Create key/value store based on this file-system
1582 Makes a MutableMapping interface to the FS at the given root path.
1583 See ``fsspec.mapping.FSMap`` for further details.
1584 """
1585 from .mapping import FSMap
1587 return FSMap(
1588 root,
1589 self,
1590 check=check,
1591 create=create,
1592 missing_exceptions=missing_exceptions,
1593 )
1595 @classmethod
1596 def clear_instance_cache(cls):
1597 """
1598 Clear the cache of filesystem instances.
1600 Notes
1601 -----
1602 Unless overridden by setting the ``cachable`` class attribute to False,
1603 the filesystem class stores a reference to newly created instances. This
1604 prevents Python's normal rules around garbage collection from working,
1605 since the instances refcount will not drop to zero until
1606 ``clear_instance_cache`` is called.
1607 """
1608 cls._cache.clear()
1610 def created(self, path):
1611 """Return the created timestamp of a file as a datetime.datetime"""
1612 raise NotImplementedError
1614 def modified(self, path):
1615 """Return the modified timestamp of a file as a datetime.datetime"""
1616 raise NotImplementedError
1618 def tree(
1619 self,
1620 path: str = "/",
1621 recursion_limit: int = 2,
1622 max_display: int = 25,
1623 display_size: bool = False,
1624 prefix: str = "",
1625 is_last: bool = True,
1626 first: bool = True,
1627 indent_size: int = 4,
1628 ) -> str:
1629 """
1630 Return a tree-like structure of the filesystem starting from the given path as a string.
1632 Parameters
1633 ----------
1634 path: Root path to start traversal from
1635 recursion_limit: Maximum depth of directory traversal
1636 max_display: Maximum number of items to display per directory
1637 display_size: Whether to display file sizes
1638 prefix: Current line prefix for visual tree structure
1639 is_last: Whether current item is last in its level
1640 first: Whether this is the first call (displays root path)
1641 indent_size: Number of spaces by indent
1643 Returns
1644 -------
1645 str: A string representing the tree structure.
1647 Example
1648 -------
1649 >>> from fsspec import filesystem
1651 >>> fs = filesystem('ftp', host='test.rebex.net', user='demo', password='password')
1652 >>> tree = fs.tree(display_size=True, recursion_limit=3, indent_size=8, max_display=10)
1653 >>> print(tree)
1654 """
1656 def format_bytes(n: int) -> str:
1657 """Format bytes as text."""
1658 for prefix, k in (
1659 ("P", 2**50),
1660 ("T", 2**40),
1661 ("G", 2**30),
1662 ("M", 2**20),
1663 ("k", 2**10),
1664 ):
1665 if n >= 0.9 * k:
1666 return f"{n / k:.2f} {prefix}b"
1667 return f"{n}B"
1669 result = []
1671 if first:
1672 result.append(path)
1674 if recursion_limit:
1675 indent = " " * indent_size
1676 contents = self.ls(path, detail=True)
1677 contents.sort(
1678 key=lambda x: (x.get("type") != "directory", x.get("name", ""))
1679 )
1681 if max_display is not None and len(contents) > max_display:
1682 displayed_contents = contents[:max_display]
1683 remaining_count = len(contents) - max_display
1684 else:
1685 displayed_contents = contents
1686 remaining_count = 0
1688 for i, item in enumerate(displayed_contents):
1689 is_last_item = (i == len(displayed_contents) - 1) and (
1690 remaining_count == 0
1691 )
1693 branch = (
1694 "└" + ("─" * (indent_size - 2))
1695 if is_last_item
1696 else "├" + ("─" * (indent_size - 2))
1697 )
1698 branch += " "
1699 new_prefix = prefix + (
1700 indent if is_last_item else "│" + " " * (indent_size - 1)
1701 )
1703 name = os.path.basename(item.get("name", ""))
1705 if display_size and item.get("type") == "directory":
1706 sub_contents = self.ls(item.get("name", ""), detail=True)
1707 num_files = sum(
1708 1 for sub_item in sub_contents if sub_item.get("type") == "file"
1709 )
1710 num_folders = sum(
1711 1
1712 for sub_item in sub_contents
1713 if sub_item.get("type") == "directory"
1714 )
1716 if num_files == 0 and num_folders == 0:
1717 size = " (empty folder)"
1718 elif num_files == 0:
1719 size = f" ({num_folders} subfolder{'s' if num_folders > 1 else ''})"
1720 elif num_folders == 0:
1721 size = f" ({num_files} file{'s' if num_files > 1 else ''})"
1722 else:
1723 size = f" ({num_files} file{'s' if num_files > 1 else ''}, {num_folders} subfolder{'s' if num_folders > 1 else ''})"
1724 elif display_size and item.get("type") == "file":
1725 size = f" ({format_bytes(item.get('size', 0))})"
1726 else:
1727 size = ""
1729 result.append(f"{prefix}{branch}{name}{size}")
1731 if item.get("type") == "directory" and recursion_limit > 0:
1732 result.append(
1733 self.tree(
1734 path=item.get("name", ""),
1735 recursion_limit=recursion_limit - 1,
1736 max_display=max_display,
1737 display_size=display_size,
1738 prefix=new_prefix,
1739 is_last=is_last_item,
1740 first=False,
1741 indent_size=indent_size,
1742 )
1743 )
1745 if remaining_count > 0:
1746 more_message = f"{remaining_count} more item(s) not displayed."
1747 result.append(
1748 f"{prefix}{'└' + ('─' * (indent_size - 2))} {more_message}"
1749 )
1751 return "\n".join(_ for _ in result if _)
1753 # ------------------------------------------------------------------------
1754 # Aliases
1756 def read_bytes(self, path, start=None, end=None, **kwargs):
1757 """Alias of `AbstractFileSystem.cat_file`."""
1758 return self.cat_file(path, start=start, end=end, **kwargs)
1760 def write_bytes(self, path, value, **kwargs):
1761 """Alias of `AbstractFileSystem.pipe_file`."""
1762 self.pipe_file(path, value, **kwargs)
1764 def makedir(self, path, create_parents=True, **kwargs):
1765 """Alias of `AbstractFileSystem.mkdir`."""
1766 return self.mkdir(path, create_parents=create_parents, **kwargs)
1768 def mkdirs(self, path, exist_ok=False):
1769 """Alias of `AbstractFileSystem.makedirs`."""
1770 return self.makedirs(path, exist_ok=exist_ok)
1772 def listdir(self, path, detail=True, **kwargs):
1773 """Alias of `AbstractFileSystem.ls`."""
1774 return self.ls(path, detail=detail, **kwargs)
1776 def cp(self, path1, path2, **kwargs):
1777 """Alias of `AbstractFileSystem.copy`."""
1778 return self.copy(path1, path2, **kwargs)
1780 def move(self, path1, path2, **kwargs):
1781 """Alias of `AbstractFileSystem.mv`."""
1782 return self.mv(path1, path2, **kwargs)
1784 def stat(self, path, **kwargs):
1785 """Alias of `AbstractFileSystem.info`."""
1786 return self.info(path, **kwargs)
1788 def disk_usage(self, path, total=True, maxdepth=None, **kwargs):
1789 """Alias of `AbstractFileSystem.du`."""
1790 return self.du(path, total=total, maxdepth=maxdepth, **kwargs)
1792 def rename(self, path1, path2, **kwargs):
1793 """Alias of `AbstractFileSystem.mv`."""
1794 return self.mv(path1, path2, **kwargs)
1796 def delete(self, path, recursive=False, maxdepth=None):
1797 """Alias of `AbstractFileSystem.rm`."""
1798 return self.rm(path, recursive=recursive, maxdepth=maxdepth)
1800 def upload(self, lpath, rpath, recursive=False, **kwargs):
1801 """Alias of `AbstractFileSystem.put`."""
1802 return self.put(lpath, rpath, recursive=recursive, **kwargs)
1804 def download(self, rpath, lpath, recursive=False, **kwargs):
1805 """Alias of `AbstractFileSystem.get`."""
1806 return self.get(rpath, lpath, recursive=recursive, **kwargs)
1808 def sign(self, path, expiration=100, **kwargs):
1809 """Create a signed URL representing the given path
1811 Some implementations allow temporary URLs to be generated, as a
1812 way of delegating credentials.
1814 Parameters
1815 ----------
1816 path : str
1817 The path on the filesystem
1818 expiration : int
1819 Number of seconds to enable the URL for (if supported)
1821 Returns
1822 -------
1823 URL : str
1824 The signed URL
1826 Raises
1827 ------
1828 NotImplementedError : if method is not implemented for a filesystem
1829 """
1830 raise NotImplementedError("Sign is not implemented for this filesystem")
1832 def _isfilestore(self):
1833 # Originally inherited from pyarrow DaskFileSystem. Keeping this
1834 # here for backwards compatibility as long as pyarrow uses its
1835 # legacy fsspec-compatible filesystems and thus accepts fsspec
1836 # filesystems as well
1837 return False
1840class AbstractBufferedFile(io.IOBase):
1841 """Convenient class to derive from to provide buffering
1843 In the case that the backend does not provide a pythonic file-like object
1844 already, this class contains much of the logic to build one. The only
1845 methods that need to be overridden are ``_upload_chunk``,
1846 ``_initiate_upload`` and ``_fetch_range``.
1847 """
1849 DEFAULT_BLOCK_SIZE = 5 * 2**20
1850 _details = None
1852 def __init__(
1853 self,
1854 fs,
1855 path,
1856 mode="rb",
1857 block_size="default",
1858 autocommit=True,
1859 cache_type="readahead",
1860 cache_options=None,
1861 size=None,
1862 **kwargs,
1863 ):
1864 """
1865 Template for files with buffered reading and writing
1867 Parameters
1868 ----------
1869 fs: instance of FileSystem
1870 path: str
1871 location in file-system
1872 mode: str
1873 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file
1874 systems may be read-only, and some may not support append.
1875 block_size: int
1876 Buffer size for reading or writing, 'default' for class default
1877 autocommit: bool
1878 Whether to write to final destination; may only impact what
1879 happens when file is being closed.
1880 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead"
1881 Caching policy in read mode. See the definitions in ``core``.
1882 cache_options : dict
1883 Additional options passed to the constructor for the cache specified
1884 by `cache_type`.
1885 size: int
1886 If given and in read mode, suppressed having to look up the file size
1887 kwargs:
1888 Gets stored as self.kwargs
1889 """
1890 from .core import caches
1892 self.path = path
1893 self.fs = fs
1894 self.mode = mode
1895 self.blocksize = (
1896 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
1897 )
1898 self.loc = 0
1899 self.autocommit = autocommit
1900 self.end = None
1901 self.start = None
1902 self.closed = False
1904 if cache_options is None:
1905 cache_options = {}
1907 if "trim" in kwargs:
1908 warnings.warn(
1909 "Passing 'trim' to control the cache behavior has been deprecated. "
1910 "Specify it within the 'cache_options' argument instead.",
1911 FutureWarning,
1912 )
1913 cache_options["trim"] = kwargs.pop("trim")
1915 self.kwargs = kwargs
1917 if mode not in {"ab", "rb", "wb", "xb"}:
1918 raise NotImplementedError("File mode not supported")
1919 if mode == "rb":
1920 if size is not None:
1921 self.size = size
1922 else:
1923 self.size = self.details["size"]
1924 self.cache = caches[cache_type](
1925 self.blocksize, self._fetch_range, self.size, **cache_options
1926 )
1927 else:
1928 self.buffer = io.BytesIO()
1929 self.offset = None
1930 self.forced = False
1931 self.location = None
1933 @property
1934 def details(self):
1935 if self._details is None:
1936 self._details = self.fs.info(self.path)
1937 return self._details
1939 @details.setter
1940 def details(self, value):
1941 self._details = value
1942 self.size = value["size"]
1944 @property
1945 def full_name(self):
1946 return _unstrip_protocol(self.path, self.fs)
1948 @property
1949 def closed(self):
1950 # get around this attr being read-only in IOBase
1951 # use getattr here, since this can be called during del
1952 return getattr(self, "_closed", True)
1954 @closed.setter
1955 def closed(self, c):
1956 self._closed = c
1958 def __hash__(self):
1959 if "w" in self.mode:
1960 return id(self)
1961 else:
1962 return int(tokenize(self.details), 16)
1964 def __eq__(self, other):
1965 """Files are equal if they have the same checksum, only in read mode"""
1966 if self is other:
1967 return True
1968 return (
1969 isinstance(other, type(self))
1970 and self.mode == "rb"
1971 and other.mode == "rb"
1972 and hash(self) == hash(other)
1973 )
1975 def commit(self):
1976 """Move from temp to final destination"""
1978 def discard(self):
1979 """Throw away temporary file"""
1981 def info(self):
1982 """File information about this path"""
1983 if self.readable():
1984 return self.details
1985 else:
1986 raise ValueError("Info not available while writing")
1988 def tell(self):
1989 """Current file location"""
1990 return self.loc
1992 def seek(self, loc, whence=0):
1993 """Set current file location
1995 Parameters
1996 ----------
1997 loc: int
1998 byte location
1999 whence: {0, 1, 2}
2000 from start of file, current location or end of file, resp.
2001 """
2002 loc = int(loc)
2003 if not self.mode == "rb":
2004 raise OSError(ESPIPE, "Seek only available in read mode")
2005 if whence == 0:
2006 nloc = loc
2007 elif whence == 1:
2008 nloc = self.loc + loc
2009 elif whence == 2:
2010 nloc = self.size + loc
2011 else:
2012 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
2013 if nloc < 0:
2014 raise ValueError("Seek before start of file")
2015 self.loc = nloc
2016 return self.loc
2018 def write(self, data):
2019 """
2020 Write data to buffer.
2022 Buffer only sent on flush() or if buffer is greater than
2023 or equal to blocksize.
2025 Parameters
2026 ----------
2027 data: bytes
2028 Set of bytes to be written.
2029 """
2030 if not self.writable():
2031 raise ValueError("File not in write mode")
2032 if self.closed:
2033 raise ValueError("I/O operation on closed file.")
2034 if self.forced:
2035 raise ValueError("This file has been force-flushed, can only close")
2036 out = self.buffer.write(data)
2037 self.loc += out
2038 if self.buffer.tell() >= self.blocksize:
2039 self.flush()
2040 return out
2042 def flush(self, force=False):
2043 """
2044 Write buffered data to backend store.
2046 Writes the current buffer, if it is larger than the block-size, or if
2047 the file is being closed.
2049 Parameters
2050 ----------
2051 force: bool
2052 When closing, write the last block even if it is smaller than
2053 blocks are allowed to be. Disallows further writing to this file.
2054 """
2056 if self.closed:
2057 raise ValueError("Flush on closed file")
2058 if force and self.forced:
2059 raise ValueError("Force flush cannot be called more than once")
2060 if force:
2061 self.forced = True
2063 if self.readable():
2064 # no-op to flush on read-mode
2065 return
2067 if not force and self.buffer.tell() < self.blocksize:
2068 # Defer write on small block
2069 return
2071 if self.offset is None:
2072 # Initialize a multipart upload
2073 self.offset = 0
2074 try:
2075 self._initiate_upload()
2076 except:
2077 self.closed = True
2078 raise
2080 if self._upload_chunk(final=force) is not False:
2081 self.offset += self.buffer.seek(0, 2)
2082 self.buffer = io.BytesIO()
2084 def _upload_chunk(self, final=False):
2085 """Write one part of a multi-block file upload
2087 Parameters
2088 ==========
2089 final: bool
2090 This is the last block, so should complete file, if
2091 self.autocommit is True.
2092 """
2093 # may not yet have been initialized, may need to call _initialize_upload
2095 def _initiate_upload(self):
2096 """Create remote file/upload"""
2097 pass
2099 def _fetch_range(self, start, end):
2100 """Get the specified set of bytes from remote"""
2101 return self.fs.cat_file(self.path, start=start, end=end)
2103 def read(self, length=-1):
2104 """
2105 Return data from cache, or fetch pieces as necessary
2107 Parameters
2108 ----------
2109 length: int (-1)
2110 Number of bytes to read; if <0, all remaining bytes.
2111 """
2112 length = -1 if length is None else int(length)
2113 if self.mode != "rb":
2114 raise ValueError("File not in read mode")
2115 if length < 0:
2116 length = self.size - self.loc
2117 if self.closed:
2118 raise ValueError("I/O operation on closed file.")
2119 if length == 0:
2120 # don't even bother calling fetch
2121 return b""
2122 out = self.cache._fetch(self.loc, self.loc + length)
2124 logger.debug(
2125 "%s read: %i - %i %s",
2126 self,
2127 self.loc,
2128 self.loc + length,
2129 self.cache._log_stats(),
2130 )
2131 self.loc += len(out)
2132 return out
2134 def readinto(self, b):
2135 """mirrors builtin file's readinto method
2137 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
2138 """
2139 out = memoryview(b).cast("B")
2140 data = self.read(out.nbytes)
2141 out[: len(data)] = data
2142 return len(data)
2144 def readuntil(self, char=b"\n", blocks=None):
2145 """Return data between current position and first occurrence of char
2147 char is included in the output, except if the end of the tile is
2148 encountered first.
2150 Parameters
2151 ----------
2152 char: bytes
2153 Thing to find
2154 blocks: None or int
2155 How much to read in each go. Defaults to file blocksize - which may
2156 mean a new read on every call.
2157 """
2158 out = []
2159 while True:
2160 start = self.tell()
2161 part = self.read(blocks or self.blocksize)
2162 if len(part) == 0:
2163 break
2164 found = part.find(char)
2165 if found > -1:
2166 out.append(part[: found + len(char)])
2167 self.seek(start + found + len(char))
2168 break
2169 out.append(part)
2170 return b"".join(out)
2172 def readline(self):
2173 """Read until and including the first occurrence of newline character
2175 Note that, because of character encoding, this is not necessarily a
2176 true line ending.
2177 """
2178 return self.readuntil(b"\n")
2180 def __next__(self):
2181 out = self.readline()
2182 if out:
2183 return out
2184 raise StopIteration
2186 def __iter__(self):
2187 return self
2189 def readlines(self):
2190 """Return all data, split by the newline character, including the newline character"""
2191 data = self.read()
2192 lines = data.split(b"\n")
2193 out = [l + b"\n" for l in lines[:-1]]
2194 if data.endswith(b"\n"):
2195 return out
2196 else:
2197 return out + [lines[-1]]
2198 # return list(self) ???
2200 def readinto1(self, b):
2201 return self.readinto(b)
2203 def close(self):
2204 """Close file
2206 Finalizes writes, discards cache
2207 """
2208 if getattr(self, "_unclosable", False):
2209 return
2210 if self.closed:
2211 return
2212 try:
2213 if self.mode == "rb":
2214 self.cache = None
2215 else:
2216 if not self.forced:
2217 self.flush(force=True)
2219 if self.fs is not None:
2220 self.fs.invalidate_cache(self.path)
2221 self.fs.invalidate_cache(self.fs._parent(self.path))
2222 finally:
2223 self.closed = True
2225 def readable(self):
2226 """Whether opened for reading"""
2227 return "r" in self.mode and not self.closed
2229 def seekable(self):
2230 """Whether is seekable (only in read mode)"""
2231 return self.readable()
2233 def writable(self):
2234 """Whether opened for writing"""
2235 return self.mode in {"wb", "ab", "xb"} and not self.closed
2237 def __reduce__(self):
2238 if self.mode != "rb":
2239 raise RuntimeError("Pickling a writeable file is not supported")
2241 return reopen, (
2242 self.fs,
2243 self.path,
2244 self.mode,
2245 self.blocksize,
2246 self.loc,
2247 self.size,
2248 self.autocommit,
2249 self.cache.name if self.cache else "none",
2250 self.kwargs,
2251 )
2253 def __del__(self):
2254 if not self.closed:
2255 self.close()
2257 def __str__(self):
2258 return f"<File-like object {type(self.fs).__name__}, {self.path}>"
2260 __repr__ = __str__
2262 def __enter__(self):
2263 return self
2265 def __exit__(self, *args):
2266 self.close()
2269def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs):
2270 file = fs.open(
2271 path,
2272 mode=mode,
2273 block_size=blocksize,
2274 autocommit=autocommit,
2275 cache_type=cache_type,
2276 size=size,
2277 **kwargs,
2278 )
2279 if loc > 0:
2280 file.seek(loc)
2281 return file