Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.events
3import functools
4import inspect
5import io
6import numbers
7import os
8import re
9import threading
10from collections.abc import Iterable
11from glob import has_magic
12from typing import TYPE_CHECKING
14from .callbacks import DEFAULT_CALLBACK
15from .exceptions import FSTimeoutError
16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
17from .spec import AbstractBufferedFile, AbstractFileSystem
18from .utils import glob_translate, is_exception, other_paths
20private = re.compile("_[^_]")
21iothread = [None] # dedicated fsspec IO thread
22loop = [None] # global event loop for any non-async instance
23_lock = None # global lock placeholder
24get_running_loop = asyncio.get_running_loop
27def get_lock():
28 """Allocate or return a threading lock.
30 The lock is allocated on first use to allow setting one lock per forked process.
31 """
32 global _lock
33 if not _lock:
34 _lock = threading.Lock()
35 return _lock
38def reset_lock():
39 """Reset the global lock.
41 This should be called only on the init of a forked process to reset the lock to
42 None, enabling the new forked process to get a new lock.
43 """
44 global _lock
46 iothread[0] = None
47 loop[0] = None
48 _lock = None
51async def _runner(event, coro, result, timeout=None):
52 timeout = timeout if timeout else None # convert 0 or 0.0 to None
53 if timeout is not None:
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
56 result[0] = await coro
57 except Exception as ex:
58 result[0] = ex
59 finally:
60 event.set()
63def sync(loop, func, *args, timeout=None, **kwargs):
64 """
65 Make loop run coroutine until it returns. Runs in other thread
67 Examples
68 --------
69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
70 timeout=timeout, **kwargs)
71 """
72 timeout = timeout if timeout else None # convert 0 or 0.0 to None
73 # NB: if the loop is not running *yet*, it is OK to submit work
74 # and we will wait for it
75 if loop is None or loop.is_closed():
76 raise RuntimeError("Loop is not running")
77 try:
78 loop0 = asyncio.events.get_running_loop()
79 if loop0 is loop:
80 raise NotImplementedError("Calling sync() from within a running loop")
81 except NotImplementedError:
82 raise
83 except RuntimeError:
84 pass
85 coro = func(*args, **kwargs)
86 result = [None]
87 event = threading.Event()
88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
89 while True:
90 # this loops allows thread to get interrupted
91 if event.wait(1):
92 break
93 if timeout is not None:
94 timeout -= 1
95 if timeout < 0:
96 raise FSTimeoutError
98 return_result = result[0]
99 if isinstance(return_result, asyncio.TimeoutError):
100 # suppress asyncio.TimeoutError, raise FSTimeoutError
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
103 raise return_result
104 else:
105 return return_result
108def sync_wrapper(func, obj=None):
109 """Given a function, make so can be called in blocking contexts
111 Leave obj=None if defining within a class. Pass the instance if attaching
112 as an attribute of the instance.
113 """
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
118 return sync(self.loop, func, *args, **kwargs)
120 return wrapper
123def get_loop():
124 """Create or return the default fsspec IO loop
126 The loop will be running on a separate thread.
127 """
128 if loop[0] is None:
129 with get_lock():
130 # repeat the check just in case the loop got filled between the
131 # previous two calls from another thread
132 if loop[0] is None:
133 loop[0] = asyncio.new_event_loop()
134 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
135 th.daemon = True
136 th.start()
137 iothread[0] = th
138 return loop[0]
141def reset_after_fork():
142 global lock
143 loop[0] = None
144 iothread[0] = None
145 lock = None
148if hasattr(os, "register_at_fork"):
149 # should be posix; this will do nothing for spawn or forkserver subprocesses
150 os.register_at_fork(after_in_child=reset_after_fork)
153if TYPE_CHECKING:
154 import resource
156 ResourceError = resource.error
157else:
158 try:
159 import resource
160 except ImportError:
161 resource = None
162 ResourceError = OSError
163 else:
164 ResourceError = getattr(resource, "error", OSError)
166_DEFAULT_BATCH_SIZE = 128
167_NOFILES_DEFAULT_BATCH_SIZE = 1280
170def _get_batch_size(nofiles=False):
171 from fsspec.config import conf
173 if nofiles:
174 if "nofiles_gather_batch_size" in conf:
175 return conf["nofiles_gather_batch_size"]
176 else:
177 if "gather_batch_size" in conf:
178 return conf["gather_batch_size"]
179 if nofiles:
180 return _NOFILES_DEFAULT_BATCH_SIZE
181 if resource is None:
182 return _DEFAULT_BATCH_SIZE
184 try:
185 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
186 except (ImportError, ValueError, ResourceError):
187 return _DEFAULT_BATCH_SIZE
189 if soft_limit == resource.RLIM_INFINITY:
190 return -1
191 else:
192 return soft_limit // 8
195def running_async() -> bool:
196 """Being executed by an event loop?"""
197 try:
198 asyncio.get_running_loop()
199 return True
200 except RuntimeError:
201 return False
204async def _run_coros_in_chunks(
205 coros,
206 batch_size=None,
207 callback=DEFAULT_CALLBACK,
208 timeout=None,
209 return_exceptions=False,
210 nofiles=False,
211):
212 """Run the given coroutines in chunks.
214 Parameters
215 ----------
216 coros: list of coroutines to run
217 batch_size: int or None
218 Number of coroutines to submit/wait on simultaneously.
219 If -1, then it will not be any throttling. If
220 None, it will be inferred from _get_batch_size()
221 callback: fsspec.callbacks.Callback instance
222 Gets a relative_update when each coroutine completes
223 timeout: number or None
224 If given, each coroutine times out after this time. Note that, since
225 there are multiple batches, the total run time of this function will in
226 general be longer
227 return_exceptions: bool
228 Same meaning as in asyncio.gather
229 nofiles: bool
230 If inferring the batch_size, does this operation involve local files?
231 If yes, you normally expect smaller batches.
232 """
234 if batch_size is None:
235 batch_size = _get_batch_size(nofiles=nofiles)
237 if batch_size == -1:
238 batch_size = len(coros)
240 assert batch_size > 0
242 async def _run_coro(coro, i):
243 try:
244 return await asyncio.wait_for(coro, timeout=timeout), i
245 except Exception as e:
246 if not return_exceptions:
247 raise
248 return e, i
249 finally:
250 callback.relative_update(1)
252 i = 0
253 n = len(coros)
254 results = [None] * n
255 pending = set()
257 while pending or i < n:
258 while len(pending) < batch_size and i < n:
259 pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
260 i += 1
262 if not pending:
263 break
265 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
266 while done:
267 result, k = await done.pop()
268 results[k] = result
270 return results
273# these methods should be implemented as async by any async-able backend
274async_methods = [
275 "_ls",
276 "_cat_file",
277 "_get_file",
278 "_put_file",
279 "_rm_file",
280 "_cp_file",
281 "_pipe_file",
282 "_expand_path",
283 "_info",
284 "_isfile",
285 "_isdir",
286 "_exists",
287 "_walk",
288 "_glob",
289 "_find",
290 "_du",
291 "_size",
292 "_mkdir",
293 "_makedirs",
294]
297class AsyncFileSystem(AbstractFileSystem):
298 """Async file operations, default implementations
300 Passes bulk operations to asyncio.gather for concurrent operation.
302 Implementations that have concurrent batch operations and/or async methods
303 should inherit from this class instead of AbstractFileSystem. Docstrings are
304 copied from the un-underscored method in AbstractFileSystem, if not given.
305 """
307 # note that methods do not have docstring here; they will be copied
308 # for _* methods and inferred for overridden methods.
310 async_impl = True
311 mirror_sync_methods = True
312 disable_throttling = False
314 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
315 self.asynchronous = asynchronous
316 self._pid = os.getpid()
317 if not asynchronous:
318 self._loop = loop or get_loop()
319 else:
320 self._loop = None
321 self.batch_size = batch_size
322 super().__init__(*args, **kwargs)
324 @property
325 def loop(self):
326 if self._pid != os.getpid():
327 raise RuntimeError("This class is not fork-safe")
328 return self._loop
330 async def _rm_file(self, path, **kwargs):
331 raise NotImplementedError
333 async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
334 # TODO: implement on_error
335 batch_size = batch_size or self.batch_size
336 path = await self._expand_path(path, recursive=recursive)
337 return await _run_coros_in_chunks(
338 [self._rm_file(p, **kwargs) for p in reversed(path)],
339 batch_size=batch_size,
340 nofiles=True,
341 )
343 async def _cp_file(self, path1, path2, **kwargs):
344 raise NotImplementedError
346 async def _mv_file(self, path1, path2):
347 await self._cp_file(path1, path2)
348 await self._rm_file(path1)
350 async def _copy(
351 self,
352 path1,
353 path2,
354 recursive=False,
355 on_error=None,
356 maxdepth=None,
357 batch_size=None,
358 **kwargs,
359 ):
360 if on_error is None and recursive:
361 on_error = "ignore"
362 elif on_error is None:
363 on_error = "raise"
365 if isinstance(path1, list) and isinstance(path2, list):
366 # No need to expand paths when both source and destination
367 # are provided as lists
368 paths1 = path1
369 paths2 = path2
370 else:
371 source_is_str = isinstance(path1, str)
372 paths1 = await self._expand_path(
373 path1, maxdepth=maxdepth, recursive=recursive
374 )
375 if source_is_str and (not recursive or maxdepth is not None):
376 # Non-recursive glob does not copy directories
377 paths1 = [
378 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
379 ]
380 if not paths1:
381 return
383 source_is_file = len(paths1) == 1
384 dest_is_dir = isinstance(path2, str) and (
385 trailing_sep(path2) or await self._isdir(path2)
386 )
388 exists = source_is_str and (
389 (has_magic(path1) and source_is_file)
390 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
391 )
392 paths2 = other_paths(
393 paths1,
394 path2,
395 exists=exists,
396 flatten=not source_is_str,
397 )
399 batch_size = batch_size or self.batch_size
400 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
401 result = await _run_coros_in_chunks(
402 coros, batch_size=batch_size, return_exceptions=True, nofiles=True
403 )
405 for ex in filter(is_exception, result):
406 if on_error == "ignore" and isinstance(ex, FileNotFoundError):
407 continue
408 raise ex
410 async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
411 raise NotImplementedError
413 async def _pipe(self, path, value=None, batch_size=None, **kwargs):
414 if isinstance(path, str):
415 path = {path: value}
416 batch_size = batch_size or self.batch_size
417 return await _run_coros_in_chunks(
418 [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
419 batch_size=batch_size,
420 nofiles=True,
421 )
423 async def _process_limits(self, url, start, end):
424 """Helper for "Range"-based _cat_file"""
425 size = None
426 suff = False
427 if start is not None and start < 0:
428 # if start is negative and end None, end is the "suffix length"
429 if end is None:
430 end = -start
431 start = ""
432 suff = True
433 else:
434 size = size or (await self._info(url))["size"]
435 start = size + start
436 elif start is None:
437 start = 0
438 if not suff:
439 if end is not None and end < 0:
440 if start is not None:
441 size = size or (await self._info(url))["size"]
442 end = size + end
443 elif end is None:
444 end = ""
445 if isinstance(end, numbers.Integral):
446 end -= 1 # bytes range is inclusive
447 return f"bytes={start}-{end}"
449 async def _cat_file(self, path, start=None, end=None, **kwargs):
450 raise NotImplementedError
452 async def _cat(
453 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
454 ):
455 paths = await self._expand_path(path, recursive=recursive)
456 coros = [self._cat_file(path, **kwargs) for path in paths]
457 batch_size = batch_size or self.batch_size
458 out = await _run_coros_in_chunks(
459 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
460 )
461 if on_error == "raise":
462 ex = next(filter(is_exception, out), False)
463 if ex:
464 raise ex
465 if (
466 len(paths) > 1
467 or isinstance(path, list)
468 or paths[0] != self._strip_protocol(path)
469 ):
470 return {
471 k: v
472 for k, v in zip(paths, out)
473 if on_error != "omit" or not is_exception(v)
474 }
475 else:
476 return out[0]
478 async def _cat_ranges(
479 self,
480 paths,
481 starts,
482 ends,
483 max_gap=None,
484 batch_size=None,
485 on_error="return",
486 **kwargs,
487 ):
488 """Get the contents of byte ranges from one or more files
490 Parameters
491 ----------
492 paths: list
493 A list of of filepaths on this filesystems
494 starts, ends: int or list
495 Bytes limits of the read. If using a single int, the same value will be
496 used to read all the specified files.
497 """
498 # TODO: on_error
499 if max_gap is not None:
500 # use utils.merge_offset_ranges
501 raise NotImplementedError
502 if not isinstance(paths, list):
503 raise TypeError
504 if not isinstance(starts, Iterable):
505 starts = [starts] * len(paths)
506 if not isinstance(ends, Iterable):
507 ends = [ends] * len(paths)
508 if len(starts) != len(paths) or len(ends) != len(paths):
509 raise ValueError
510 coros = [
511 self._cat_file(p, start=s, end=e, **kwargs)
512 for p, s, e in zip(paths, starts, ends)
513 ]
514 batch_size = batch_size or self.batch_size
515 return await _run_coros_in_chunks(
516 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
517 )
519 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
520 raise NotImplementedError
522 async def _put(
523 self,
524 lpath,
525 rpath,
526 recursive=False,
527 callback=DEFAULT_CALLBACK,
528 batch_size=None,
529 maxdepth=None,
530 **kwargs,
531 ):
532 """Copy file(s) from local.
534 Copies a specific file or tree of files (if recursive=True). If rpath
535 ends with a "/", it will be assumed to be a directory, and target files
536 will go within.
538 The put_file method will be called concurrently on a batch of files. The
539 batch_size option can configure the amount of futures that can be executed
540 at the same time. If it is -1, then all the files will be uploaded concurrently.
541 The default can be set for this instance by passing "batch_size" in the
542 constructor, or for all instances by setting the "gather_batch_size" key
543 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
544 """
545 if isinstance(lpath, list) and isinstance(rpath, list):
546 # No need to expand paths when both source and destination
547 # are provided as lists
548 rpaths = rpath
549 lpaths = lpath
550 else:
551 source_is_str = isinstance(lpath, str)
552 if source_is_str:
553 lpath = make_path_posix(lpath)
554 fs = LocalFileSystem()
555 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
556 if source_is_str and (not recursive or maxdepth is not None):
557 # Non-recursive glob does not copy directories
558 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
559 if not lpaths:
560 return
562 source_is_file = len(lpaths) == 1
563 dest_is_dir = isinstance(rpath, str) and (
564 trailing_sep(rpath) or await self._isdir(rpath)
565 )
567 rpath = self._strip_protocol(rpath)
568 exists = source_is_str and (
569 (has_magic(lpath) and source_is_file)
570 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
571 )
572 rpaths = other_paths(
573 lpaths,
574 rpath,
575 exists=exists,
576 flatten=not source_is_str,
577 )
579 is_dir = {l: os.path.isdir(l) for l in lpaths}
580 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
581 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
583 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
584 batch_size = batch_size or self.batch_size
586 coros = []
587 callback.set_size(len(file_pairs))
588 for lfile, rfile in file_pairs:
589 put_file = callback.branch_coro(self._put_file)
590 coros.append(put_file(lfile, rfile, **kwargs))
592 return await _run_coros_in_chunks(
593 coros, batch_size=batch_size, callback=callback
594 )
596 async def _get_file(self, rpath, lpath, **kwargs):
597 raise NotImplementedError
599 async def _get(
600 self,
601 rpath,
602 lpath,
603 recursive=False,
604 callback=DEFAULT_CALLBACK,
605 maxdepth=None,
606 **kwargs,
607 ):
608 """Copy file(s) to local.
610 Copies a specific file or tree of files (if recursive=True). If lpath
611 ends with a "/", it will be assumed to be a directory, and target files
612 will go within. Can submit a list of paths, which may be glob-patterns
613 and will be expanded.
615 The get_file method will be called concurrently on a batch of files. The
616 batch_size option can configure the amount of futures that can be executed
617 at the same time. If it is -1, then all the files will be uploaded concurrently.
618 The default can be set for this instance by passing "batch_size" in the
619 constructor, or for all instances by setting the "gather_batch_size" key
620 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
621 """
622 if isinstance(lpath, list) and isinstance(rpath, list):
623 # No need to expand paths when both source and destination
624 # are provided as lists
625 rpaths = rpath
626 lpaths = lpath
627 else:
628 source_is_str = isinstance(rpath, str)
629 # First check for rpath trailing slash as _strip_protocol removes it.
630 source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
631 rpath = self._strip_protocol(rpath)
632 rpaths = await self._expand_path(
633 rpath, recursive=recursive, maxdepth=maxdepth
634 )
635 if source_is_str and (not recursive or maxdepth is not None):
636 # Non-recursive glob does not copy directories
637 rpaths = [
638 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
639 ]
640 if not rpaths:
641 return
643 lpath = make_path_posix(lpath)
644 source_is_file = len(rpaths) == 1
645 dest_is_dir = isinstance(lpath, str) and (
646 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
647 )
649 exists = source_is_str and (
650 (has_magic(rpath) and source_is_file)
651 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
652 )
653 lpaths = other_paths(
654 rpaths,
655 lpath,
656 exists=exists,
657 flatten=not source_is_str,
658 )
660 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
661 batch_size = kwargs.pop("batch_size", self.batch_size)
663 coros = []
664 callback.set_size(len(lpaths))
665 for lpath, rpath in zip(lpaths, rpaths):
666 get_file = callback.branch_coro(self._get_file)
667 coros.append(get_file(rpath, lpath, **kwargs))
668 return await _run_coros_in_chunks(
669 coros, batch_size=batch_size, callback=callback
670 )
672 async def _isfile(self, path):
673 try:
674 return (await self._info(path))["type"] == "file"
675 except: # noqa: E722
676 return False
678 async def _isdir(self, path):
679 try:
680 return (await self._info(path))["type"] == "directory"
681 except OSError:
682 return False
684 async def _size(self, path):
685 return (await self._info(path)).get("size", None)
687 async def _sizes(self, paths, batch_size=None):
688 batch_size = batch_size or self.batch_size
689 return await _run_coros_in_chunks(
690 [self._size(p) for p in paths], batch_size=batch_size
691 )
693 async def _exists(self, path, **kwargs):
694 try:
695 await self._info(path, **kwargs)
696 return True
697 except FileNotFoundError:
698 return False
700 async def _info(self, path, **kwargs):
701 raise NotImplementedError
703 async def _ls(self, path, detail=True, **kwargs):
704 raise NotImplementedError
706 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
707 if maxdepth is not None and maxdepth < 1:
708 raise ValueError("maxdepth must be at least 1")
710 path = self._strip_protocol(path)
711 full_dirs = {}
712 dirs = {}
713 files = {}
715 detail = kwargs.pop("detail", False)
716 try:
717 listing = await self._ls(path, detail=True, **kwargs)
718 except (FileNotFoundError, OSError) as e:
719 if on_error == "raise":
720 raise
721 elif callable(on_error):
722 on_error(e)
723 if detail:
724 yield path, {}, {}
725 else:
726 yield path, [], []
727 return
729 for info in listing:
730 # each info name must be at least [path]/part , but here
731 # we check also for names like [path]/part/
732 pathname = info["name"].rstrip("/")
733 name = pathname.rsplit("/", 1)[-1]
734 if info["type"] == "directory" and pathname != path:
735 # do not include "self" path
736 full_dirs[name] = pathname
737 dirs[name] = info
738 elif pathname == path:
739 # file-like with same name as give path
740 files[""] = info
741 else:
742 files[name] = info
744 if detail:
745 yield path, dirs, files
746 else:
747 yield path, list(dirs), list(files)
749 if maxdepth is not None:
750 maxdepth -= 1
751 if maxdepth < 1:
752 return
754 for d in dirs:
755 async for _ in self._walk(
756 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
757 ):
758 yield _
760 async def _glob(self, path, maxdepth=None, **kwargs):
761 if maxdepth is not None and maxdepth < 1:
762 raise ValueError("maxdepth must be at least 1")
764 import re
766 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
767 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
768 path = self._strip_protocol(path)
769 append_slash_to_dirname = ends_with_sep or path.endswith(
770 tuple(sep + "**" for sep in seps)
771 )
772 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
773 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
774 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
776 min_idx = min(idx_star, idx_qmark, idx_brace)
778 detail = kwargs.pop("detail", False)
780 if not has_magic(path):
781 if await self._exists(path, **kwargs):
782 if not detail:
783 return [path]
784 else:
785 return {path: await self._info(path, **kwargs)}
786 else:
787 if not detail:
788 return [] # glob of non-existent returns empty
789 else:
790 return {}
791 elif "/" in path[:min_idx]:
792 min_idx = path[:min_idx].rindex("/")
793 root = path[: min_idx + 1]
794 depth = path[min_idx + 1 :].count("/") + 1
795 else:
796 root = ""
797 depth = path[min_idx + 1 :].count("/") + 1
799 if "**" in path:
800 if maxdepth is not None:
801 idx_double_stars = path.find("**")
802 depth_double_stars = path[idx_double_stars:].count("/") + 1
803 depth = depth - depth_double_stars + maxdepth
804 else:
805 depth = None
807 allpaths = await self._find(
808 root, maxdepth=depth, withdirs=True, detail=True, **kwargs
809 )
811 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
812 pattern = re.compile(pattern)
814 out = {
815 p: info
816 for p, info in sorted(allpaths.items())
817 if pattern.match(
818 p + "/"
819 if append_slash_to_dirname and info["type"] == "directory"
820 else p
821 )
822 }
824 if detail:
825 return out
826 else:
827 return list(out)
829 async def _du(self, path, total=True, maxdepth=None, **kwargs):
830 sizes = {}
831 # async for?
832 for f in await self._find(path, maxdepth=maxdepth, **kwargs):
833 info = await self._info(f)
834 sizes[info["name"]] = info["size"]
835 if total:
836 return sum(sizes.values())
837 else:
838 return sizes
840 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
841 path = self._strip_protocol(path)
842 out = {}
843 detail = kwargs.pop("detail", False)
845 # Add the root directory if withdirs is requested
846 # This is needed for posix glob compliance
847 if withdirs and path != "" and await self._isdir(path):
848 out[path] = await self._info(path)
850 # async for?
851 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
852 if withdirs:
853 files.update(dirs)
854 out.update({info["name"]: info for name, info in files.items()})
855 if not out and (await self._isfile(path)):
856 # walk works on directories, but find should also return [path]
857 # when path happens to be a file
858 out[path] = {}
859 names = sorted(out)
860 if not detail:
861 return names
862 else:
863 return {name: out[name] for name in names}
865 async def _expand_path(self, path, recursive=False, maxdepth=None):
866 if maxdepth is not None and maxdepth < 1:
867 raise ValueError("maxdepth must be at least 1")
869 if isinstance(path, str):
870 out = await self._expand_path([path], recursive, maxdepth)
871 else:
872 out = set()
873 path = [self._strip_protocol(p) for p in path]
874 for p in path: # can gather here
875 if has_magic(p):
876 bit = set(await self._glob(p, maxdepth=maxdepth))
877 out |= bit
878 if recursive:
879 # glob call above expanded one depth so if maxdepth is defined
880 # then decrement it in expand_path call below. If it is zero
881 # after decrementing then avoid expand_path call.
882 if maxdepth is not None and maxdepth <= 1:
883 continue
884 out |= set(
885 await self._expand_path(
886 list(bit),
887 recursive=recursive,
888 maxdepth=maxdepth - 1 if maxdepth is not None else None,
889 )
890 )
891 continue
892 elif recursive:
893 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
894 out |= rec
895 if p not in out and (recursive is False or (await self._exists(p))):
896 # should only check once, for the root
897 out.add(p)
898 if not out:
899 raise FileNotFoundError(path)
900 return sorted(out)
902 async def _mkdir(self, path, create_parents=True, **kwargs):
903 pass # not necessary to implement, may not have directories
905 async def _makedirs(self, path, exist_ok=False):
906 pass # not necessary to implement, may not have directories
908 async def open_async(self, path, mode="rb", **kwargs):
909 if "b" not in mode or kwargs.get("compression"):
910 raise ValueError
911 raise NotImplementedError
914def mirror_sync_methods(obj):
915 """Populate sync and async methods for obj
917 For each method will create a sync version if the name refers to an async method
918 (coroutine) and there is no override in the child class; will create an async
919 method for the corresponding sync method if there is no implementation.
921 Uses the methods specified in
922 - async_methods: the set that an implementation is expected to provide
923 - default_async_methods: that can be derived from their sync version in
924 AbstractFileSystem
925 - AsyncFileSystem: async-specific default coroutines
926 """
927 from fsspec import AbstractFileSystem
929 for method in async_methods + dir(AsyncFileSystem):
930 if not method.startswith("_"):
931 continue
932 smethod = method[1:]
933 if private.match(method):
934 isco = inspect.iscoroutinefunction(getattr(obj, method, None))
935 unsync = getattr(getattr(obj, smethod, False), "__func__", None)
936 is_default = unsync is getattr(AbstractFileSystem, smethod, "")
937 if isco and is_default:
938 mth = sync_wrapper(getattr(obj, method), obj=obj)
939 setattr(obj, smethod, mth)
940 if not mth.__doc__:
941 mth.__doc__ = getattr(
942 getattr(AbstractFileSystem, smethod, None), "__doc__", ""
943 )
946class FSSpecCoroutineCancel(Exception):
947 pass
950def _dump_running_tasks(
951 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
952):
953 import traceback
955 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
956 if printout:
957 [task.print_stack() for task in tasks]
958 out = [
959 {
960 "locals": task._coro.cr_frame.f_locals,
961 "file": task._coro.cr_frame.f_code.co_filename,
962 "firstline": task._coro.cr_frame.f_code.co_firstlineno,
963 "linelo": task._coro.cr_frame.f_lineno,
964 "stack": traceback.format_stack(task._coro.cr_frame),
965 "task": task if with_task else None,
966 }
967 for task in tasks
968 ]
969 if cancel:
970 for t in tasks:
971 cbs = t._callbacks
972 t.cancel()
973 asyncio.futures.Future.set_exception(t, exc)
974 asyncio.futures.Future.cancel(t)
975 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
976 try:
977 t._coro.throw(exc) # exits coro, unless explicitly handled
978 except exc:
979 pass
980 return out
983class AbstractAsyncStreamedFile(AbstractBufferedFile):
984 # no read buffering, and always auto-commit
985 # TODO: readahead might still be useful here, but needs async version
987 async def read(self, length=-1):
988 """
989 Return data from cache, or fetch pieces as necessary
991 Parameters
992 ----------
993 length: int (-1)
994 Number of bytes to read; if <0, all remaining bytes.
995 """
996 length = -1 if length is None else int(length)
997 if self.mode != "rb":
998 raise ValueError("File not in read mode")
999 if length < 0:
1000 length = self.size - self.loc
1001 if self.closed:
1002 raise ValueError("I/O operation on closed file.")
1003 if length == 0:
1004 # don't even bother calling fetch
1005 return b""
1006 out = await self._fetch_range(self.loc, self.loc + length)
1007 self.loc += len(out)
1008 return out
1010 async def write(self, data):
1011 """
1012 Write data to buffer.
1014 Buffer only sent on flush() or if buffer is greater than
1015 or equal to blocksize.
1017 Parameters
1018 ----------
1019 data: bytes
1020 Set of bytes to be written.
1021 """
1022 if self.mode not in {"wb", "ab"}:
1023 raise ValueError("File not in write mode")
1024 if self.closed:
1025 raise ValueError("I/O operation on closed file.")
1026 if self.forced:
1027 raise ValueError("This file has been force-flushed, can only close")
1028 out = self.buffer.write(data)
1029 self.loc += out
1030 if self.buffer.tell() >= self.blocksize:
1031 await self.flush()
1032 return out
1034 async def close(self):
1035 """Close file
1037 Finalizes writes, discards cache
1038 """
1039 if getattr(self, "_unclosable", False):
1040 return
1041 if self.closed:
1042 return
1043 if self.mode == "rb":
1044 self.cache = None
1045 else:
1046 if not self.forced:
1047 await self.flush(force=True)
1049 if self.fs is not None:
1050 self.fs.invalidate_cache(self.path)
1051 self.fs.invalidate_cache(self.fs._parent(self.path))
1053 self.closed = True
1055 async def flush(self, force=False):
1056 if self.closed:
1057 raise ValueError("Flush on closed file")
1058 if force and self.forced:
1059 raise ValueError("Force flush cannot be called more than once")
1060 if force:
1061 self.forced = True
1063 if self.mode not in {"wb", "ab"}:
1064 # no-op to flush on read-mode
1065 return
1067 if not force and self.buffer.tell() < self.blocksize:
1068 # Defer write on small block
1069 return
1071 if self.offset is None:
1072 # Initialize a multipart upload
1073 self.offset = 0
1074 try:
1075 await self._initiate_upload()
1076 except:
1077 self.closed = True
1078 raise
1080 if await self._upload_chunk(final=force) is not False:
1081 self.offset += self.buffer.seek(0, 2)
1082 self.buffer = io.BytesIO()
1084 async def __aenter__(self):
1085 return self
1087 async def __aexit__(self, exc_type, exc_val, exc_tb):
1088 await self.close()
1090 async def _fetch_range(self, start, end):
1091 raise NotImplementedError
1093 async def _initiate_upload(self):
1094 pass
1096 async def _upload_chunk(self, final=False):
1097 raise NotImplementedError