Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/asyn.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.events
3import functools
4import inspect
5import io
6import numbers
7import os
8import re
9import threading
10from contextlib import contextmanager
11from glob import has_magic
12from typing import TYPE_CHECKING, Iterable
14from .callbacks import DEFAULT_CALLBACK
15from .exceptions import FSTimeoutError
16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
17from .spec import AbstractBufferedFile, AbstractFileSystem
18from .utils import glob_translate, is_exception, other_paths
20private = re.compile("_[^_]")
21iothread = [None] # dedicated fsspec IO thread
22loop = [None] # global event loop for any non-async instance
23_lock = None # global lock placeholder
24get_running_loop = asyncio.get_running_loop
27def get_lock():
28 """Allocate or return a threading lock.
30 The lock is allocated on first use to allow setting one lock per forked process.
31 """
32 global _lock
33 if not _lock:
34 _lock = threading.Lock()
35 return _lock
38def reset_lock():
39 """Reset the global lock.
41 This should be called only on the init of a forked process to reset the lock to
42 None, enabling the new forked process to get a new lock.
43 """
44 global _lock
46 iothread[0] = None
47 loop[0] = None
48 _lock = None
51async def _runner(event, coro, result, timeout=None):
52 timeout = timeout if timeout else None # convert 0 or 0.0 to None
53 if timeout is not None:
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
56 result[0] = await coro
57 except Exception as ex:
58 result[0] = ex
59 finally:
60 event.set()
63def sync(loop, func, *args, timeout=None, **kwargs):
64 """
65 Make loop run coroutine until it returns. Runs in other thread
67 Examples
68 --------
69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
70 timeout=timeout, **kwargs)
71 """
72 timeout = timeout if timeout else None # convert 0 or 0.0 to None
73 # NB: if the loop is not running *yet*, it is OK to submit work
74 # and we will wait for it
75 if loop is None or loop.is_closed():
76 raise RuntimeError("Loop is not running")
77 try:
78 loop0 = asyncio.events.get_running_loop()
79 if loop0 is loop:
80 raise NotImplementedError("Calling sync() from within a running loop")
81 except NotImplementedError:
82 raise
83 except RuntimeError:
84 pass
85 coro = func(*args, **kwargs)
86 result = [None]
87 event = threading.Event()
88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
89 while True:
90 # this loops allows thread to get interrupted
91 if event.wait(1):
92 break
93 if timeout is not None:
94 timeout -= 1
95 if timeout < 0:
96 raise FSTimeoutError
98 return_result = result[0]
99 if isinstance(return_result, asyncio.TimeoutError):
100 # suppress asyncio.TimeoutError, raise FSTimeoutError
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
103 raise return_result
104 else:
105 return return_result
108def sync_wrapper(func, obj=None):
109 """Given a function, make so can be called in blocking contexts
111 Leave obj=None if defining within a class. Pass the instance if attaching
112 as an attribute of the instance.
113 """
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
118 return sync(self.loop, func, *args, **kwargs)
120 return wrapper
123@contextmanager
124def _selector_policy():
125 original_policy = asyncio.get_event_loop_policy()
126 try:
127 if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
128 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
130 yield
131 finally:
132 asyncio.set_event_loop_policy(original_policy)
135def get_loop():
136 """Create or return the default fsspec IO loop
138 The loop will be running on a separate thread.
139 """
140 if loop[0] is None:
141 with get_lock():
142 # repeat the check just in case the loop got filled between the
143 # previous two calls from another thread
144 if loop[0] is None:
145 with _selector_policy():
146 loop[0] = asyncio.new_event_loop()
147 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
148 th.daemon = True
149 th.start()
150 iothread[0] = th
151 return loop[0]
154if TYPE_CHECKING:
155 import resource
157 ResourceError = resource.error
158else:
159 try:
160 import resource
161 except ImportError:
162 resource = None
163 ResourceError = OSError
164 else:
165 ResourceError = getattr(resource, "error", OSError)
167_DEFAULT_BATCH_SIZE = 128
168_NOFILES_DEFAULT_BATCH_SIZE = 1280
171def _get_batch_size(nofiles=False):
172 from fsspec.config import conf
174 if nofiles:
175 if "nofiles_gather_batch_size" in conf:
176 return conf["nofiles_gather_batch_size"]
177 else:
178 if "gather_batch_size" in conf:
179 return conf["gather_batch_size"]
180 if nofiles:
181 return _NOFILES_DEFAULT_BATCH_SIZE
182 if resource is None:
183 return _DEFAULT_BATCH_SIZE
185 try:
186 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
187 except (ImportError, ValueError, ResourceError):
188 return _DEFAULT_BATCH_SIZE
190 if soft_limit == resource.RLIM_INFINITY:
191 return -1
192 else:
193 return soft_limit // 8
196def running_async() -> bool:
197 """Being executed by an event loop?"""
198 try:
199 asyncio.get_running_loop()
200 return True
201 except RuntimeError:
202 return False
205async def _run_coros_in_chunks(
206 coros,
207 batch_size=None,
208 callback=DEFAULT_CALLBACK,
209 timeout=None,
210 return_exceptions=False,
211 nofiles=False,
212):
213 """Run the given coroutines in chunks.
215 Parameters
216 ----------
217 coros: list of coroutines to run
218 batch_size: int or None
219 Number of coroutines to submit/wait on simultaneously.
220 If -1, then it will not be any throttling. If
221 None, it will be inferred from _get_batch_size()
222 callback: fsspec.callbacks.Callback instance
223 Gets a relative_update when each coroutine completes
224 timeout: number or None
225 If given, each coroutine times out after this time. Note that, since
226 there are multiple batches, the total run time of this function will in
227 general be longer
228 return_exceptions: bool
229 Same meaning as in asyncio.gather
230 nofiles: bool
231 If inferring the batch_size, does this operation involve local files?
232 If yes, you normally expect smaller batches.
233 """
235 if batch_size is None:
236 batch_size = _get_batch_size(nofiles=nofiles)
238 if batch_size == -1:
239 batch_size = len(coros)
241 assert batch_size > 0
243 async def _run_coro(coro, i):
244 try:
245 return await asyncio.wait_for(coro, timeout=timeout), i
246 except Exception as e:
247 if not return_exceptions:
248 raise
249 return e, i
250 finally:
251 callback.relative_update(1)
253 i = 0
254 n = len(coros)
255 results = [None] * n
256 pending = set()
258 while pending or i < n:
259 while len(pending) < batch_size and i < n:
260 pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
261 i += 1
263 if not pending:
264 break
266 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
267 while done:
268 result, k = await done.pop()
269 results[k] = result
271 return results
274# these methods should be implemented as async by any async-able backend
275async_methods = [
276 "_ls",
277 "_cat_file",
278 "_get_file",
279 "_put_file",
280 "_rm_file",
281 "_cp_file",
282 "_pipe_file",
283 "_expand_path",
284 "_info",
285 "_isfile",
286 "_isdir",
287 "_exists",
288 "_walk",
289 "_glob",
290 "_find",
291 "_du",
292 "_size",
293 "_mkdir",
294 "_makedirs",
295]
298class AsyncFileSystem(AbstractFileSystem):
299 """Async file operations, default implementations
301 Passes bulk operations to asyncio.gather for concurrent operation.
303 Implementations that have concurrent batch operations and/or async methods
304 should inherit from this class instead of AbstractFileSystem. Docstrings are
305 copied from the un-underscored method in AbstractFileSystem, if not given.
306 """
308 # note that methods do not have docstring here; they will be copied
309 # for _* methods and inferred for overridden methods.
311 async_impl = True
312 mirror_sync_methods = True
313 disable_throttling = False
315 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
316 self.asynchronous = asynchronous
317 self._pid = os.getpid()
318 if not asynchronous:
319 self._loop = loop or get_loop()
320 else:
321 self._loop = None
322 self.batch_size = batch_size
323 super().__init__(*args, **kwargs)
325 @property
326 def loop(self):
327 if self._pid != os.getpid():
328 raise RuntimeError("This class is not fork-safe")
329 return self._loop
331 async def _rm_file(self, path, **kwargs):
332 raise NotImplementedError
334 async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
335 # TODO: implement on_error
336 batch_size = batch_size or self.batch_size
337 path = await self._expand_path(path, recursive=recursive)
338 return await _run_coros_in_chunks(
339 [self._rm_file(p, **kwargs) for p in reversed(path)],
340 batch_size=batch_size,
341 nofiles=True,
342 )
344 async def _cp_file(self, path1, path2, **kwargs):
345 raise NotImplementedError
347 async def _copy(
348 self,
349 path1,
350 path2,
351 recursive=False,
352 on_error=None,
353 maxdepth=None,
354 batch_size=None,
355 **kwargs,
356 ):
357 if on_error is None and recursive:
358 on_error = "ignore"
359 elif on_error is None:
360 on_error = "raise"
362 if isinstance(path1, list) and isinstance(path2, list):
363 # No need to expand paths when both source and destination
364 # are provided as lists
365 paths1 = path1
366 paths2 = path2
367 else:
368 source_is_str = isinstance(path1, str)
369 paths1 = await self._expand_path(
370 path1, maxdepth=maxdepth, recursive=recursive
371 )
372 if source_is_str and (not recursive or maxdepth is not None):
373 # Non-recursive glob does not copy directories
374 paths1 = [
375 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
376 ]
377 if not paths1:
378 return
380 source_is_file = len(paths1) == 1
381 dest_is_dir = isinstance(path2, str) and (
382 trailing_sep(path2) or await self._isdir(path2)
383 )
385 exists = source_is_str and (
386 (has_magic(path1) and source_is_file)
387 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
388 )
389 paths2 = other_paths(
390 paths1,
391 path2,
392 exists=exists,
393 flatten=not source_is_str,
394 )
396 batch_size = batch_size or self.batch_size
397 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
398 result = await _run_coros_in_chunks(
399 coros, batch_size=batch_size, return_exceptions=True, nofiles=True
400 )
402 for ex in filter(is_exception, result):
403 if on_error == "ignore" and isinstance(ex, FileNotFoundError):
404 continue
405 raise ex
407 async def _pipe_file(self, path, value, **kwargs):
408 raise NotImplementedError
410 async def _pipe(self, path, value=None, batch_size=None, **kwargs):
411 if isinstance(path, str):
412 path = {path: value}
413 batch_size = batch_size or self.batch_size
414 return await _run_coros_in_chunks(
415 [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
416 batch_size=batch_size,
417 nofiles=True,
418 )
420 async def _process_limits(self, url, start, end):
421 """Helper for "Range"-based _cat_file"""
422 size = None
423 suff = False
424 if start is not None and start < 0:
425 # if start is negative and end None, end is the "suffix length"
426 if end is None:
427 end = -start
428 start = ""
429 suff = True
430 else:
431 size = size or (await self._info(url))["size"]
432 start = size + start
433 elif start is None:
434 start = 0
435 if not suff:
436 if end is not None and end < 0:
437 if start is not None:
438 size = size or (await self._info(url))["size"]
439 end = size + end
440 elif end is None:
441 end = ""
442 if isinstance(end, numbers.Integral):
443 end -= 1 # bytes range is inclusive
444 return f"bytes={start}-{end}"
446 async def _cat_file(self, path, start=None, end=None, **kwargs):
447 raise NotImplementedError
449 async def _cat(
450 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
451 ):
452 paths = await self._expand_path(path, recursive=recursive)
453 coros = [self._cat_file(path, **kwargs) for path in paths]
454 batch_size = batch_size or self.batch_size
455 out = await _run_coros_in_chunks(
456 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
457 )
458 if on_error == "raise":
459 ex = next(filter(is_exception, out), False)
460 if ex:
461 raise ex
462 if (
463 len(paths) > 1
464 or isinstance(path, list)
465 or paths[0] != self._strip_protocol(path)
466 ):
467 return {
468 k: v
469 for k, v in zip(paths, out)
470 if on_error != "omit" or not is_exception(v)
471 }
472 else:
473 return out[0]
475 async def _cat_ranges(
476 self,
477 paths,
478 starts,
479 ends,
480 max_gap=None,
481 batch_size=None,
482 on_error="return",
483 **kwargs,
484 ):
485 """Get the contents of byte ranges from one or more files
487 Parameters
488 ----------
489 paths: list
490 A list of of filepaths on this filesystems
491 starts, ends: int or list
492 Bytes limits of the read. If using a single int, the same value will be
493 used to read all the specified files.
494 """
495 # TODO: on_error
496 if max_gap is not None:
497 # use utils.merge_offset_ranges
498 raise NotImplementedError
499 if not isinstance(paths, list):
500 raise TypeError
501 if not isinstance(starts, Iterable):
502 starts = [starts] * len(paths)
503 if not isinstance(ends, Iterable):
504 ends = [ends] * len(paths)
505 if len(starts) != len(paths) or len(ends) != len(paths):
506 raise ValueError
507 coros = [
508 self._cat_file(p, start=s, end=e, **kwargs)
509 for p, s, e in zip(paths, starts, ends)
510 ]
511 batch_size = batch_size or self.batch_size
512 return await _run_coros_in_chunks(
513 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
514 )
516 async def _put_file(self, lpath, rpath, **kwargs):
517 raise NotImplementedError
519 async def _put(
520 self,
521 lpath,
522 rpath,
523 recursive=False,
524 callback=DEFAULT_CALLBACK,
525 batch_size=None,
526 maxdepth=None,
527 **kwargs,
528 ):
529 """Copy file(s) from local.
531 Copies a specific file or tree of files (if recursive=True). If rpath
532 ends with a "/", it will be assumed to be a directory, and target files
533 will go within.
535 The put_file method will be called concurrently on a batch of files. The
536 batch_size option can configure the amount of futures that can be executed
537 at the same time. If it is -1, then all the files will be uploaded concurrently.
538 The default can be set for this instance by passing "batch_size" in the
539 constructor, or for all instances by setting the "gather_batch_size" key
540 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
541 """
542 if isinstance(lpath, list) and isinstance(rpath, list):
543 # No need to expand paths when both source and destination
544 # are provided as lists
545 rpaths = rpath
546 lpaths = lpath
547 else:
548 source_is_str = isinstance(lpath, str)
549 if source_is_str:
550 lpath = make_path_posix(lpath)
551 fs = LocalFileSystem()
552 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
553 if source_is_str and (not recursive or maxdepth is not None):
554 # Non-recursive glob does not copy directories
555 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
556 if not lpaths:
557 return
559 source_is_file = len(lpaths) == 1
560 dest_is_dir = isinstance(rpath, str) and (
561 trailing_sep(rpath) or await self._isdir(rpath)
562 )
564 rpath = self._strip_protocol(rpath)
565 exists = source_is_str and (
566 (has_magic(lpath) and source_is_file)
567 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
568 )
569 rpaths = other_paths(
570 lpaths,
571 rpath,
572 exists=exists,
573 flatten=not source_is_str,
574 )
576 is_dir = {l: os.path.isdir(l) for l in lpaths}
577 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
578 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
580 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
581 batch_size = batch_size or self.batch_size
583 coros = []
584 callback.set_size(len(file_pairs))
585 for lfile, rfile in file_pairs:
586 put_file = callback.branch_coro(self._put_file)
587 coros.append(put_file(lfile, rfile, **kwargs))
589 return await _run_coros_in_chunks(
590 coros, batch_size=batch_size, callback=callback
591 )
593 async def _get_file(self, rpath, lpath, **kwargs):
594 raise NotImplementedError
596 async def _get(
597 self,
598 rpath,
599 lpath,
600 recursive=False,
601 callback=DEFAULT_CALLBACK,
602 maxdepth=None,
603 **kwargs,
604 ):
605 """Copy file(s) to local.
607 Copies a specific file or tree of files (if recursive=True). If lpath
608 ends with a "/", it will be assumed to be a directory, and target files
609 will go within. Can submit a list of paths, which may be glob-patterns
610 and will be expanded.
612 The get_file method will be called concurrently on a batch of files. The
613 batch_size option can configure the amount of futures that can be executed
614 at the same time. If it is -1, then all the files will be uploaded concurrently.
615 The default can be set for this instance by passing "batch_size" in the
616 constructor, or for all instances by setting the "gather_batch_size" key
617 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
618 """
619 if isinstance(lpath, list) and isinstance(rpath, list):
620 # No need to expand paths when both source and destination
621 # are provided as lists
622 rpaths = rpath
623 lpaths = lpath
624 else:
625 source_is_str = isinstance(rpath, str)
626 # First check for rpath trailing slash as _strip_protocol removes it.
627 source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
628 rpath = self._strip_protocol(rpath)
629 rpaths = await self._expand_path(
630 rpath, recursive=recursive, maxdepth=maxdepth
631 )
632 if source_is_str and (not recursive or maxdepth is not None):
633 # Non-recursive glob does not copy directories
634 rpaths = [
635 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
636 ]
637 if not rpaths:
638 return
640 lpath = make_path_posix(lpath)
641 source_is_file = len(rpaths) == 1
642 dest_is_dir = isinstance(lpath, str) and (
643 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
644 )
646 exists = source_is_str and (
647 (has_magic(rpath) and source_is_file)
648 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
649 )
650 lpaths = other_paths(
651 rpaths,
652 lpath,
653 exists=exists,
654 flatten=not source_is_str,
655 )
657 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
658 batch_size = kwargs.pop("batch_size", self.batch_size)
660 coros = []
661 callback.set_size(len(lpaths))
662 for lpath, rpath in zip(lpaths, rpaths):
663 get_file = callback.branch_coro(self._get_file)
664 coros.append(get_file(rpath, lpath, **kwargs))
665 return await _run_coros_in_chunks(
666 coros, batch_size=batch_size, callback=callback
667 )
669 async def _isfile(self, path):
670 try:
671 return (await self._info(path))["type"] == "file"
672 except: # noqa: E722
673 return False
675 async def _isdir(self, path):
676 try:
677 return (await self._info(path))["type"] == "directory"
678 except OSError:
679 return False
681 async def _size(self, path):
682 return (await self._info(path)).get("size", None)
684 async def _sizes(self, paths, batch_size=None):
685 batch_size = batch_size or self.batch_size
686 return await _run_coros_in_chunks(
687 [self._size(p) for p in paths], batch_size=batch_size
688 )
690 async def _exists(self, path, **kwargs):
691 try:
692 await self._info(path, **kwargs)
693 return True
694 except FileNotFoundError:
695 return False
697 async def _info(self, path, **kwargs):
698 raise NotImplementedError
700 async def _ls(self, path, detail=True, **kwargs):
701 raise NotImplementedError
703 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
704 if maxdepth is not None and maxdepth < 1:
705 raise ValueError("maxdepth must be at least 1")
707 path = self._strip_protocol(path)
708 full_dirs = {}
709 dirs = {}
710 files = {}
712 detail = kwargs.pop("detail", False)
713 try:
714 listing = await self._ls(path, detail=True, **kwargs)
715 except (FileNotFoundError, OSError) as e:
716 if on_error == "raise":
717 raise
718 elif callable(on_error):
719 on_error(e)
720 if detail:
721 yield path, {}, {}
722 else:
723 yield path, [], []
724 return
726 for info in listing:
727 # each info name must be at least [path]/part , but here
728 # we check also for names like [path]/part/
729 pathname = info["name"].rstrip("/")
730 name = pathname.rsplit("/", 1)[-1]
731 if info["type"] == "directory" and pathname != path:
732 # do not include "self" path
733 full_dirs[name] = pathname
734 dirs[name] = info
735 elif pathname == path:
736 # file-like with same name as give path
737 files[""] = info
738 else:
739 files[name] = info
741 if detail:
742 yield path, dirs, files
743 else:
744 yield path, list(dirs), list(files)
746 if maxdepth is not None:
747 maxdepth -= 1
748 if maxdepth < 1:
749 return
751 for d in dirs:
752 async for _ in self._walk(
753 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
754 ):
755 yield _
757 async def _glob(self, path, maxdepth=None, **kwargs):
758 if maxdepth is not None and maxdepth < 1:
759 raise ValueError("maxdepth must be at least 1")
761 import re
763 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
764 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
765 path = self._strip_protocol(path)
766 append_slash_to_dirname = ends_with_sep or path.endswith(
767 tuple(sep + "**" for sep in seps)
768 )
769 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
770 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
771 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
773 min_idx = min(idx_star, idx_qmark, idx_brace)
775 detail = kwargs.pop("detail", False)
777 if not has_magic(path):
778 if await self._exists(path, **kwargs):
779 if not detail:
780 return [path]
781 else:
782 return {path: await self._info(path, **kwargs)}
783 else:
784 if not detail:
785 return [] # glob of non-existent returns empty
786 else:
787 return {}
788 elif "/" in path[:min_idx]:
789 min_idx = path[:min_idx].rindex("/")
790 root = path[: min_idx + 1]
791 depth = path[min_idx + 1 :].count("/") + 1
792 else:
793 root = ""
794 depth = path[min_idx + 1 :].count("/") + 1
796 if "**" in path:
797 if maxdepth is not None:
798 idx_double_stars = path.find("**")
799 depth_double_stars = path[idx_double_stars:].count("/") + 1
800 depth = depth - depth_double_stars + maxdepth
801 else:
802 depth = None
804 allpaths = await self._find(
805 root, maxdepth=depth, withdirs=True, detail=True, **kwargs
806 )
808 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
809 pattern = re.compile(pattern)
811 out = {
812 p: info
813 for p, info in sorted(allpaths.items())
814 if pattern.match(
815 (
816 p + "/"
817 if append_slash_to_dirname and info["type"] == "directory"
818 else p
819 )
820 )
821 }
823 if detail:
824 return out
825 else:
826 return list(out)
828 async def _du(self, path, total=True, maxdepth=None, **kwargs):
829 sizes = {}
830 # async for?
831 for f in await self._find(path, maxdepth=maxdepth, **kwargs):
832 info = await self._info(f)
833 sizes[info["name"]] = info["size"]
834 if total:
835 return sum(sizes.values())
836 else:
837 return sizes
839 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
840 path = self._strip_protocol(path)
841 out = {}
842 detail = kwargs.pop("detail", False)
844 # Add the root directory if withdirs is requested
845 # This is needed for posix glob compliance
846 if withdirs and path != "" and await self._isdir(path):
847 out[path] = await self._info(path)
849 # async for?
850 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
851 if withdirs:
852 files.update(dirs)
853 out.update({info["name"]: info for name, info in files.items()})
854 if not out and (await self._isfile(path)):
855 # walk works on directories, but find should also return [path]
856 # when path happens to be a file
857 out[path] = {}
858 names = sorted(out)
859 if not detail:
860 return names
861 else:
862 return {name: out[name] for name in names}
864 async def _expand_path(self, path, recursive=False, maxdepth=None):
865 if maxdepth is not None and maxdepth < 1:
866 raise ValueError("maxdepth must be at least 1")
868 if isinstance(path, str):
869 out = await self._expand_path([path], recursive, maxdepth)
870 else:
871 out = set()
872 path = [self._strip_protocol(p) for p in path]
873 for p in path: # can gather here
874 if has_magic(p):
875 bit = set(await self._glob(p, maxdepth=maxdepth))
876 out |= bit
877 if recursive:
878 # glob call above expanded one depth so if maxdepth is defined
879 # then decrement it in expand_path call below. If it is zero
880 # after decrementing then avoid expand_path call.
881 if maxdepth is not None and maxdepth <= 1:
882 continue
883 out |= set(
884 await self._expand_path(
885 list(bit),
886 recursive=recursive,
887 maxdepth=maxdepth - 1 if maxdepth is not None else None,
888 )
889 )
890 continue
891 elif recursive:
892 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
893 out |= rec
894 if p not in out and (recursive is False or (await self._exists(p))):
895 # should only check once, for the root
896 out.add(p)
897 if not out:
898 raise FileNotFoundError(path)
899 return sorted(out)
901 async def _mkdir(self, path, create_parents=True, **kwargs):
902 pass # not necessary to implement, may not have directories
904 async def _makedirs(self, path, exist_ok=False):
905 pass # not necessary to implement, may not have directories
907 async def open_async(self, path, mode="rb", **kwargs):
908 if "b" not in mode or kwargs.get("compression"):
909 raise ValueError
910 raise NotImplementedError
913def mirror_sync_methods(obj):
914 """Populate sync and async methods for obj
916 For each method will create a sync version if the name refers to an async method
917 (coroutine) and there is no override in the child class; will create an async
918 method for the corresponding sync method if there is no implementation.
920 Uses the methods specified in
921 - async_methods: the set that an implementation is expected to provide
922 - default_async_methods: that can be derived from their sync version in
923 AbstractFileSystem
924 - AsyncFileSystem: async-specific default coroutines
925 """
926 from fsspec import AbstractFileSystem
928 for method in async_methods + dir(AsyncFileSystem):
929 if not method.startswith("_"):
930 continue
931 smethod = method[1:]
932 if private.match(method):
933 isco = inspect.iscoroutinefunction(getattr(obj, method, None))
934 unsync = getattr(getattr(obj, smethod, False), "__func__", None)
935 is_default = unsync is getattr(AbstractFileSystem, smethod, "")
936 if isco and is_default:
937 mth = sync_wrapper(getattr(obj, method), obj=obj)
938 setattr(obj, smethod, mth)
939 if not mth.__doc__:
940 mth.__doc__ = getattr(
941 getattr(AbstractFileSystem, smethod, None), "__doc__", ""
942 )
945class FSSpecCoroutineCancel(Exception):
946 pass
949def _dump_running_tasks(
950 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
951):
952 import traceback
954 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
955 if printout:
956 [task.print_stack() for task in tasks]
957 out = [
958 {
959 "locals": task._coro.cr_frame.f_locals,
960 "file": task._coro.cr_frame.f_code.co_filename,
961 "firstline": task._coro.cr_frame.f_code.co_firstlineno,
962 "linelo": task._coro.cr_frame.f_lineno,
963 "stack": traceback.format_stack(task._coro.cr_frame),
964 "task": task if with_task else None,
965 }
966 for task in tasks
967 ]
968 if cancel:
969 for t in tasks:
970 cbs = t._callbacks
971 t.cancel()
972 asyncio.futures.Future.set_exception(t, exc)
973 asyncio.futures.Future.cancel(t)
974 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
975 try:
976 t._coro.throw(exc) # exits coro, unless explicitly handled
977 except exc:
978 pass
979 return out
982class AbstractAsyncStreamedFile(AbstractBufferedFile):
983 # no read buffering, and always auto-commit
984 # TODO: readahead might still be useful here, but needs async version
986 async def read(self, length=-1):
987 """
988 Return data from cache, or fetch pieces as necessary
990 Parameters
991 ----------
992 length: int (-1)
993 Number of bytes to read; if <0, all remaining bytes.
994 """
995 length = -1 if length is None else int(length)
996 if self.mode != "rb":
997 raise ValueError("File not in read mode")
998 if length < 0:
999 length = self.size - self.loc
1000 if self.closed:
1001 raise ValueError("I/O operation on closed file.")
1002 if length == 0:
1003 # don't even bother calling fetch
1004 return b""
1005 out = await self._fetch_range(self.loc, self.loc + length)
1006 self.loc += len(out)
1007 return out
1009 async def write(self, data):
1010 """
1011 Write data to buffer.
1013 Buffer only sent on flush() or if buffer is greater than
1014 or equal to blocksize.
1016 Parameters
1017 ----------
1018 data: bytes
1019 Set of bytes to be written.
1020 """
1021 if self.mode not in {"wb", "ab"}:
1022 raise ValueError("File not in write mode")
1023 if self.closed:
1024 raise ValueError("I/O operation on closed file.")
1025 if self.forced:
1026 raise ValueError("This file has been force-flushed, can only close")
1027 out = self.buffer.write(data)
1028 self.loc += out
1029 if self.buffer.tell() >= self.blocksize:
1030 await self.flush()
1031 return out
1033 async def close(self):
1034 """Close file
1036 Finalizes writes, discards cache
1037 """
1038 if getattr(self, "_unclosable", False):
1039 return
1040 if self.closed:
1041 return
1042 if self.mode == "rb":
1043 self.cache = None
1044 else:
1045 if not self.forced:
1046 await self.flush(force=True)
1048 if self.fs is not None:
1049 self.fs.invalidate_cache(self.path)
1050 self.fs.invalidate_cache(self.fs._parent(self.path))
1052 self.closed = True
1054 async def flush(self, force=False):
1055 if self.closed:
1056 raise ValueError("Flush on closed file")
1057 if force and self.forced:
1058 raise ValueError("Force flush cannot be called more than once")
1059 if force:
1060 self.forced = True
1062 if self.mode not in {"wb", "ab"}:
1063 # no-op to flush on read-mode
1064 return
1066 if not force and self.buffer.tell() < self.blocksize:
1067 # Defer write on small block
1068 return
1070 if self.offset is None:
1071 # Initialize a multipart upload
1072 self.offset = 0
1073 try:
1074 await self._initiate_upload()
1075 except: # noqa: E722
1076 self.closed = True
1077 raise
1079 if await self._upload_chunk(final=force) is not False:
1080 self.offset += self.buffer.seek(0, 2)
1081 self.buffer = io.BytesIO()
1083 async def __aenter__(self):
1084 return self
1086 async def __aexit__(self, exc_type, exc_val, exc_tb):
1087 await self.close()
1089 async def _fetch_range(self, start, end):
1090 raise NotImplementedError
1092 async def _initiate_upload(self):
1093 pass
1095 async def _upload_chunk(self, final=False):
1096 raise NotImplementedError