Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.events
3import functools
4import inspect
5import io
6import numbers
7import os
8import re
9import threading
10from collections.abc import Iterable
11from glob import has_magic
12from typing import TYPE_CHECKING
14from .callbacks import DEFAULT_CALLBACK
15from .exceptions import FSTimeoutError
16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
17from .spec import AbstractBufferedFile, AbstractFileSystem
18from .utils import glob_translate, is_exception, other_paths
20private = re.compile("_[^_]")
21iothread = [None] # dedicated fsspec IO thread
22loop = [None] # global event loop for any non-async instance
23_lock = None # global lock placeholder
24get_running_loop = asyncio.get_running_loop
27def get_lock():
28 """Allocate or return a threading lock.
30 The lock is allocated on first use to allow setting one lock per forked process.
31 """
32 global _lock
33 if not _lock:
34 _lock = threading.Lock()
35 return _lock
38def reset_lock():
39 """Reset the global lock.
41 This should be called only on the init of a forked process to reset the lock to
42 None, enabling the new forked process to get a new lock.
43 """
44 global _lock
46 iothread[0] = None
47 loop[0] = None
48 _lock = None
51async def _runner(event, coro, result, timeout=None):
52 timeout = timeout if timeout else None # convert 0 or 0.0 to None
53 if timeout is not None:
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
56 result[0] = await coro
57 except Exception as ex:
58 result[0] = ex
59 finally:
60 event.set()
63def sync(loop, func, *args, timeout=None, **kwargs):
64 """
65 Make loop run coroutine until it returns. Runs in other thread
67 Examples
68 --------
69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
70 timeout=timeout, **kwargs)
71 """
72 timeout = timeout if timeout else None # convert 0 or 0.0 to None
73 # NB: if the loop is not running *yet*, it is OK to submit work
74 # and we will wait for it
75 if loop is None or loop.is_closed():
76 raise RuntimeError("Loop is not running")
77 try:
78 loop0 = asyncio.events.get_running_loop()
79 if loop0 is loop:
80 raise NotImplementedError("Calling sync() from within a running loop")
81 except NotImplementedError:
82 raise
83 except RuntimeError:
84 pass
85 coro = func(*args, **kwargs)
86 result = [None]
87 event = threading.Event()
88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
89 while True:
90 # this loops allows thread to get interrupted
91 if event.wait(1):
92 break
93 if timeout is not None:
94 timeout -= 1
95 if timeout < 0:
96 raise FSTimeoutError
98 return_result = result[0]
99 if isinstance(return_result, asyncio.TimeoutError):
100 # suppress asyncio.TimeoutError, raise FSTimeoutError
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
103 raise return_result
104 else:
105 return return_result
108def sync_wrapper(func, obj=None):
109 """Given a function, make so can be called in blocking contexts
111 Leave obj=None if defining within a class. Pass the instance if attaching
112 as an attribute of the instance.
113 """
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
118 return sync(self.loop, func, *args, **kwargs)
120 return wrapper
123def get_loop():
124 """Create or return the default fsspec IO loop
126 The loop will be running on a separate thread.
127 """
128 if loop[0] is None:
129 with get_lock():
130 # repeat the check just in case the loop got filled between the
131 # previous two calls from another thread
132 if loop[0] is None:
133 loop[0] = asyncio.new_event_loop()
134 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
135 th.daemon = True
136 th.start()
137 iothread[0] = th
138 return loop[0]
141def reset_after_fork():
142 global lock
143 loop[0] = None
144 iothread[0] = None
145 lock = None
148if hasattr(os, "register_at_fork"):
149 # should be posix; this will do nothing for spawn or forkserver subprocesses
150 os.register_at_fork(after_in_child=reset_after_fork)
153if TYPE_CHECKING:
154 import resource
156 ResourceError = resource.error
157else:
158 try:
159 import resource
160 except ImportError:
161 resource = None
162 ResourceError = OSError
163 else:
164 ResourceError = getattr(resource, "error", OSError)
166_DEFAULT_BATCH_SIZE = 128
167_NOFILES_DEFAULT_BATCH_SIZE = 1280
170def _get_batch_size(nofiles=False):
171 from fsspec.config import conf
173 if nofiles:
174 if "nofiles_gather_batch_size" in conf:
175 return conf["nofiles_gather_batch_size"]
176 else:
177 if "gather_batch_size" in conf:
178 return conf["gather_batch_size"]
179 if nofiles:
180 return _NOFILES_DEFAULT_BATCH_SIZE
181 if resource is None:
182 return _DEFAULT_BATCH_SIZE
184 try:
185 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
186 except (ImportError, ValueError, ResourceError):
187 return _DEFAULT_BATCH_SIZE
189 if soft_limit == resource.RLIM_INFINITY:
190 return -1
191 else:
192 return soft_limit // 8
195def running_async() -> bool:
196 """Being executed by an event loop?"""
197 try:
198 asyncio.get_running_loop()
199 return True
200 except RuntimeError:
201 return False
204async def _run_coros_in_chunks(
205 coros,
206 batch_size=None,
207 callback=DEFAULT_CALLBACK,
208 timeout=None,
209 return_exceptions=False,
210 nofiles=False,
211):
212 """Run the given coroutines in chunks.
214 Parameters
215 ----------
216 coros: list of coroutines to run
217 batch_size: int or None
218 Number of coroutines to submit/wait on simultaneously.
219 If -1, then it will not be any throttling. If
220 None, it will be inferred from _get_batch_size()
221 callback: fsspec.callbacks.Callback instance
222 Gets a relative_update when each coroutine completes
223 timeout: number or None
224 If given, each coroutine times out after this time. Note that, since
225 there are multiple batches, the total run time of this function will in
226 general be longer
227 return_exceptions: bool
228 Same meaning as in asyncio.gather
229 nofiles: bool
230 If inferring the batch_size, does this operation involve local files?
231 If yes, you normally expect smaller batches.
232 """
234 if batch_size is None:
235 batch_size = _get_batch_size(nofiles=nofiles)
237 if batch_size == -1:
238 batch_size = len(coros)
239 elif batch_size <= 0:
240 raise ValueError
242 async def _run_coro(coro, i):
243 try:
244 return await asyncio.wait_for(coro, timeout=timeout), i
245 except Exception as e:
246 if not return_exceptions:
247 raise
248 return e, i
249 finally:
250 callback.relative_update(1)
252 i = 0
253 n = len(coros)
254 results = [None] * n
255 pending = set()
257 while pending or i < n:
258 while len(pending) < batch_size and i < n:
259 pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
260 i += 1
262 if not pending:
263 break
265 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
266 first_exc = None
267 while done:
268 task = done.pop()
269 try:
270 result, k = await task
271 results[k] = result
272 except Exception as exc:
273 if first_exc is None:
274 first_exc = exc
276 if first_exc is not None:
277 for task in pending:
278 task.cancel()
279 if pending:
280 await asyncio.gather(*pending, return_exceptions=True)
281 raise first_exc
283 return results
286# these methods should be implemented as async by any async-able backend
287async_methods = [
288 "_ls",
289 "_cat_file",
290 "_get_file",
291 "_put_file",
292 "_rm_file",
293 "_cp_file",
294 "_pipe_file",
295 "_expand_path",
296 "_info",
297 "_isfile",
298 "_isdir",
299 "_exists",
300 "_walk",
301 "_glob",
302 "_find",
303 "_du",
304 "_size",
305 "_mkdir",
306 "_makedirs",
307]
310class AsyncFileSystem(AbstractFileSystem):
311 """Async file operations, default implementations
313 Passes bulk operations to asyncio.gather for concurrent operation.
315 Implementations that have concurrent batch operations and/or async methods
316 should inherit from this class instead of AbstractFileSystem. Docstrings are
317 copied from the un-underscored method in AbstractFileSystem, if not given.
318 """
320 # note that methods do not have docstring here; they will be copied
321 # for _* methods and inferred for overridden methods.
323 async_impl = True
324 mirror_sync_methods = True
325 disable_throttling = False
327 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
328 self.asynchronous = asynchronous
329 self._pid = os.getpid()
330 if not asynchronous:
331 self._loop = loop or get_loop()
332 else:
333 self._loop = None
334 self.batch_size = batch_size
335 super().__init__(*args, **kwargs)
337 @property
338 def loop(self):
339 if self._pid != os.getpid():
340 raise RuntimeError("This class is not fork-safe")
341 return self._loop
343 async def _rm_file(self, path, **kwargs):
344 if (
345 inspect.iscoroutinefunction(self._rm)
346 and type(self)._rm is not AsyncFileSystem._rm
347 ):
348 return await self._rm(path, recursive=False, batch_size=1, **kwargs)
349 raise NotImplementedError
351 async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
352 # TODO: implement on_error
353 batch_size = batch_size or self.batch_size
354 path = await self._expand_path(path, recursive=recursive)
355 return await _run_coros_in_chunks(
356 [self._rm_file(p, **kwargs) for p in reversed(path)],
357 batch_size=batch_size,
358 nofiles=True,
359 )
361 async def _cp_file(self, path1, path2, **kwargs):
362 raise NotImplementedError
364 async def _mv_file(self, path1, path2):
365 await self._cp_file(path1, path2)
366 await self._rm_file(path1)
368 async def _copy(
369 self,
370 path1,
371 path2,
372 recursive=False,
373 on_error=None,
374 maxdepth=None,
375 batch_size=None,
376 **kwargs,
377 ):
378 if on_error is None and recursive:
379 on_error = "ignore"
380 elif on_error is None:
381 on_error = "raise"
383 if isinstance(path1, list) and isinstance(path2, list):
384 # No need to expand paths when both source and destination
385 # are provided as lists
386 paths1 = path1
387 paths2 = path2
388 else:
389 source_is_str = isinstance(path1, str)
390 paths1 = await self._expand_path(
391 path1, maxdepth=maxdepth, recursive=recursive
392 )
393 if source_is_str and (not recursive or maxdepth is not None):
394 # Non-recursive glob does not copy directories
395 paths1 = [
396 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
397 ]
398 if not paths1:
399 return
401 source_is_file = len(paths1) == 1
402 dest_is_dir = isinstance(path2, str) and (
403 trailing_sep(path2) or await self._isdir(path2)
404 )
406 exists = source_is_str and (
407 (has_magic(path1) and source_is_file)
408 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
409 )
410 paths2 = other_paths(
411 paths1,
412 path2,
413 exists=exists,
414 flatten=not source_is_str,
415 )
417 batch_size = batch_size or self.batch_size
418 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
419 result = await _run_coros_in_chunks(
420 coros, batch_size=batch_size, return_exceptions=True, nofiles=True
421 )
423 for ex in filter(is_exception, result):
424 if on_error == "ignore" and isinstance(ex, FileNotFoundError):
425 continue
426 raise ex
428 async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
429 raise NotImplementedError
431 async def _pipe(self, path, value=None, batch_size=None, **kwargs):
432 if isinstance(path, str):
433 path = {path: value}
434 batch_size = batch_size or self.batch_size
435 return await _run_coros_in_chunks(
436 [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
437 batch_size=batch_size,
438 nofiles=True,
439 )
441 async def _process_limits(self, url, start, end):
442 """Helper for "Range"-based _cat_file"""
443 size = None
444 suff = False
445 if start is not None and start < 0:
446 # if start is negative and end None, end is the "suffix length"
447 if end is None:
448 end = -start
449 start = ""
450 suff = True
451 else:
452 size = size or (await self._info(url))["size"]
453 start = size + start
454 elif start is None:
455 start = 0
456 if not suff:
457 if end is not None and end < 0:
458 if start is not None:
459 size = size or (await self._info(url))["size"]
460 end = size + end
461 elif end is None:
462 end = ""
463 if isinstance(end, numbers.Integral):
464 end -= 1 # bytes range is inclusive
465 return f"bytes={start}-{end}"
467 async def _cat_file(self, path, start=None, end=None, **kwargs):
468 raise NotImplementedError
470 async def _cat(
471 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
472 ):
473 paths = await self._expand_path(path, recursive=recursive)
474 coros = [self._cat_file(path, **kwargs) for path in paths]
475 batch_size = batch_size or self.batch_size
476 out = await _run_coros_in_chunks(
477 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
478 )
479 if on_error == "raise":
480 ex = next(filter(is_exception, out), False)
481 if ex:
482 raise ex
483 if (
484 len(paths) > 1
485 or isinstance(path, list)
486 or paths[0] != self._strip_protocol(path)
487 ):
488 return {
489 k: v
490 for k, v in zip(paths, out)
491 if on_error != "omit" or not is_exception(v)
492 }
493 else:
494 return out[0]
496 async def _cat_ranges(
497 self,
498 paths,
499 starts,
500 ends,
501 max_gap=None,
502 batch_size=None,
503 on_error="return",
504 **kwargs,
505 ):
506 """Get the contents of byte ranges from one or more files
508 Parameters
509 ----------
510 paths: list
511 A list of of filepaths on this filesystems
512 starts, ends: int or list
513 Bytes limits of the read. If using a single int, the same value will be
514 used to read all the specified files.
515 """
516 # TODO: on_error
517 if max_gap is not None:
518 # use utils.merge_offset_ranges
519 raise NotImplementedError
520 if not isinstance(paths, list):
521 raise TypeError
522 if not isinstance(starts, Iterable):
523 starts = [starts] * len(paths)
524 if not isinstance(ends, Iterable):
525 ends = [ends] * len(paths)
526 if len(starts) != len(paths) or len(ends) != len(paths):
527 raise ValueError
528 coros = [
529 self._cat_file(p, start=s, end=e, **kwargs)
530 for p, s, e in zip(paths, starts, ends)
531 ]
532 batch_size = batch_size or self.batch_size
533 return await _run_coros_in_chunks(
534 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
535 )
537 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
538 raise NotImplementedError
540 async def _put(
541 self,
542 lpath,
543 rpath,
544 recursive=False,
545 callback=DEFAULT_CALLBACK,
546 batch_size=None,
547 maxdepth=None,
548 **kwargs,
549 ):
550 """Copy file(s) from local.
552 Copies a specific file or tree of files (if recursive=True). If rpath
553 ends with a "/", it will be assumed to be a directory, and target files
554 will go within.
556 The put_file method will be called concurrently on a batch of files. The
557 batch_size option can configure the amount of futures that can be executed
558 at the same time. If it is -1, then all the files will be uploaded concurrently.
559 The default can be set for this instance by passing "batch_size" in the
560 constructor, or for all instances by setting the "gather_batch_size" key
561 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
562 """
563 if isinstance(lpath, list) and isinstance(rpath, list):
564 # No need to expand paths when both source and destination
565 # are provided as lists
566 rpaths = rpath
567 lpaths = lpath
568 else:
569 source_is_str = isinstance(lpath, str)
570 if source_is_str:
571 lpath = make_path_posix(lpath)
572 fs = LocalFileSystem()
573 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
574 if source_is_str and (not recursive or maxdepth is not None):
575 # Non-recursive glob does not copy directories
576 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
577 if not lpaths:
578 return
580 source_is_file = len(lpaths) == 1
581 dest_is_dir = isinstance(rpath, str) and (
582 trailing_sep(rpath) or await self._isdir(rpath)
583 )
585 rpath = self._strip_protocol(rpath)
586 exists = source_is_str and (
587 (has_magic(lpath) and source_is_file)
588 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
589 )
590 rpaths = other_paths(
591 lpaths,
592 rpath,
593 exists=exists,
594 flatten=not source_is_str,
595 )
597 is_dir = {l: os.path.isdir(l) for l in lpaths}
598 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
599 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
601 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
602 batch_size = batch_size or self.batch_size
604 coros = []
605 callback.set_size(len(file_pairs))
606 for lfile, rfile in file_pairs:
607 put_file = callback.branch_coro(self._put_file)
608 coros.append(put_file(lfile, rfile, **kwargs))
610 return await _run_coros_in_chunks(
611 coros, batch_size=batch_size, callback=callback
612 )
614 async def _get_file(self, rpath, lpath, **kwargs):
615 raise NotImplementedError
617 async def _get(
618 self,
619 rpath,
620 lpath,
621 recursive=False,
622 callback=DEFAULT_CALLBACK,
623 maxdepth=None,
624 **kwargs,
625 ):
626 """Copy file(s) to local.
628 Copies a specific file or tree of files (if recursive=True). If lpath
629 ends with a "/", it will be assumed to be a directory, and target files
630 will go within. Can submit a list of paths, which may be glob-patterns
631 and will be expanded.
633 The get_file method will be called concurrently on a batch of files. The
634 batch_size option can configure the amount of futures that can be executed
635 at the same time. If it is -1, then all the files will be uploaded concurrently.
636 The default can be set for this instance by passing "batch_size" in the
637 constructor, or for all instances by setting the "gather_batch_size" key
638 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
639 """
640 if isinstance(lpath, list) and isinstance(rpath, list):
641 # No need to expand paths when both source and destination
642 # are provided as lists
643 rpaths = rpath
644 lpaths = lpath
645 else:
646 source_is_str = isinstance(rpath, str)
647 # First check for rpath trailing slash as _strip_protocol removes it.
648 source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
649 rpath = self._strip_protocol(rpath)
650 rpaths = await self._expand_path(
651 rpath, recursive=recursive, maxdepth=maxdepth
652 )
653 if source_is_str and (not recursive or maxdepth is not None):
654 # Non-recursive glob does not copy directories
655 rpaths = [
656 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
657 ]
658 if not rpaths:
659 return
661 lpath = make_path_posix(lpath)
662 source_is_file = len(rpaths) == 1
663 dest_is_dir = isinstance(lpath, str) and (
664 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
665 )
667 exists = source_is_str and (
668 (has_magic(rpath) and source_is_file)
669 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
670 )
671 lpaths = other_paths(
672 rpaths,
673 lpath,
674 exists=exists,
675 flatten=not source_is_str,
676 )
678 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
679 batch_size = kwargs.pop("batch_size", self.batch_size)
681 coros = []
682 callback.set_size(len(lpaths))
683 for lpath, rpath in zip(lpaths, rpaths):
684 get_file = callback.branch_coro(self._get_file)
685 coros.append(get_file(rpath, lpath, **kwargs))
686 return await _run_coros_in_chunks(
687 coros, batch_size=batch_size, callback=callback
688 )
690 async def _isfile(self, path):
691 try:
692 return (await self._info(path))["type"] == "file"
693 except: # noqa: E722
694 return False
696 async def _isdir(self, path):
697 try:
698 return (await self._info(path))["type"] == "directory"
699 except OSError:
700 return False
702 async def _size(self, path):
703 return (await self._info(path)).get("size", None)
705 async def _sizes(self, paths, batch_size=None):
706 batch_size = batch_size or self.batch_size
707 return await _run_coros_in_chunks(
708 [self._size(p) for p in paths], batch_size=batch_size
709 )
711 async def _exists(self, path, **kwargs):
712 try:
713 await self._info(path, **kwargs)
714 return True
715 except FileNotFoundError:
716 return False
718 async def _info(self, path, **kwargs):
719 raise NotImplementedError
721 async def _ls(self, path, detail=True, **kwargs):
722 raise NotImplementedError
724 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
725 if maxdepth is not None and maxdepth < 1:
726 raise ValueError("maxdepth must be at least 1")
728 path = self._strip_protocol(path)
729 full_dirs = {}
730 dirs = {}
731 files = {}
733 detail = kwargs.pop("detail", False)
734 try:
735 listing = await self._ls(path, detail=True, **kwargs)
736 except (FileNotFoundError, OSError) as e:
737 if on_error == "raise":
738 raise
739 elif callable(on_error):
740 on_error(e)
741 if detail:
742 yield path, {}, {}
743 else:
744 yield path, [], []
745 return
747 for info in listing:
748 # each info name must be at least [path]/part , but here
749 # we check also for names like [path]/part/
750 pathname = info["name"].rstrip("/")
751 name = pathname.rsplit("/", 1)[-1]
752 if info["type"] == "directory" and pathname != path:
753 # do not include "self" path
754 full_dirs[name] = pathname
755 dirs[name] = info
756 elif pathname == path:
757 # file-like with same name as give path
758 files[""] = info
759 else:
760 files[name] = info
762 if detail:
763 yield path, dirs, files
764 else:
765 yield path, list(dirs), list(files)
767 if maxdepth is not None:
768 maxdepth -= 1
769 if maxdepth < 1:
770 return
772 for d in dirs:
773 async for _ in self._walk(
774 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
775 ):
776 yield _
778 async def _glob(self, path, maxdepth=None, **kwargs):
779 if maxdepth is not None and maxdepth < 1:
780 raise ValueError("maxdepth must be at least 1")
782 import re
784 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
785 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
786 path = self._strip_protocol(path)
787 append_slash_to_dirname = ends_with_sep or path.endswith(
788 tuple(sep + "**" for sep in seps)
789 )
790 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
791 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
792 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
794 min_idx = min(idx_star, idx_qmark, idx_brace)
796 detail = kwargs.pop("detail", False)
797 withdirs = kwargs.pop("withdirs", True)
799 if not has_magic(path):
800 if await self._exists(path, **kwargs):
801 if not detail:
802 return [path]
803 else:
804 return {path: await self._info(path, **kwargs)}
805 else:
806 if not detail:
807 return [] # glob of non-existent returns empty
808 else:
809 return {}
810 elif "/" in path[:min_idx]:
811 first_wildcard_idx = min_idx
812 min_idx = path[:min_idx].rindex("/")
813 root = path[
814 : min_idx + 1
815 ] # everything up to the last / before the first wildcard
816 prefix = path[
817 min_idx + 1 : first_wildcard_idx
818 ] # stem between last "/" and first wildcard
819 depth = path[min_idx + 1 :].count("/") + 1
820 else:
821 root = ""
822 prefix = path[:min_idx] # stem up to the first wildcard
823 depth = path[min_idx + 1 :].count("/") + 1
825 if "**" in path:
826 if maxdepth is not None:
827 idx_double_stars = path.find("**")
828 depth_double_stars = path[idx_double_stars:].count("/") + 1
829 depth = depth - depth_double_stars + maxdepth
830 else:
831 depth = None
833 # Pass the filename stem as prefix= so backends that support it such as
834 # gcsfs, s3fs and adlfs can filter server-side up to the first wildcard.
835 if prefix:
836 kwargs["prefix"] = prefix
837 allpaths = await self._find(
838 root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs
839 )
841 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
842 pattern = re.compile(pattern)
844 out = {
845 p: info
846 for p, info in sorted(allpaths.items())
847 if pattern.match(
848 p + "/"
849 if append_slash_to_dirname and info["type"] == "directory"
850 else p
851 )
852 }
854 if detail:
855 return out
856 else:
857 return list(out)
859 async def _du(self, path, total=True, maxdepth=None, **kwargs):
860 sizes = {}
861 # async for?
862 for f in await self._find(path, maxdepth=maxdepth, **kwargs):
863 info = await self._info(f)
864 sizes[info["name"]] = info["size"]
865 if total:
866 return sum(sizes.values())
867 else:
868 return sizes
870 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
871 path = self._strip_protocol(path)
872 out = {}
873 detail = kwargs.pop("detail", False)
875 # Add the root directory if withdirs is requested
876 # This is needed for posix glob compliance
877 if withdirs and path != "" and await self._isdir(path):
878 out[path] = await self._info(path)
880 # async for?
881 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
882 if withdirs:
883 files.update(dirs)
884 out.update({info["name"]: info for name, info in files.items()})
885 if not out and (await self._isfile(path)):
886 # walk works on directories, but find should also return [path]
887 # when path happens to be a file
888 out[path] = {}
889 names = sorted(out)
890 if not detail:
891 return names
892 else:
893 return {name: out[name] for name in names}
895 async def _expand_path(
896 self, path, recursive=False, maxdepth=None, assume_literal=False
897 ):
898 if maxdepth is not None and maxdepth < 1:
899 raise ValueError("maxdepth must be at least 1")
901 if isinstance(path, str):
902 out = await self._expand_path([path], recursive, maxdepth)
903 else:
904 out = set()
905 path = [self._strip_protocol(p) for p in path]
906 for p in path: # can gather here
907 if not assume_literal and has_magic(p):
908 bit = set(await self._glob(p, maxdepth=maxdepth))
909 out |= bit
910 if recursive:
911 # glob call above expanded one depth so if maxdepth is defined
912 # then decrement it in expand_path call below. If it is zero
913 # after decrementing then avoid expand_path call.
914 if maxdepth is not None and maxdepth <= 1:
915 continue
916 out |= set(
917 await self._expand_path(
918 list(bit),
919 recursive=recursive,
920 maxdepth=maxdepth - 1 if maxdepth is not None else None,
921 assume_literal=True,
922 )
923 )
924 continue
925 elif recursive:
926 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
927 out |= rec
928 if p not in out and (recursive is False or (await self._exists(p))):
929 # should only check once, for the root
930 out.add(p)
931 if not out:
932 raise FileNotFoundError(path)
933 return sorted(out)
935 async def _mkdir(self, path, create_parents=True, **kwargs):
936 pass # not necessary to implement, may not have directories
938 async def _makedirs(self, path, exist_ok=False):
939 pass # not necessary to implement, may not have directories
941 async def open_async(self, path, mode="rb", **kwargs):
942 if "b" not in mode or kwargs.get("compression"):
943 raise ValueError
944 raise NotImplementedError
947def mirror_sync_methods(obj):
948 """Populate sync and async methods for obj
950 For each method will create a sync version if the name refers to an async method
951 (coroutine) and there is no override in the child class; will create an async
952 method for the corresponding sync method if there is no implementation.
954 Uses the methods specified in
955 - async_methods: the set that an implementation is expected to provide
956 - default_async_methods: that can be derived from their sync version in
957 AbstractFileSystem
958 - AsyncFileSystem: async-specific default coroutines
959 """
960 from fsspec import AbstractFileSystem
962 for method in async_methods + dir(AsyncFileSystem):
963 if not method.startswith("_"):
964 continue
965 smethod = method[1:]
966 if private.match(method):
967 isco = inspect.iscoroutinefunction(getattr(obj, method, None))
968 unsync = getattr(getattr(obj, smethod, False), "__func__", None)
969 is_default = unsync is getattr(AbstractFileSystem, smethod, "")
970 if isco and is_default:
971 mth = sync_wrapper(getattr(obj, method), obj=obj)
972 setattr(obj, smethod, mth)
973 if not mth.__doc__:
974 mth.__doc__ = getattr(
975 getattr(AbstractFileSystem, smethod, None), "__doc__", ""
976 )
979class FSSpecCoroutineCancel(Exception):
980 pass
983def _dump_running_tasks(
984 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
985):
986 import traceback
988 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
989 if printout:
990 [task.print_stack() for task in tasks]
991 out = [
992 {
993 "locals": task._coro.cr_frame.f_locals,
994 "file": task._coro.cr_frame.f_code.co_filename,
995 "firstline": task._coro.cr_frame.f_code.co_firstlineno,
996 "linelo": task._coro.cr_frame.f_lineno,
997 "stack": traceback.format_stack(task._coro.cr_frame),
998 "task": task if with_task else None,
999 }
1000 for task in tasks
1001 ]
1002 if cancel:
1003 for t in tasks:
1004 cbs = t._callbacks
1005 t.cancel()
1006 asyncio.futures.Future.set_exception(t, exc)
1007 asyncio.futures.Future.cancel(t)
1008 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
1009 try:
1010 t._coro.throw(exc) # exits coro, unless explicitly handled
1011 except exc:
1012 pass
1013 return out
1016class AbstractAsyncStreamedFile(AbstractBufferedFile):
1017 # no read buffering, and always auto-commit
1018 # TODO: readahead might still be useful here, but needs async version
1020 async def read(self, length=-1):
1021 """
1022 Return data from cache, or fetch pieces as necessary
1024 Parameters
1025 ----------
1026 length: int (-1)
1027 Number of bytes to read; if <0, all remaining bytes.
1028 """
1029 length = -1 if length is None else int(length)
1030 if self.mode != "rb":
1031 raise ValueError("File not in read mode")
1032 if length < 0:
1033 length = self.size - self.loc
1034 if self.closed:
1035 raise ValueError("I/O operation on closed file.")
1036 if length == 0:
1037 # don't even bother calling fetch
1038 return b""
1039 out = await self._fetch_range(self.loc, self.loc + length)
1040 self.loc += len(out)
1041 return out
1043 async def write(self, data):
1044 """
1045 Write data to buffer.
1047 Buffer only sent on flush() or if buffer is greater than
1048 or equal to blocksize.
1050 Parameters
1051 ----------
1052 data: bytes
1053 Set of bytes to be written.
1054 """
1055 if self.mode not in {"wb", "ab"}:
1056 raise ValueError("File not in write mode")
1057 if self.closed:
1058 raise ValueError("I/O operation on closed file.")
1059 if self.forced:
1060 raise ValueError("This file has been force-flushed, can only close")
1061 out = self.buffer.write(data)
1062 self.loc += out
1063 if self.buffer.tell() >= self.blocksize:
1064 await self.flush()
1065 return out
1067 async def close(self):
1068 """Close file
1070 Finalizes writes, discards cache
1071 """
1072 if getattr(self, "_unclosable", False):
1073 return
1074 if self.closed:
1075 return
1076 if self.mode == "rb":
1077 self.cache = None
1078 else:
1079 if not self.forced:
1080 await self.flush(force=True)
1082 if self.fs is not None:
1083 self.fs.invalidate_cache(self.path)
1084 self.fs.invalidate_cache(self.fs._parent(self.path))
1086 self.closed = True
1088 async def flush(self, force=False):
1089 if self.closed:
1090 raise ValueError("Flush on closed file")
1091 if force and self.forced:
1092 raise ValueError("Force flush cannot be called more than once")
1093 if force:
1094 self.forced = True
1096 if self.mode not in {"wb", "ab"}:
1097 # no-op to flush on read-mode
1098 return
1100 if not force and self.buffer.tell() < self.blocksize:
1101 # Defer write on small block
1102 return
1104 if self.offset is None:
1105 # Initialize a multipart upload
1106 self.offset = 0
1107 try:
1108 await self._initiate_upload()
1109 except:
1110 self.closed = True
1111 raise
1113 if await self._upload_chunk(final=force) is not False:
1114 self.offset += self.buffer.seek(0, 2)
1115 self.buffer = io.BytesIO()
1117 async def __aenter__(self):
1118 return self
1120 async def __aexit__(self, exc_type, exc_val, exc_tb):
1121 await self.close()
1123 async def _fetch_range(self, start, end):
1124 raise NotImplementedError
1126 async def _initiate_upload(self):
1127 pass
1129 async def _upload_chunk(self, final=False):
1130 raise NotImplementedError