Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/spec.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import json
5import logging
6import os
7import threading
8import warnings
9import weakref
10from errno import ESPIPE
11from glob import has_magic
12from hashlib import sha256
13from typing import Any, ClassVar
15from .callbacks import DEFAULT_CALLBACK
16from .config import apply_config, conf
17from .dircache import DirCache
18from .transaction import Transaction
19from .utils import (
20 _unstrip_protocol,
21 glob_translate,
22 isfilelike,
23 other_paths,
24 read_block,
25 stringify_path,
26 tokenize,
27)
29logger = logging.getLogger("fsspec")
32def make_instance(cls, args, kwargs):
33 return cls(*args, **kwargs)
36class _Cached(type):
37 """
38 Metaclass for caching file system instances.
40 Notes
41 -----
42 Instances are cached according to
44 * The values of the class attributes listed in `_extra_tokenize_attributes`
45 * The arguments passed to ``__init__``.
47 This creates an additional reference to the filesystem, which prevents the
48 filesystem from being garbage collected when all *user* references go away.
49 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
50 be made for a filesystem instance to be garbage collected.
51 """
53 def __init__(cls, *args, **kwargs):
54 super().__init__(*args, **kwargs)
55 # Note: we intentionally create a reference here, to avoid garbage
56 # collecting instances when all other references are gone. To really
57 # delete a FileSystem, the cache must be cleared.
58 if conf.get("weakref_instance_cache"): # pragma: no cover
59 # debug option for analysing fork/spawn conditions
60 cls._cache = weakref.WeakValueDictionary()
61 else:
62 cls._cache = {}
63 cls._pid = os.getpid()
65 def __call__(cls, *args, **kwargs):
66 kwargs = apply_config(cls, kwargs)
67 extra_tokens = tuple(
68 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
69 )
70 strip_tokenize_options = {
71 k: kwargs.pop(k) for k in cls._strip_tokenize_options if k in kwargs
72 }
73 if getattr(cls, "async_impl", False) and not kwargs.get("asynchronous", False):
74 token = tokenize(cls, cls._pid, *args, *extra_tokens, **kwargs)
75 else:
76 token = tokenize(
77 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
78 )
79 skip = kwargs.pop("skip_instance_cache", False)
80 if os.getpid() != cls._pid:
81 cls._cache.clear()
82 cls._pid = os.getpid()
83 if not skip and cls.cachable and token in cls._cache:
84 cls._latest = token
85 return cls._cache[token]
86 else:
87 obj = super().__call__(*args, **kwargs, **strip_tokenize_options)
88 # Setting _fs_token here causes some static linters to complain.
89 obj._fs_token_ = token
90 obj.storage_args = args
91 obj.storage_options = kwargs
92 if obj.async_impl and obj.mirror_sync_methods:
93 from .asyn import mirror_sync_methods
95 mirror_sync_methods(obj)
97 if cls.cachable and not skip:
98 cls._latest = token
99 cls._cache[token] = obj
100 return obj
103class AbstractFileSystem(metaclass=_Cached):
104 """
105 An abstract super-class for pythonic file-systems
107 Implementations are expected to be compatible with or, better, subclass
108 from here.
109 """
111 cachable = True # this class can be cached, instances reused
112 _cached = False
113 blocksize = 2**22
114 sep = "/"
115 protocol: ClassVar[str | tuple[str, ...]] = "abstract"
116 _latest = None
117 async_impl = False
118 mirror_sync_methods = False
119 root_marker = "" # For some FSs, may require leading '/' or other character
120 transaction_type = Transaction
122 #: Extra *class attributes* that should be considered when hashing.
123 _extra_tokenize_attributes = ()
124 #: *storage options* that should not be considered when hashing.
125 _strip_tokenize_options = ()
127 # Set by _Cached metaclass
128 storage_args: tuple[Any, ...]
129 storage_options: dict[str, Any]
131 def __init__(self, *args, **storage_options):
132 """Create and configure file-system instance
134 Instances may be cachable, so if similar enough arguments are seen
135 a new instance is not required. The token attribute exists to allow
136 implementations to cache instances if they wish.
138 A reasonable default should be provided if there are no arguments.
140 Subclasses should call this method.
142 Parameters
143 ----------
144 use_listings_cache, listings_expiry_time, max_paths:
145 passed to ``DirCache``, if the implementation supports
146 directory listing caching. Pass use_listings_cache=False
147 to disable such caching.
148 skip_instance_cache: bool
149 If this is a cachable implementation, pass True here to force
150 creating a new instance even if a matching instance exists, and prevent
151 storing this instance.
152 asynchronous: bool
153 loop: asyncio-compatible IOLoop or None
154 """
155 if self._cached:
156 # reusing instance, don't change
157 return
158 self._cached = True
159 self._intrans = False
160 self._transaction = None
161 self._invalidated_caches_in_transaction = []
162 self.dircache = DirCache(**storage_options)
164 if storage_options.pop("add_docs", None):
165 warnings.warn("add_docs is no longer supported.", FutureWarning)
167 if storage_options.pop("add_aliases", None):
168 warnings.warn("add_aliases has been removed.", FutureWarning)
169 # This is set in _Cached
170 self._fs_token_ = None
172 @property
173 def fsid(self):
174 """Persistent filesystem id that can be used to compare filesystems
175 across sessions.
176 """
177 raise NotImplementedError
179 @property
180 def _fs_token(self):
181 return self._fs_token_
183 def __dask_tokenize__(self):
184 return self._fs_token
186 def __hash__(self):
187 return int(self._fs_token, 16)
189 def __eq__(self, other):
190 return isinstance(other, type(self)) and self._fs_token == other._fs_token
192 def __reduce__(self):
193 return make_instance, (type(self), self.storage_args, self.storage_options)
195 @classmethod
196 def _strip_protocol(cls, path):
197 """Turn path from fully-qualified to file-system-specific
199 May require FS-specific handling, e.g., for relative paths or links.
200 """
201 if isinstance(path, list):
202 return [cls._strip_protocol(p) for p in path]
203 path = stringify_path(path)
204 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
205 for protocol in protos:
206 if path.startswith(protocol + "://"):
207 path = path[len(protocol) + 3 :]
208 elif path.startswith(protocol + "::"):
209 path = path[len(protocol) + 2 :]
210 path = path.rstrip("/")
211 # use of root_marker to make minimum required path, e.g., "/"
212 return path or cls.root_marker
214 def unstrip_protocol(self, name: str) -> str:
215 """Format FS-specific path to generic, including protocol"""
216 protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol
217 for protocol in protos:
218 if name.startswith(f"{protocol}://"):
219 return name
220 return f"{protos[0]}://{name}"
222 @staticmethod
223 def _get_kwargs_from_urls(path):
224 """If kwargs can be encoded in the paths, extract them here
226 This should happen before instantiation of the class; incoming paths
227 then should be amended to strip the options in methods.
229 Examples may look like an sftp path "sftp://user@host:/my/path", where
230 the user and host should become kwargs and later get stripped.
231 """
232 # by default, nothing happens
233 return {}
235 @classmethod
236 def current(cls):
237 """Return the most recently instantiated FileSystem
239 If no instance has been created, then create one with defaults
240 """
241 if cls._latest in cls._cache:
242 return cls._cache[cls._latest]
243 return cls()
245 @property
246 def transaction(self):
247 """A context within which files are committed together upon exit
249 Requires the file class to implement `.commit()` and `.discard()`
250 for the normal and exception cases.
251 """
252 if self._transaction is None:
253 self._transaction = self.transaction_type(self)
254 return self._transaction
256 def start_transaction(self):
257 """Begin write transaction for deferring files, non-context version"""
258 self._intrans = True
259 self._transaction = self.transaction_type(self)
260 return self.transaction
262 def end_transaction(self):
263 """Finish write transaction, non-context version"""
264 self.transaction.complete()
265 self._transaction = None
266 # The invalid cache must be cleared after the transaction is completed.
267 for path in self._invalidated_caches_in_transaction:
268 self.invalidate_cache(path)
269 self._invalidated_caches_in_transaction.clear()
271 def invalidate_cache(self, path=None):
272 """
273 Discard any cached directory information
275 Parameters
276 ----------
277 path: string or None
278 If None, clear all listings cached else listings at or under given
279 path.
280 """
281 # Not necessary to implement invalidation mechanism, may have no cache.
282 # But if have, you should call this method of parent class from your
283 # subclass to ensure expiring caches after transacations correctly.
284 # See the implementation of FTPFileSystem in ftp.py
285 if self._intrans:
286 self._invalidated_caches_in_transaction.append(path)
288 def mkdir(self, path, create_parents=True, **kwargs):
289 """
290 Create directory entry at path
292 For systems that don't have true directories, may create an for
293 this instance only and not touch the real filesystem
295 Parameters
296 ----------
297 path: str
298 location
299 create_parents: bool
300 if True, this is equivalent to ``makedirs``
301 kwargs:
302 may be permissions, etc.
303 """
304 pass # not necessary to implement, may not have directories
306 def makedirs(self, path, exist_ok=False):
307 """Recursively make directories
309 Creates directory at path and any intervening required directories.
310 Raises exception if, for instance, the path already exists but is a
311 file.
313 Parameters
314 ----------
315 path: str
316 leaf directory name
317 exist_ok: bool (False)
318 If False, will error if the target already exists
319 """
320 pass # not necessary to implement, may not have directories
322 def rmdir(self, path):
323 """Remove a directory, if empty"""
324 pass # not necessary to implement, may not have directories
326 def ls(self, path, detail=True, **kwargs):
327 """List objects at path.
329 This should include subdirectories and files at that location. The
330 difference between a file and a directory must be clear when details
331 are requested.
333 The specific keys, or perhaps a FileInfo class, or similar, is TBD,
334 but must be consistent across implementations.
335 Must include:
337 - full path to the entry (without protocol)
338 - size of the entry, in bytes. If the value cannot be determined, will
339 be ``None``.
340 - type of entry, "file", "directory" or other
342 Additional information
343 may be present, appropriate to the file-system, e.g., generation,
344 checksum, etc.
346 May use refresh=True|False to allow use of self._ls_from_cache to
347 check for a saved listing and avoid calling the backend. This would be
348 common where listing may be expensive.
350 Parameters
351 ----------
352 path: str
353 detail: bool
354 if True, gives a list of dictionaries, where each is the same as
355 the result of ``info(path)``. If False, gives a list of paths
356 (str).
357 kwargs: may have additional backend-specific options, such as version
358 information
360 Returns
361 -------
362 List of strings if detail is False, or list of directory information
363 dicts if detail is True.
364 """
365 raise NotImplementedError
367 def _ls_from_cache(self, path):
368 """Check cache for listing
370 Returns listing, if found (may be empty list for a directly that exists
371 but contains nothing), None if not in cache.
372 """
373 parent = self._parent(path)
374 try:
375 return self.dircache[path.rstrip("/")]
376 except KeyError:
377 pass
378 try:
379 files = [
380 f
381 for f in self.dircache[parent]
382 if f["name"] == path
383 or (f["name"] == path.rstrip("/") and f["type"] == "directory")
384 ]
385 if len(files) == 0:
386 # parent dir was listed but did not contain this file
387 raise FileNotFoundError(path)
388 return files
389 except KeyError:
390 pass
392 def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs):
393 """Return all files under the given path.
395 List all files, recursing into subdirectories; output is iterator-style,
396 like ``os.walk()``. For a simple list of files, ``find()`` is available.
398 When topdown is True, the caller can modify the dirnames list in-place (perhaps
399 using del or slice assignment), and walk() will
400 only recurse into the subdirectories whose names remain in dirnames;
401 this can be used to prune the search, impose a specific order of visiting,
402 or even to inform walk() about directories the caller creates or renames before
403 it resumes walk() again.
404 Modifying dirnames when topdown is False has no effect. (see os.walk)
406 Note that the "files" outputted will include anything that is not
407 a directory, such as links.
409 Parameters
410 ----------
411 path: str
412 Root to recurse into
413 maxdepth: int
414 Maximum recursion depth. None means limitless, but not recommended
415 on link-based file-systems.
416 topdown: bool (True)
417 Whether to walk the directory tree from the top downwards or from
418 the bottom upwards.
419 on_error: "omit", "raise", a callable
420 if omit (default), path with exception will simply be empty;
421 If raise, an underlying exception will be raised;
422 if callable, it will be called with a single OSError instance as argument
423 kwargs: passed to ``ls``
424 """
425 if maxdepth is not None and maxdepth < 1:
426 raise ValueError("maxdepth must be at least 1")
428 path = self._strip_protocol(path)
429 full_dirs = {}
430 dirs = {}
431 files = {}
433 detail = kwargs.pop("detail", False)
434 try:
435 listing = self.ls(path, detail=True, **kwargs)
436 except (FileNotFoundError, OSError) as e:
437 if on_error == "raise":
438 raise
439 if callable(on_error):
440 on_error(e)
441 return
443 for info in listing:
444 # each info name must be at least [path]/part , but here
445 # we check also for names like [path]/part/
446 pathname = info["name"].rstrip("/")
447 name = pathname.rsplit("/", 1)[-1]
448 if info["type"] == "directory" and pathname != path:
449 # do not include "self" path
450 full_dirs[name] = pathname
451 dirs[name] = info
452 elif pathname == path:
453 # file-like with same name as give path
454 files[""] = info
455 else:
456 files[name] = info
458 if not detail:
459 dirs = list(dirs)
460 files = list(files)
462 if topdown:
463 # Yield before recursion if walking top down
464 yield path, dirs, files
466 if maxdepth is not None:
467 maxdepth -= 1
468 if maxdepth < 1:
469 if not topdown:
470 yield path, dirs, files
471 return
473 for d in dirs:
474 yield from self.walk(
475 full_dirs[d],
476 maxdepth=maxdepth,
477 detail=detail,
478 topdown=topdown,
479 **kwargs,
480 )
482 if not topdown:
483 # Yield after recursion if walking bottom up
484 yield path, dirs, files
486 def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
487 """List all files below path.
489 Like posix ``find`` command without conditions
491 Parameters
492 ----------
493 path : str
494 maxdepth: int or None
495 If not None, the maximum number of levels to descend
496 withdirs: bool
497 Whether to include directory paths in the output. This is True
498 when used by glob, but users usually only want files.
499 kwargs are passed to ``ls``.
500 """
501 # TODO: allow equivalent of -name parameter
502 path = self._strip_protocol(path)
503 out = {}
505 # Add the root directory if withdirs is requested
506 # This is needed for posix glob compliance
507 if withdirs and path != "" and self.isdir(path):
508 out[path] = self.info(path)
510 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
511 if withdirs:
512 files.update(dirs)
513 out.update({info["name"]: info for name, info in files.items()})
514 if not out and self.isfile(path):
515 # walk works on directories, but find should also return [path]
516 # when path happens to be a file
517 out[path] = {}
518 names = sorted(out)
519 if not detail:
520 return names
521 else:
522 return {name: out[name] for name in names}
524 def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs):
525 """Space used by files and optionally directories within a path
527 Directory size does not include the size of its contents.
529 Parameters
530 ----------
531 path: str
532 total: bool
533 Whether to sum all the file sizes
534 maxdepth: int or None
535 Maximum number of directory levels to descend, None for unlimited.
536 withdirs: bool
537 Whether to include directory paths in the output.
538 kwargs: passed to ``find``
540 Returns
541 -------
542 Dict of {path: size} if total=False, or int otherwise, where numbers
543 refer to bytes used.
544 """
545 sizes = {}
546 if withdirs and self.isdir(path):
547 # Include top-level directory in output
548 info = self.info(path)
549 sizes[info["name"]] = info["size"]
550 for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs):
551 info = self.info(f)
552 sizes[info["name"]] = info["size"]
553 if total:
554 return sum(sizes.values())
555 else:
556 return sizes
558 def glob(self, path, maxdepth=None, **kwargs):
559 """Find files by glob-matching.
561 Pattern matching capabilities for finding files that match the given pattern.
563 Parameters
564 ----------
565 path: str
566 The glob pattern to match against
567 maxdepth: int or None
568 Maximum depth for ``'**'`` patterns. Applied on the first ``'**'`` found.
569 Must be at least 1 if provided.
570 kwargs:
571 Additional arguments passed to ``find`` (e.g., detail=True)
573 Returns
574 -------
575 List of matched paths, or dict of paths and their info if detail=True
577 Notes
578 -----
579 Supported patterns:
580 - '*': Matches any sequence of characters within a single directory level
581 - ``'**'``: Matches any number of directory levels (must be an entire path component)
582 - '?': Matches exactly one character
583 - '[abc]': Matches any character in the set
584 - '[a-z]': Matches any character in the range
585 - '[!abc]': Matches any character NOT in the set
587 Special behaviors:
588 - If the path ends with '/', only folders are returned
589 - Consecutive '*' characters are compressed into a single '*'
590 - Empty brackets '[]' never match anything
591 - Negated empty brackets '[!]' match any single character
592 - Special characters in character classes are escaped properly
594 Limitations:
595 - ``'**'`` must be a complete path component (e.g., ``'a/**/b'``, not ``'a**b'``)
596 - No brace expansion ('{a,b}.txt')
597 - No extended glob patterns ('+(pattern)', '!(pattern)')
598 """
599 if maxdepth is not None and maxdepth < 1:
600 raise ValueError("maxdepth must be at least 1")
602 import re
604 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
605 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
606 path = self._strip_protocol(path)
607 append_slash_to_dirname = ends_with_sep or path.endswith(
608 tuple(sep + "**" for sep in seps)
609 )
610 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
611 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
612 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
614 min_idx = min(idx_star, idx_qmark, idx_brace)
616 detail = kwargs.pop("detail", False)
617 withdirs = kwargs.pop("withdirs", True)
619 if not has_magic(path):
620 if self.exists(path, **kwargs):
621 if not detail:
622 return [path]
623 else:
624 return {path: self.info(path, **kwargs)}
625 else:
626 if not detail:
627 return [] # glob of non-existent returns empty
628 else:
629 return {}
630 elif "/" in path[:min_idx]:
631 min_idx = path[:min_idx].rindex("/")
632 root = path[: min_idx + 1]
633 depth = path[min_idx + 1 :].count("/") + 1
634 else:
635 root = ""
636 depth = path[min_idx + 1 :].count("/") + 1
638 if "**" in path:
639 if maxdepth is not None:
640 idx_double_stars = path.find("**")
641 depth_double_stars = path[idx_double_stars:].count("/") + 1
642 depth = depth - depth_double_stars + maxdepth
643 else:
644 depth = None
646 allpaths = self.find(
647 root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs
648 )
650 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
651 pattern = re.compile(pattern)
653 out = {
654 p: info
655 for p, info in sorted(allpaths.items())
656 if pattern.match(
657 p + "/"
658 if append_slash_to_dirname and info["type"] == "directory"
659 else p
660 )
661 }
663 if detail:
664 return out
665 else:
666 return list(out)
668 def exists(self, path, **kwargs):
669 """Is there a file at the given path"""
670 try:
671 self.info(path, **kwargs)
672 return True
673 except: # noqa: E722
674 # any exception allowed bar FileNotFoundError?
675 return False
677 def lexists(self, path, **kwargs):
678 """If there is a file at the given path (including
679 broken links)"""
680 return self.exists(path)
682 def info(self, path, **kwargs):
683 """Give details of entry at path
685 Returns a single dictionary, with exactly the same information as ``ls``
686 would with ``detail=True``.
688 The default implementation calls ls and could be overridden by a
689 shortcut. kwargs are passed on to ```ls()``.
691 Some file systems might not be able to measure the file's size, in
692 which case, the returned dict will include ``'size': None``.
694 Returns
695 -------
696 dict with keys: name (full path in the FS), size (in bytes), type (file,
697 directory, or something else) and other FS-specific keys.
698 """
699 path = self._strip_protocol(path)
700 out = self.ls(self._parent(path), detail=True, **kwargs)
701 out = [o for o in out if o["name"].rstrip("/") == path]
702 if out:
703 return out[0]
704 out = self.ls(path, detail=True, **kwargs)
705 path = path.rstrip("/")
706 out1 = [o for o in out if o["name"].rstrip("/") == path]
707 if len(out1) == 1:
708 if "size" not in out1[0]:
709 out1[0]["size"] = None
710 return out1[0]
711 elif len(out1) > 1 or out:
712 return {"name": path, "size": 0, "type": "directory"}
713 else:
714 raise FileNotFoundError(path)
716 def checksum(self, path):
717 """Unique value for current version of file
719 If the checksum is the same from one moment to another, the contents
720 are guaranteed to be the same. If the checksum changes, the contents
721 *might* have changed.
723 This should normally be overridden; default will probably capture
724 creation/modification timestamp (which would be good) or maybe
725 access timestamp (which would be bad)
726 """
727 return int(tokenize(self.info(path)), 16)
729 def size(self, path):
730 """Size in bytes of file"""
731 return self.info(path).get("size", None)
733 def sizes(self, paths):
734 """Size in bytes of each file in a list of paths"""
735 return [self.size(p) for p in paths]
737 def isdir(self, path):
738 """Is this entry directory-like?"""
739 try:
740 return self.info(path)["type"] == "directory"
741 except OSError:
742 return False
744 def isfile(self, path):
745 """Is this entry file-like?"""
746 try:
747 return self.info(path)["type"] == "file"
748 except: # noqa: E722
749 return False
751 def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs):
752 """Get the contents of the file as a string.
754 Parameters
755 ----------
756 path: str
757 URL of file on this filesystems
758 encoding, errors, newline: same as `open`.
759 """
760 with self.open(
761 path,
762 mode="r",
763 encoding=encoding,
764 errors=errors,
765 newline=newline,
766 **kwargs,
767 ) as f:
768 return f.read()
770 def write_text(
771 self, path, value, encoding=None, errors=None, newline=None, **kwargs
772 ):
773 """Write the text to the given file.
775 An existing file will be overwritten.
777 Parameters
778 ----------
779 path: str
780 URL of file on this filesystems
781 value: str
782 Text to write.
783 encoding, errors, newline: same as `open`.
784 """
785 with self.open(
786 path,
787 mode="w",
788 encoding=encoding,
789 errors=errors,
790 newline=newline,
791 **kwargs,
792 ) as f:
793 return f.write(value)
795 def cat_file(self, path, start=None, end=None, **kwargs):
796 """Get the content of a file
798 Parameters
799 ----------
800 path: URL of file on this filesystems
801 start, end: int
802 Bytes limits of the read. If negative, backwards from end,
803 like usual python slices. Either can be None for start or
804 end of file, respectively
805 kwargs: passed to ``open()``.
806 """
807 # explicitly set buffering off?
808 with self.open(path, "rb", **kwargs) as f:
809 if start is not None:
810 if start >= 0:
811 f.seek(start)
812 else:
813 f.seek(max(0, f.size + start))
814 if end is not None:
815 if end < 0:
816 end = f.size + end
817 return f.read(end - f.tell())
818 return f.read()
820 def pipe_file(self, path, value, mode="overwrite", **kwargs):
821 """Set the bytes of given file"""
822 if mode == "create" and self.exists(path):
823 # non-atomic but simple way; or could use "xb" in open(), which is likely
824 # not as well supported
825 raise FileExistsError
826 with self.open(path, "wb", **kwargs) as f:
827 f.write(value)
829 def pipe(self, path, value=None, **kwargs):
830 """Put value into path
832 (counterpart to ``cat``)
834 Parameters
835 ----------
836 path: string or dict(str, bytes)
837 If a string, a single remote location to put ``value`` bytes; if a dict,
838 a mapping of {path: bytesvalue}.
839 value: bytes, optional
840 If using a single path, these are the bytes to put there. Ignored if
841 ``path`` is a dict
842 """
843 if isinstance(path, str):
844 self.pipe_file(self._strip_protocol(path), value, **kwargs)
845 elif isinstance(path, dict):
846 for k, v in path.items():
847 self.pipe_file(self._strip_protocol(k), v, **kwargs)
848 else:
849 raise ValueError("path must be str or dict")
851 def cat_ranges(
852 self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
853 ):
854 """Get the contents of byte ranges from one or more files
856 Parameters
857 ----------
858 paths: list
859 A list of of filepaths on this filesystems
860 starts, ends: int or list
861 Bytes limits of the read. If using a single int, the same value will be
862 used to read all the specified files.
863 """
864 if max_gap is not None:
865 raise NotImplementedError
866 if not isinstance(paths, list):
867 raise TypeError
868 if not isinstance(starts, list):
869 starts = [starts] * len(paths)
870 if not isinstance(ends, list):
871 ends = [ends] * len(paths)
872 if len(starts) != len(paths) or len(ends) != len(paths):
873 raise ValueError
874 out = []
875 for p, s, e in zip(paths, starts, ends):
876 try:
877 out.append(self.cat_file(p, s, e, **kwargs))
878 except Exception as e:
879 if on_error == "return":
880 out.append(e)
881 else:
882 raise
883 return out
885 def cat(self, path, recursive=False, on_error="raise", **kwargs):
886 """Fetch (potentially multiple) paths' contents
888 Parameters
889 ----------
890 recursive: bool
891 If True, assume the path(s) are directories, and get all the
892 contained files
893 on_error : "raise", "omit", "return"
894 If raise, an underlying exception will be raised (converted to KeyError
895 if the type is in self.missing_exceptions); if omit, keys with exception
896 will simply not be included in the output; if "return", all keys are
897 included in the output, but the value will be bytes or an exception
898 instance.
899 kwargs: passed to cat_file
901 Returns
902 -------
903 dict of {path: contents} if there are multiple paths
904 or the path has been otherwise expanded
905 """
906 paths = self.expand_path(path, recursive=recursive, **kwargs)
907 if (
908 len(paths) > 1
909 or isinstance(path, list)
910 or paths[0] != self._strip_protocol(path)
911 ):
912 out = {}
913 for path in paths:
914 try:
915 out[path] = self.cat_file(path, **kwargs)
916 except Exception as e:
917 if on_error == "raise":
918 raise
919 if on_error == "return":
920 out[path] = e
921 return out
922 else:
923 return self.cat_file(paths[0], **kwargs)
925 def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, outfile=None, **kwargs):
926 """Copy single remote file to local"""
927 from .implementations.local import LocalFileSystem
929 if isfilelike(lpath):
930 outfile = lpath
931 elif self.isdir(rpath):
932 os.makedirs(lpath, exist_ok=True)
933 return None
935 fs = LocalFileSystem(auto_mkdir=True)
936 fs.makedirs(fs._parent(lpath), exist_ok=True)
938 with self.open(rpath, "rb", **kwargs) as f1:
939 if outfile is None:
940 outfile = open(lpath, "wb")
942 try:
943 callback.set_size(getattr(f1, "size", None))
944 data = True
945 while data:
946 data = f1.read(self.blocksize)
947 segment_len = outfile.write(data)
948 if segment_len is None:
949 segment_len = len(data)
950 callback.relative_update(segment_len)
951 finally:
952 if not isfilelike(lpath):
953 outfile.close()
955 def get(
956 self,
957 rpath,
958 lpath,
959 recursive=False,
960 callback=DEFAULT_CALLBACK,
961 maxdepth=None,
962 **kwargs,
963 ):
964 """Copy file(s) to local.
966 Copies a specific file or tree of files (if recursive=True). If lpath
967 ends with a "/", it will be assumed to be a directory, and target files
968 will go within. Can submit a list of paths, which may be glob-patterns
969 and will be expanded.
971 Calls get_file for each source.
972 """
973 if isinstance(lpath, list) and isinstance(rpath, list):
974 # No need to expand paths when both source and destination
975 # are provided as lists
976 rpaths = rpath
977 lpaths = lpath
978 else:
979 from .implementations.local import (
980 LocalFileSystem,
981 make_path_posix,
982 trailing_sep,
983 )
985 source_is_str = isinstance(rpath, str)
986 rpaths = self.expand_path(
987 rpath, recursive=recursive, maxdepth=maxdepth, **kwargs
988 )
989 if source_is_str and (not recursive or maxdepth is not None):
990 # Non-recursive glob does not copy directories
991 rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))]
992 if not rpaths:
993 return
995 if isinstance(lpath, str):
996 lpath = make_path_posix(lpath)
998 source_is_file = len(rpaths) == 1
999 dest_is_dir = isinstance(lpath, str) and (
1000 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
1001 )
1003 exists = source_is_str and (
1004 (has_magic(rpath) and source_is_file)
1005 or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath))
1006 )
1007 lpaths = other_paths(
1008 rpaths,
1009 lpath,
1010 exists=exists,
1011 flatten=not source_is_str,
1012 )
1014 callback.set_size(len(lpaths))
1015 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1016 with callback.branched(rpath, lpath) as child:
1017 self.get_file(rpath, lpath, callback=child, **kwargs)
1019 def put_file(
1020 self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
1021 ):
1022 """Copy single file to remote"""
1023 if mode == "create" and self.exists(rpath):
1024 raise FileExistsError
1025 if os.path.isdir(lpath):
1026 self.makedirs(rpath, exist_ok=True)
1027 return None
1029 with open(lpath, "rb") as f1:
1030 size = f1.seek(0, 2)
1031 callback.set_size(size)
1032 f1.seek(0)
1034 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True)
1035 with self.open(rpath, "wb", **kwargs) as f2:
1036 while f1.tell() < size:
1037 data = f1.read(self.blocksize)
1038 segment_len = f2.write(data)
1039 if segment_len is None:
1040 segment_len = len(data)
1041 callback.relative_update(segment_len)
1043 def put(
1044 self,
1045 lpath,
1046 rpath,
1047 recursive=False,
1048 callback=DEFAULT_CALLBACK,
1049 maxdepth=None,
1050 **kwargs,
1051 ):
1052 """Copy file(s) from local.
1054 Copies a specific file or tree of files (if recursive=True). If rpath
1055 ends with a "/", it will be assumed to be a directory, and target files
1056 will go within.
1058 Calls put_file for each source.
1059 """
1060 if isinstance(lpath, list) and isinstance(rpath, list):
1061 # No need to expand paths when both source and destination
1062 # are provided as lists
1063 rpaths = rpath
1064 lpaths = lpath
1065 else:
1066 from .implementations.local import (
1067 LocalFileSystem,
1068 make_path_posix,
1069 trailing_sep,
1070 )
1072 source_is_str = isinstance(lpath, str)
1073 if source_is_str:
1074 lpath = make_path_posix(lpath)
1075 fs = LocalFileSystem()
1076 lpaths = fs.expand_path(
1077 lpath, recursive=recursive, maxdepth=maxdepth, **kwargs
1078 )
1079 if source_is_str and (not recursive or maxdepth is not None):
1080 # Non-recursive glob does not copy directories
1081 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
1082 if not lpaths:
1083 return
1085 source_is_file = len(lpaths) == 1
1086 dest_is_dir = isinstance(rpath, str) and (
1087 trailing_sep(rpath) or self.isdir(rpath)
1088 )
1090 rpath = (
1091 self._strip_protocol(rpath)
1092 if isinstance(rpath, str)
1093 else [self._strip_protocol(p) for p in rpath]
1094 )
1095 exists = source_is_str and (
1096 (has_magic(lpath) and source_is_file)
1097 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
1098 )
1099 rpaths = other_paths(
1100 lpaths,
1101 rpath,
1102 exists=exists,
1103 flatten=not source_is_str,
1104 )
1106 callback.set_size(len(rpaths))
1107 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1108 with callback.branched(lpath, rpath) as child:
1109 self.put_file(lpath, rpath, callback=child, **kwargs)
1111 def head(self, path, size=1024):
1112 """Get the first ``size`` bytes from file"""
1113 with self.open(path, "rb") as f:
1114 return f.read(size)
1116 def tail(self, path, size=1024):
1117 """Get the last ``size`` bytes from file"""
1118 with self.open(path, "rb") as f:
1119 f.seek(max(-size, -f.size), 2)
1120 return f.read()
1122 def cp_file(self, path1, path2, **kwargs):
1123 raise NotImplementedError
1125 def copy(
1126 self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs
1127 ):
1128 """Copy within two locations in the filesystem
1130 on_error : "raise", "ignore"
1131 If raise, any not-found exceptions will be raised; if ignore any
1132 not-found exceptions will cause the path to be skipped; defaults to
1133 raise unless recursive is true, where the default is ignore
1134 """
1135 if on_error is None and recursive:
1136 on_error = "ignore"
1137 elif on_error is None:
1138 on_error = "raise"
1140 if isinstance(path1, list) and isinstance(path2, list):
1141 # No need to expand paths when both source and destination
1142 # are provided as lists
1143 paths1 = path1
1144 paths2 = path2
1145 else:
1146 from .implementations.local import trailing_sep
1148 source_is_str = isinstance(path1, str)
1149 paths1 = self.expand_path(
1150 path1, recursive=recursive, maxdepth=maxdepth, **kwargs
1151 )
1152 if source_is_str and (not recursive or maxdepth is not None):
1153 # Non-recursive glob does not copy directories
1154 paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))]
1155 if not paths1:
1156 return
1158 source_is_file = len(paths1) == 1
1159 dest_is_dir = isinstance(path2, str) and (
1160 trailing_sep(path2) or self.isdir(path2)
1161 )
1163 exists = source_is_str and (
1164 (has_magic(path1) and source_is_file)
1165 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
1166 )
1167 paths2 = other_paths(
1168 paths1,
1169 path2,
1170 exists=exists,
1171 flatten=not source_is_str,
1172 )
1174 for p1, p2 in zip(paths1, paths2):
1175 try:
1176 self.cp_file(p1, p2, **kwargs)
1177 except FileNotFoundError:
1178 if on_error == "raise":
1179 raise
1181 def expand_path(
1182 self, path, recursive=False, maxdepth=None, assume_literal=False, **kwargs
1183 ):
1184 """Turn one or more globs or directories into a list of all matching paths
1185 to files or directories.
1187 kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls``
1188 """
1190 if maxdepth is not None and maxdepth < 1:
1191 raise ValueError("maxdepth must be at least 1")
1193 if isinstance(path, (str, os.PathLike)):
1194 out = self.expand_path([path], recursive, maxdepth, **kwargs)
1195 else:
1196 out = set()
1197 path = [self._strip_protocol(p) for p in path]
1198 for p in path:
1199 if not assume_literal and has_magic(p):
1200 bit = set(self.glob(p, maxdepth=maxdepth, **kwargs))
1201 out |= bit
1202 if recursive:
1203 # glob call above expanded one depth so if maxdepth is defined
1204 # then decrement it in expand_path call below. If it is zero
1205 # after decrementing then avoid expand_path call.
1206 if maxdepth is not None and maxdepth <= 1:
1207 continue
1208 out |= set(
1209 self.expand_path(
1210 list(bit),
1211 recursive=recursive,
1212 maxdepth=maxdepth - 1 if maxdepth is not None else None,
1213 assume_literal=True,
1214 **kwargs,
1215 )
1216 )
1217 continue
1218 elif recursive:
1219 rec = set(
1220 self.find(
1221 p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs
1222 )
1223 )
1224 out |= rec
1225 if p not in out and (recursive is False or self.exists(p)):
1226 # should only check once, for the root
1227 out.add(p)
1228 if not out:
1229 raise FileNotFoundError(path)
1230 return sorted(out)
1232 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
1233 """Move file(s) from one location to another"""
1234 if path1 == path2:
1235 logger.debug("%s mv: The paths are the same, so no files were moved.", self)
1236 else:
1237 # explicitly raise exception to prevent data corruption
1238 self.copy(
1239 path1, path2, recursive=recursive, maxdepth=maxdepth, on_error="raise"
1240 )
1241 self.rm(path1, recursive=recursive)
1243 def rm_file(self, path):
1244 """Delete a file"""
1245 self._rm(path)
1247 def _rm(self, path):
1248 """Delete one file"""
1249 # this is the old name for the method, prefer rm_file
1250 raise NotImplementedError
1252 def rm(self, path, recursive=False, maxdepth=None):
1253 """Delete files.
1255 Parameters
1256 ----------
1257 path: str or list of str
1258 File(s) to delete.
1259 recursive: bool
1260 If file(s) are directories, recursively delete contents and then
1261 also remove the directory
1262 maxdepth: int or None
1263 Depth to pass to walk for finding files to delete, if recursive.
1264 If None, there will be no limit and infinite recursion may be
1265 possible.
1266 """
1267 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
1268 for p in reversed(path):
1269 self.rm_file(p)
1271 @classmethod
1272 def _parent(cls, path):
1273 path = cls._strip_protocol(path)
1274 if "/" in path:
1275 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
1276 return cls.root_marker + parent
1277 else:
1278 return cls.root_marker
1280 def _open(
1281 self,
1282 path,
1283 mode="rb",
1284 block_size=None,
1285 autocommit=True,
1286 cache_options=None,
1287 **kwargs,
1288 ):
1289 """Return raw bytes-mode file-like from the file-system"""
1290 return AbstractBufferedFile(
1291 self,
1292 path,
1293 mode,
1294 block_size,
1295 autocommit,
1296 cache_options=cache_options,
1297 **kwargs,
1298 )
1300 def open(
1301 self,
1302 path,
1303 mode="rb",
1304 block_size=None,
1305 cache_options=None,
1306 compression=None,
1307 **kwargs,
1308 ):
1309 """
1310 Return a file-like object from the filesystem
1312 The resultant instance must function correctly in a context ``with``
1313 block.
1315 Parameters
1316 ----------
1317 path: str
1318 Target file
1319 mode: str like 'rb', 'w'
1320 See builtin ``open()``
1321 Mode "x" (exclusive write) may be implemented by the backend. Even if
1322 it is, whether it is checked up front or on commit, and whether it is
1323 atomic is implementation-dependent.
1324 block_size: int
1325 Some indication of buffering - this is a value in bytes
1326 cache_options : dict, optional
1327 Extra arguments to pass through to the cache.
1328 compression: string or None
1329 If given, open file using compression codec. Can either be a compression
1330 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
1331 compression from the filename suffix.
1332 encoding, errors, newline: passed on to TextIOWrapper for text mode
1333 """
1334 import io
1336 path = self._strip_protocol(path)
1337 if "b" not in mode:
1338 mode = mode.replace("t", "") + "b"
1340 text_kwargs = {
1341 k: kwargs.pop(k)
1342 for k in ["encoding", "errors", "newline"]
1343 if k in kwargs
1344 }
1345 return io.TextIOWrapper(
1346 self.open(
1347 path,
1348 mode,
1349 block_size=block_size,
1350 cache_options=cache_options,
1351 compression=compression,
1352 **kwargs,
1353 ),
1354 **text_kwargs,
1355 )
1356 else:
1357 ac = kwargs.pop("autocommit", not self._intrans)
1358 f = self._open(
1359 path,
1360 mode=mode,
1361 block_size=block_size,
1362 autocommit=ac,
1363 cache_options=cache_options,
1364 **kwargs,
1365 )
1366 if compression is not None:
1367 from fsspec.compression import compr
1368 from fsspec.core import get_compression
1370 compression = get_compression(path, compression)
1371 compress = compr[compression]
1372 f = compress(f, mode=mode[0])
1374 if not ac and "r" not in mode:
1375 self.transaction.files.append(f)
1376 return f
1378 def touch(self, path, truncate=True, **kwargs):
1379 """Create empty file, or update timestamp
1381 Parameters
1382 ----------
1383 path: str
1384 file location
1385 truncate: bool
1386 If True, always set file size to 0; if False, update timestamp and
1387 leave file unchanged, if backend allows this
1388 """
1389 if truncate or not self.exists(path):
1390 with self.open(path, "wb", **kwargs):
1391 pass
1392 else:
1393 raise NotImplementedError # update timestamp, if possible
1395 def ukey(self, path):
1396 """Hash of file properties, to tell if it has changed"""
1397 return sha256(str(self.info(path)).encode()).hexdigest()
1399 def read_block(self, fn, offset, length, delimiter=None):
1400 """Read a block of bytes from
1402 Starting at ``offset`` of the file, read ``length`` bytes. If
1403 ``delimiter`` is set then we ensure that the read starts and stops at
1404 delimiter boundaries that follow the locations ``offset`` and ``offset
1405 + length``. If ``offset`` is zero then we start at zero. The
1406 bytestring returned WILL include the end delimiter string.
1408 If offset+length is beyond the eof, reads to eof.
1410 Parameters
1411 ----------
1412 fn: string
1413 Path to filename
1414 offset: int
1415 Byte offset to start read
1416 length: int
1417 Number of bytes to read. If None, read to end.
1418 delimiter: bytes (optional)
1419 Ensure reading starts and stops at delimiter bytestring
1421 Examples
1422 --------
1423 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP
1424 b'Alice, 100\\nBo'
1425 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP
1426 b'Alice, 100\\nBob, 200\\n'
1428 Use ``length=None`` to read to the end of the file.
1429 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP
1430 b'Alice, 100\\nBob, 200\\nCharlie, 300'
1432 See Also
1433 --------
1434 :func:`fsspec.utils.read_block`
1435 """
1436 with self.open(fn, "rb") as f:
1437 size = f.size
1438 if length is None:
1439 length = size
1440 if size is not None and offset + length > size:
1441 length = size - offset
1442 return read_block(f, offset, length, delimiter)
1444 def to_json(self, *, include_password: bool = True) -> str:
1445 """
1446 JSON representation of this filesystem instance.
1448 Parameters
1449 ----------
1450 include_password: bool, default True
1451 Whether to include the password (if any) in the output.
1453 Returns
1454 -------
1455 JSON string with keys ``cls`` (the python location of this class),
1456 protocol (text name of this class's protocol, first one in case of
1457 multiple), ``args`` (positional args, usually empty), and all other
1458 keyword arguments as their own keys.
1460 Warnings
1461 --------
1462 Serialized filesystems may contain sensitive information which have been
1463 passed to the constructor, such as passwords and tokens. Make sure you
1464 store and send them in a secure environment!
1465 """
1466 from .json import FilesystemJSONEncoder
1468 return json.dumps(
1469 self,
1470 cls=type(
1471 "_FilesystemJSONEncoder",
1472 (FilesystemJSONEncoder,),
1473 {"include_password": include_password},
1474 ),
1475 )
1477 @staticmethod
1478 def from_json(blob: str) -> AbstractFileSystem:
1479 """
1480 Recreate a filesystem instance from JSON representation.
1482 See ``.to_json()`` for the expected structure of the input.
1484 Parameters
1485 ----------
1486 blob: str
1488 Returns
1489 -------
1490 file system instance, not necessarily of this particular class.
1492 Warnings
1493 --------
1494 This can import arbitrary modules (as determined by the ``cls`` key).
1495 Make sure you haven't installed any modules that may execute malicious code
1496 at import time.
1497 """
1498 from .json import FilesystemJSONDecoder
1500 return json.loads(blob, cls=FilesystemJSONDecoder)
1502 def to_dict(self, *, include_password: bool = True) -> dict[str, Any]:
1503 """
1504 JSON-serializable dictionary representation of this filesystem instance.
1506 Parameters
1507 ----------
1508 include_password: bool, default True
1509 Whether to include the password (if any) in the output.
1511 Returns
1512 -------
1513 Dictionary with keys ``cls`` (the python location of this class),
1514 protocol (text name of this class's protocol, first one in case of
1515 multiple), ``args`` (positional args, usually empty), and all other
1516 keyword arguments as their own keys.
1518 Warnings
1519 --------
1520 Serialized filesystems may contain sensitive information which have been
1521 passed to the constructor, such as passwords and tokens. Make sure you
1522 store and send them in a secure environment!
1523 """
1524 from .json import FilesystemJSONEncoder
1526 json_encoder = FilesystemJSONEncoder()
1528 cls = type(self)
1529 proto = self.protocol
1531 storage_options = dict(self.storage_options)
1532 if not include_password:
1533 storage_options.pop("password", None)
1535 return dict(
1536 cls=f"{cls.__module__}:{cls.__name__}",
1537 protocol=proto[0] if isinstance(proto, (tuple, list)) else proto,
1538 args=json_encoder.make_serializable(self.storage_args),
1539 **json_encoder.make_serializable(storage_options),
1540 )
1542 @staticmethod
1543 def from_dict(dct: dict[str, Any]) -> AbstractFileSystem:
1544 """
1545 Recreate a filesystem instance from dictionary representation.
1547 See ``.to_dict()`` for the expected structure of the input.
1549 Parameters
1550 ----------
1551 dct: Dict[str, Any]
1553 Returns
1554 -------
1555 file system instance, not necessarily of this particular class.
1557 Warnings
1558 --------
1559 This can import arbitrary modules (as determined by the ``cls`` key).
1560 Make sure you haven't installed any modules that may execute malicious code
1561 at import time.
1562 """
1563 from .json import FilesystemJSONDecoder
1565 json_decoder = FilesystemJSONDecoder()
1567 dct = dict(dct) # Defensive copy
1569 cls = FilesystemJSONDecoder.try_resolve_fs_cls(dct)
1570 if cls is None:
1571 raise ValueError("Not a serialized AbstractFileSystem")
1573 dct.pop("cls", None)
1574 dct.pop("protocol", None)
1576 return cls(
1577 *json_decoder.unmake_serializable(dct.pop("args", ())),
1578 **json_decoder.unmake_serializable(dct),
1579 )
1581 def _get_pyarrow_filesystem(self):
1582 """
1583 Make a version of the FS instance which will be acceptable to pyarrow
1584 """
1585 # all instances already also derive from pyarrow
1586 return self
1588 def get_mapper(self, root="", check=False, create=False, missing_exceptions=None):
1589 """Create key/value store based on this file-system
1591 Makes a MutableMapping interface to the FS at the given root path.
1592 See ``fsspec.mapping.FSMap`` for further details.
1593 """
1594 from .mapping import FSMap
1596 return FSMap(
1597 root,
1598 self,
1599 check=check,
1600 create=create,
1601 missing_exceptions=missing_exceptions,
1602 )
1604 @classmethod
1605 def clear_instance_cache(cls):
1606 """
1607 Clear the cache of filesystem instances.
1609 Notes
1610 -----
1611 Unless overridden by setting the ``cachable`` class attribute to False,
1612 the filesystem class stores a reference to newly created instances. This
1613 prevents Python's normal rules around garbage collection from working,
1614 since the instances refcount will not drop to zero until
1615 ``clear_instance_cache`` is called.
1616 """
1617 cls._cache.clear()
1619 def created(self, path):
1620 """Return the created timestamp of a file as a datetime.datetime"""
1621 raise NotImplementedError
1623 def modified(self, path):
1624 """Return the modified timestamp of a file as a datetime.datetime"""
1625 raise NotImplementedError
1627 def tree(
1628 self,
1629 path: str = "/",
1630 recursion_limit: int = 2,
1631 max_display: int = 25,
1632 display_size: bool = False,
1633 prefix: str = "",
1634 is_last: bool = True,
1635 first: bool = True,
1636 indent_size: int = 4,
1637 ) -> str:
1638 """
1639 Return a tree-like structure of the filesystem starting from the given path as a string.
1641 Parameters
1642 ----------
1643 path: Root path to start traversal from
1644 recursion_limit: Maximum depth of directory traversal
1645 max_display: Maximum number of items to display per directory
1646 display_size: Whether to display file sizes
1647 prefix: Current line prefix for visual tree structure
1648 is_last: Whether current item is last in its level
1649 first: Whether this is the first call (displays root path)
1650 indent_size: Number of spaces by indent
1652 Returns
1653 -------
1654 str: A string representing the tree structure.
1656 Example
1657 -------
1658 >>> from fsspec import filesystem
1660 >>> fs = filesystem('ftp', host='test.rebex.net', user='demo', password='password')
1661 >>> tree = fs.tree(display_size=True, recursion_limit=3, indent_size=8, max_display=10)
1662 >>> print(tree)
1663 """
1665 def format_bytes(n: int) -> str:
1666 """Format bytes as text."""
1667 for prefix, k in (
1668 ("P", 2**50),
1669 ("T", 2**40),
1670 ("G", 2**30),
1671 ("M", 2**20),
1672 ("k", 2**10),
1673 ):
1674 if n >= 0.9 * k:
1675 return f"{n / k:.2f} {prefix}b"
1676 return f"{n}B"
1678 result = []
1680 if first:
1681 result.append(path)
1683 if recursion_limit:
1684 indent = " " * indent_size
1685 contents = self.ls(path, detail=True)
1686 contents.sort(
1687 key=lambda x: (x.get("type") != "directory", x.get("name", ""))
1688 )
1690 if max_display is not None and len(contents) > max_display:
1691 displayed_contents = contents[:max_display]
1692 remaining_count = len(contents) - max_display
1693 else:
1694 displayed_contents = contents
1695 remaining_count = 0
1697 for i, item in enumerate(displayed_contents):
1698 is_last_item = (i == len(displayed_contents) - 1) and (
1699 remaining_count == 0
1700 )
1702 branch = (
1703 "└" + ("─" * (indent_size - 2))
1704 if is_last_item
1705 else "├" + ("─" * (indent_size - 2))
1706 )
1707 branch += " "
1708 new_prefix = prefix + (
1709 indent if is_last_item else "│" + " " * (indent_size - 1)
1710 )
1712 name = os.path.basename(item.get("name", ""))
1714 if display_size and item.get("type") == "directory":
1715 sub_contents = self.ls(item.get("name", ""), detail=True)
1716 num_files = sum(
1717 1 for sub_item in sub_contents if sub_item.get("type") == "file"
1718 )
1719 num_folders = sum(
1720 1
1721 for sub_item in sub_contents
1722 if sub_item.get("type") == "directory"
1723 )
1725 if num_files == 0 and num_folders == 0:
1726 size = " (empty folder)"
1727 elif num_files == 0:
1728 size = f" ({num_folders} subfolder{'s' if num_folders > 1 else ''})"
1729 elif num_folders == 0:
1730 size = f" ({num_files} file{'s' if num_files > 1 else ''})"
1731 else:
1732 size = f" ({num_files} file{'s' if num_files > 1 else ''}, {num_folders} subfolder{'s' if num_folders > 1 else ''})"
1733 elif display_size and item.get("type") == "file":
1734 size = f" ({format_bytes(item.get('size', 0))})"
1735 else:
1736 size = ""
1738 result.append(f"{prefix}{branch}{name}{size}")
1740 if item.get("type") == "directory" and recursion_limit > 0:
1741 result.append(
1742 self.tree(
1743 path=item.get("name", ""),
1744 recursion_limit=recursion_limit - 1,
1745 max_display=max_display,
1746 display_size=display_size,
1747 prefix=new_prefix,
1748 is_last=is_last_item,
1749 first=False,
1750 indent_size=indent_size,
1751 )
1752 )
1754 if remaining_count > 0:
1755 more_message = f"{remaining_count} more item(s) not displayed."
1756 result.append(
1757 f"{prefix}{'└' + ('─' * (indent_size - 2))} {more_message}"
1758 )
1760 return "\n".join(_ for _ in result if _)
1762 # ------------------------------------------------------------------------
1763 # Aliases
1765 def read_bytes(self, path, start=None, end=None, **kwargs):
1766 """Alias of `AbstractFileSystem.cat_file`."""
1767 return self.cat_file(path, start=start, end=end, **kwargs)
1769 def write_bytes(self, path, value, **kwargs):
1770 """Alias of `AbstractFileSystem.pipe_file`."""
1771 self.pipe_file(path, value, **kwargs)
1773 def makedir(self, path, create_parents=True, **kwargs):
1774 """Alias of `AbstractFileSystem.mkdir`."""
1775 return self.mkdir(path, create_parents=create_parents, **kwargs)
1777 def mkdirs(self, path, exist_ok=False):
1778 """Alias of `AbstractFileSystem.makedirs`."""
1779 return self.makedirs(path, exist_ok=exist_ok)
1781 def listdir(self, path, detail=True, **kwargs):
1782 """Alias of `AbstractFileSystem.ls`."""
1783 return self.ls(path, detail=detail, **kwargs)
1785 def cp(self, path1, path2, **kwargs):
1786 """Alias of `AbstractFileSystem.copy`."""
1787 return self.copy(path1, path2, **kwargs)
1789 def move(self, path1, path2, **kwargs):
1790 """Alias of `AbstractFileSystem.mv`."""
1791 return self.mv(path1, path2, **kwargs)
1793 def stat(self, path, **kwargs):
1794 """Alias of `AbstractFileSystem.info`."""
1795 return self.info(path, **kwargs)
1797 def disk_usage(self, path, total=True, maxdepth=None, **kwargs):
1798 """Alias of `AbstractFileSystem.du`."""
1799 return self.du(path, total=total, maxdepth=maxdepth, **kwargs)
1801 def rename(self, path1, path2, **kwargs):
1802 """Alias of `AbstractFileSystem.mv`."""
1803 return self.mv(path1, path2, **kwargs)
1805 def delete(self, path, recursive=False, maxdepth=None):
1806 """Alias of `AbstractFileSystem.rm`."""
1807 return self.rm(path, recursive=recursive, maxdepth=maxdepth)
1809 def upload(self, lpath, rpath, recursive=False, **kwargs):
1810 """Alias of `AbstractFileSystem.put`."""
1811 return self.put(lpath, rpath, recursive=recursive, **kwargs)
1813 def download(self, rpath, lpath, recursive=False, **kwargs):
1814 """Alias of `AbstractFileSystem.get`."""
1815 return self.get(rpath, lpath, recursive=recursive, **kwargs)
1817 def sign(self, path, expiration=100, **kwargs):
1818 """Create a signed URL representing the given path
1820 Some implementations allow temporary URLs to be generated, as a
1821 way of delegating credentials.
1823 Parameters
1824 ----------
1825 path : str
1826 The path on the filesystem
1827 expiration : int
1828 Number of seconds to enable the URL for (if supported)
1830 Returns
1831 -------
1832 URL : str
1833 The signed URL
1835 Raises
1836 ------
1837 NotImplementedError : if method is not implemented for a filesystem
1838 """
1839 raise NotImplementedError("Sign is not implemented for this filesystem")
1841 def _isfilestore(self):
1842 # Originally inherited from pyarrow DaskFileSystem. Keeping this
1843 # here for backwards compatibility as long as pyarrow uses its
1844 # legacy fsspec-compatible filesystems and thus accepts fsspec
1845 # filesystems as well
1846 return False
1849class AbstractBufferedFile(io.IOBase):
1850 """Convenient class to derive from to provide buffering
1852 In the case that the backend does not provide a pythonic file-like object
1853 already, this class contains much of the logic to build one. The only
1854 methods that need to be overridden are ``_upload_chunk``,
1855 ``_initiate_upload`` and ``_fetch_range``.
1856 """
1858 DEFAULT_BLOCK_SIZE = 5 * 2**20
1859 _details = None
1861 def __init__(
1862 self,
1863 fs,
1864 path,
1865 mode="rb",
1866 block_size="default",
1867 autocommit=True,
1868 cache_type="readahead",
1869 cache_options=None,
1870 size=None,
1871 **kwargs,
1872 ):
1873 """
1874 Template for files with buffered reading and writing
1876 Parameters
1877 ----------
1878 fs: instance of FileSystem
1879 path: str
1880 location in file-system
1881 mode: str
1882 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file
1883 systems may be read-only, and some may not support append.
1884 block_size: int
1885 Buffer size for reading or writing, 'default' for class default
1886 autocommit: bool
1887 Whether to write to final destination; may only impact what
1888 happens when file is being closed.
1889 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead"
1890 Caching policy in read mode. See the definitions in ``core``.
1891 cache_options : dict
1892 Additional options passed to the constructor for the cache specified
1893 by `cache_type`.
1894 size: int
1895 If given and in read mode, suppressed having to look up the file size
1896 kwargs:
1897 Gets stored as self.kwargs
1898 """
1899 from .core import caches
1901 self.path = path
1902 self.fs = fs
1903 self.mode = mode
1904 self.blocksize = (
1905 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
1906 )
1907 self.loc = 0
1908 self.autocommit = autocommit
1909 self.end = None
1910 self.start = None
1911 self.closed = False
1913 if cache_options is None:
1914 cache_options = {}
1916 if "trim" in kwargs:
1917 warnings.warn(
1918 "Passing 'trim' to control the cache behavior has been deprecated. "
1919 "Specify it within the 'cache_options' argument instead.",
1920 FutureWarning,
1921 )
1922 cache_options["trim"] = kwargs.pop("trim")
1924 self.kwargs = kwargs
1926 if mode not in {"ab", "rb", "wb", "xb"}:
1927 raise NotImplementedError("File mode not supported")
1928 if mode == "rb":
1929 if size is not None:
1930 self.size = size
1931 else:
1932 self.size = self.details["size"]
1933 self.cache = caches[cache_type](
1934 self.blocksize, self._fetch_range, self.size, **cache_options
1935 )
1936 else:
1937 self.buffer = io.BytesIO()
1938 self.offset = None
1939 self.forced = False
1940 self.location = None
1942 @property
1943 def details(self):
1944 if self._details is None:
1945 self._details = self.fs.info(self.path)
1946 return self._details
1948 @details.setter
1949 def details(self, value):
1950 self._details = value
1951 self.size = value["size"]
1953 @property
1954 def full_name(self):
1955 return _unstrip_protocol(self.path, self.fs)
1957 @property
1958 def closed(self):
1959 # get around this attr being read-only in IOBase
1960 # use getattr here, since this can be called during del
1961 return getattr(self, "_closed", True)
1963 @closed.setter
1964 def closed(self, c):
1965 self._closed = c
1967 def __hash__(self):
1968 if "w" in self.mode:
1969 return id(self)
1970 else:
1971 return int(tokenize(self.details), 16)
1973 def __eq__(self, other):
1974 """Files are equal if they have the same checksum, only in read mode"""
1975 if self is other:
1976 return True
1977 return (
1978 isinstance(other, type(self))
1979 and self.mode == "rb"
1980 and other.mode == "rb"
1981 and hash(self) == hash(other)
1982 )
1984 def commit(self):
1985 """Move from temp to final destination"""
1987 def discard(self):
1988 """Throw away temporary file"""
1990 def info(self):
1991 """File information about this path"""
1992 if self.readable():
1993 return self.details
1994 else:
1995 raise ValueError("Info not available while writing")
1997 def tell(self):
1998 """Current file location"""
1999 return self.loc
2001 def seek(self, loc, whence=0):
2002 """Set current file location
2004 Parameters
2005 ----------
2006 loc: int
2007 byte location
2008 whence: {0, 1, 2}
2009 from start of file, current location or end of file, resp.
2010 """
2011 loc = int(loc)
2012 if not self.mode == "rb":
2013 raise OSError(ESPIPE, "Seek only available in read mode")
2014 if whence == 0:
2015 nloc = loc
2016 elif whence == 1:
2017 nloc = self.loc + loc
2018 elif whence == 2:
2019 nloc = self.size + loc
2020 else:
2021 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
2022 if nloc < 0:
2023 raise ValueError("Seek before start of file")
2024 self.loc = nloc
2025 return self.loc
2027 def write(self, data):
2028 """
2029 Write data to buffer.
2031 Buffer only sent on flush() or if buffer is greater than
2032 or equal to blocksize.
2034 Parameters
2035 ----------
2036 data: bytes
2037 Set of bytes to be written.
2038 """
2039 if not self.writable():
2040 raise ValueError("File not in write mode")
2041 if self.closed:
2042 raise ValueError("I/O operation on closed file.")
2043 if self.forced:
2044 raise ValueError("This file has been force-flushed, can only close")
2045 out = self.buffer.write(data)
2046 self.loc += out
2047 if self.buffer.tell() >= self.blocksize:
2048 self.flush()
2049 return out
2051 def flush(self, force=False):
2052 """
2053 Write buffered data to backend store.
2055 Writes the current buffer, if it is larger than the block-size, or if
2056 the file is being closed.
2058 Parameters
2059 ----------
2060 force: bool
2061 When closing, write the last block even if it is smaller than
2062 blocks are allowed to be. Disallows further writing to this file.
2063 """
2065 if self.closed:
2066 raise ValueError("Flush on closed file")
2067 if force and self.forced:
2068 raise ValueError("Force flush cannot be called more than once")
2069 if force:
2070 self.forced = True
2072 if self.readable():
2073 # no-op to flush on read-mode
2074 return
2076 if not force and self.buffer.tell() < self.blocksize:
2077 # Defer write on small block
2078 return
2080 if self.offset is None:
2081 # Initialize a multipart upload
2082 self.offset = 0
2083 try:
2084 self._initiate_upload()
2085 except:
2086 self.closed = True
2087 raise
2089 if self._upload_chunk(final=force) is not False:
2090 self.offset += self.buffer.seek(0, 2)
2091 self.buffer = io.BytesIO()
2093 def _upload_chunk(self, final=False):
2094 """Write one part of a multi-block file upload
2096 Parameters
2097 ==========
2098 final: bool
2099 This is the last block, so should complete file, if
2100 self.autocommit is True.
2101 """
2102 # may not yet have been initialized, may need to call _initialize_upload
2104 def _initiate_upload(self):
2105 """Create remote file/upload"""
2106 pass
2108 def _fetch_range(self, start, end):
2109 """Get the specified set of bytes from remote"""
2110 return self.fs.cat_file(self.path, start=start, end=end)
2112 def read(self, length=-1):
2113 """
2114 Return data from cache, or fetch pieces as necessary
2116 Parameters
2117 ----------
2118 length: int (-1)
2119 Number of bytes to read; if <0, all remaining bytes.
2120 """
2121 length = -1 if length is None else int(length)
2122 if self.mode != "rb":
2123 raise ValueError("File not in read mode")
2124 if length < 0:
2125 length = self.size - self.loc
2126 if self.closed:
2127 raise ValueError("I/O operation on closed file.")
2128 if length == 0:
2129 # don't even bother calling fetch
2130 return b""
2131 out = self.cache._fetch(self.loc, self.loc + length)
2133 logger.debug(
2134 "%s read: %i - %i %s",
2135 self,
2136 self.loc,
2137 self.loc + length,
2138 self.cache._log_stats(),
2139 )
2140 self.loc += len(out)
2141 return out
2143 def readinto(self, b):
2144 """mirrors builtin file's readinto method
2146 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
2147 """
2148 out = memoryview(b).cast("B")
2149 data = self.read(out.nbytes)
2150 out[: len(data)] = data
2151 return len(data)
2153 def readuntil(self, char=b"\n", blocks=None):
2154 """Return data between current position and first occurrence of char
2156 char is included in the output, except if the end of the tile is
2157 encountered first.
2159 Parameters
2160 ----------
2161 char: bytes
2162 Thing to find
2163 blocks: None or int
2164 How much to read in each go. Defaults to file blocksize - which may
2165 mean a new read on every call.
2166 """
2167 out = []
2168 while True:
2169 start = self.tell()
2170 part = self.read(blocks or self.blocksize)
2171 if len(part) == 0:
2172 break
2173 found = part.find(char)
2174 if found > -1:
2175 out.append(part[: found + len(char)])
2176 self.seek(start + found + len(char))
2177 break
2178 out.append(part)
2179 return b"".join(out)
2181 def readline(self):
2182 """Read until and including the first occurrence of newline character
2184 Note that, because of character encoding, this is not necessarily a
2185 true line ending.
2186 """
2187 return self.readuntil(b"\n")
2189 def __next__(self):
2190 out = self.readline()
2191 if out:
2192 return out
2193 raise StopIteration
2195 def __iter__(self):
2196 return self
2198 def readlines(self):
2199 """Return all data, split by the newline character, including the newline character"""
2200 data = self.read()
2201 lines = data.split(b"\n")
2202 out = [l + b"\n" for l in lines[:-1]]
2203 if data.endswith(b"\n"):
2204 return out
2205 else:
2206 return out + [lines[-1]]
2207 # return list(self) ???
2209 def readinto1(self, b):
2210 return self.readinto(b)
2212 def close(self):
2213 """Close file
2215 Finalizes writes, discards cache
2216 """
2217 if getattr(self, "_unclosable", False):
2218 return
2219 if self.closed:
2220 return
2221 try:
2222 if self.mode == "rb":
2223 self.cache = None
2224 else:
2225 if not getattr(self, "forced", True):
2226 self.flush(force=True)
2228 if self.fs is not None:
2229 self.fs.invalidate_cache(self.path)
2230 self.fs.invalidate_cache(self.fs._parent(self.path))
2231 finally:
2232 self.closed = True
2234 def readable(self):
2235 """Whether opened for reading"""
2236 return "r" in self.mode and not self.closed
2238 def seekable(self):
2239 """Whether is seekable (only in read mode)"""
2240 return self.readable()
2242 def writable(self):
2243 """Whether opened for writing"""
2244 return self.mode in {"wb", "ab", "xb"} and not self.closed
2246 def __reduce__(self):
2247 if self.mode != "rb":
2248 raise RuntimeError("Pickling a writeable file is not supported")
2250 return reopen, (
2251 self.fs,
2252 self.path,
2253 self.mode,
2254 self.blocksize,
2255 self.loc,
2256 self.size,
2257 self.autocommit,
2258 self.cache.name if self.cache else "none",
2259 self.kwargs,
2260 )
2262 def __del__(self):
2263 if not self.closed:
2264 self.close()
2266 def __str__(self):
2267 return f"<File-like object {type(self.fs).__name__}, {self.path}>"
2269 __repr__ = __str__
2271 def __enter__(self):
2272 return self
2274 def __exit__(self, *args):
2275 self.close()
2278def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs):
2279 file = fs.open(
2280 path,
2281 mode=mode,
2282 block_size=blocksize,
2283 autocommit=autocommit,
2284 cache_type=cache_type,
2285 size=size,
2286 **kwargs,
2287 )
2288 if loc > 0:
2289 file.seek(loc)
2290 return file