Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.events
3import functools
4import inspect
5import io
6import numbers
7import os
8import re
9import threading
10from collections.abc import Iterable
11from glob import has_magic
12from typing import TYPE_CHECKING
14from .callbacks import DEFAULT_CALLBACK
15from .exceptions import FSTimeoutError
16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
17from .spec import AbstractBufferedFile, AbstractFileSystem
18from .utils import glob_translate, is_exception, other_paths
20private = re.compile("_[^_]")
21iothread = [None] # dedicated fsspec IO thread
22loop = [None] # global event loop for any non-async instance
23_lock = None # global lock placeholder
24get_running_loop = asyncio.get_running_loop
27def get_lock():
28 """Allocate or return a threading lock.
30 The lock is allocated on first use to allow setting one lock per forked process.
31 """
32 global _lock
33 if not _lock:
34 _lock = threading.Lock()
35 return _lock
38def reset_lock():
39 """Reset the global lock.
41 This should be called only on the init of a forked process to reset the lock to
42 None, enabling the new forked process to get a new lock.
43 """
44 global _lock
46 iothread[0] = None
47 loop[0] = None
48 _lock = None
51async def _runner(event, coro, result, timeout=None):
52 timeout = timeout if timeout else None # convert 0 or 0.0 to None
53 if timeout is not None:
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
56 result[0] = await coro
57 except Exception as ex:
58 result[0] = ex
59 finally:
60 event.set()
63def sync(loop, func, *args, timeout=None, **kwargs):
64 """
65 Make loop run coroutine until it returns. Runs in other thread
67 Examples
68 --------
69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
70 timeout=timeout, **kwargs)
71 """
72 timeout = timeout if timeout else None # convert 0 or 0.0 to None
73 # NB: if the loop is not running *yet*, it is OK to submit work
74 # and we will wait for it
75 if loop is None or loop.is_closed():
76 raise RuntimeError("Loop is not running")
77 try:
78 loop0 = asyncio.events.get_running_loop()
79 if loop0 is loop:
80 raise NotImplementedError("Calling sync() from within a running loop")
81 except NotImplementedError:
82 raise
83 except RuntimeError:
84 pass
85 coro = func(*args, **kwargs)
86 result = [None]
87 event = threading.Event()
88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
89 while True:
90 # this loops allows thread to get interrupted
91 if event.wait(1):
92 break
93 if timeout is not None:
94 timeout -= 1
95 if timeout < 0:
96 raise FSTimeoutError
98 return_result = result[0]
99 if isinstance(return_result, asyncio.TimeoutError):
100 # suppress asyncio.TimeoutError, raise FSTimeoutError
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
103 raise return_result
104 else:
105 return return_result
108def sync_wrapper(func, obj=None):
109 """Given a function, make so can be called in blocking contexts
111 Leave obj=None if defining within a class. Pass the instance if attaching
112 as an attribute of the instance.
113 """
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
118 return sync(self.loop, func, *args, **kwargs)
120 return wrapper
123def get_loop():
124 """Create or return the default fsspec IO loop
126 The loop will be running on a separate thread.
127 """
128 if loop[0] is None:
129 with get_lock():
130 # repeat the check just in case the loop got filled between the
131 # previous two calls from another thread
132 if loop[0] is None:
133 loop[0] = asyncio.new_event_loop()
134 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
135 th.daemon = True
136 th.start()
137 iothread[0] = th
138 return loop[0]
141def reset_after_fork():
142 global lock
143 loop[0] = None
144 iothread[0] = None
145 lock = None
148if hasattr(os, "register_at_fork"):
149 # should be posix; this will do nothing for spawn or forkserver subprocesses
150 os.register_at_fork(after_in_child=reset_after_fork)
153if TYPE_CHECKING:
154 import resource
156 ResourceError = resource.error
157else:
158 try:
159 import resource
160 except ImportError:
161 resource = None
162 ResourceError = OSError
163 else:
164 ResourceError = getattr(resource, "error", OSError)
166_DEFAULT_BATCH_SIZE = 128
167_NOFILES_DEFAULT_BATCH_SIZE = 1280
170def _get_batch_size(nofiles=False):
171 from fsspec.config import conf
173 if nofiles:
174 if "nofiles_gather_batch_size" in conf:
175 return conf["nofiles_gather_batch_size"]
176 else:
177 if "gather_batch_size" in conf:
178 return conf["gather_batch_size"]
179 if nofiles:
180 return _NOFILES_DEFAULT_BATCH_SIZE
181 if resource is None:
182 return _DEFAULT_BATCH_SIZE
184 try:
185 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
186 except (ImportError, ValueError, ResourceError):
187 return _DEFAULT_BATCH_SIZE
189 if soft_limit == resource.RLIM_INFINITY:
190 return -1
191 else:
192 return soft_limit // 8
195def running_async() -> bool:
196 """Being executed by an event loop?"""
197 try:
198 asyncio.get_running_loop()
199 return True
200 except RuntimeError:
201 return False
204async def _run_coros_in_chunks(
205 coros,
206 batch_size=None,
207 callback=DEFAULT_CALLBACK,
208 timeout=None,
209 return_exceptions=False,
210 nofiles=False,
211):
212 """Run the given coroutines in chunks.
214 Parameters
215 ----------
216 coros: list of coroutines to run
217 batch_size: int or None
218 Number of coroutines to submit/wait on simultaneously.
219 If -1, then it will not be any throttling. If
220 None, it will be inferred from _get_batch_size()
221 callback: fsspec.callbacks.Callback instance
222 Gets a relative_update when each coroutine completes
223 timeout: number or None
224 If given, each coroutine times out after this time. Note that, since
225 there are multiple batches, the total run time of this function will in
226 general be longer
227 return_exceptions: bool
228 Same meaning as in asyncio.gather
229 nofiles: bool
230 If inferring the batch_size, does this operation involve local files?
231 If yes, you normally expect smaller batches.
232 """
234 if batch_size is None:
235 batch_size = _get_batch_size(nofiles=nofiles)
237 if batch_size == -1:
238 batch_size = len(coros)
240 assert batch_size > 0
242 async def _run_coro(coro, i):
243 try:
244 return await asyncio.wait_for(coro, timeout=timeout), i
245 except Exception as e:
246 if not return_exceptions:
247 raise
248 return e, i
249 finally:
250 callback.relative_update(1)
252 i = 0
253 n = len(coros)
254 results = [None] * n
255 pending = set()
257 while pending or i < n:
258 while len(pending) < batch_size and i < n:
259 pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
260 i += 1
262 if not pending:
263 break
265 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
266 while done:
267 result, k = await done.pop()
268 results[k] = result
270 return results
273# these methods should be implemented as async by any async-able backend
274async_methods = [
275 "_ls",
276 "_cat_file",
277 "_get_file",
278 "_put_file",
279 "_rm_file",
280 "_cp_file",
281 "_pipe_file",
282 "_expand_path",
283 "_info",
284 "_isfile",
285 "_isdir",
286 "_exists",
287 "_walk",
288 "_glob",
289 "_find",
290 "_du",
291 "_size",
292 "_mkdir",
293 "_makedirs",
294]
297class AsyncFileSystem(AbstractFileSystem):
298 """Async file operations, default implementations
300 Passes bulk operations to asyncio.gather for concurrent operation.
302 Implementations that have concurrent batch operations and/or async methods
303 should inherit from this class instead of AbstractFileSystem. Docstrings are
304 copied from the un-underscored method in AbstractFileSystem, if not given.
305 """
307 # note that methods do not have docstring here; they will be copied
308 # for _* methods and inferred for overridden methods.
310 async_impl = True
311 mirror_sync_methods = True
312 disable_throttling = False
314 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
315 self.asynchronous = asynchronous
316 self._pid = os.getpid()
317 if not asynchronous:
318 self._loop = loop or get_loop()
319 else:
320 self._loop = None
321 self.batch_size = batch_size
322 super().__init__(*args, **kwargs)
324 @property
325 def loop(self):
326 if self._pid != os.getpid():
327 raise RuntimeError("This class is not fork-safe")
328 return self._loop
330 async def _rm_file(self, path, **kwargs):
331 if (
332 inspect.iscoroutinefunction(self._rm)
333 and type(self)._rm is not AsyncFileSystem._rm
334 ):
335 return await self._rm(path, recursive=False, batch_size=1, **kwargs)
336 raise NotImplementedError
338 async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
339 # TODO: implement on_error
340 batch_size = batch_size or self.batch_size
341 path = await self._expand_path(path, recursive=recursive)
342 return await _run_coros_in_chunks(
343 [self._rm_file(p, **kwargs) for p in reversed(path)],
344 batch_size=batch_size,
345 nofiles=True,
346 )
348 async def _cp_file(self, path1, path2, **kwargs):
349 raise NotImplementedError
351 async def _mv_file(self, path1, path2):
352 await self._cp_file(path1, path2)
353 await self._rm_file(path1)
355 async def _copy(
356 self,
357 path1,
358 path2,
359 recursive=False,
360 on_error=None,
361 maxdepth=None,
362 batch_size=None,
363 **kwargs,
364 ):
365 if on_error is None and recursive:
366 on_error = "ignore"
367 elif on_error is None:
368 on_error = "raise"
370 if isinstance(path1, list) and isinstance(path2, list):
371 # No need to expand paths when both source and destination
372 # are provided as lists
373 paths1 = path1
374 paths2 = path2
375 else:
376 source_is_str = isinstance(path1, str)
377 paths1 = await self._expand_path(
378 path1, maxdepth=maxdepth, recursive=recursive
379 )
380 if source_is_str and (not recursive or maxdepth is not None):
381 # Non-recursive glob does not copy directories
382 paths1 = [
383 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
384 ]
385 if not paths1:
386 return
388 source_is_file = len(paths1) == 1
389 dest_is_dir = isinstance(path2, str) and (
390 trailing_sep(path2) or await self._isdir(path2)
391 )
393 exists = source_is_str and (
394 (has_magic(path1) and source_is_file)
395 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
396 )
397 paths2 = other_paths(
398 paths1,
399 path2,
400 exists=exists,
401 flatten=not source_is_str,
402 )
404 batch_size = batch_size or self.batch_size
405 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
406 result = await _run_coros_in_chunks(
407 coros, batch_size=batch_size, return_exceptions=True, nofiles=True
408 )
410 for ex in filter(is_exception, result):
411 if on_error == "ignore" and isinstance(ex, FileNotFoundError):
412 continue
413 raise ex
415 async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
416 raise NotImplementedError
418 async def _pipe(self, path, value=None, batch_size=None, **kwargs):
419 if isinstance(path, str):
420 path = {path: value}
421 batch_size = batch_size or self.batch_size
422 return await _run_coros_in_chunks(
423 [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
424 batch_size=batch_size,
425 nofiles=True,
426 )
428 async def _process_limits(self, url, start, end):
429 """Helper for "Range"-based _cat_file"""
430 size = None
431 suff = False
432 if start is not None and start < 0:
433 # if start is negative and end None, end is the "suffix length"
434 if end is None:
435 end = -start
436 start = ""
437 suff = True
438 else:
439 size = size or (await self._info(url))["size"]
440 start = size + start
441 elif start is None:
442 start = 0
443 if not suff:
444 if end is not None and end < 0:
445 if start is not None:
446 size = size or (await self._info(url))["size"]
447 end = size + end
448 elif end is None:
449 end = ""
450 if isinstance(end, numbers.Integral):
451 end -= 1 # bytes range is inclusive
452 return f"bytes={start}-{end}"
454 async def _cat_file(self, path, start=None, end=None, **kwargs):
455 raise NotImplementedError
457 async def _cat(
458 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
459 ):
460 paths = await self._expand_path(path, recursive=recursive)
461 coros = [self._cat_file(path, **kwargs) for path in paths]
462 batch_size = batch_size or self.batch_size
463 out = await _run_coros_in_chunks(
464 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
465 )
466 if on_error == "raise":
467 ex = next(filter(is_exception, out), False)
468 if ex:
469 raise ex
470 if (
471 len(paths) > 1
472 or isinstance(path, list)
473 or paths[0] != self._strip_protocol(path)
474 ):
475 return {
476 k: v
477 for k, v in zip(paths, out)
478 if on_error != "omit" or not is_exception(v)
479 }
480 else:
481 return out[0]
483 async def _cat_ranges(
484 self,
485 paths,
486 starts,
487 ends,
488 max_gap=None,
489 batch_size=None,
490 on_error="return",
491 **kwargs,
492 ):
493 """Get the contents of byte ranges from one or more files
495 Parameters
496 ----------
497 paths: list
498 A list of of filepaths on this filesystems
499 starts, ends: int or list
500 Bytes limits of the read. If using a single int, the same value will be
501 used to read all the specified files.
502 """
503 # TODO: on_error
504 if max_gap is not None:
505 # use utils.merge_offset_ranges
506 raise NotImplementedError
507 if not isinstance(paths, list):
508 raise TypeError
509 if not isinstance(starts, Iterable):
510 starts = [starts] * len(paths)
511 if not isinstance(ends, Iterable):
512 ends = [ends] * len(paths)
513 if len(starts) != len(paths) or len(ends) != len(paths):
514 raise ValueError
515 coros = [
516 self._cat_file(p, start=s, end=e, **kwargs)
517 for p, s, e in zip(paths, starts, ends)
518 ]
519 batch_size = batch_size or self.batch_size
520 return await _run_coros_in_chunks(
521 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
522 )
524 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
525 raise NotImplementedError
527 async def _put(
528 self,
529 lpath,
530 rpath,
531 recursive=False,
532 callback=DEFAULT_CALLBACK,
533 batch_size=None,
534 maxdepth=None,
535 **kwargs,
536 ):
537 """Copy file(s) from local.
539 Copies a specific file or tree of files (if recursive=True). If rpath
540 ends with a "/", it will be assumed to be a directory, and target files
541 will go within.
543 The put_file method will be called concurrently on a batch of files. The
544 batch_size option can configure the amount of futures that can be executed
545 at the same time. If it is -1, then all the files will be uploaded concurrently.
546 The default can be set for this instance by passing "batch_size" in the
547 constructor, or for all instances by setting the "gather_batch_size" key
548 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
549 """
550 if isinstance(lpath, list) and isinstance(rpath, list):
551 # No need to expand paths when both source and destination
552 # are provided as lists
553 rpaths = rpath
554 lpaths = lpath
555 else:
556 source_is_str = isinstance(lpath, str)
557 if source_is_str:
558 lpath = make_path_posix(lpath)
559 fs = LocalFileSystem()
560 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
561 if source_is_str and (not recursive or maxdepth is not None):
562 # Non-recursive glob does not copy directories
563 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
564 if not lpaths:
565 return
567 source_is_file = len(lpaths) == 1
568 dest_is_dir = isinstance(rpath, str) and (
569 trailing_sep(rpath) or await self._isdir(rpath)
570 )
572 rpath = self._strip_protocol(rpath)
573 exists = source_is_str and (
574 (has_magic(lpath) and source_is_file)
575 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
576 )
577 rpaths = other_paths(
578 lpaths,
579 rpath,
580 exists=exists,
581 flatten=not source_is_str,
582 )
584 is_dir = {l: os.path.isdir(l) for l in lpaths}
585 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
586 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
588 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
589 batch_size = batch_size or self.batch_size
591 coros = []
592 callback.set_size(len(file_pairs))
593 for lfile, rfile in file_pairs:
594 put_file = callback.branch_coro(self._put_file)
595 coros.append(put_file(lfile, rfile, **kwargs))
597 return await _run_coros_in_chunks(
598 coros, batch_size=batch_size, callback=callback
599 )
601 async def _get_file(self, rpath, lpath, **kwargs):
602 raise NotImplementedError
604 async def _get(
605 self,
606 rpath,
607 lpath,
608 recursive=False,
609 callback=DEFAULT_CALLBACK,
610 maxdepth=None,
611 **kwargs,
612 ):
613 """Copy file(s) to local.
615 Copies a specific file or tree of files (if recursive=True). If lpath
616 ends with a "/", it will be assumed to be a directory, and target files
617 will go within. Can submit a list of paths, which may be glob-patterns
618 and will be expanded.
620 The get_file method will be called concurrently on a batch of files. The
621 batch_size option can configure the amount of futures that can be executed
622 at the same time. If it is -1, then all the files will be uploaded concurrently.
623 The default can be set for this instance by passing "batch_size" in the
624 constructor, or for all instances by setting the "gather_batch_size" key
625 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
626 """
627 if isinstance(lpath, list) and isinstance(rpath, list):
628 # No need to expand paths when both source and destination
629 # are provided as lists
630 rpaths = rpath
631 lpaths = lpath
632 else:
633 source_is_str = isinstance(rpath, str)
634 # First check for rpath trailing slash as _strip_protocol removes it.
635 source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
636 rpath = self._strip_protocol(rpath)
637 rpaths = await self._expand_path(
638 rpath, recursive=recursive, maxdepth=maxdepth
639 )
640 if source_is_str and (not recursive or maxdepth is not None):
641 # Non-recursive glob does not copy directories
642 rpaths = [
643 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
644 ]
645 if not rpaths:
646 return
648 lpath = make_path_posix(lpath)
649 source_is_file = len(rpaths) == 1
650 dest_is_dir = isinstance(lpath, str) and (
651 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
652 )
654 exists = source_is_str and (
655 (has_magic(rpath) and source_is_file)
656 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
657 )
658 lpaths = other_paths(
659 rpaths,
660 lpath,
661 exists=exists,
662 flatten=not source_is_str,
663 )
665 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
666 batch_size = kwargs.pop("batch_size", self.batch_size)
668 coros = []
669 callback.set_size(len(lpaths))
670 for lpath, rpath in zip(lpaths, rpaths):
671 get_file = callback.branch_coro(self._get_file)
672 coros.append(get_file(rpath, lpath, **kwargs))
673 return await _run_coros_in_chunks(
674 coros, batch_size=batch_size, callback=callback
675 )
677 async def _isfile(self, path):
678 try:
679 return (await self._info(path))["type"] == "file"
680 except: # noqa: E722
681 return False
683 async def _isdir(self, path):
684 try:
685 return (await self._info(path))["type"] == "directory"
686 except OSError:
687 return False
689 async def _size(self, path):
690 return (await self._info(path)).get("size", None)
692 async def _sizes(self, paths, batch_size=None):
693 batch_size = batch_size or self.batch_size
694 return await _run_coros_in_chunks(
695 [self._size(p) for p in paths], batch_size=batch_size
696 )
698 async def _exists(self, path, **kwargs):
699 try:
700 await self._info(path, **kwargs)
701 return True
702 except FileNotFoundError:
703 return False
705 async def _info(self, path, **kwargs):
706 raise NotImplementedError
708 async def _ls(self, path, detail=True, **kwargs):
709 raise NotImplementedError
711 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
712 if maxdepth is not None and maxdepth < 1:
713 raise ValueError("maxdepth must be at least 1")
715 path = self._strip_protocol(path)
716 full_dirs = {}
717 dirs = {}
718 files = {}
720 detail = kwargs.pop("detail", False)
721 try:
722 listing = await self._ls(path, detail=True, **kwargs)
723 except (FileNotFoundError, OSError) as e:
724 if on_error == "raise":
725 raise
726 elif callable(on_error):
727 on_error(e)
728 if detail:
729 yield path, {}, {}
730 else:
731 yield path, [], []
732 return
734 for info in listing:
735 # each info name must be at least [path]/part , but here
736 # we check also for names like [path]/part/
737 pathname = info["name"].rstrip("/")
738 name = pathname.rsplit("/", 1)[-1]
739 if info["type"] == "directory" and pathname != path:
740 # do not include "self" path
741 full_dirs[name] = pathname
742 dirs[name] = info
743 elif pathname == path:
744 # file-like with same name as give path
745 files[""] = info
746 else:
747 files[name] = info
749 if detail:
750 yield path, dirs, files
751 else:
752 yield path, list(dirs), list(files)
754 if maxdepth is not None:
755 maxdepth -= 1
756 if maxdepth < 1:
757 return
759 for d in dirs:
760 async for _ in self._walk(
761 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
762 ):
763 yield _
765 async def _glob(self, path, maxdepth=None, **kwargs):
766 if maxdepth is not None and maxdepth < 1:
767 raise ValueError("maxdepth must be at least 1")
769 import re
771 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
772 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
773 path = self._strip_protocol(path)
774 append_slash_to_dirname = ends_with_sep or path.endswith(
775 tuple(sep + "**" for sep in seps)
776 )
777 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
778 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
779 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
781 min_idx = min(idx_star, idx_qmark, idx_brace)
783 detail = kwargs.pop("detail", False)
784 withdirs = kwargs.pop("withdirs", True)
786 if not has_magic(path):
787 if await self._exists(path, **kwargs):
788 if not detail:
789 return [path]
790 else:
791 return {path: await self._info(path, **kwargs)}
792 else:
793 if not detail:
794 return [] # glob of non-existent returns empty
795 else:
796 return {}
797 elif "/" in path[:min_idx]:
798 min_idx = path[:min_idx].rindex("/")
799 root = path[: min_idx + 1]
800 depth = path[min_idx + 1 :].count("/") + 1
801 else:
802 root = ""
803 depth = path[min_idx + 1 :].count("/") + 1
805 if "**" in path:
806 if maxdepth is not None:
807 idx_double_stars = path.find("**")
808 depth_double_stars = path[idx_double_stars:].count("/") + 1
809 depth = depth - depth_double_stars + maxdepth
810 else:
811 depth = None
813 allpaths = await self._find(
814 root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs
815 )
817 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
818 pattern = re.compile(pattern)
820 out = {
821 p: info
822 for p, info in sorted(allpaths.items())
823 if pattern.match(
824 p + "/"
825 if append_slash_to_dirname and info["type"] == "directory"
826 else p
827 )
828 }
830 if detail:
831 return out
832 else:
833 return list(out)
835 async def _du(self, path, total=True, maxdepth=None, **kwargs):
836 sizes = {}
837 # async for?
838 for f in await self._find(path, maxdepth=maxdepth, **kwargs):
839 info = await self._info(f)
840 sizes[info["name"]] = info["size"]
841 if total:
842 return sum(sizes.values())
843 else:
844 return sizes
846 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
847 path = self._strip_protocol(path)
848 out = {}
849 detail = kwargs.pop("detail", False)
851 # Add the root directory if withdirs is requested
852 # This is needed for posix glob compliance
853 if withdirs and path != "" and await self._isdir(path):
854 out[path] = await self._info(path)
856 # async for?
857 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
858 if withdirs:
859 files.update(dirs)
860 out.update({info["name"]: info for name, info in files.items()})
861 if not out and (await self._isfile(path)):
862 # walk works on directories, but find should also return [path]
863 # when path happens to be a file
864 out[path] = {}
865 names = sorted(out)
866 if not detail:
867 return names
868 else:
869 return {name: out[name] for name in names}
871 async def _expand_path(self, path, recursive=False, maxdepth=None):
872 if maxdepth is not None and maxdepth < 1:
873 raise ValueError("maxdepth must be at least 1")
875 if isinstance(path, str):
876 out = await self._expand_path([path], recursive, maxdepth)
877 else:
878 out = set()
879 path = [self._strip_protocol(p) for p in path]
880 for p in path: # can gather here
881 if has_magic(p):
882 bit = set(await self._glob(p, maxdepth=maxdepth))
883 out |= bit
884 if recursive:
885 # glob call above expanded one depth so if maxdepth is defined
886 # then decrement it in expand_path call below. If it is zero
887 # after decrementing then avoid expand_path call.
888 if maxdepth is not None and maxdepth <= 1:
889 continue
890 out |= set(
891 await self._expand_path(
892 list(bit),
893 recursive=recursive,
894 maxdepth=maxdepth - 1 if maxdepth is not None else None,
895 )
896 )
897 continue
898 elif recursive:
899 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
900 out |= rec
901 if p not in out and (recursive is False or (await self._exists(p))):
902 # should only check once, for the root
903 out.add(p)
904 if not out:
905 raise FileNotFoundError(path)
906 return sorted(out)
908 async def _mkdir(self, path, create_parents=True, **kwargs):
909 pass # not necessary to implement, may not have directories
911 async def _makedirs(self, path, exist_ok=False):
912 pass # not necessary to implement, may not have directories
914 async def open_async(self, path, mode="rb", **kwargs):
915 if "b" not in mode or kwargs.get("compression"):
916 raise ValueError
917 raise NotImplementedError
920def mirror_sync_methods(obj):
921 """Populate sync and async methods for obj
923 For each method will create a sync version if the name refers to an async method
924 (coroutine) and there is no override in the child class; will create an async
925 method for the corresponding sync method if there is no implementation.
927 Uses the methods specified in
928 - async_methods: the set that an implementation is expected to provide
929 - default_async_methods: that can be derived from their sync version in
930 AbstractFileSystem
931 - AsyncFileSystem: async-specific default coroutines
932 """
933 from fsspec import AbstractFileSystem
935 for method in async_methods + dir(AsyncFileSystem):
936 if not method.startswith("_"):
937 continue
938 smethod = method[1:]
939 if private.match(method):
940 isco = inspect.iscoroutinefunction(getattr(obj, method, None))
941 unsync = getattr(getattr(obj, smethod, False), "__func__", None)
942 is_default = unsync is getattr(AbstractFileSystem, smethod, "")
943 if isco and is_default:
944 mth = sync_wrapper(getattr(obj, method), obj=obj)
945 setattr(obj, smethod, mth)
946 if not mth.__doc__:
947 mth.__doc__ = getattr(
948 getattr(AbstractFileSystem, smethod, None), "__doc__", ""
949 )
952class FSSpecCoroutineCancel(Exception):
953 pass
956def _dump_running_tasks(
957 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
958):
959 import traceback
961 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
962 if printout:
963 [task.print_stack() for task in tasks]
964 out = [
965 {
966 "locals": task._coro.cr_frame.f_locals,
967 "file": task._coro.cr_frame.f_code.co_filename,
968 "firstline": task._coro.cr_frame.f_code.co_firstlineno,
969 "linelo": task._coro.cr_frame.f_lineno,
970 "stack": traceback.format_stack(task._coro.cr_frame),
971 "task": task if with_task else None,
972 }
973 for task in tasks
974 ]
975 if cancel:
976 for t in tasks:
977 cbs = t._callbacks
978 t.cancel()
979 asyncio.futures.Future.set_exception(t, exc)
980 asyncio.futures.Future.cancel(t)
981 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
982 try:
983 t._coro.throw(exc) # exits coro, unless explicitly handled
984 except exc:
985 pass
986 return out
989class AbstractAsyncStreamedFile(AbstractBufferedFile):
990 # no read buffering, and always auto-commit
991 # TODO: readahead might still be useful here, but needs async version
993 async def read(self, length=-1):
994 """
995 Return data from cache, or fetch pieces as necessary
997 Parameters
998 ----------
999 length: int (-1)
1000 Number of bytes to read; if <0, all remaining bytes.
1001 """
1002 length = -1 if length is None else int(length)
1003 if self.mode != "rb":
1004 raise ValueError("File not in read mode")
1005 if length < 0:
1006 length = self.size - self.loc
1007 if self.closed:
1008 raise ValueError("I/O operation on closed file.")
1009 if length == 0:
1010 # don't even bother calling fetch
1011 return b""
1012 out = await self._fetch_range(self.loc, self.loc + length)
1013 self.loc += len(out)
1014 return out
1016 async def write(self, data):
1017 """
1018 Write data to buffer.
1020 Buffer only sent on flush() or if buffer is greater than
1021 or equal to blocksize.
1023 Parameters
1024 ----------
1025 data: bytes
1026 Set of bytes to be written.
1027 """
1028 if self.mode not in {"wb", "ab"}:
1029 raise ValueError("File not in write mode")
1030 if self.closed:
1031 raise ValueError("I/O operation on closed file.")
1032 if self.forced:
1033 raise ValueError("This file has been force-flushed, can only close")
1034 out = self.buffer.write(data)
1035 self.loc += out
1036 if self.buffer.tell() >= self.blocksize:
1037 await self.flush()
1038 return out
1040 async def close(self):
1041 """Close file
1043 Finalizes writes, discards cache
1044 """
1045 if getattr(self, "_unclosable", False):
1046 return
1047 if self.closed:
1048 return
1049 if self.mode == "rb":
1050 self.cache = None
1051 else:
1052 if not self.forced:
1053 await self.flush(force=True)
1055 if self.fs is not None:
1056 self.fs.invalidate_cache(self.path)
1057 self.fs.invalidate_cache(self.fs._parent(self.path))
1059 self.closed = True
1061 async def flush(self, force=False):
1062 if self.closed:
1063 raise ValueError("Flush on closed file")
1064 if force and self.forced:
1065 raise ValueError("Force flush cannot be called more than once")
1066 if force:
1067 self.forced = True
1069 if self.mode not in {"wb", "ab"}:
1070 # no-op to flush on read-mode
1071 return
1073 if not force and self.buffer.tell() < self.blocksize:
1074 # Defer write on small block
1075 return
1077 if self.offset is None:
1078 # Initialize a multipart upload
1079 self.offset = 0
1080 try:
1081 await self._initiate_upload()
1082 except:
1083 self.closed = True
1084 raise
1086 if await self._upload_chunk(final=force) is not False:
1087 self.offset += self.buffer.seek(0, 2)
1088 self.buffer = io.BytesIO()
1090 async def __aenter__(self):
1091 return self
1093 async def __aexit__(self, exc_type, exc_val, exc_tb):
1094 await self.close()
1096 async def _fetch_range(self, start, end):
1097 raise NotImplementedError
1099 async def _initiate_upload(self):
1100 pass
1102 async def _upload_chunk(self, final=False):
1103 raise NotImplementedError