Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.events
3import functools
4import inspect
5import io
6import numbers
7import os
8import re
9import threading
10from glob import has_magic
11from typing import TYPE_CHECKING, Iterable
13from .callbacks import DEFAULT_CALLBACK
14from .exceptions import FSTimeoutError
15from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
16from .spec import AbstractBufferedFile, AbstractFileSystem
17from .utils import glob_translate, is_exception, other_paths
19private = re.compile("_[^_]")
20iothread = [None] # dedicated fsspec IO thread
21loop = [None] # global event loop for any non-async instance
22_lock = None # global lock placeholder
23get_running_loop = asyncio.get_running_loop
26def get_lock():
27 """Allocate or return a threading lock.
29 The lock is allocated on first use to allow setting one lock per forked process.
30 """
31 global _lock
32 if not _lock:
33 _lock = threading.Lock()
34 return _lock
37def reset_lock():
38 """Reset the global lock.
40 This should be called only on the init of a forked process to reset the lock to
41 None, enabling the new forked process to get a new lock.
42 """
43 global _lock
45 iothread[0] = None
46 loop[0] = None
47 _lock = None
50async def _runner(event, coro, result, timeout=None):
51 timeout = timeout if timeout else None # convert 0 or 0.0 to None
52 if timeout is not None:
53 coro = asyncio.wait_for(coro, timeout=timeout)
54 try:
55 result[0] = await coro
56 except Exception as ex:
57 result[0] = ex
58 finally:
59 event.set()
62def sync(loop, func, *args, timeout=None, **kwargs):
63 """
64 Make loop run coroutine until it returns. Runs in other thread
66 Examples
67 --------
68 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
69 timeout=timeout, **kwargs)
70 """
71 timeout = timeout if timeout else None # convert 0 or 0.0 to None
72 # NB: if the loop is not running *yet*, it is OK to submit work
73 # and we will wait for it
74 if loop is None or loop.is_closed():
75 raise RuntimeError("Loop is not running")
76 try:
77 loop0 = asyncio.events.get_running_loop()
78 if loop0 is loop:
79 raise NotImplementedError("Calling sync() from within a running loop")
80 except NotImplementedError:
81 raise
82 except RuntimeError:
83 pass
84 coro = func(*args, **kwargs)
85 result = [None]
86 event = threading.Event()
87 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
88 while True:
89 # this loops allows thread to get interrupted
90 if event.wait(1):
91 break
92 if timeout is not None:
93 timeout -= 1
94 if timeout < 0:
95 raise FSTimeoutError
97 return_result = result[0]
98 if isinstance(return_result, asyncio.TimeoutError):
99 # suppress asyncio.TimeoutError, raise FSTimeoutError
100 raise FSTimeoutError from return_result
101 elif isinstance(return_result, BaseException):
102 raise return_result
103 else:
104 return return_result
107def sync_wrapper(func, obj=None):
108 """Given a function, make so can be called in blocking contexts
110 Leave obj=None if defining within a class. Pass the instance if attaching
111 as an attribute of the instance.
112 """
114 @functools.wraps(func)
115 def wrapper(*args, **kwargs):
116 self = obj or args[0]
117 return sync(self.loop, func, *args, **kwargs)
119 return wrapper
122def get_loop():
123 """Create or return the default fsspec IO loop
125 The loop will be running on a separate thread.
126 """
127 if loop[0] is None:
128 with get_lock():
129 # repeat the check just in case the loop got filled between the
130 # previous two calls from another thread
131 if loop[0] is None:
132 loop[0] = asyncio.new_event_loop()
133 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
134 th.daemon = True
135 th.start()
136 iothread[0] = th
137 return loop[0]
140def reset_after_fork():
141 global lock
142 loop[0] = None
143 iothread[0] = None
144 lock = None
147if hasattr(os, "register_at_fork"):
148 # should be posix; this will do nothing for spawn or forkserver subprocesses
149 os.register_at_fork(after_in_child=reset_after_fork)
152if TYPE_CHECKING:
153 import resource
155 ResourceError = resource.error
156else:
157 try:
158 import resource
159 except ImportError:
160 resource = None
161 ResourceError = OSError
162 else:
163 ResourceError = getattr(resource, "error", OSError)
165_DEFAULT_BATCH_SIZE = 128
166_NOFILES_DEFAULT_BATCH_SIZE = 1280
169def _get_batch_size(nofiles=False):
170 from fsspec.config import conf
172 if nofiles:
173 if "nofiles_gather_batch_size" in conf:
174 return conf["nofiles_gather_batch_size"]
175 else:
176 if "gather_batch_size" in conf:
177 return conf["gather_batch_size"]
178 if nofiles:
179 return _NOFILES_DEFAULT_BATCH_SIZE
180 if resource is None:
181 return _DEFAULT_BATCH_SIZE
183 try:
184 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
185 except (ImportError, ValueError, ResourceError):
186 return _DEFAULT_BATCH_SIZE
188 if soft_limit == resource.RLIM_INFINITY:
189 return -1
190 else:
191 return soft_limit // 8
194def running_async() -> bool:
195 """Being executed by an event loop?"""
196 try:
197 asyncio.get_running_loop()
198 return True
199 except RuntimeError:
200 return False
203async def _run_coros_in_chunks(
204 coros,
205 batch_size=None,
206 callback=DEFAULT_CALLBACK,
207 timeout=None,
208 return_exceptions=False,
209 nofiles=False,
210):
211 """Run the given coroutines in chunks.
213 Parameters
214 ----------
215 coros: list of coroutines to run
216 batch_size: int or None
217 Number of coroutines to submit/wait on simultaneously.
218 If -1, then it will not be any throttling. If
219 None, it will be inferred from _get_batch_size()
220 callback: fsspec.callbacks.Callback instance
221 Gets a relative_update when each coroutine completes
222 timeout: number or None
223 If given, each coroutine times out after this time. Note that, since
224 there are multiple batches, the total run time of this function will in
225 general be longer
226 return_exceptions: bool
227 Same meaning as in asyncio.gather
228 nofiles: bool
229 If inferring the batch_size, does this operation involve local files?
230 If yes, you normally expect smaller batches.
231 """
233 if batch_size is None:
234 batch_size = _get_batch_size(nofiles=nofiles)
236 if batch_size == -1:
237 batch_size = len(coros)
239 assert batch_size > 0
241 async def _run_coro(coro, i):
242 try:
243 return await asyncio.wait_for(coro, timeout=timeout), i
244 except Exception as e:
245 if not return_exceptions:
246 raise
247 return e, i
248 finally:
249 callback.relative_update(1)
251 i = 0
252 n = len(coros)
253 results = [None] * n
254 pending = set()
256 while pending or i < n:
257 while len(pending) < batch_size and i < n:
258 pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
259 i += 1
261 if not pending:
262 break
264 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
265 while done:
266 result, k = await done.pop()
267 results[k] = result
269 return results
272# these methods should be implemented as async by any async-able backend
273async_methods = [
274 "_ls",
275 "_cat_file",
276 "_get_file",
277 "_put_file",
278 "_rm_file",
279 "_cp_file",
280 "_pipe_file",
281 "_expand_path",
282 "_info",
283 "_isfile",
284 "_isdir",
285 "_exists",
286 "_walk",
287 "_glob",
288 "_find",
289 "_du",
290 "_size",
291 "_mkdir",
292 "_makedirs",
293]
296class AsyncFileSystem(AbstractFileSystem):
297 """Async file operations, default implementations
299 Passes bulk operations to asyncio.gather for concurrent operation.
301 Implementations that have concurrent batch operations and/or async methods
302 should inherit from this class instead of AbstractFileSystem. Docstrings are
303 copied from the un-underscored method in AbstractFileSystem, if not given.
304 """
306 # note that methods do not have docstring here; they will be copied
307 # for _* methods and inferred for overridden methods.
309 async_impl = True
310 mirror_sync_methods = True
311 disable_throttling = False
313 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
314 self.asynchronous = asynchronous
315 self._pid = os.getpid()
316 if not asynchronous:
317 self._loop = loop or get_loop()
318 else:
319 self._loop = None
320 self.batch_size = batch_size
321 super().__init__(*args, **kwargs)
323 @property
324 def loop(self):
325 if self._pid != os.getpid():
326 raise RuntimeError("This class is not fork-safe")
327 return self._loop
329 async def _rm_file(self, path, **kwargs):
330 raise NotImplementedError
332 async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
333 # TODO: implement on_error
334 batch_size = batch_size or self.batch_size
335 path = await self._expand_path(path, recursive=recursive)
336 return await _run_coros_in_chunks(
337 [self._rm_file(p, **kwargs) for p in reversed(path)],
338 batch_size=batch_size,
339 nofiles=True,
340 )
342 async def _cp_file(self, path1, path2, **kwargs):
343 raise NotImplementedError
345 async def _mv_file(self, path1, path2):
346 await self._cp_file(path1, path2)
347 await self._rm_file(path1)
349 async def _copy(
350 self,
351 path1,
352 path2,
353 recursive=False,
354 on_error=None,
355 maxdepth=None,
356 batch_size=None,
357 **kwargs,
358 ):
359 if on_error is None and recursive:
360 on_error = "ignore"
361 elif on_error is None:
362 on_error = "raise"
364 if isinstance(path1, list) and isinstance(path2, list):
365 # No need to expand paths when both source and destination
366 # are provided as lists
367 paths1 = path1
368 paths2 = path2
369 else:
370 source_is_str = isinstance(path1, str)
371 paths1 = await self._expand_path(
372 path1, maxdepth=maxdepth, recursive=recursive
373 )
374 if source_is_str and (not recursive or maxdepth is not None):
375 # Non-recursive glob does not copy directories
376 paths1 = [
377 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
378 ]
379 if not paths1:
380 return
382 source_is_file = len(paths1) == 1
383 dest_is_dir = isinstance(path2, str) and (
384 trailing_sep(path2) or await self._isdir(path2)
385 )
387 exists = source_is_str and (
388 (has_magic(path1) and source_is_file)
389 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
390 )
391 paths2 = other_paths(
392 paths1,
393 path2,
394 exists=exists,
395 flatten=not source_is_str,
396 )
398 batch_size = batch_size or self.batch_size
399 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
400 result = await _run_coros_in_chunks(
401 coros, batch_size=batch_size, return_exceptions=True, nofiles=True
402 )
404 for ex in filter(is_exception, result):
405 if on_error == "ignore" and isinstance(ex, FileNotFoundError):
406 continue
407 raise ex
409 async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
410 raise NotImplementedError
412 async def _pipe(self, path, value=None, batch_size=None, **kwargs):
413 if isinstance(path, str):
414 path = {path: value}
415 batch_size = batch_size or self.batch_size
416 return await _run_coros_in_chunks(
417 [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
418 batch_size=batch_size,
419 nofiles=True,
420 )
422 async def _process_limits(self, url, start, end):
423 """Helper for "Range"-based _cat_file"""
424 size = None
425 suff = False
426 if start is not None and start < 0:
427 # if start is negative and end None, end is the "suffix length"
428 if end is None:
429 end = -start
430 start = ""
431 suff = True
432 else:
433 size = size or (await self._info(url))["size"]
434 start = size + start
435 elif start is None:
436 start = 0
437 if not suff:
438 if end is not None and end < 0:
439 if start is not None:
440 size = size or (await self._info(url))["size"]
441 end = size + end
442 elif end is None:
443 end = ""
444 if isinstance(end, numbers.Integral):
445 end -= 1 # bytes range is inclusive
446 return f"bytes={start}-{end}"
448 async def _cat_file(self, path, start=None, end=None, **kwargs):
449 raise NotImplementedError
451 async def _cat(
452 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
453 ):
454 paths = await self._expand_path(path, recursive=recursive)
455 coros = [self._cat_file(path, **kwargs) for path in paths]
456 batch_size = batch_size or self.batch_size
457 out = await _run_coros_in_chunks(
458 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
459 )
460 if on_error == "raise":
461 ex = next(filter(is_exception, out), False)
462 if ex:
463 raise ex
464 if (
465 len(paths) > 1
466 or isinstance(path, list)
467 or paths[0] != self._strip_protocol(path)
468 ):
469 return {
470 k: v
471 for k, v in zip(paths, out)
472 if on_error != "omit" or not is_exception(v)
473 }
474 else:
475 return out[0]
477 async def _cat_ranges(
478 self,
479 paths,
480 starts,
481 ends,
482 max_gap=None,
483 batch_size=None,
484 on_error="return",
485 **kwargs,
486 ):
487 """Get the contents of byte ranges from one or more files
489 Parameters
490 ----------
491 paths: list
492 A list of of filepaths on this filesystems
493 starts, ends: int or list
494 Bytes limits of the read. If using a single int, the same value will be
495 used to read all the specified files.
496 """
497 # TODO: on_error
498 if max_gap is not None:
499 # use utils.merge_offset_ranges
500 raise NotImplementedError
501 if not isinstance(paths, list):
502 raise TypeError
503 if not isinstance(starts, Iterable):
504 starts = [starts] * len(paths)
505 if not isinstance(ends, Iterable):
506 ends = [ends] * len(paths)
507 if len(starts) != len(paths) or len(ends) != len(paths):
508 raise ValueError
509 coros = [
510 self._cat_file(p, start=s, end=e, **kwargs)
511 for p, s, e in zip(paths, starts, ends)
512 ]
513 batch_size = batch_size or self.batch_size
514 return await _run_coros_in_chunks(
515 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
516 )
518 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
519 raise NotImplementedError
521 async def _put(
522 self,
523 lpath,
524 rpath,
525 recursive=False,
526 callback=DEFAULT_CALLBACK,
527 batch_size=None,
528 maxdepth=None,
529 **kwargs,
530 ):
531 """Copy file(s) from local.
533 Copies a specific file or tree of files (if recursive=True). If rpath
534 ends with a "/", it will be assumed to be a directory, and target files
535 will go within.
537 The put_file method will be called concurrently on a batch of files. The
538 batch_size option can configure the amount of futures that can be executed
539 at the same time. If it is -1, then all the files will be uploaded concurrently.
540 The default can be set for this instance by passing "batch_size" in the
541 constructor, or for all instances by setting the "gather_batch_size" key
542 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
543 """
544 if isinstance(lpath, list) and isinstance(rpath, list):
545 # No need to expand paths when both source and destination
546 # are provided as lists
547 rpaths = rpath
548 lpaths = lpath
549 else:
550 source_is_str = isinstance(lpath, str)
551 if source_is_str:
552 lpath = make_path_posix(lpath)
553 fs = LocalFileSystem()
554 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
555 if source_is_str and (not recursive or maxdepth is not None):
556 # Non-recursive glob does not copy directories
557 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
558 if not lpaths:
559 return
561 source_is_file = len(lpaths) == 1
562 dest_is_dir = isinstance(rpath, str) and (
563 trailing_sep(rpath) or await self._isdir(rpath)
564 )
566 rpath = self._strip_protocol(rpath)
567 exists = source_is_str and (
568 (has_magic(lpath) and source_is_file)
569 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
570 )
571 rpaths = other_paths(
572 lpaths,
573 rpath,
574 exists=exists,
575 flatten=not source_is_str,
576 )
578 is_dir = {l: os.path.isdir(l) for l in lpaths}
579 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
580 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
582 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
583 batch_size = batch_size or self.batch_size
585 coros = []
586 callback.set_size(len(file_pairs))
587 for lfile, rfile in file_pairs:
588 put_file = callback.branch_coro(self._put_file)
589 coros.append(put_file(lfile, rfile, **kwargs))
591 return await _run_coros_in_chunks(
592 coros, batch_size=batch_size, callback=callback
593 )
595 async def _get_file(self, rpath, lpath, **kwargs):
596 raise NotImplementedError
598 async def _get(
599 self,
600 rpath,
601 lpath,
602 recursive=False,
603 callback=DEFAULT_CALLBACK,
604 maxdepth=None,
605 **kwargs,
606 ):
607 """Copy file(s) to local.
609 Copies a specific file or tree of files (if recursive=True). If lpath
610 ends with a "/", it will be assumed to be a directory, and target files
611 will go within. Can submit a list of paths, which may be glob-patterns
612 and will be expanded.
614 The get_file method will be called concurrently on a batch of files. The
615 batch_size option can configure the amount of futures that can be executed
616 at the same time. If it is -1, then all the files will be uploaded concurrently.
617 The default can be set for this instance by passing "batch_size" in the
618 constructor, or for all instances by setting the "gather_batch_size" key
619 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
620 """
621 if isinstance(lpath, list) and isinstance(rpath, list):
622 # No need to expand paths when both source and destination
623 # are provided as lists
624 rpaths = rpath
625 lpaths = lpath
626 else:
627 source_is_str = isinstance(rpath, str)
628 # First check for rpath trailing slash as _strip_protocol removes it.
629 source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
630 rpath = self._strip_protocol(rpath)
631 rpaths = await self._expand_path(
632 rpath, recursive=recursive, maxdepth=maxdepth
633 )
634 if source_is_str and (not recursive or maxdepth is not None):
635 # Non-recursive glob does not copy directories
636 rpaths = [
637 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
638 ]
639 if not rpaths:
640 return
642 lpath = make_path_posix(lpath)
643 source_is_file = len(rpaths) == 1
644 dest_is_dir = isinstance(lpath, str) and (
645 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
646 )
648 exists = source_is_str and (
649 (has_magic(rpath) and source_is_file)
650 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
651 )
652 lpaths = other_paths(
653 rpaths,
654 lpath,
655 exists=exists,
656 flatten=not source_is_str,
657 )
659 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
660 batch_size = kwargs.pop("batch_size", self.batch_size)
662 coros = []
663 callback.set_size(len(lpaths))
664 for lpath, rpath in zip(lpaths, rpaths):
665 get_file = callback.branch_coro(self._get_file)
666 coros.append(get_file(rpath, lpath, **kwargs))
667 return await _run_coros_in_chunks(
668 coros, batch_size=batch_size, callback=callback
669 )
671 async def _isfile(self, path):
672 try:
673 return (await self._info(path))["type"] == "file"
674 except: # noqa: E722
675 return False
677 async def _isdir(self, path):
678 try:
679 return (await self._info(path))["type"] == "directory"
680 except OSError:
681 return False
683 async def _size(self, path):
684 return (await self._info(path)).get("size", None)
686 async def _sizes(self, paths, batch_size=None):
687 batch_size = batch_size or self.batch_size
688 return await _run_coros_in_chunks(
689 [self._size(p) for p in paths], batch_size=batch_size
690 )
692 async def _exists(self, path, **kwargs):
693 try:
694 await self._info(path, **kwargs)
695 return True
696 except FileNotFoundError:
697 return False
699 async def _info(self, path, **kwargs):
700 raise NotImplementedError
702 async def _ls(self, path, detail=True, **kwargs):
703 raise NotImplementedError
705 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
706 if maxdepth is not None and maxdepth < 1:
707 raise ValueError("maxdepth must be at least 1")
709 path = self._strip_protocol(path)
710 full_dirs = {}
711 dirs = {}
712 files = {}
714 detail = kwargs.pop("detail", False)
715 try:
716 listing = await self._ls(path, detail=True, **kwargs)
717 except (FileNotFoundError, OSError) as e:
718 if on_error == "raise":
719 raise
720 elif callable(on_error):
721 on_error(e)
722 if detail:
723 yield path, {}, {}
724 else:
725 yield path, [], []
726 return
728 for info in listing:
729 # each info name must be at least [path]/part , but here
730 # we check also for names like [path]/part/
731 pathname = info["name"].rstrip("/")
732 name = pathname.rsplit("/", 1)[-1]
733 if info["type"] == "directory" and pathname != path:
734 # do not include "self" path
735 full_dirs[name] = pathname
736 dirs[name] = info
737 elif pathname == path:
738 # file-like with same name as give path
739 files[""] = info
740 else:
741 files[name] = info
743 if detail:
744 yield path, dirs, files
745 else:
746 yield path, list(dirs), list(files)
748 if maxdepth is not None:
749 maxdepth -= 1
750 if maxdepth < 1:
751 return
753 for d in dirs:
754 async for _ in self._walk(
755 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
756 ):
757 yield _
759 async def _glob(self, path, maxdepth=None, **kwargs):
760 if maxdepth is not None and maxdepth < 1:
761 raise ValueError("maxdepth must be at least 1")
763 import re
765 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
766 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
767 path = self._strip_protocol(path)
768 append_slash_to_dirname = ends_with_sep or path.endswith(
769 tuple(sep + "**" for sep in seps)
770 )
771 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
772 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
773 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
775 min_idx = min(idx_star, idx_qmark, idx_brace)
777 detail = kwargs.pop("detail", False)
779 if not has_magic(path):
780 if await self._exists(path, **kwargs):
781 if not detail:
782 return [path]
783 else:
784 return {path: await self._info(path, **kwargs)}
785 else:
786 if not detail:
787 return [] # glob of non-existent returns empty
788 else:
789 return {}
790 elif "/" in path[:min_idx]:
791 min_idx = path[:min_idx].rindex("/")
792 root = path[: min_idx + 1]
793 depth = path[min_idx + 1 :].count("/") + 1
794 else:
795 root = ""
796 depth = path[min_idx + 1 :].count("/") + 1
798 if "**" in path:
799 if maxdepth is not None:
800 idx_double_stars = path.find("**")
801 depth_double_stars = path[idx_double_stars:].count("/") + 1
802 depth = depth - depth_double_stars + maxdepth
803 else:
804 depth = None
806 allpaths = await self._find(
807 root, maxdepth=depth, withdirs=True, detail=True, **kwargs
808 )
810 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
811 pattern = re.compile(pattern)
813 out = {
814 p: info
815 for p, info in sorted(allpaths.items())
816 if pattern.match(
817 p + "/"
818 if append_slash_to_dirname and info["type"] == "directory"
819 else p
820 )
821 }
823 if detail:
824 return out
825 else:
826 return list(out)
828 async def _du(self, path, total=True, maxdepth=None, **kwargs):
829 sizes = {}
830 # async for?
831 for f in await self._find(path, maxdepth=maxdepth, **kwargs):
832 info = await self._info(f)
833 sizes[info["name"]] = info["size"]
834 if total:
835 return sum(sizes.values())
836 else:
837 return sizes
839 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
840 path = self._strip_protocol(path)
841 out = {}
842 detail = kwargs.pop("detail", False)
844 # Add the root directory if withdirs is requested
845 # This is needed for posix glob compliance
846 if withdirs and path != "" and await self._isdir(path):
847 out[path] = await self._info(path)
849 # async for?
850 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
851 if withdirs:
852 files.update(dirs)
853 out.update({info["name"]: info for name, info in files.items()})
854 if not out and (await self._isfile(path)):
855 # walk works on directories, but find should also return [path]
856 # when path happens to be a file
857 out[path] = {}
858 names = sorted(out)
859 if not detail:
860 return names
861 else:
862 return {name: out[name] for name in names}
864 async def _expand_path(self, path, recursive=False, maxdepth=None):
865 if maxdepth is not None and maxdepth < 1:
866 raise ValueError("maxdepth must be at least 1")
868 if isinstance(path, str):
869 out = await self._expand_path([path], recursive, maxdepth)
870 else:
871 out = set()
872 path = [self._strip_protocol(p) for p in path]
873 for p in path: # can gather here
874 if has_magic(p):
875 bit = set(await self._glob(p, maxdepth=maxdepth))
876 out |= bit
877 if recursive:
878 # glob call above expanded one depth so if maxdepth is defined
879 # then decrement it in expand_path call below. If it is zero
880 # after decrementing then avoid expand_path call.
881 if maxdepth is not None and maxdepth <= 1:
882 continue
883 out |= set(
884 await self._expand_path(
885 list(bit),
886 recursive=recursive,
887 maxdepth=maxdepth - 1 if maxdepth is not None else None,
888 )
889 )
890 continue
891 elif recursive:
892 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
893 out |= rec
894 if p not in out and (recursive is False or (await self._exists(p))):
895 # should only check once, for the root
896 out.add(p)
897 if not out:
898 raise FileNotFoundError(path)
899 return sorted(out)
901 async def _mkdir(self, path, create_parents=True, **kwargs):
902 pass # not necessary to implement, may not have directories
904 async def _makedirs(self, path, exist_ok=False):
905 pass # not necessary to implement, may not have directories
907 async def open_async(self, path, mode="rb", **kwargs):
908 if "b" not in mode or kwargs.get("compression"):
909 raise ValueError
910 raise NotImplementedError
913def mirror_sync_methods(obj):
914 """Populate sync and async methods for obj
916 For each method will create a sync version if the name refers to an async method
917 (coroutine) and there is no override in the child class; will create an async
918 method for the corresponding sync method if there is no implementation.
920 Uses the methods specified in
921 - async_methods: the set that an implementation is expected to provide
922 - default_async_methods: that can be derived from their sync version in
923 AbstractFileSystem
924 - AsyncFileSystem: async-specific default coroutines
925 """
926 from fsspec import AbstractFileSystem
928 for method in async_methods + dir(AsyncFileSystem):
929 if not method.startswith("_"):
930 continue
931 smethod = method[1:]
932 if private.match(method):
933 isco = inspect.iscoroutinefunction(getattr(obj, method, None))
934 unsync = getattr(getattr(obj, smethod, False), "__func__", None)
935 is_default = unsync is getattr(AbstractFileSystem, smethod, "")
936 if isco and is_default:
937 mth = sync_wrapper(getattr(obj, method), obj=obj)
938 setattr(obj, smethod, mth)
939 if not mth.__doc__:
940 mth.__doc__ = getattr(
941 getattr(AbstractFileSystem, smethod, None), "__doc__", ""
942 )
945class FSSpecCoroutineCancel(Exception):
946 pass
949def _dump_running_tasks(
950 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
951):
952 import traceback
954 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
955 if printout:
956 [task.print_stack() for task in tasks]
957 out = [
958 {
959 "locals": task._coro.cr_frame.f_locals,
960 "file": task._coro.cr_frame.f_code.co_filename,
961 "firstline": task._coro.cr_frame.f_code.co_firstlineno,
962 "linelo": task._coro.cr_frame.f_lineno,
963 "stack": traceback.format_stack(task._coro.cr_frame),
964 "task": task if with_task else None,
965 }
966 for task in tasks
967 ]
968 if cancel:
969 for t in tasks:
970 cbs = t._callbacks
971 t.cancel()
972 asyncio.futures.Future.set_exception(t, exc)
973 asyncio.futures.Future.cancel(t)
974 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
975 try:
976 t._coro.throw(exc) # exits coro, unless explicitly handled
977 except exc:
978 pass
979 return out
982class AbstractAsyncStreamedFile(AbstractBufferedFile):
983 # no read buffering, and always auto-commit
984 # TODO: readahead might still be useful here, but needs async version
986 async def read(self, length=-1):
987 """
988 Return data from cache, or fetch pieces as necessary
990 Parameters
991 ----------
992 length: int (-1)
993 Number of bytes to read; if <0, all remaining bytes.
994 """
995 length = -1 if length is None else int(length)
996 if self.mode != "rb":
997 raise ValueError("File not in read mode")
998 if length < 0:
999 length = self.size - self.loc
1000 if self.closed:
1001 raise ValueError("I/O operation on closed file.")
1002 if length == 0:
1003 # don't even bother calling fetch
1004 return b""
1005 out = await self._fetch_range(self.loc, self.loc + length)
1006 self.loc += len(out)
1007 return out
1009 async def write(self, data):
1010 """
1011 Write data to buffer.
1013 Buffer only sent on flush() or if buffer is greater than
1014 or equal to blocksize.
1016 Parameters
1017 ----------
1018 data: bytes
1019 Set of bytes to be written.
1020 """
1021 if self.mode not in {"wb", "ab"}:
1022 raise ValueError("File not in write mode")
1023 if self.closed:
1024 raise ValueError("I/O operation on closed file.")
1025 if self.forced:
1026 raise ValueError("This file has been force-flushed, can only close")
1027 out = self.buffer.write(data)
1028 self.loc += out
1029 if self.buffer.tell() >= self.blocksize:
1030 await self.flush()
1031 return out
1033 async def close(self):
1034 """Close file
1036 Finalizes writes, discards cache
1037 """
1038 if getattr(self, "_unclosable", False):
1039 return
1040 if self.closed:
1041 return
1042 if self.mode == "rb":
1043 self.cache = None
1044 else:
1045 if not self.forced:
1046 await self.flush(force=True)
1048 if self.fs is not None:
1049 self.fs.invalidate_cache(self.path)
1050 self.fs.invalidate_cache(self.fs._parent(self.path))
1052 self.closed = True
1054 async def flush(self, force=False):
1055 if self.closed:
1056 raise ValueError("Flush on closed file")
1057 if force and self.forced:
1058 raise ValueError("Force flush cannot be called more than once")
1059 if force:
1060 self.forced = True
1062 if self.mode not in {"wb", "ab"}:
1063 # no-op to flush on read-mode
1064 return
1066 if not force and self.buffer.tell() < self.blocksize:
1067 # Defer write on small block
1068 return
1070 if self.offset is None:
1071 # Initialize a multipart upload
1072 self.offset = 0
1073 try:
1074 await self._initiate_upload()
1075 except:
1076 self.closed = True
1077 raise
1079 if await self._upload_chunk(final=force) is not False:
1080 self.offset += self.buffer.seek(0, 2)
1081 self.buffer = io.BytesIO()
1083 async def __aenter__(self):
1084 return self
1086 async def __aexit__(self, exc_type, exc_val, exc_tb):
1087 await self.close()
1089 async def _fetch_range(self, start, end):
1090 raise NotImplementedError
1092 async def _initiate_upload(self):
1093 pass
1095 async def _upload_chunk(self, final=False):
1096 raise NotImplementedError