Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import asyncio.events
3import functools
4import inspect
5import io
6import numbers
7import os
8import re
9import threading
10from collections.abc import Iterable
11from glob import has_magic
12from typing import TYPE_CHECKING
14from .callbacks import DEFAULT_CALLBACK
15from .exceptions import FSTimeoutError
16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
17from .spec import AbstractBufferedFile, AbstractFileSystem
18from .utils import glob_translate, is_exception, other_paths
20private = re.compile("_[^_]")
21iothread = [None] # dedicated fsspec IO thread
22loop = [None] # global event loop for any non-async instance
23_lock = None # global lock placeholder
24get_running_loop = asyncio.get_running_loop
27def get_lock():
28 """Allocate or return a threading lock.
30 The lock is allocated on first use to allow setting one lock per forked process.
31 """
32 global _lock
33 if not _lock:
34 _lock = threading.Lock()
35 return _lock
38def reset_lock():
39 """Reset the global lock.
41 This should be called only on the init of a forked process to reset the lock to
42 None, enabling the new forked process to get a new lock.
43 """
44 global _lock
46 iothread[0] = None
47 loop[0] = None
48 _lock = None
51async def _runner(event, coro, result, timeout=None):
52 timeout = timeout if timeout else None # convert 0 or 0.0 to None
53 if timeout is not None:
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
56 result[0] = await coro
57 except Exception as ex:
58 result[0] = ex
59 finally:
60 event.set()
63def sync(loop, func, *args, timeout=None, **kwargs):
64 """
65 Make loop run coroutine until it returns. Runs in other thread
67 Examples
68 --------
69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
70 timeout=timeout, **kwargs)
71 """
72 timeout = timeout if timeout else None # convert 0 or 0.0 to None
73 # NB: if the loop is not running *yet*, it is OK to submit work
74 # and we will wait for it
75 if loop is None or loop.is_closed():
76 raise RuntimeError("Loop is not running")
77 try:
78 loop0 = asyncio.events.get_running_loop()
79 if loop0 is loop:
80 raise NotImplementedError("Calling sync() from within a running loop")
81 except NotImplementedError:
82 raise
83 except RuntimeError:
84 pass
85 coro = func(*args, **kwargs)
86 result = [None]
87 event = threading.Event()
88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
89 while True:
90 # this loops allows thread to get interrupted
91 if event.wait(1):
92 break
93 if timeout is not None:
94 timeout -= 1
95 if timeout < 0:
96 raise FSTimeoutError
98 return_result = result[0]
99 if isinstance(return_result, asyncio.TimeoutError):
100 # suppress asyncio.TimeoutError, raise FSTimeoutError
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
103 raise return_result
104 else:
105 return return_result
108def sync_wrapper(func, obj=None):
109 """Given a function, make so can be called in blocking contexts
111 Leave obj=None if defining within a class. Pass the instance if attaching
112 as an attribute of the instance.
113 """
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
118 return sync(self.loop, func, *args, **kwargs)
120 return wrapper
123def async_gen_wrapper(func, obj=None):
124 """Given a async generator, make so can be called in blocking contexts"""
126 @functools.wraps(func)
127 def wrapper(*args, **kwargs):
128 self = obj or args[0]
129 gen = func(*args, **kwargs)
130 while True:
131 try:
132 yield sync(self.loop, gen.__anext__)
133 except StopAsyncIteration:
134 break
136 return wrapper
139def get_loop():
140 """Create or return the default fsspec IO loop
142 The loop will be running on a separate thread.
143 """
144 if loop[0] is None:
145 with get_lock():
146 # repeat the check just in case the loop got filled between the
147 # previous two calls from another thread
148 if loop[0] is None:
149 loop[0] = asyncio.new_event_loop()
150 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
151 th.daemon = True
152 th.start()
153 iothread[0] = th
154 return loop[0]
157def reset_after_fork():
158 global lock
159 loop[0] = None
160 iothread[0] = None
161 lock = None
164if hasattr(os, "register_at_fork"):
165 # should be posix; this will do nothing for spawn or forkserver subprocesses
166 os.register_at_fork(after_in_child=reset_after_fork)
169if TYPE_CHECKING:
170 import resource
172 ResourceError = resource.error
173else:
174 try:
175 import resource
176 except ImportError:
177 resource = None
178 ResourceError = OSError
179 else:
180 ResourceError = getattr(resource, "error", OSError)
182_DEFAULT_BATCH_SIZE = 128
183_NOFILES_DEFAULT_BATCH_SIZE = 1280
186def _get_batch_size(nofiles=False):
187 from fsspec.config import conf
189 if nofiles:
190 if "nofiles_gather_batch_size" in conf:
191 return conf["nofiles_gather_batch_size"]
192 else:
193 if "gather_batch_size" in conf:
194 return conf["gather_batch_size"]
195 if nofiles:
196 return _NOFILES_DEFAULT_BATCH_SIZE
197 if resource is None:
198 return _DEFAULT_BATCH_SIZE
200 try:
201 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
202 except (ImportError, ValueError, ResourceError):
203 return _DEFAULT_BATCH_SIZE
205 if soft_limit == resource.RLIM_INFINITY:
206 return -1
207 else:
208 return soft_limit // 8
211def running_async() -> bool:
212 """Being executed by an event loop?"""
213 try:
214 asyncio.get_running_loop()
215 return True
216 except RuntimeError:
217 return False
220async def _run_coros_in_chunks(
221 coros,
222 batch_size=None,
223 callback=DEFAULT_CALLBACK,
224 timeout=None,
225 return_exceptions=False,
226 nofiles=False,
227):
228 """Run the given coroutines in chunks.
230 Parameters
231 ----------
232 coros: list of coroutines to run
233 batch_size: int or None
234 Number of coroutines to submit/wait on simultaneously.
235 If -1, then it will not be any throttling. If
236 None, it will be inferred from _get_batch_size()
237 callback: fsspec.callbacks.Callback instance
238 Gets a relative_update when each coroutine completes
239 timeout: number or None
240 If given, each coroutine times out after this time. Note that, since
241 there are multiple batches, the total run time of this function will in
242 general be longer
243 return_exceptions: bool
244 Same meaning as in asyncio.gather
245 nofiles: bool
246 If inferring the batch_size, does this operation involve local files?
247 If yes, you normally expect smaller batches.
248 """
250 if batch_size is None:
251 batch_size = _get_batch_size(nofiles=nofiles)
253 if batch_size == -1:
254 batch_size = len(coros)
255 elif batch_size <= 0:
256 raise ValueError
258 async def _run_coro(coro, i):
259 try:
260 return await asyncio.wait_for(coro, timeout=timeout), i
261 except Exception as e:
262 if not return_exceptions:
263 raise
264 return e, i
265 finally:
266 callback.relative_update(1)
268 i = 0
269 n = len(coros)
270 results = [None] * n
271 pending = set()
273 while pending or i < n:
274 while len(pending) < batch_size and i < n:
275 pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
276 i += 1
278 if not pending:
279 break
281 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
282 first_exc = None
283 while done:
284 task = done.pop()
285 try:
286 result, k = await task
287 results[k] = result
288 except Exception as exc:
289 if first_exc is None:
290 first_exc = exc
292 if first_exc is not None:
293 for task in pending:
294 task.cancel()
295 if pending:
296 await asyncio.gather(*pending, return_exceptions=True)
297 raise first_exc
299 return results
302# these methods should be implemented as async by any async-able backend
303async_methods = [
304 "_ls",
305 "_cat_file",
306 "_get_file",
307 "_put_file",
308 "_rm_file",
309 "_cp_file",
310 "_pipe_file",
311 "_expand_path",
312 "_info",
313 "_isfile",
314 "_isdir",
315 "_exists",
316 "_walk",
317 "_glob",
318 "_find",
319 "_du",
320 "_size",
321 "_mkdir",
322 "_makedirs",
323]
326class AsyncFileSystem(AbstractFileSystem):
327 """Async file operations, default implementations
329 Passes bulk operations to asyncio.gather for concurrent operation.
331 Implementations that have concurrent batch operations and/or async methods
332 should inherit from this class instead of AbstractFileSystem. Docstrings are
333 copied from the un-underscored method in AbstractFileSystem, if not given.
334 """
336 # note that methods do not have docstring here; they will be copied
337 # for _* methods and inferred for overridden methods.
339 async_impl = True
340 mirror_sync_methods = True
341 disable_throttling = False
343 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
344 self.asynchronous = asynchronous
345 self._pid = os.getpid()
346 if not asynchronous:
347 self._loop = loop or get_loop()
348 else:
349 self._loop = None
350 self.batch_size = batch_size
351 super().__init__(*args, **kwargs)
353 @property
354 def loop(self):
355 if self._pid != os.getpid():
356 raise RuntimeError("This class is not fork-safe")
357 return self._loop
359 async def _rm_file(self, path, **kwargs):
360 if (
361 inspect.iscoroutinefunction(self._rm)
362 and type(self)._rm is not AsyncFileSystem._rm
363 ):
364 return await self._rm(path, recursive=False, batch_size=1, **kwargs)
365 raise NotImplementedError
367 async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
368 # TODO: implement on_error
369 batch_size = batch_size or self.batch_size
370 path = await self._expand_path(path, recursive=recursive)
371 return await _run_coros_in_chunks(
372 [self._rm_file(p, **kwargs) for p in reversed(path)],
373 batch_size=batch_size,
374 nofiles=True,
375 )
377 async def _cp_file(self, path1, path2, **kwargs):
378 raise NotImplementedError
380 async def _mv_file(self, path1, path2):
381 await self._cp_file(path1, path2)
382 await self._rm_file(path1)
384 async def _copy(
385 self,
386 path1,
387 path2,
388 recursive=False,
389 on_error=None,
390 maxdepth=None,
391 batch_size=None,
392 **kwargs,
393 ):
394 if on_error is None and recursive:
395 on_error = "ignore"
396 elif on_error is None:
397 on_error = "raise"
399 if isinstance(path1, list) and isinstance(path2, list):
400 # No need to expand paths when both source and destination
401 # are provided as lists
402 paths1 = path1
403 paths2 = path2
404 else:
405 source_is_str = isinstance(path1, str)
406 paths1 = await self._expand_path(
407 path1, maxdepth=maxdepth, recursive=recursive
408 )
409 if source_is_str and (not recursive or maxdepth is not None):
410 # Non-recursive glob does not copy directories
411 paths1 = [
412 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
413 ]
414 if not paths1:
415 return
417 source_is_file = len(paths1) == 1
418 dest_is_dir = isinstance(path2, str) and (
419 trailing_sep(path2) or await self._isdir(path2)
420 )
422 exists = source_is_str and (
423 (has_magic(path1) and source_is_file)
424 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
425 )
426 paths2 = other_paths(
427 paths1,
428 path2,
429 exists=exists,
430 flatten=not source_is_str,
431 )
433 batch_size = batch_size or self.batch_size
434 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
435 result = await _run_coros_in_chunks(
436 coros, batch_size=batch_size, return_exceptions=True, nofiles=True
437 )
439 for ex in filter(is_exception, result):
440 if on_error == "ignore" and isinstance(ex, FileNotFoundError):
441 continue
442 raise ex
444 async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
445 raise NotImplementedError
447 async def _pipe(self, path, value=None, batch_size=None, **kwargs):
448 if isinstance(path, str):
449 path = {path: value}
450 batch_size = batch_size or self.batch_size
451 return await _run_coros_in_chunks(
452 [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
453 batch_size=batch_size,
454 nofiles=True,
455 )
457 async def _process_limits(self, url, start, end):
458 """Helper for "Range"-based _cat_file"""
459 size = None
460 suff = False
461 if start is not None and start < 0:
462 # if start is negative and end None, end is the "suffix length"
463 if end is None:
464 end = -start
465 start = ""
466 suff = True
467 else:
468 size = size or (await self._info(url))["size"]
469 start = size + start
470 elif start is None:
471 start = 0
472 if not suff:
473 if end is not None and end < 0:
474 if start is not None:
475 size = size or (await self._info(url))["size"]
476 end = size + end
477 elif end is None:
478 end = ""
479 if isinstance(end, numbers.Integral):
480 end -= 1 # bytes range is inclusive
481 return f"bytes={start}-{end}"
483 async def _cat_file(self, path, start=None, end=None, **kwargs):
484 raise NotImplementedError
486 async def _cat(
487 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
488 ):
489 paths = await self._expand_path(path, recursive=recursive)
490 coros = [self._cat_file(path, **kwargs) for path in paths]
491 batch_size = batch_size or self.batch_size
492 out = await _run_coros_in_chunks(
493 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
494 )
495 if on_error == "raise":
496 ex = next(filter(is_exception, out), False)
497 if ex:
498 raise ex
499 if (
500 len(paths) > 1
501 or isinstance(path, list)
502 or paths[0] != self._strip_protocol(path)
503 ):
504 return {
505 k: v
506 for k, v in zip(paths, out)
507 if on_error != "omit" or not is_exception(v)
508 }
509 else:
510 return out[0]
512 async def _cat_ranges(
513 self,
514 paths,
515 starts,
516 ends,
517 max_gap=None,
518 batch_size=None,
519 on_error="return",
520 **kwargs,
521 ):
522 """Get the contents of byte ranges from one or more files
524 Parameters
525 ----------
526 paths: list
527 A list of of filepaths on this filesystems
528 starts, ends: int or list
529 Bytes limits of the read. If using a single int, the same value will be
530 used to read all the specified files.
531 on_error: "return" or "raise"
532 If "return" (default), any per-range exception is placed in the output
533 list at the corresponding position. Otherwise the first such exception
534 is raised. Matches ``AbstractFileSystem.cat_ranges``.
535 """
536 if max_gap is not None:
537 # use utils.merge_offset_ranges
538 raise NotImplementedError
539 if not isinstance(paths, list):
540 raise TypeError
541 if not isinstance(starts, Iterable):
542 starts = [starts] * len(paths)
543 if not isinstance(ends, Iterable):
544 ends = [ends] * len(paths)
545 if len(starts) != len(paths) or len(ends) != len(paths):
546 raise ValueError
547 coros = [
548 self._cat_file(p, start=s, end=e, **kwargs)
549 for p, s, e in zip(paths, starts, ends)
550 ]
551 batch_size = batch_size or self.batch_size
552 out = await _run_coros_in_chunks(
553 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
554 )
555 if on_error != "return":
556 ex = next(filter(is_exception, out), None)
557 if ex is not None:
558 raise ex
559 return out
561 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
562 raise NotImplementedError
564 async def _put(
565 self,
566 lpath,
567 rpath,
568 recursive=False,
569 callback=DEFAULT_CALLBACK,
570 batch_size=None,
571 maxdepth=None,
572 **kwargs,
573 ):
574 """Copy file(s) from local.
576 Copies a specific file or tree of files (if recursive=True). If rpath
577 ends with a "/", it will be assumed to be a directory, and target files
578 will go within.
580 The put_file method will be called concurrently on a batch of files. The
581 batch_size option can configure the amount of futures that can be executed
582 at the same time. If it is -1, then all the files will be uploaded concurrently.
583 The default can be set for this instance by passing "batch_size" in the
584 constructor, or for all instances by setting the "gather_batch_size" key
585 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
586 """
587 if isinstance(lpath, list) and isinstance(rpath, list):
588 # No need to expand paths when both source and destination
589 # are provided as lists
590 rpaths = rpath
591 lpaths = lpath
592 else:
593 source_is_str = isinstance(lpath, str)
594 if source_is_str:
595 lpath = make_path_posix(lpath)
596 fs = LocalFileSystem()
597 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
598 if source_is_str and (not recursive or maxdepth is not None):
599 # Non-recursive glob does not copy directories
600 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
601 if not lpaths:
602 return
604 source_is_file = len(lpaths) == 1
605 dest_is_dir = isinstance(rpath, str) and (
606 trailing_sep(rpath) or await self._isdir(rpath)
607 )
609 rpath = self._strip_protocol(rpath)
610 exists = source_is_str and (
611 (has_magic(lpath) and source_is_file)
612 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
613 )
614 rpaths = other_paths(
615 lpaths,
616 rpath,
617 exists=exists,
618 flatten=not source_is_str,
619 )
621 is_dir = {l: os.path.isdir(l) for l in lpaths}
622 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
623 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
625 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
626 batch_size = batch_size or self.batch_size
628 coros = []
629 callback.set_size(len(file_pairs))
630 for lfile, rfile in file_pairs:
631 put_file = callback.branch_coro(self._put_file)
632 coros.append(put_file(lfile, rfile, **kwargs))
634 return await _run_coros_in_chunks(
635 coros, batch_size=batch_size, callback=callback
636 )
638 async def _get_file(self, rpath, lpath, **kwargs):
639 raise NotImplementedError
641 async def _get(
642 self,
643 rpath,
644 lpath,
645 recursive=False,
646 callback=DEFAULT_CALLBACK,
647 maxdepth=None,
648 **kwargs,
649 ):
650 """Copy file(s) to local.
652 Copies a specific file or tree of files (if recursive=True). If lpath
653 ends with a "/", it will be assumed to be a directory, and target files
654 will go within. Can submit a list of paths, which may be glob-patterns
655 and will be expanded.
657 The get_file method will be called concurrently on a batch of files. The
658 batch_size option can configure the amount of futures that can be executed
659 at the same time. If it is -1, then all the files will be uploaded concurrently.
660 The default can be set for this instance by passing "batch_size" in the
661 constructor, or for all instances by setting the "gather_batch_size" key
662 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
663 """
664 if isinstance(lpath, list) and isinstance(rpath, list):
665 # No need to expand paths when both source and destination
666 # are provided as lists
667 rpaths = rpath
668 lpaths = lpath
669 else:
670 source_is_str = isinstance(rpath, str)
671 # First check for rpath trailing slash as _strip_protocol removes it.
672 source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
673 rpath = self._strip_protocol(rpath)
674 rpaths = await self._expand_path(
675 rpath, recursive=recursive, maxdepth=maxdepth
676 )
677 if source_is_str and (not recursive or maxdepth is not None):
678 # Non-recursive glob does not copy directories
679 rpaths = [
680 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
681 ]
682 if not rpaths:
683 return
685 lpath = make_path_posix(lpath)
686 source_is_file = len(rpaths) == 1
687 dest_is_dir = isinstance(lpath, str) and (
688 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
689 )
691 exists = source_is_str and (
692 (has_magic(rpath) and source_is_file)
693 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
694 )
695 lpaths = other_paths(
696 rpaths,
697 lpath,
698 exists=exists,
699 flatten=not source_is_str,
700 )
702 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
703 batch_size = kwargs.pop("batch_size", self.batch_size)
705 coros = []
706 callback.set_size(len(lpaths))
707 for lpath, rpath in zip(lpaths, rpaths):
708 get_file = callback.branch_coro(self._get_file)
709 coros.append(get_file(rpath, lpath, **kwargs))
710 return await _run_coros_in_chunks(
711 coros, batch_size=batch_size, callback=callback
712 )
714 async def _isfile(self, path):
715 try:
716 return (await self._info(path))["type"] == "file"
717 except: # noqa: E722
718 return False
720 async def _isdir(self, path):
721 try:
722 return (await self._info(path))["type"] == "directory"
723 except OSError:
724 return False
726 async def _size(self, path):
727 return (await self._info(path)).get("size", None)
729 async def _sizes(self, paths, batch_size=None):
730 batch_size = batch_size or self.batch_size
731 return await _run_coros_in_chunks(
732 [self._size(p) for p in paths], batch_size=batch_size
733 )
735 async def _exists(self, path, **kwargs):
736 try:
737 await self._info(path, **kwargs)
738 return True
739 except FileNotFoundError:
740 return False
742 async def _info(self, path, **kwargs):
743 raise NotImplementedError
745 async def _ls(self, path, detail=True, **kwargs):
746 raise NotImplementedError
748 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
749 if maxdepth is not None and maxdepth < 1:
750 raise ValueError("maxdepth must be at least 1")
752 path = self._strip_protocol(path)
753 full_dirs = {}
754 dirs = {}
755 files = {}
757 detail = kwargs.pop("detail", False)
758 try:
759 listing = await self._ls(path, detail=True, **kwargs)
760 except (FileNotFoundError, OSError) as e:
761 if on_error == "raise":
762 raise
763 elif callable(on_error):
764 on_error(e)
765 if detail:
766 yield path, {}, {}
767 else:
768 yield path, [], []
769 return
771 for info in listing:
772 # each info name must be at least [path]/part , but here
773 # we check also for names like [path]/part/
774 pathname = info["name"].rstrip("/")
775 name = pathname.rsplit("/", 1)[-1]
776 if info["type"] == "directory" and pathname != path:
777 # do not include "self" path
778 full_dirs[name] = pathname
779 dirs[name] = info
780 elif pathname == path:
781 # file-like with same name as give path
782 files[""] = info
783 else:
784 files[name] = info
786 if detail:
787 yield path, dirs, files
788 else:
789 yield path, list(dirs), list(files)
791 if maxdepth is not None:
792 maxdepth -= 1
793 if maxdepth < 1:
794 return
796 for d in dirs:
797 async for _ in self._walk(
798 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
799 ):
800 yield _
802 async def _glob(self, path, maxdepth=None, **kwargs):
803 if maxdepth is not None and maxdepth < 1:
804 raise ValueError("maxdepth must be at least 1")
806 import re
808 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
809 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
810 path = self._strip_protocol(path)
811 append_slash_to_dirname = ends_with_sep or path.endswith(
812 tuple(sep + "**" for sep in seps)
813 )
814 idx_star = path.find("*") if path.find("*") >= 0 else len(path)
815 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
816 idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
818 min_idx = min(idx_star, idx_qmark, idx_brace)
820 detail = kwargs.pop("detail", False)
821 withdirs = kwargs.pop("withdirs", True)
823 if not has_magic(path):
824 if await self._exists(path, **kwargs):
825 if not detail:
826 return [path]
827 else:
828 return {path: await self._info(path, **kwargs)}
829 else:
830 if not detail:
831 return [] # glob of non-existent returns empty
832 else:
833 return {}
834 elif "/" in path[:min_idx]:
835 first_wildcard_idx = min_idx
836 min_idx = path[:min_idx].rindex("/")
837 root = path[
838 : min_idx + 1
839 ] # everything up to the last / before the first wildcard
840 prefix = path[
841 min_idx + 1 : first_wildcard_idx
842 ] # stem between last "/" and first wildcard
843 depth = path[min_idx + 1 :].count("/") + 1
844 else:
845 root = ""
846 prefix = path[:min_idx] # stem up to the first wildcard
847 depth = path[min_idx + 1 :].count("/") + 1
849 if "**" in path:
850 if maxdepth is not None:
851 idx_double_stars = path.find("**")
852 depth_double_stars = path[idx_double_stars:].count("/") + 1
853 depth = depth - depth_double_stars + maxdepth
854 else:
855 depth = None
857 # Pass the filename stem as prefix= so backends that support it such as
858 # gcsfs, s3fs and adlfs can filter server-side up to the first wildcard.
859 if prefix:
860 kwargs["prefix"] = prefix
861 allpaths = await self._find(
862 root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs
863 )
865 pattern = glob_translate(path + ("/" if ends_with_sep else ""))
866 pattern = re.compile(pattern)
868 out = {
869 p: info
870 for p, info in sorted(allpaths.items())
871 if pattern.match(
872 p + "/"
873 if append_slash_to_dirname and info["type"] == "directory"
874 else p
875 )
876 }
878 if detail:
879 return out
880 else:
881 return list(out)
883 async def _du(self, path, total=True, maxdepth=None, **kwargs):
884 sizes = {}
885 # async for?
886 for f in await self._find(path, maxdepth=maxdepth, **kwargs):
887 info = await self._info(f)
888 sizes[info["name"]] = info["size"]
889 if total:
890 return sum(sizes.values())
891 else:
892 return sizes
894 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
895 path = self._strip_protocol(path)
896 out = {}
897 detail = kwargs.pop("detail", False)
899 # Add the root directory if withdirs is requested
900 # This is needed for posix glob compliance
901 if withdirs and path != "" and await self._isdir(path):
902 out[path] = await self._info(path)
904 # async for?
905 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
906 if withdirs:
907 files.update(dirs)
908 out.update({info["name"]: info for name, info in files.items()})
909 if not out and (await self._isfile(path)):
910 # walk works on directories, but find should also return [path]
911 # when path happens to be a file
912 out[path] = {}
913 names = sorted(out)
914 if not detail:
915 return names
916 else:
917 return {name: out[name] for name in names}
919 async def _expand_path(
920 self, path, recursive=False, maxdepth=None, assume_literal=False
921 ):
922 if maxdepth is not None and maxdepth < 1:
923 raise ValueError("maxdepth must be at least 1")
925 if isinstance(path, str):
926 out = await self._expand_path([path], recursive, maxdepth)
927 else:
928 out = set()
929 path = [self._strip_protocol(p) for p in path]
930 for p in path: # can gather here
931 if not assume_literal and has_magic(p):
932 bit = set(await self._glob(p, maxdepth=maxdepth))
933 out |= bit
934 if recursive:
935 # glob call above expanded one depth so if maxdepth is defined
936 # then decrement it in expand_path call below. If it is zero
937 # after decrementing then avoid expand_path call.
938 if maxdepth is not None and maxdepth <= 1:
939 continue
940 out |= set(
941 await self._expand_path(
942 list(bit),
943 recursive=recursive,
944 maxdepth=maxdepth - 1 if maxdepth is not None else None,
945 assume_literal=True,
946 )
947 )
948 continue
949 elif recursive:
950 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
951 out |= rec
952 if p not in out and (recursive is False or (await self._exists(p))):
953 # should only check once, for the root
954 out.add(p)
955 if not out:
956 raise FileNotFoundError(path)
957 return sorted(out)
959 async def _mkdir(self, path, create_parents=True, **kwargs):
960 pass # not necessary to implement, may not have directories
962 async def _makedirs(self, path, exist_ok=False):
963 pass # not necessary to implement, may not have directories
965 async def open_async(self, path, mode="rb", **kwargs):
966 if "b" not in mode or kwargs.get("compression"):
967 raise ValueError
968 raise NotImplementedError
971def mirror_sync_methods(obj):
972 """Populate sync and async methods for obj
974 For each method will create a sync version if the name refers to an async method
975 (coroutine) and there is no override in the child class; will create an async
976 method for the corresponding sync method if there is no implementation.
978 Uses the methods specified in
979 - async_methods: the set that an implementation is expected to provide
980 - default_async_methods: that can be derived from their sync version in
981 AbstractFileSystem
982 - AsyncFileSystem: async-specific default coroutines
983 """
984 from fsspec import AbstractFileSystem
986 for method in set(async_methods + dir(AsyncFileSystem)):
987 if not method.startswith("_"):
988 continue
989 smethod = method[1:]
990 if private.match(method):
991 isco = inspect.iscoroutinefunction(getattr(obj, method, None))
992 unsync = getattr(getattr(obj, smethod, False), "__func__", None)
993 is_default = unsync is getattr(AbstractFileSystem, smethod, "")
994 if isco and is_default:
995 mth = sync_wrapper(getattr(obj, method), obj=obj)
996 elif inspect.isasyncgenfunction(getattr(obj, method, None)) and is_default:
997 mth = async_gen_wrapper(getattr(obj, method), obj=obj)
998 else:
999 continue
1000 setattr(obj, smethod, mth)
1001 if not mth.__doc__:
1002 mth.__doc__ = getattr(
1003 getattr(AbstractFileSystem, smethod, None), "__doc__", ""
1004 )
1007class FSSpecCoroutineCancel(Exception):
1008 pass
1011def _dump_running_tasks(
1012 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
1013):
1014 import traceback
1016 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
1017 if printout:
1018 [task.print_stack() for task in tasks]
1019 out = [
1020 {
1021 "locals": task._coro.cr_frame.f_locals,
1022 "file": task._coro.cr_frame.f_code.co_filename,
1023 "firstline": task._coro.cr_frame.f_code.co_firstlineno,
1024 "linelo": task._coro.cr_frame.f_lineno,
1025 "stack": traceback.format_stack(task._coro.cr_frame),
1026 "task": task if with_task else None,
1027 }
1028 for task in tasks
1029 ]
1030 if cancel:
1031 for t in tasks:
1032 cbs = t._callbacks
1033 t.cancel()
1034 asyncio.futures.Future.set_exception(t, exc)
1035 asyncio.futures.Future.cancel(t)
1036 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
1037 try:
1038 t._coro.throw(exc) # exits coro, unless explicitly handled
1039 except exc:
1040 pass
1041 return out
1044class AbstractAsyncStreamedFile(AbstractBufferedFile):
1045 # no read buffering, and always auto-commit
1046 # TODO: readahead might still be useful here, but needs async version
1048 async def read(self, length=-1):
1049 """
1050 Return data from cache, or fetch pieces as necessary
1052 Parameters
1053 ----------
1054 length: int (-1)
1055 Number of bytes to read; if <0, all remaining bytes.
1056 """
1057 length = -1 if length is None else int(length)
1058 if self.mode != "rb":
1059 raise ValueError("File not in read mode")
1060 if length < 0:
1061 length = self.size - self.loc
1062 if self.closed:
1063 raise ValueError("I/O operation on closed file.")
1064 if length == 0:
1065 # don't even bother calling fetch
1066 return b""
1067 out = await self._fetch_range(self.loc, self.loc + length)
1068 self.loc += len(out)
1069 return out
1071 async def write(self, data):
1072 """
1073 Write data to buffer.
1075 Buffer only sent on flush() or if buffer is greater than
1076 or equal to blocksize.
1078 Parameters
1079 ----------
1080 data: bytes
1081 Set of bytes to be written.
1082 """
1083 if self.mode not in {"wb", "ab"}:
1084 raise ValueError("File not in write mode")
1085 if self.closed:
1086 raise ValueError("I/O operation on closed file.")
1087 if self.forced:
1088 raise ValueError("This file has been force-flushed, can only close")
1089 out = self.buffer.write(data)
1090 self.loc += out
1091 if self.buffer.tell() >= self.blocksize:
1092 await self.flush()
1093 return out
1095 async def close(self):
1096 """Close file
1098 Finalizes writes, discards cache
1099 """
1100 if getattr(self, "_unclosable", False):
1101 return
1102 if self.closed:
1103 return
1104 if self.mode == "rb":
1105 self.cache = None
1106 else:
1107 if not self.forced:
1108 await self.flush(force=True)
1110 if self.fs is not None:
1111 self.fs.invalidate_cache(self.path)
1112 self.fs.invalidate_cache(self.fs._parent(self.path))
1114 self.closed = True
1116 async def flush(self, force=False):
1117 if self.closed:
1118 raise ValueError("Flush on closed file")
1119 if force and self.forced:
1120 raise ValueError("Force flush cannot be called more than once")
1121 if force:
1122 self.forced = True
1124 if self.mode not in {"wb", "ab"}:
1125 # no-op to flush on read-mode
1126 return
1128 if not force and self.buffer.tell() < self.blocksize:
1129 # Defer write on small block
1130 return
1132 if self.offset is None:
1133 # Initialize a multipart upload
1134 self.offset = 0
1135 try:
1136 await self._initiate_upload()
1137 except:
1138 self.closed = True
1139 raise
1141 if await self._upload_chunk(final=force) is not False:
1142 self.offset += self.buffer.seek(0, 2)
1143 self.buffer = io.BytesIO()
1145 async def __aenter__(self):
1146 return self
1148 async def __aexit__(self, exc_type, exc_val, exc_tb):
1149 await self.close()
1151 async def _fetch_range(self, start, end):
1152 raise NotImplementedError
1154 async def _initiate_upload(self):
1155 pass
1157 async def _upload_chunk(self, final=False):
1158 raise NotImplementedError