Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/spec.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import json
5import logging
6import os
7import threading
8import warnings
9import weakref
10from errno import ESPIPE
11from glob import has_magic
12from hashlib import sha256
13from typing import Any, ClassVar
15from .callbacks import DEFAULT_CALLBACK
16from .config import apply_config, conf
17from .dircache import DirCache
18from .transaction import Transaction
19from .utils import (
20 _unstrip_protocol,
21 glob_translate,
22 isfilelike,
23 other_paths,
24 read_block,
25 stringify_path,
26 tokenize,
27)
29logger = logging.getLogger("fsspec")
32def make_instance(cls, args, kwargs):
33 return cls(*args, **kwargs)
36class _Cached(type):
37 """
38 Metaclass for caching file system instances.
40 Notes
41 -----
42 Instances are cached according to
44 * The values of the class attributes listed in `_extra_tokenize_attributes`
45 * The arguments passed to ``__init__``.
47 This creates an additional reference to the filesystem, which prevents the
48 filesystem from being garbage collected when all *user* references go away.
49 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
50 be made for a filesystem instance to be garbage collected.
51 """
53 def __init__(cls, *args, **kwargs):
54 super().__init__(*args, **kwargs)
55 # Note: we intentionally create a reference here, to avoid garbage
56 # collecting instances when all other references are gone. To really
57 # delete a FileSystem, the cache must be cleared.
58 if conf.get("weakref_instance_cache"): # pragma: no cover
59 # debug option for analysing fork/spawn conditions
60 cls._cache = weakref.WeakValueDictionary()
61 else:
62 cls._cache = {}
63 cls._pid = os.getpid()
65 def __call__(cls, *args, **kwargs):
66 kwargs = apply_config(cls, kwargs)
67 extra_tokens = tuple(
68 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
69 )
70 strip_tokenize_options = {
71 k: kwargs.pop(k) for k in cls._strip_tokenize_options if k in kwargs
72 }
73 token = tokenize(
74 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
75 )
76 skip = kwargs.pop("skip_instance_cache", False)
77 if os.getpid() != cls._pid:
78 cls._cache.clear()
79 cls._pid = os.getpid()
80 if not skip and cls.cachable and token in cls._cache:
81 cls._latest = token
82 return cls._cache[token]
83 else:
84 obj = super().__call__(*args, **kwargs, **strip_tokenize_options)
85 # Setting _fs_token here causes some static linters to complain.
86 obj._fs_token_ = token
87 obj.storage_args = args
88 obj.storage_options = kwargs
89 if obj.async_impl and obj.mirror_sync_methods:
90 from .asyn import mirror_sync_methods
92 mirror_sync_methods(obj)
94 if cls.cachable and not skip:
95 cls._latest = token
96 cls._cache[token] = obj
97 return obj
100class AbstractFileSystem(metaclass=_Cached):
101 """
102 An abstract super-class for pythonic file-systems
104 Implementations are expected to be compatible with or, better, subclass
105 from here.
106 """
108 cachable = True # this class can be cached, instances reused
109 _cached = False
110 blocksize = 2**22
111 sep = "/"
112 protocol: ClassVar[str | tuple[str, ...]] = "abstract"
113 _latest = None
114 async_impl = False
115 mirror_sync_methods = False
116 root_marker = "" # For some FSs, may require leading '/' or other character
117 transaction_type = Transaction
119 #: Extra *class attributes* that should be considered when hashing.
120 _extra_tokenize_attributes = ()
121 #: *storage options* that should not be considered when hashing.
122 _strip_tokenize_options = ()
124 # Set by _Cached metaclass
125 storage_args: tuple[Any, ...]
126 storage_options: dict[str, Any]
128 def __init__(self, *args, **storage_options):
129 """Create and configure file-system instance
131 Instances may be cachable, so if similar enough arguments are seen
132 a new instance is not required. The token attribute exists to allow
133 implementations to cache instances if they wish.
135 A reasonable default should be provided if there are no arguments.
137 Subclasses should call this method.
139 Parameters
140 ----------
141 use_listings_cache, listings_expiry_time, max_paths:
142 passed to ``DirCache``, if the implementation supports
143 directory listing caching. Pass use_listings_cache=False
144 to disable such caching.
145 skip_instance_cache: bool
146 If this is a cachable implementation, pass True here to force
147 creating a new instance even if a matching instance exists, and prevent
148 storing this instance.
149 asynchronous: bool
150 loop: asyncio-compatible IOLoop or None
151 """
152 if self._cached:
153 # reusing instance, don't change
154 return
155 self._cached = True
156 self._intrans = False
157 self._transaction = None
158 self._invalidated_caches_in_transaction = []
159 self.dircache = DirCache(**storage_options)
161 if storage_options.pop("add_docs", None):
162 warnings.warn("add_docs is no longer supported.", FutureWarning)
164 if storage_options.pop("add_aliases", None):
165 warnings.warn("add_aliases has been removed.", FutureWarning)
166 # This is set in _Cached
167 self._fs_token_ = None
169 @property
170 def fsid(self):
171 """Persistent filesystem id that can be used to compare filesystems
172 across sessions.
173 """
174 raise NotImplementedError
176 @property
177 def _fs_token(self):
178 return self._fs_token_
180 def __dask_tokenize__(self):
181 return self._fs_token
183 def __hash__(self):
184 return int(self._fs_token, 16)
186 def __eq__(self, other):
187 return isinstance(other, type(self)) and self._fs_token == other._fs_token
189 def __reduce__(self):
190 return make_instance, (type(self), self.storage_args, self.storage_options)
192 @classmethod
193 def _strip_protocol(cls, path):
194 """Turn path from fully-qualified to file-system-specific
196 May require FS-specific handling, e.g., for relative paths or links.
197 """
198 if isinstance(path, list):
199 return [cls._strip_protocol(p) for p in path]
200 path = stringify_path(path)
201 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
202 for protocol in protos:
203 if path.startswith(protocol + "://"):
204 path = path[len(protocol) + 3 :]
205 elif path.startswith(protocol + "::"):
206 path = path[len(protocol) + 2 :]
207 path = path.rstrip("/")
208 # use of root_marker to make minimum required path, e.g., "/"
209 return path or cls.root_marker
211 def unstrip_protocol(self, name: str) -> str:
212 """Format FS-specific path to generic, including protocol"""
213 protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol
214 for protocol in protos:
215 if name.startswith(f"{protocol}://"):
216 return name
217 return f"{protos[0]}://{name}"
219 @staticmethod
220 def _get_kwargs_from_urls(path):
221 """If kwargs can be encoded in the paths, extract them here
223 This should happen before instantiation of the class; incoming paths
224 then should be amended to strip the options in methods.
226 Examples may look like an sftp path "sftp://user@host:/my/path", where
227 the user and host should become kwargs and later get stripped.
228 """
229 # by default, nothing happens
230 return {}
232 @classmethod
233 def current(cls):
234 """Return the most recently instantiated FileSystem
236 If no instance has been created, then create one with defaults
237 """
238 if cls._latest in cls._cache:
239 return cls._cache[cls._latest]
240 return cls()
242 @property
243 def transaction(self):
244 """A context within which files are committed together upon exit
246 Requires the file class to implement `.commit()` and `.discard()`
247 for the normal and exception cases.
248 """
249 if self._transaction is None:
250 self._transaction = self.transaction_type(self)
251 return self._transaction
253 def start_transaction(self):
254 """Begin write transaction for deferring files, non-context version"""
255 self._intrans = True
256 self._transaction = self.transaction_type(self)
257 return self.transaction
259 def end_transaction(self):
260 """Finish write transaction, non-context version"""
261 self.transaction.complete()
262 self._transaction = None
263 # The invalid cache must be cleared after the transaction is completed.
264 for path in self._invalidated_caches_in_transaction:
265 self.invalidate_cache(path)
266 self._invalidated_caches_in_transaction.clear()
268 def invalidate_cache(self, path=None):
269 """
270 Discard any cached directory information
272 Parameters
273 ----------
274 path: string or None
275 If None, clear all listings cached else listings at or under given
276 path.
277 """
278 # Not necessary to implement invalidation mechanism, may have no cache.
279 # But if have, you should call this method of parent class from your
280 # subclass to ensure expiring caches after transacations correctly.
281 # See the implementation of FTPFileSystem in ftp.py
282 if self._intrans:
283 self._invalidated_caches_in_transaction.append(path)
285 def mkdir(self, path, create_parents=True, **kwargs):
286 """
287 Create directory entry at path
289 For systems that don't have true directories, may create an for
290 this instance only and not touch the real filesystem
292 Parameters
293 ----------
294 path: str
295 location
296 create_parents: bool
297 if True, this is equivalent to ``makedirs``
298 kwargs:
299 may be permissions, etc.
300 """
301 pass # not necessary to implement, may not have directories
303 def makedirs(self, path, exist_ok=False):
304 """Recursively make directories
306 Creates directory at path and any intervening required directories.
307 Raises exception if, for instance, the path already exists but is a
308 file.
310 Parameters
311 ----------
312 path: str
313 leaf directory name
314 exist_ok: bool (False)
315 If False, will error if the target already exists
316 """
317 pass # not necessary to implement, may not have directories
319 def rmdir(self, path):
320 """Remove a directory, if empty"""
321 pass # not necessary to implement, may not have directories
323 def ls(self, path, detail=True, **kwargs):
324 """List objects at path.
326 This should include subdirectories and files at that location. The
327 difference between a file and a directory must be clear when details
328 are requested.
330 The specific keys, or perhaps a FileInfo class, or similar, is TBD,
331 but must be consistent across implementations.
332 Must include:
334 - full path to the entry (without protocol)
335 - size of the entry, in bytes. If the value cannot be determined, will
336 be ``None``.
337 - type of entry, "file", "directory" or other
339 Additional information
340 may be present, appropriate to the file-system, e.g., generation,
341 checksum, etc.
343 May use refresh=True|False to allow use of self._ls_from_cache to
344 check for a saved listing and avoid calling the backend. This would be
345 common where listing may be expensive.
347 Parameters
348 ----------
349 path: str
350 detail: bool
351 if True, gives a list of dictionaries, where each is the same as
352 the result of ``info(path)``. If False, gives a list of paths
353 (str).
354 kwargs: may have additional backend-specific options, such as version
355 information
357 Returns
358 -------
359 List of strings if detail is False, or list of directory information
360 dicts if detail is True.
361 """
362 raise NotImplementedError
364 def _ls_from_cache(self, path):
365 """Check cache for listing
367 Returns listing, if found (may be empty list for a directly that exists
368 but contains nothing), None if not in cache.
369 """
370 parent = self._parent(path)
371 try:
372 return self.dircache[path.rstrip("/")]
373 except KeyError:
374 pass
375 try:
376 files = [
377 f
378 for f in self.dircache[parent]
379 if f["name"] == path
380 or (f["name"] == path.rstrip("/") and f["type"] == "directory")
381 ]
382 if len(files) == 0:
383 # parent dir was listed but did not contain this file
384 raise FileNotFoundError(path)
385 return files
386 except KeyError:
387 pass
389 def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs):
390 """Return all files under the given path.
392 List all files, recursing into subdirectories; output is iterator-style,
393 like ``os.walk()``. For a simple list of files, ``find()`` is available.
395 When topdown is True, the caller can modify the dirnames list in-place (perhaps
396 using del or slice assignment), and walk() will
397 only recurse into the subdirectories whose names remain in dirnames;
398 this can be used to prune the search, impose a specific order of visiting,
399 or even to inform walk() about directories the caller creates or renames before
400 it resumes walk() again.
401 Modifying dirnames when topdown is False has no effect. (see os.walk)
403 Note that the "files" outputted will include anything that is not
404 a directory, such as links.
406 Parameters
407 ----------
408 path: str
409 Root to recurse into
410 maxdepth: int
411 Maximum recursion depth. None means limitless, but not recommended
412 on link-based file-systems.
413 topdown: bool (True)
414 Whether to walk the directory tree from the top downwards or from
415 the bottom upwards.
416 on_error: "omit", "raise", a callable
417 if omit (default), path with exception will simply be empty;
418 If raise, an underlying exception will be raised;
419 if callable, it will be called with a single OSError instance as argument
420 kwargs: passed to ``ls``
421 """
422 if maxdepth is not None and maxdepth < 1:
423 raise ValueError("maxdepth must be at least 1")
425 path = self._strip_protocol(path)
426 full_dirs = {}
427 dirs = {}
428 files = {}
430 detail = kwargs.pop("detail", False)
431 try:
432 listing = self.ls(path, detail=True, **kwargs)
433 except (FileNotFoundError, OSError) as e:
434 if on_error == "raise":
435 raise
436 if callable(on_error):
437 on_error(e)
438 return
440 for info in listing:
441 # each info name must be at least [path]/part , but here
442 # we check also for names like [path]/part/
443 pathname = info["name"].rstrip("/")
444 name = pathname.rsplit("/", 1)[-1]
445 if info["type"] == "directory" and pathname != path:
446 # do not include "self" path
447 full_dirs[name] = pathname
448 dirs[name] = info
449 elif pathname == path:
450 # file-like with same name as give path
451 files[""] = info
452 else:
453 files[name] = info
455 if not detail:
456 dirs = list(dirs)
457 files = list(files)
459 if topdown:
460 # Yield before recursion if walking top down
461 yield path, dirs, files
463 if maxdepth is not None:
464 maxdepth -= 1
465 if maxdepth < 1:
466 if not topdown:
467 yield path, dirs, files
468 return
470 for d in dirs:
471 yield from self.walk(
472 full_dirs[d],
473 maxdepth=maxdepth,
474 detail=detail,
475 topdown=topdown,
476 **kwargs,
477 )
479 if not topdown:
480 # Yield after recursion if walking bottom up
481 yield path, dirs, files
483 def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
484 """List all files below path.
486 Like posix ``find`` command without conditions
488 Parameters
489 ----------
490 path : str
491 maxdepth: int or None
492 If not None, the maximum number of levels to descend
493 withdirs: bool
494 Whether to include directory paths in the output. This is True
495 when used by glob, but users usually only want files.
496 kwargs are passed to ``ls``.
497 """
498 # TODO: allow equivalent of -name parameter
499 path = self._strip_protocol(path)
500 out = {}
502 # Add the root directory if withdirs is requested
503 # This is needed for posix glob compliance
504 if withdirs and path != "" and self.isdir(path):
505 out[path] = self.info(path)
507 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
508 if withdirs:
509 files.update(dirs)
510 out.update({info["name"]: info for name, info in files.items()})
511 if not out and self.isfile(path):
512 # walk works on directories, but find should also return [path]
513 # when path happens to be a file
514 out[path] = {}
515 names = sorted(out)
516 if not detail:
517 return names
518 else:
519 return {name: out[name] for name in names}
521 def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs):
522 """Space used by files and optionally directories within a path
524 Directory size does not include the size of its contents.
526 Parameters
527 ----------
528 path: str
529 total: bool
530 Whether to sum all the file sizes
531 maxdepth: int or None
532 Maximum number of directory levels to descend, None for unlimited.
533 withdirs: bool
534 Whether to include directory paths in the output.
535 kwargs: passed to ``find``
537 Returns
538 -------
539 Dict of {path: size} if total=False, or int otherwise, where numbers
540 refer to bytes used.
541 """
542 sizes = {}
543 if withdirs and self.isdir(path):
544 # Include top-level directory in output
545 info = self.info(path)
546 sizes[info["name"]] = info["size"]
547 for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs):
548 info = self.info(f)
549 sizes[info["name"]] = info["size"]
550 if total:
551 return sum(sizes.values())
552 else:
553 return sizes
555 def glob(self, path, maxdepth=None, **kwargs):
556 """Find files by glob-matching.
558 Pattern matching capabilities for finding files that match the given pattern.
560 Parameters
561 ----------
562 path: str
563 The glob pattern to match against
564 maxdepth: int or None
565 Maximum depth for ``'**'`` patterns. Applied on the first ``'**'`` found.
566 Must be at least 1 if provided.
567 kwargs:
568 Additional arguments passed to ``find`` (e.g., detail=True)
570 Returns
571 -------
572 List of matched paths, or dict of paths and their info if detail=True
574 Notes
575 -----
576 Supported patterns:
577 - '*': Matches any sequence of characters within a single directory level
578 - ``'**'``: Matches any number of directory levels (must be an entire path component)
579 - '?': Matches exactly one character
580 - '[abc]': Matches any character in the set
581 - '[a-z]': Matches any character in the range
582 - '[!abc]': Matches any character NOT in the set
584 Special behaviors:
585 - If the path ends with '/', only folders are returned
586 - Consecutive '*' characters are compressed into a single '*'
587 - Empty brackets '[]' never match anything
588 - Negated empty brackets '[!]' match any single character
589 - Special characters in character classes are escaped properly
591 Limitations:
592 - ``'**'`` must be a complete path component (e.g., ``'a/**/b'``, not ``'a**b'``)
593 - No brace expansion ('{a,b}.txt')
594 - No extended glob patterns ('+(pattern)', '!(pattern)')
595 """
596 if maxdepth is not None and maxdepth < 1:
597 raise ValueError("maxdepth must be at least 1")
599 import re
601 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
602 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
603 path = self._strip_protocol(path)
604 append_slash_to_dirname = ends_with_sep or path.endswith(
605 tuple(sep + "**" for sep in seps)
606 )
607 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
608 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
609 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
611 min_idx = min(idx_star, idx_qmark, idx_brace)
613 detail = kwargs.pop("detail", False)
614 withdirs = kwargs.pop("withdirs", True)
616 if not has_magic(path):
617 if self.exists(path, **kwargs):
618 if not detail:
619 return [path]
620 else:
621 return {path: self.info(path, **kwargs)}
622 else:
623 if not detail:
624 return [] # glob of non-existent returns empty
625 else:
626 return {}
627 elif "/" in path[:min_idx]:
628 min_idx = path[:min_idx].rindex("/")
629 root = path[: min_idx + 1]
630 depth = path[min_idx + 1 :].count("/") + 1
631 else:
632 root = ""
633 depth = path[min_idx + 1 :].count("/") + 1
635 if "**" in path:
636 if maxdepth is not None:
637 idx_double_stars = path.find("**")
638 depth_double_stars = path[idx_double_stars:].count("/") + 1
639 depth = depth - depth_double_stars + maxdepth
640 else:
641 depth = None
643 allpaths = self.find(
644 root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs
645 )
647 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
648 pattern = re.compile(pattern)
650 out = {
651 p: info
652 for p, info in sorted(allpaths.items())
653 if pattern.match(
654 p + "/"
655 if append_slash_to_dirname and info["type"] == "directory"
656 else p
657 )
658 }
660 if detail:
661 return out
662 else:
663 return list(out)
665 def exists(self, path, **kwargs):
666 """Is there a file at the given path"""
667 try:
668 self.info(path, **kwargs)
669 return True
670 except: # noqa: E722
671 # any exception allowed bar FileNotFoundError?
672 return False
674 def lexists(self, path, **kwargs):
675 """If there is a file at the given path (including
676 broken links)"""
677 return self.exists(path)
679 def info(self, path, **kwargs):
680 """Give details of entry at path
682 Returns a single dictionary, with exactly the same information as ``ls``
683 would with ``detail=True``.
685 The default implementation calls ls and could be overridden by a
686 shortcut. kwargs are passed on to ```ls()``.
688 Some file systems might not be able to measure the file's size, in
689 which case, the returned dict will include ``'size': None``.
691 Returns
692 -------
693 dict with keys: name (full path in the FS), size (in bytes), type (file,
694 directory, or something else) and other FS-specific keys.
695 """
696 path = self._strip_protocol(path)
697 out = self.ls(self._parent(path), detail=True, **kwargs)
698 out = [o for o in out if o["name"].rstrip("/") == path]
699 if out:
700 return out[0]
701 out = self.ls(path, detail=True, **kwargs)
702 path = path.rstrip("/")
703 out1 = [o for o in out if o["name"].rstrip("/") == path]
704 if len(out1) == 1:
705 if "size" not in out1[0]:
706 out1[0]["size"] = None
707 return out1[0]
708 elif len(out1) > 1 or out:
709 return {"name": path, "size": 0, "type": "directory"}
710 else:
711 raise FileNotFoundError(path)
713 def checksum(self, path):
714 """Unique value for current version of file
716 If the checksum is the same from one moment to another, the contents
717 are guaranteed to be the same. If the checksum changes, the contents
718 *might* have changed.
720 This should normally be overridden; default will probably capture
721 creation/modification timestamp (which would be good) or maybe
722 access timestamp (which would be bad)
723 """
724 return int(tokenize(self.info(path)), 16)
726 def size(self, path):
727 """Size in bytes of file"""
728 return self.info(path).get("size", None)
730 def sizes(self, paths):
731 """Size in bytes of each file in a list of paths"""
732 return [self.size(p) for p in paths]
734 def isdir(self, path):
735 """Is this entry directory-like?"""
736 try:
737 return self.info(path)["type"] == "directory"
738 except OSError:
739 return False
741 def isfile(self, path):
742 """Is this entry file-like?"""
743 try:
744 return self.info(path)["type"] == "file"
745 except: # noqa: E722
746 return False
748 def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs):
749 """Get the contents of the file as a string.
751 Parameters
752 ----------
753 path: str
754 URL of file on this filesystems
755 encoding, errors, newline: same as `open`.
756 """
757 with self.open(
758 path,
759 mode="r",
760 encoding=encoding,
761 errors=errors,
762 newline=newline,
763 **kwargs,
764 ) as f:
765 return f.read()
767 def write_text(
768 self, path, value, encoding=None, errors=None, newline=None, **kwargs
769 ):
770 """Write the text to the given file.
772 An existing file will be overwritten.
774 Parameters
775 ----------
776 path: str
777 URL of file on this filesystems
778 value: str
779 Text to write.
780 encoding, errors, newline: same as `open`.
781 """
782 with self.open(
783 path,
784 mode="w",
785 encoding=encoding,
786 errors=errors,
787 newline=newline,
788 **kwargs,
789 ) as f:
790 return f.write(value)
792 def cat_file(self, path, start=None, end=None, **kwargs):
793 """Get the content of a file
795 Parameters
796 ----------
797 path: URL of file on this filesystems
798 start, end: int
799 Bytes limits of the read. If negative, backwards from end,
800 like usual python slices. Either can be None for start or
801 end of file, respectively
802 kwargs: passed to ``open()``.
803 """
804 # explicitly set buffering off?
805 with self.open(path, "rb", **kwargs) as f:
806 if start is not None:
807 if start >= 0:
808 f.seek(start)
809 else:
810 f.seek(max(0, f.size + start))
811 if end is not None:
812 if end < 0:
813 end = f.size + end
814 return f.read(end - f.tell())
815 return f.read()
817 def pipe_file(self, path, value, mode="overwrite", **kwargs):
818 """Set the bytes of given file"""
819 if mode == "create" and self.exists(path):
820 # non-atomic but simple way; or could use "xb" in open(), which is likely
821 # not as well supported
822 raise FileExistsError
823 with self.open(path, "wb", **kwargs) as f:
824 f.write(value)
826 def pipe(self, path, value=None, **kwargs):
827 """Put value into path
829 (counterpart to ``cat``)
831 Parameters
832 ----------
833 path: string or dict(str, bytes)
834 If a string, a single remote location to put ``value`` bytes; if a dict,
835 a mapping of {path: bytesvalue}.
836 value: bytes, optional
837 If using a single path, these are the bytes to put there. Ignored if
838 ``path`` is a dict
839 """
840 if isinstance(path, str):
841 self.pipe_file(self._strip_protocol(path), value, **kwargs)
842 elif isinstance(path, dict):
843 for k, v in path.items():
844 self.pipe_file(self._strip_protocol(k), v, **kwargs)
845 else:
846 raise ValueError("path must be str or dict")
848 def cat_ranges(
849 self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
850 ):
851 """Get the contents of byte ranges from one or more files
853 Parameters
854 ----------
855 paths: list
856 A list of of filepaths on this filesystems
857 starts, ends: int or list
858 Bytes limits of the read. If using a single int, the same value will be
859 used to read all the specified files.
860 """
861 if max_gap is not None:
862 raise NotImplementedError
863 if not isinstance(paths, list):
864 raise TypeError
865 if not isinstance(starts, list):
866 starts = [starts] * len(paths)
867 if not isinstance(ends, list):
868 ends = [ends] * len(paths)
869 if len(starts) != len(paths) or len(ends) != len(paths):
870 raise ValueError
871 out = []
872 for p, s, e in zip(paths, starts, ends):
873 try:
874 out.append(self.cat_file(p, s, e))
875 except Exception as e:
876 if on_error == "return":
877 out.append(e)
878 else:
879 raise
880 return out
882 def cat(self, path, recursive=False, on_error="raise", **kwargs):
883 """Fetch (potentially multiple) paths' contents
885 Parameters
886 ----------
887 recursive: bool
888 If True, assume the path(s) are directories, and get all the
889 contained files
890 on_error : "raise", "omit", "return"
891 If raise, an underlying exception will be raised (converted to KeyError
892 if the type is in self.missing_exceptions); if omit, keys with exception
893 will simply not be included in the output; if "return", all keys are
894 included in the output, but the value will be bytes or an exception
895 instance.
896 kwargs: passed to cat_file
898 Returns
899 -------
900 dict of {path: contents} if there are multiple paths
901 or the path has been otherwise expanded
902 """
903 paths = self.expand_path(path, recursive=recursive, **kwargs)
904 if (
905 len(paths) > 1
906 or isinstance(path, list)
907 or paths[0] != self._strip_protocol(path)
908 ):
909 out = {}
910 for path in paths:
911 try:
912 out[path] = self.cat_file(path, **kwargs)
913 except Exception as e:
914 if on_error == "raise":
915 raise
916 if on_error == "return":
917 out[path] = e
918 return out
919 else:
920 return self.cat_file(paths[0], **kwargs)
922 def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, outfile=None, **kwargs):
923 """Copy single remote file to local"""
924 from .implementations.local import LocalFileSystem
926 if isfilelike(lpath):
927 outfile = lpath
928 elif self.isdir(rpath):
929 os.makedirs(lpath, exist_ok=True)
930 return None
932 fs = LocalFileSystem(auto_mkdir=True)
933 fs.makedirs(fs._parent(lpath), exist_ok=True)
935 with self.open(rpath, "rb", **kwargs) as f1:
936 if outfile is None:
937 outfile = open(lpath, "wb")
939 try:
940 callback.set_size(getattr(f1, "size", None))
941 data = True
942 while data:
943 data = f1.read(self.blocksize)
944 segment_len = outfile.write(data)
945 if segment_len is None:
946 segment_len = len(data)
947 callback.relative_update(segment_len)
948 finally:
949 if not isfilelike(lpath):
950 outfile.close()
952 def get(
953 self,
954 rpath,
955 lpath,
956 recursive=False,
957 callback=DEFAULT_CALLBACK,
958 maxdepth=None,
959 **kwargs,
960 ):
961 """Copy file(s) to local.
963 Copies a specific file or tree of files (if recursive=True). If lpath
964 ends with a "/", it will be assumed to be a directory, and target files
965 will go within. Can submit a list of paths, which may be glob-patterns
966 and will be expanded.
968 Calls get_file for each source.
969 """
970 if isinstance(lpath, list) and isinstance(rpath, list):
971 # No need to expand paths when both source and destination
972 # are provided as lists
973 rpaths = rpath
974 lpaths = lpath
975 else:
976 from .implementations.local import (
977 LocalFileSystem,
978 make_path_posix,
979 trailing_sep,
980 )
982 source_is_str = isinstance(rpath, str)
983 rpaths = self.expand_path(
984 rpath, recursive=recursive, maxdepth=maxdepth, **kwargs
985 )
986 if source_is_str and (not recursive or maxdepth is not None):
987 # Non-recursive glob does not copy directories
988 rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))]
989 if not rpaths:
990 return
992 if isinstance(lpath, str):
993 lpath = make_path_posix(lpath)
995 source_is_file = len(rpaths) == 1
996 dest_is_dir = isinstance(lpath, str) and (
997 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
998 )
1000 exists = source_is_str and (
1001 (has_magic(rpath) and source_is_file)
1002 or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath))
1003 )
1004 lpaths = other_paths(
1005 rpaths,
1006 lpath,
1007 exists=exists,
1008 flatten=not source_is_str,
1009 )
1011 callback.set_size(len(lpaths))
1012 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1013 with callback.branched(rpath, lpath) as child:
1014 self.get_file(rpath, lpath, callback=child, **kwargs)
1016 def put_file(
1017 self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
1018 ):
1019 """Copy single file to remote"""
1020 if mode == "create" and self.exists(rpath):
1021 raise FileExistsError
1022 if os.path.isdir(lpath):
1023 self.makedirs(rpath, exist_ok=True)
1024 return None
1026 with open(lpath, "rb") as f1:
1027 size = f1.seek(0, 2)
1028 callback.set_size(size)
1029 f1.seek(0)
1031 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True)
1032 with self.open(rpath, "wb", **kwargs) as f2:
1033 while f1.tell() < size:
1034 data = f1.read(self.blocksize)
1035 segment_len = f2.write(data)
1036 if segment_len is None:
1037 segment_len = len(data)
1038 callback.relative_update(segment_len)
1040 def put(
1041 self,
1042 lpath,
1043 rpath,
1044 recursive=False,
1045 callback=DEFAULT_CALLBACK,
1046 maxdepth=None,
1047 **kwargs,
1048 ):
1049 """Copy file(s) from local.
1051 Copies a specific file or tree of files (if recursive=True). If rpath
1052 ends with a "/", it will be assumed to be a directory, and target files
1053 will go within.
1055 Calls put_file for each source.
1056 """
1057 if isinstance(lpath, list) and isinstance(rpath, list):
1058 # No need to expand paths when both source and destination
1059 # are provided as lists
1060 rpaths = rpath
1061 lpaths = lpath
1062 else:
1063 from .implementations.local import (
1064 LocalFileSystem,
1065 make_path_posix,
1066 trailing_sep,
1067 )
1069 source_is_str = isinstance(lpath, str)
1070 if source_is_str:
1071 lpath = make_path_posix(lpath)
1072 fs = LocalFileSystem()
1073 lpaths = fs.expand_path(
1074 lpath, recursive=recursive, maxdepth=maxdepth, **kwargs
1075 )
1076 if source_is_str and (not recursive or maxdepth is not None):
1077 # Non-recursive glob does not copy directories
1078 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
1079 if not lpaths:
1080 return
1082 source_is_file = len(lpaths) == 1
1083 dest_is_dir = isinstance(rpath, str) and (
1084 trailing_sep(rpath) or self.isdir(rpath)
1085 )
1087 rpath = (
1088 self._strip_protocol(rpath)
1089 if isinstance(rpath, str)
1090 else [self._strip_protocol(p) for p in rpath]
1091 )
1092 exists = source_is_str and (
1093 (has_magic(lpath) and source_is_file)
1094 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
1095 )
1096 rpaths = other_paths(
1097 lpaths,
1098 rpath,
1099 exists=exists,
1100 flatten=not source_is_str,
1101 )
1103 callback.set_size(len(rpaths))
1104 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
1105 with callback.branched(lpath, rpath) as child:
1106 self.put_file(lpath, rpath, callback=child, **kwargs)
1108 def head(self, path, size=1024):
1109 """Get the first ``size`` bytes from file"""
1110 with self.open(path, "rb") as f:
1111 return f.read(size)
1113 def tail(self, path, size=1024):
1114 """Get the last ``size`` bytes from file"""
1115 with self.open(path, "rb") as f:
1116 f.seek(max(-size, -f.size), 2)
1117 return f.read()
1119 def cp_file(self, path1, path2, **kwargs):
1120 raise NotImplementedError
1122 def copy(
1123 self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs
1124 ):
1125 """Copy within two locations in the filesystem
1127 on_error : "raise", "ignore"
1128 If raise, any not-found exceptions will be raised; if ignore any
1129 not-found exceptions will cause the path to be skipped; defaults to
1130 raise unless recursive is true, where the default is ignore
1131 """
1132 if on_error is None and recursive:
1133 on_error = "ignore"
1134 elif on_error is None:
1135 on_error = "raise"
1137 if isinstance(path1, list) and isinstance(path2, list):
1138 # No need to expand paths when both source and destination
1139 # are provided as lists
1140 paths1 = path1
1141 paths2 = path2
1142 else:
1143 from .implementations.local import trailing_sep
1145 source_is_str = isinstance(path1, str)
1146 paths1 = self.expand_path(
1147 path1, recursive=recursive, maxdepth=maxdepth, **kwargs
1148 )
1149 if source_is_str and (not recursive or maxdepth is not None):
1150 # Non-recursive glob does not copy directories
1151 paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))]
1152 if not paths1:
1153 return
1155 source_is_file = len(paths1) == 1
1156 dest_is_dir = isinstance(path2, str) and (
1157 trailing_sep(path2) or self.isdir(path2)
1158 )
1160 exists = source_is_str and (
1161 (has_magic(path1) and source_is_file)
1162 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
1163 )
1164 paths2 = other_paths(
1165 paths1,
1166 path2,
1167 exists=exists,
1168 flatten=not source_is_str,
1169 )
1171 for p1, p2 in zip(paths1, paths2):
1172 try:
1173 self.cp_file(p1, p2, **kwargs)
1174 except FileNotFoundError:
1175 if on_error == "raise":
1176 raise
1178 def expand_path(self, path, recursive=False, maxdepth=None, **kwargs):
1179 """Turn one or more globs or directories into a list of all matching paths
1180 to files or directories.
1182 kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls``
1183 """
1185 if maxdepth is not None and maxdepth < 1:
1186 raise ValueError("maxdepth must be at least 1")
1188 if isinstance(path, (str, os.PathLike)):
1189 out = self.expand_path([path], recursive, maxdepth, **kwargs)
1190 else:
1191 out = set()
1192 path = [self._strip_protocol(p) for p in path]
1193 for p in path:
1194 if has_magic(p):
1195 bit = set(self.glob(p, maxdepth=maxdepth, **kwargs))
1196 out |= bit
1197 if recursive:
1198 # glob call above expanded one depth so if maxdepth is defined
1199 # then decrement it in expand_path call below. If it is zero
1200 # after decrementing then avoid expand_path call.
1201 if maxdepth is not None and maxdepth <= 1:
1202 continue
1203 out |= set(
1204 self.expand_path(
1205 list(bit),
1206 recursive=recursive,
1207 maxdepth=maxdepth - 1 if maxdepth is not None else None,
1208 **kwargs,
1209 )
1210 )
1211 continue
1212 elif recursive:
1213 rec = set(
1214 self.find(
1215 p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs
1216 )
1217 )
1218 out |= rec
1219 if p not in out and (recursive is False or self.exists(p)):
1220 # should only check once, for the root
1221 out.add(p)
1222 if not out:
1223 raise FileNotFoundError(path)
1224 return sorted(out)
1226 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
1227 """Move file(s) from one location to another"""
1228 if path1 == path2:
1229 logger.debug("%s mv: The paths are the same, so no files were moved.", self)
1230 else:
1231 # explicitly raise exception to prevent data corruption
1232 self.copy(
1233 path1, path2, recursive=recursive, maxdepth=maxdepth, onerror="raise"
1234 )
1235 self.rm(path1, recursive=recursive)
1237 def rm_file(self, path):
1238 """Delete a file"""
1239 self._rm(path)
1241 def _rm(self, path):
1242 """Delete one file"""
1243 # this is the old name for the method, prefer rm_file
1244 raise NotImplementedError
1246 def rm(self, path, recursive=False, maxdepth=None):
1247 """Delete files.
1249 Parameters
1250 ----------
1251 path: str or list of str
1252 File(s) to delete.
1253 recursive: bool
1254 If file(s) are directories, recursively delete contents and then
1255 also remove the directory
1256 maxdepth: int or None
1257 Depth to pass to walk for finding files to delete, if recursive.
1258 If None, there will be no limit and infinite recursion may be
1259 possible.
1260 """
1261 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
1262 for p in reversed(path):
1263 self.rm_file(p)
1265 @classmethod
1266 def _parent(cls, path):
1267 path = cls._strip_protocol(path)
1268 if "/" in path:
1269 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
1270 return cls.root_marker + parent
1271 else:
1272 return cls.root_marker
1274 def _open(
1275 self,
1276 path,
1277 mode="rb",
1278 block_size=None,
1279 autocommit=True,
1280 cache_options=None,
1281 **kwargs,
1282 ):
1283 """Return raw bytes-mode file-like from the file-system"""
1284 return AbstractBufferedFile(
1285 self,
1286 path,
1287 mode,
1288 block_size,
1289 autocommit,
1290 cache_options=cache_options,
1291 **kwargs,
1292 )
1294 def open(
1295 self,
1296 path,
1297 mode="rb",
1298 block_size=None,
1299 cache_options=None,
1300 compression=None,
1301 **kwargs,
1302 ):
1303 """
1304 Return a file-like object from the filesystem
1306 The resultant instance must function correctly in a context ``with``
1307 block.
1309 Parameters
1310 ----------
1311 path: str
1312 Target file
1313 mode: str like 'rb', 'w'
1314 See builtin ``open()``
1315 Mode "x" (exclusive write) may be implemented by the backend. Even if
1316 it is, whether it is checked up front or on commit, and whether it is
1317 atomic is implementation-dependent.
1318 block_size: int
1319 Some indication of buffering - this is a value in bytes
1320 cache_options : dict, optional
1321 Extra arguments to pass through to the cache.
1322 compression: string or None
1323 If given, open file using compression codec. Can either be a compression
1324 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
1325 compression from the filename suffix.
1326 encoding, errors, newline: passed on to TextIOWrapper for text mode
1327 """
1328 import io
1330 path = self._strip_protocol(path)
1331 if "b" not in mode:
1332 mode = mode.replace("t", "") + "b"
1334 text_kwargs = {
1335 k: kwargs.pop(k)
1336 for k in ["encoding", "errors", "newline"]
1337 if k in kwargs
1338 }
1339 return io.TextIOWrapper(
1340 self.open(
1341 path,
1342 mode,
1343 block_size=block_size,
1344 cache_options=cache_options,
1345 compression=compression,
1346 **kwargs,
1347 ),
1348 **text_kwargs,
1349 )
1350 else:
1351 ac = kwargs.pop("autocommit", not self._intrans)
1352 f = self._open(
1353 path,
1354 mode=mode,
1355 block_size=block_size,
1356 autocommit=ac,
1357 cache_options=cache_options,
1358 **kwargs,
1359 )
1360 if compression is not None:
1361 from fsspec.compression import compr
1362 from fsspec.core import get_compression
1364 compression = get_compression(path, compression)
1365 compress = compr[compression]
1366 f = compress(f, mode=mode[0])
1368 if not ac and "r" not in mode:
1369 self.transaction.files.append(f)
1370 return f
1372 def touch(self, path, truncate=True, **kwargs):
1373 """Create empty file, or update timestamp
1375 Parameters
1376 ----------
1377 path: str
1378 file location
1379 truncate: bool
1380 If True, always set file size to 0; if False, update timestamp and
1381 leave file unchanged, if backend allows this
1382 """
1383 if truncate or not self.exists(path):
1384 with self.open(path, "wb", **kwargs):
1385 pass
1386 else:
1387 raise NotImplementedError # update timestamp, if possible
1389 def ukey(self, path):
1390 """Hash of file properties, to tell if it has changed"""
1391 return sha256(str(self.info(path)).encode()).hexdigest()
1393 def read_block(self, fn, offset, length, delimiter=None):
1394 """Read a block of bytes from
1396 Starting at ``offset`` of the file, read ``length`` bytes. If
1397 ``delimiter`` is set then we ensure that the read starts and stops at
1398 delimiter boundaries that follow the locations ``offset`` and ``offset
1399 + length``. If ``offset`` is zero then we start at zero. The
1400 bytestring returned WILL include the end delimiter string.
1402 If offset+length is beyond the eof, reads to eof.
1404 Parameters
1405 ----------
1406 fn: string
1407 Path to filename
1408 offset: int
1409 Byte offset to start read
1410 length: int
1411 Number of bytes to read. If None, read to end.
1412 delimiter: bytes (optional)
1413 Ensure reading starts and stops at delimiter bytestring
1415 Examples
1416 --------
1417 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP
1418 b'Alice, 100\\nBo'
1419 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP
1420 b'Alice, 100\\nBob, 200\\n'
1422 Use ``length=None`` to read to the end of the file.
1423 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP
1424 b'Alice, 100\\nBob, 200\\nCharlie, 300'
1426 See Also
1427 --------
1428 :func:`fsspec.utils.read_block`
1429 """
1430 with self.open(fn, "rb") as f:
1431 size = f.size
1432 if length is None:
1433 length = size
1434 if size is not None and offset + length > size:
1435 length = size - offset
1436 return read_block(f, offset, length, delimiter)
1438 def to_json(self, *, include_password: bool = True) -> str:
1439 """
1440 JSON representation of this filesystem instance.
1442 Parameters
1443 ----------
1444 include_password: bool, default True
1445 Whether to include the password (if any) in the output.
1447 Returns
1448 -------
1449 JSON string with keys ``cls`` (the python location of this class),
1450 protocol (text name of this class's protocol, first one in case of
1451 multiple), ``args`` (positional args, usually empty), and all other
1452 keyword arguments as their own keys.
1454 Warnings
1455 --------
1456 Serialized filesystems may contain sensitive information which have been
1457 passed to the constructor, such as passwords and tokens. Make sure you
1458 store and send them in a secure environment!
1459 """
1460 from .json import FilesystemJSONEncoder
1462 return json.dumps(
1463 self,
1464 cls=type(
1465 "_FilesystemJSONEncoder",
1466 (FilesystemJSONEncoder,),
1467 {"include_password": include_password},
1468 ),
1469 )
1471 @staticmethod
1472 def from_json(blob: str) -> AbstractFileSystem:
1473 """
1474 Recreate a filesystem instance from JSON representation.
1476 See ``.to_json()`` for the expected structure of the input.
1478 Parameters
1479 ----------
1480 blob: str
1482 Returns
1483 -------
1484 file system instance, not necessarily of this particular class.
1486 Warnings
1487 --------
1488 This can import arbitrary modules (as determined by the ``cls`` key).
1489 Make sure you haven't installed any modules that may execute malicious code
1490 at import time.
1491 """
1492 from .json import FilesystemJSONDecoder
1494 return json.loads(blob, cls=FilesystemJSONDecoder)
1496 def to_dict(self, *, include_password: bool = True) -> dict[str, Any]:
1497 """
1498 JSON-serializable dictionary representation of this filesystem instance.
1500 Parameters
1501 ----------
1502 include_password: bool, default True
1503 Whether to include the password (if any) in the output.
1505 Returns
1506 -------
1507 Dictionary with keys ``cls`` (the python location of this class),
1508 protocol (text name of this class's protocol, first one in case of
1509 multiple), ``args`` (positional args, usually empty), and all other
1510 keyword arguments as their own keys.
1512 Warnings
1513 --------
1514 Serialized filesystems may contain sensitive information which have been
1515 passed to the constructor, such as passwords and tokens. Make sure you
1516 store and send them in a secure environment!
1517 """
1518 from .json import FilesystemJSONEncoder
1520 json_encoder = FilesystemJSONEncoder()
1522 cls = type(self)
1523 proto = self.protocol
1525 storage_options = dict(self.storage_options)
1526 if not include_password:
1527 storage_options.pop("password", None)
1529 return dict(
1530 cls=f"{cls.__module__}:{cls.__name__}",
1531 protocol=proto[0] if isinstance(proto, (tuple, list)) else proto,
1532 args=json_encoder.make_serializable(self.storage_args),
1533 **json_encoder.make_serializable(storage_options),
1534 )
1536 @staticmethod
1537 def from_dict(dct: dict[str, Any]) -> AbstractFileSystem:
1538 """
1539 Recreate a filesystem instance from dictionary representation.
1541 See ``.to_dict()`` for the expected structure of the input.
1543 Parameters
1544 ----------
1545 dct: Dict[str, Any]
1547 Returns
1548 -------
1549 file system instance, not necessarily of this particular class.
1551 Warnings
1552 --------
1553 This can import arbitrary modules (as determined by the ``cls`` key).
1554 Make sure you haven't installed any modules that may execute malicious code
1555 at import time.
1556 """
1557 from .json import FilesystemJSONDecoder
1559 json_decoder = FilesystemJSONDecoder()
1561 dct = dict(dct) # Defensive copy
1563 cls = FilesystemJSONDecoder.try_resolve_fs_cls(dct)
1564 if cls is None:
1565 raise ValueError("Not a serialized AbstractFileSystem")
1567 dct.pop("cls", None)
1568 dct.pop("protocol", None)
1570 return cls(
1571 *json_decoder.unmake_serializable(dct.pop("args", ())),
1572 **json_decoder.unmake_serializable(dct),
1573 )
1575 def _get_pyarrow_filesystem(self):
1576 """
1577 Make a version of the FS instance which will be acceptable to pyarrow
1578 """
1579 # all instances already also derive from pyarrow
1580 return self
1582 def get_mapper(self, root="", check=False, create=False, missing_exceptions=None):
1583 """Create key/value store based on this file-system
1585 Makes a MutableMapping interface to the FS at the given root path.
1586 See ``fsspec.mapping.FSMap`` for further details.
1587 """
1588 from .mapping import FSMap
1590 return FSMap(
1591 root,
1592 self,
1593 check=check,
1594 create=create,
1595 missing_exceptions=missing_exceptions,
1596 )
1598 @classmethod
1599 def clear_instance_cache(cls):
1600 """
1601 Clear the cache of filesystem instances.
1603 Notes
1604 -----
1605 Unless overridden by setting the ``cachable`` class attribute to False,
1606 the filesystem class stores a reference to newly created instances. This
1607 prevents Python's normal rules around garbage collection from working,
1608 since the instances refcount will not drop to zero until
1609 ``clear_instance_cache`` is called.
1610 """
1611 cls._cache.clear()
1613 def created(self, path):
1614 """Return the created timestamp of a file as a datetime.datetime"""
1615 raise NotImplementedError
1617 def modified(self, path):
1618 """Return the modified timestamp of a file as a datetime.datetime"""
1619 raise NotImplementedError
1621 def tree(
1622 self,
1623 path: str = "/",
1624 recursion_limit: int = 2,
1625 max_display: int = 25,
1626 display_size: bool = False,
1627 prefix: str = "",
1628 is_last: bool = True,
1629 first: bool = True,
1630 indent_size: int = 4,
1631 ) -> str:
1632 """
1633 Return a tree-like structure of the filesystem starting from the given path as a string.
1635 Parameters
1636 ----------
1637 path: Root path to start traversal from
1638 recursion_limit: Maximum depth of directory traversal
1639 max_display: Maximum number of items to display per directory
1640 display_size: Whether to display file sizes
1641 prefix: Current line prefix for visual tree structure
1642 is_last: Whether current item is last in its level
1643 first: Whether this is the first call (displays root path)
1644 indent_size: Number of spaces by indent
1646 Returns
1647 -------
1648 str: A string representing the tree structure.
1650 Example
1651 -------
1652 >>> from fsspec import filesystem
1654 >>> fs = filesystem('ftp', host='test.rebex.net', user='demo', password='password')
1655 >>> tree = fs.tree(display_size=True, recursion_limit=3, indent_size=8, max_display=10)
1656 >>> print(tree)
1657 """
1659 def format_bytes(n: int) -> str:
1660 """Format bytes as text."""
1661 for prefix, k in (
1662 ("P", 2**50),
1663 ("T", 2**40),
1664 ("G", 2**30),
1665 ("M", 2**20),
1666 ("k", 2**10),
1667 ):
1668 if n >= 0.9 * k:
1669 return f"{n / k:.2f} {prefix}b"
1670 return f"{n}B"
1672 result = []
1674 if first:
1675 result.append(path)
1677 if recursion_limit:
1678 indent = " " * indent_size
1679 contents = self.ls(path, detail=True)
1680 contents.sort(
1681 key=lambda x: (x.get("type") != "directory", x.get("name", ""))
1682 )
1684 if max_display is not None and len(contents) > max_display:
1685 displayed_contents = contents[:max_display]
1686 remaining_count = len(contents) - max_display
1687 else:
1688 displayed_contents = contents
1689 remaining_count = 0
1691 for i, item in enumerate(displayed_contents):
1692 is_last_item = (i == len(displayed_contents) - 1) and (
1693 remaining_count == 0
1694 )
1696 branch = (
1697 "└" + ("─" * (indent_size - 2))
1698 if is_last_item
1699 else "├" + ("─" * (indent_size - 2))
1700 )
1701 branch += " "
1702 new_prefix = prefix + (
1703 indent if is_last_item else "│" + " " * (indent_size - 1)
1704 )
1706 name = os.path.basename(item.get("name", ""))
1708 if display_size and item.get("type") == "directory":
1709 sub_contents = self.ls(item.get("name", ""), detail=True)
1710 num_files = sum(
1711 1 for sub_item in sub_contents if sub_item.get("type") == "file"
1712 )
1713 num_folders = sum(
1714 1
1715 for sub_item in sub_contents
1716 if sub_item.get("type") == "directory"
1717 )
1719 if num_files == 0 and num_folders == 0:
1720 size = " (empty folder)"
1721 elif num_files == 0:
1722 size = f" ({num_folders} subfolder{'s' if num_folders > 1 else ''})"
1723 elif num_folders == 0:
1724 size = f" ({num_files} file{'s' if num_files > 1 else ''})"
1725 else:
1726 size = f" ({num_files} file{'s' if num_files > 1 else ''}, {num_folders} subfolder{'s' if num_folders > 1 else ''})"
1727 elif display_size and item.get("type") == "file":
1728 size = f" ({format_bytes(item.get('size', 0))})"
1729 else:
1730 size = ""
1732 result.append(f"{prefix}{branch}{name}{size}")
1734 if item.get("type") == "directory" and recursion_limit > 0:
1735 result.append(
1736 self.tree(
1737 path=item.get("name", ""),
1738 recursion_limit=recursion_limit - 1,
1739 max_display=max_display,
1740 display_size=display_size,
1741 prefix=new_prefix,
1742 is_last=is_last_item,
1743 first=False,
1744 indent_size=indent_size,
1745 )
1746 )
1748 if remaining_count > 0:
1749 more_message = f"{remaining_count} more item(s) not displayed."
1750 result.append(
1751 f"{prefix}{'└' + ('─' * (indent_size - 2))} {more_message}"
1752 )
1754 return "\n".join(_ for _ in result if _)
1756 # ------------------------------------------------------------------------
1757 # Aliases
1759 def read_bytes(self, path, start=None, end=None, **kwargs):
1760 """Alias of `AbstractFileSystem.cat_file`."""
1761 return self.cat_file(path, start=start, end=end, **kwargs)
1763 def write_bytes(self, path, value, **kwargs):
1764 """Alias of `AbstractFileSystem.pipe_file`."""
1765 self.pipe_file(path, value, **kwargs)
1767 def makedir(self, path, create_parents=True, **kwargs):
1768 """Alias of `AbstractFileSystem.mkdir`."""
1769 return self.mkdir(path, create_parents=create_parents, **kwargs)
1771 def mkdirs(self, path, exist_ok=False):
1772 """Alias of `AbstractFileSystem.makedirs`."""
1773 return self.makedirs(path, exist_ok=exist_ok)
1775 def listdir(self, path, detail=True, **kwargs):
1776 """Alias of `AbstractFileSystem.ls`."""
1777 return self.ls(path, detail=detail, **kwargs)
1779 def cp(self, path1, path2, **kwargs):
1780 """Alias of `AbstractFileSystem.copy`."""
1781 return self.copy(path1, path2, **kwargs)
1783 def move(self, path1, path2, **kwargs):
1784 """Alias of `AbstractFileSystem.mv`."""
1785 return self.mv(path1, path2, **kwargs)
1787 def stat(self, path, **kwargs):
1788 """Alias of `AbstractFileSystem.info`."""
1789 return self.info(path, **kwargs)
1791 def disk_usage(self, path, total=True, maxdepth=None, **kwargs):
1792 """Alias of `AbstractFileSystem.du`."""
1793 return self.du(path, total=total, maxdepth=maxdepth, **kwargs)
1795 def rename(self, path1, path2, **kwargs):
1796 """Alias of `AbstractFileSystem.mv`."""
1797 return self.mv(path1, path2, **kwargs)
1799 def delete(self, path, recursive=False, maxdepth=None):
1800 """Alias of `AbstractFileSystem.rm`."""
1801 return self.rm(path, recursive=recursive, maxdepth=maxdepth)
1803 def upload(self, lpath, rpath, recursive=False, **kwargs):
1804 """Alias of `AbstractFileSystem.put`."""
1805 return self.put(lpath, rpath, recursive=recursive, **kwargs)
1807 def download(self, rpath, lpath, recursive=False, **kwargs):
1808 """Alias of `AbstractFileSystem.get`."""
1809 return self.get(rpath, lpath, recursive=recursive, **kwargs)
1811 def sign(self, path, expiration=100, **kwargs):
1812 """Create a signed URL representing the given path
1814 Some implementations allow temporary URLs to be generated, as a
1815 way of delegating credentials.
1817 Parameters
1818 ----------
1819 path : str
1820 The path on the filesystem
1821 expiration : int
1822 Number of seconds to enable the URL for (if supported)
1824 Returns
1825 -------
1826 URL : str
1827 The signed URL
1829 Raises
1830 ------
1831 NotImplementedError : if method is not implemented for a filesystem
1832 """
1833 raise NotImplementedError("Sign is not implemented for this filesystem")
1835 def _isfilestore(self):
1836 # Originally inherited from pyarrow DaskFileSystem. Keeping this
1837 # here for backwards compatibility as long as pyarrow uses its
1838 # legacy fsspec-compatible filesystems and thus accepts fsspec
1839 # filesystems as well
1840 return False
1843class AbstractBufferedFile(io.IOBase):
1844 """Convenient class to derive from to provide buffering
1846 In the case that the backend does not provide a pythonic file-like object
1847 already, this class contains much of the logic to build one. The only
1848 methods that need to be overridden are ``_upload_chunk``,
1849 ``_initiate_upload`` and ``_fetch_range``.
1850 """
1852 DEFAULT_BLOCK_SIZE = 5 * 2**20
1853 _details = None
1855 def __init__(
1856 self,
1857 fs,
1858 path,
1859 mode="rb",
1860 block_size="default",
1861 autocommit=True,
1862 cache_type="readahead",
1863 cache_options=None,
1864 size=None,
1865 **kwargs,
1866 ):
1867 """
1868 Template for files with buffered reading and writing
1870 Parameters
1871 ----------
1872 fs: instance of FileSystem
1873 path: str
1874 location in file-system
1875 mode: str
1876 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file
1877 systems may be read-only, and some may not support append.
1878 block_size: int
1879 Buffer size for reading or writing, 'default' for class default
1880 autocommit: bool
1881 Whether to write to final destination; may only impact what
1882 happens when file is being closed.
1883 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead"
1884 Caching policy in read mode. See the definitions in ``core``.
1885 cache_options : dict
1886 Additional options passed to the constructor for the cache specified
1887 by `cache_type`.
1888 size: int
1889 If given and in read mode, suppressed having to look up the file size
1890 kwargs:
1891 Gets stored as self.kwargs
1892 """
1893 from .core import caches
1895 self.path = path
1896 self.fs = fs
1897 self.mode = mode
1898 self.blocksize = (
1899 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
1900 )
1901 self.loc = 0
1902 self.autocommit = autocommit
1903 self.end = None
1904 self.start = None
1905 self.closed = False
1907 if cache_options is None:
1908 cache_options = {}
1910 if "trim" in kwargs:
1911 warnings.warn(
1912 "Passing 'trim' to control the cache behavior has been deprecated. "
1913 "Specify it within the 'cache_options' argument instead.",
1914 FutureWarning,
1915 )
1916 cache_options["trim"] = kwargs.pop("trim")
1918 self.kwargs = kwargs
1920 if mode not in {"ab", "rb", "wb", "xb"}:
1921 raise NotImplementedError("File mode not supported")
1922 if mode == "rb":
1923 if size is not None:
1924 self.size = size
1925 else:
1926 self.size = self.details["size"]
1927 self.cache = caches[cache_type](
1928 self.blocksize, self._fetch_range, self.size, **cache_options
1929 )
1930 else:
1931 self.buffer = io.BytesIO()
1932 self.offset = None
1933 self.forced = False
1934 self.location = None
1936 @property
1937 def details(self):
1938 if self._details is None:
1939 self._details = self.fs.info(self.path)
1940 return self._details
1942 @details.setter
1943 def details(self, value):
1944 self._details = value
1945 self.size = value["size"]
1947 @property
1948 def full_name(self):
1949 return _unstrip_protocol(self.path, self.fs)
1951 @property
1952 def closed(self):
1953 # get around this attr being read-only in IOBase
1954 # use getattr here, since this can be called during del
1955 return getattr(self, "_closed", True)
1957 @closed.setter
1958 def closed(self, c):
1959 self._closed = c
1961 def __hash__(self):
1962 if "w" in self.mode:
1963 return id(self)
1964 else:
1965 return int(tokenize(self.details), 16)
1967 def __eq__(self, other):
1968 """Files are equal if they have the same checksum, only in read mode"""
1969 if self is other:
1970 return True
1971 return (
1972 isinstance(other, type(self))
1973 and self.mode == "rb"
1974 and other.mode == "rb"
1975 and hash(self) == hash(other)
1976 )
1978 def commit(self):
1979 """Move from temp to final destination"""
1981 def discard(self):
1982 """Throw away temporary file"""
1984 def info(self):
1985 """File information about this path"""
1986 if self.readable():
1987 return self.details
1988 else:
1989 raise ValueError("Info not available while writing")
1991 def tell(self):
1992 """Current file location"""
1993 return self.loc
1995 def seek(self, loc, whence=0):
1996 """Set current file location
1998 Parameters
1999 ----------
2000 loc: int
2001 byte location
2002 whence: {0, 1, 2}
2003 from start of file, current location or end of file, resp.
2004 """
2005 loc = int(loc)
2006 if not self.mode == "rb":
2007 raise OSError(ESPIPE, "Seek only available in read mode")
2008 if whence == 0:
2009 nloc = loc
2010 elif whence == 1:
2011 nloc = self.loc + loc
2012 elif whence == 2:
2013 nloc = self.size + loc
2014 else:
2015 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
2016 if nloc < 0:
2017 raise ValueError("Seek before start of file")
2018 self.loc = nloc
2019 return self.loc
2021 def write(self, data):
2022 """
2023 Write data to buffer.
2025 Buffer only sent on flush() or if buffer is greater than
2026 or equal to blocksize.
2028 Parameters
2029 ----------
2030 data: bytes
2031 Set of bytes to be written.
2032 """
2033 if not self.writable():
2034 raise ValueError("File not in write mode")
2035 if self.closed:
2036 raise ValueError("I/O operation on closed file.")
2037 if self.forced:
2038 raise ValueError("This file has been force-flushed, can only close")
2039 out = self.buffer.write(data)
2040 self.loc += out
2041 if self.buffer.tell() >= self.blocksize:
2042 self.flush()
2043 return out
2045 def flush(self, force=False):
2046 """
2047 Write buffered data to backend store.
2049 Writes the current buffer, if it is larger than the block-size, or if
2050 the file is being closed.
2052 Parameters
2053 ----------
2054 force: bool
2055 When closing, write the last block even if it is smaller than
2056 blocks are allowed to be. Disallows further writing to this file.
2057 """
2059 if self.closed:
2060 raise ValueError("Flush on closed file")
2061 if force and self.forced:
2062 raise ValueError("Force flush cannot be called more than once")
2063 if force:
2064 self.forced = True
2066 if self.readable():
2067 # no-op to flush on read-mode
2068 return
2070 if not force and self.buffer.tell() < self.blocksize:
2071 # Defer write on small block
2072 return
2074 if self.offset is None:
2075 # Initialize a multipart upload
2076 self.offset = 0
2077 try:
2078 self._initiate_upload()
2079 except:
2080 self.closed = True
2081 raise
2083 if self._upload_chunk(final=force) is not False:
2084 self.offset += self.buffer.seek(0, 2)
2085 self.buffer = io.BytesIO()
2087 def _upload_chunk(self, final=False):
2088 """Write one part of a multi-block file upload
2090 Parameters
2091 ==========
2092 final: bool
2093 This is the last block, so should complete file, if
2094 self.autocommit is True.
2095 """
2096 # may not yet have been initialized, may need to call _initialize_upload
2098 def _initiate_upload(self):
2099 """Create remote file/upload"""
2100 pass
2102 def _fetch_range(self, start, end):
2103 """Get the specified set of bytes from remote"""
2104 return self.fs.cat_file(self.path, start=start, end=end)
2106 def read(self, length=-1):
2107 """
2108 Return data from cache, or fetch pieces as necessary
2110 Parameters
2111 ----------
2112 length: int (-1)
2113 Number of bytes to read; if <0, all remaining bytes.
2114 """
2115 length = -1 if length is None else int(length)
2116 if self.mode != "rb":
2117 raise ValueError("File not in read mode")
2118 if length < 0:
2119 length = self.size - self.loc
2120 if self.closed:
2121 raise ValueError("I/O operation on closed file.")
2122 if length == 0:
2123 # don't even bother calling fetch
2124 return b""
2125 out = self.cache._fetch(self.loc, self.loc + length)
2127 logger.debug(
2128 "%s read: %i - %i %s",
2129 self,
2130 self.loc,
2131 self.loc + length,
2132 self.cache._log_stats(),
2133 )
2134 self.loc += len(out)
2135 return out
2137 def readinto(self, b):
2138 """mirrors builtin file's readinto method
2140 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
2141 """
2142 out = memoryview(b).cast("B")
2143 data = self.read(out.nbytes)
2144 out[: len(data)] = data
2145 return len(data)
2147 def readuntil(self, char=b"\n", blocks=None):
2148 """Return data between current position and first occurrence of char
2150 char is included in the output, except if the end of the tile is
2151 encountered first.
2153 Parameters
2154 ----------
2155 char: bytes
2156 Thing to find
2157 blocks: None or int
2158 How much to read in each go. Defaults to file blocksize - which may
2159 mean a new read on every call.
2160 """
2161 out = []
2162 while True:
2163 start = self.tell()
2164 part = self.read(blocks or self.blocksize)
2165 if len(part) == 0:
2166 break
2167 found = part.find(char)
2168 if found > -1:
2169 out.append(part[: found + len(char)])
2170 self.seek(start + found + len(char))
2171 break
2172 out.append(part)
2173 return b"".join(out)
2175 def readline(self):
2176 """Read until and including the first occurrence of newline character
2178 Note that, because of character encoding, this is not necessarily a
2179 true line ending.
2180 """
2181 return self.readuntil(b"\n")
2183 def __next__(self):
2184 out = self.readline()
2185 if out:
2186 return out
2187 raise StopIteration
2189 def __iter__(self):
2190 return self
2192 def readlines(self):
2193 """Return all data, split by the newline character, including the newline character"""
2194 data = self.read()
2195 lines = data.split(b"\n")
2196 out = [l + b"\n" for l in lines[:-1]]
2197 if data.endswith(b"\n"):
2198 return out
2199 else:
2200 return out + [lines[-1]]
2201 # return list(self) ???
2203 def readinto1(self, b):
2204 return self.readinto(b)
2206 def close(self):
2207 """Close file
2209 Finalizes writes, discards cache
2210 """
2211 if getattr(self, "_unclosable", False):
2212 return
2213 if self.closed:
2214 return
2215 try:
2216 if self.mode == "rb":
2217 self.cache = None
2218 else:
2219 if not self.forced:
2220 self.flush(force=True)
2222 if self.fs is not None:
2223 self.fs.invalidate_cache(self.path)
2224 self.fs.invalidate_cache(self.fs._parent(self.path))
2225 finally:
2226 self.closed = True
2228 def readable(self):
2229 """Whether opened for reading"""
2230 return "r" in self.mode and not self.closed
2232 def seekable(self):
2233 """Whether is seekable (only in read mode)"""
2234 return self.readable()
2236 def writable(self):
2237 """Whether opened for writing"""
2238 return self.mode in {"wb", "ab", "xb"} and not self.closed
2240 def __reduce__(self):
2241 if self.mode != "rb":
2242 raise RuntimeError("Pickling a writeable file is not supported")
2244 return reopen, (
2245 self.fs,
2246 self.path,
2247 self.mode,
2248 self.blocksize,
2249 self.loc,
2250 self.size,
2251 self.autocommit,
2252 self.cache.name if self.cache else "none",
2253 self.kwargs,
2254 )
2256 def __del__(self):
2257 if not self.closed:
2258 self.close()
2260 def __str__(self):
2261 return f"<File-like object {type(self.fs).__name__}, {self.path}>"
2263 __repr__ = __str__
2265 def __enter__(self):
2266 return self
2268 def __exit__(self, *args):
2269 self.close()
2272def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs):
2273 file = fs.open(
2274 path,
2275 mode=mode,
2276 block_size=blocksize,
2277 autocommit=autocommit,
2278 cache_type=cache_type,
2279 size=size,
2280 **kwargs,
2281 )
2282 if loc > 0:
2283 file.seek(loc)
2284 return file