Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/asyn.py: 29%
543 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1import asyncio
2import asyncio.events
3import functools
4import inspect
5import io
6import numbers
7import os
8import re
9import threading
10from contextlib import contextmanager
11from glob import has_magic
12from typing import TYPE_CHECKING, Iterable
14from .callbacks import _DEFAULT_CALLBACK
15from .exceptions import FSTimeoutError
16from .implementations.local import (
17 LocalFileSystem,
18 make_path_posix,
19 trailing_sep,
20 trailing_sep_maybe_asterisk,
21)
22from .spec import AbstractBufferedFile, AbstractFileSystem
23from .utils import is_exception, other_paths
25private = re.compile("_[^_]")
26iothread = [None] # dedicated fsspec IO thread
27loop = [None] # global event loop for any non-async instance
28_lock = None # global lock placeholder
29get_running_loop = asyncio.get_running_loop
32def get_lock():
33 """Allocate or return a threading lock.
35 The lock is allocated on first use to allow setting one lock per forked process.
36 """
37 global _lock
38 if not _lock:
39 _lock = threading.Lock()
40 return _lock
43def reset_lock():
44 """Reset the global lock.
46 This should be called only on the init of a forked process to reset the lock to
47 None, enabling the new forked process to get a new lock.
48 """
49 global _lock
51 iothread[0] = None
52 loop[0] = None
53 _lock = None
56async def _runner(event, coro, result, timeout=None):
57 timeout = timeout if timeout else None # convert 0 or 0.0 to None
58 if timeout is not None:
59 coro = asyncio.wait_for(coro, timeout=timeout)
60 try:
61 result[0] = await coro
62 except Exception as ex:
63 result[0] = ex
64 finally:
65 event.set()
68def sync(loop, func, *args, timeout=None, **kwargs):
69 """
70 Make loop run coroutine until it returns. Runs in other thread
72 Examples
73 --------
74 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
75 timeout=timeout, **kwargs)
76 """
77 timeout = timeout if timeout else None # convert 0 or 0.0 to None
78 # NB: if the loop is not running *yet*, it is OK to submit work
79 # and we will wait for it
80 if loop is None or loop.is_closed():
81 raise RuntimeError("Loop is not running")
82 try:
83 loop0 = asyncio.events.get_running_loop()
84 if loop0 is loop:
85 raise NotImplementedError("Calling sync() from within a running loop")
86 except RuntimeError:
87 pass
88 coro = func(*args, **kwargs)
89 result = [None]
90 event = threading.Event()
91 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
92 while True:
93 # this loops allows thread to get interrupted
94 if event.wait(1):
95 break
96 if timeout is not None:
97 timeout -= 1
98 if timeout < 0:
99 raise FSTimeoutError
101 return_result = result[0]
102 if isinstance(return_result, asyncio.TimeoutError):
103 # suppress asyncio.TimeoutError, raise FSTimeoutError
104 raise FSTimeoutError from return_result
105 elif isinstance(return_result, BaseException):
106 raise return_result
107 else:
108 return return_result
111def sync_wrapper(func, obj=None):
112 """Given a function, make so can be called in async or blocking contexts
114 Leave obj=None if defining within a class. Pass the instance if attaching
115 as an attribute of the instance.
116 """
118 @functools.wraps(func)
119 def wrapper(*args, **kwargs):
120 self = obj or args[0]
121 return sync(self.loop, func, *args, **kwargs)
123 return wrapper
126@contextmanager
127def _selector_policy():
128 original_policy = asyncio.get_event_loop_policy()
129 try:
130 if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
131 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
133 yield
134 finally:
135 asyncio.set_event_loop_policy(original_policy)
138def get_loop():
139 """Create or return the default fsspec IO loop
141 The loop will be running on a separate thread.
142 """
143 if loop[0] is None:
144 with get_lock():
145 # repeat the check just in case the loop got filled between the
146 # previous two calls from another thread
147 if loop[0] is None:
148 with _selector_policy():
149 loop[0] = asyncio.new_event_loop()
150 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
151 th.daemon = True
152 th.start()
153 iothread[0] = th
154 return loop[0]
157if TYPE_CHECKING:
158 import resource
160 ResourceError = resource.error
161else:
162 try:
163 import resource
164 except ImportError:
165 resource = None
166 ResourceError = OSError
167 else:
168 ResourceError = getattr(resource, "error", OSError)
170_DEFAULT_BATCH_SIZE = 128
171_NOFILES_DEFAULT_BATCH_SIZE = 1280
174def _get_batch_size(nofiles=False):
175 from fsspec.config import conf
177 if nofiles:
178 if "nofiles_gather_batch_size" in conf:
179 return conf["nofiles_gather_batch_size"]
180 else:
181 if "gather_batch_size" in conf:
182 return conf["gather_batch_size"]
183 if nofiles:
184 return _NOFILES_DEFAULT_BATCH_SIZE
185 if resource is None:
186 return _DEFAULT_BATCH_SIZE
188 try:
189 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
190 except (ImportError, ValueError, ResourceError):
191 return _DEFAULT_BATCH_SIZE
193 if soft_limit == resource.RLIM_INFINITY:
194 return -1
195 else:
196 return soft_limit // 8
199def running_async() -> bool:
200 """Being executed by an event loop?"""
201 try:
202 asyncio.get_running_loop()
203 return True
204 except RuntimeError:
205 return False
208async def _run_coros_in_chunks(
209 coros,
210 batch_size=None,
211 callback=_DEFAULT_CALLBACK,
212 timeout=None,
213 return_exceptions=False,
214 nofiles=False,
215):
216 """Run the given coroutines in chunks.
218 Parameters
219 ----------
220 coros: list of coroutines to run
221 batch_size: int or None
222 Number of coroutines to submit/wait on simultaneously.
223 If -1, then it will not be any throttling. If
224 None, it will be inferred from _get_batch_size()
225 callback: fsspec.callbacks.Callback instance
226 Gets a relative_update when each coroutine completes
227 timeout: number or None
228 If given, each coroutine times out after this time. Note that, since
229 there are multiple batches, the total run time of this function will in
230 general be longer
231 return_exceptions: bool
232 Same meaning as in asyncio.gather
233 nofiles: bool
234 If inferring the batch_size, does this operation involve local files?
235 If yes, you normally expect smaller batches.
236 """
238 if batch_size is None:
239 batch_size = _get_batch_size(nofiles=nofiles)
241 if batch_size == -1:
242 batch_size = len(coros)
244 assert batch_size > 0
245 results = []
246 for start in range(0, len(coros), batch_size):
247 chunk = [
248 asyncio.Task(asyncio.wait_for(c, timeout=timeout))
249 for c in coros[start : start + batch_size]
250 ]
251 if callback is not _DEFAULT_CALLBACK:
252 [
253 t.add_done_callback(lambda *_, **__: callback.relative_update(1))
254 for t in chunk
255 ]
256 results.extend(
257 await asyncio.gather(*chunk, return_exceptions=return_exceptions),
258 )
259 return results
262# these methods should be implemented as async by any async-able backend
263async_methods = [
264 "_ls",
265 "_cat_file",
266 "_get_file",
267 "_put_file",
268 "_rm_file",
269 "_cp_file",
270 "_pipe_file",
271 "_expand_path",
272 "_info",
273 "_isfile",
274 "_isdir",
275 "_exists",
276 "_walk",
277 "_glob",
278 "_find",
279 "_du",
280 "_size",
281 "_mkdir",
282 "_makedirs",
283]
286class AsyncFileSystem(AbstractFileSystem):
287 """Async file operations, default implementations
289 Passes bulk operations to asyncio.gather for concurrent operation.
291 Implementations that have concurrent batch operations and/or async methods
292 should inherit from this class instead of AbstractFileSystem. Docstrings are
293 copied from the un-underscored method in AbstractFileSystem, if not given.
294 """
296 # note that methods do not have docstring here; they will be copied
297 # for _* methods and inferred for overridden methods.
299 async_impl = True
300 mirror_sync_methods = True
301 disable_throttling = False
303 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
304 self.asynchronous = asynchronous
305 self._pid = os.getpid()
306 if not asynchronous:
307 self._loop = loop or get_loop()
308 else:
309 self._loop = None
310 self.batch_size = batch_size
311 super().__init__(*args, **kwargs)
313 @property
314 def loop(self):
315 if self._pid != os.getpid():
316 raise RuntimeError("This class is not fork-safe")
317 return self._loop
319 async def _rm_file(self, path, **kwargs):
320 raise NotImplementedError
322 async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
323 # TODO: implement on_error
324 batch_size = batch_size or self.batch_size
325 path = await self._expand_path(path, recursive=recursive)
326 return await _run_coros_in_chunks(
327 [self._rm_file(p, **kwargs) for p in reversed(path)],
328 batch_size=batch_size,
329 nofiles=True,
330 )
332 async def _cp_file(self, path1, path2, **kwargs):
333 raise NotImplementedError
335 async def _copy(
336 self,
337 path1,
338 path2,
339 recursive=False,
340 on_error=None,
341 maxdepth=None,
342 batch_size=None,
343 **kwargs,
344 ):
345 if on_error is None and recursive:
346 on_error = "ignore"
347 elif on_error is None:
348 on_error = "raise"
350 source_is_str = isinstance(path1, str)
351 paths = await self._expand_path(path1, maxdepth=maxdepth, recursive=recursive)
352 if source_is_str and (not recursive or maxdepth is not None):
353 # Non-recursive glob does not copy directories
354 paths = [p for p in paths if not (trailing_sep(p) or await self._isdir(p))]
355 if not paths:
356 return
358 isdir = isinstance(path2, str) and (
359 trailing_sep(path2) or await self._isdir(path2)
360 )
361 path2 = other_paths(
362 paths,
363 path2,
364 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(path1),
365 is_dir=isdir,
366 flatten=not source_is_str,
367 )
368 batch_size = batch_size or self.batch_size
369 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)]
370 result = await _run_coros_in_chunks(
371 coros, batch_size=batch_size, return_exceptions=True, nofiles=True
372 )
374 for ex in filter(is_exception, result):
375 if on_error == "ignore" and isinstance(ex, FileNotFoundError):
376 continue
377 raise ex
379 async def _pipe_file(self, path, value, **kwargs):
380 raise NotImplementedError
382 async def _pipe(self, path, value=None, batch_size=None, **kwargs):
383 if isinstance(path, str):
384 path = {path: value}
385 batch_size = batch_size or self.batch_size
386 return await _run_coros_in_chunks(
387 [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
388 batch_size=batch_size,
389 nofiles=True,
390 )
392 async def _process_limits(self, url, start, end):
393 """Helper for "Range"-based _cat_file"""
394 size = None
395 suff = False
396 if start is not None and start < 0:
397 # if start is negative and end None, end is the "suffix length"
398 if end is None:
399 end = -start
400 start = ""
401 suff = True
402 else:
403 size = size or (await self._info(url))["size"]
404 start = size + start
405 elif start is None:
406 start = 0
407 if not suff:
408 if end is not None and end < 0:
409 if start is not None:
410 size = size or (await self._info(url))["size"]
411 end = size + end
412 elif end is None:
413 end = ""
414 if isinstance(end, numbers.Integral):
415 end -= 1 # bytes range is inclusive
416 return "bytes=%s-%s" % (start, end)
418 async def _cat_file(self, path, start=None, end=None, **kwargs):
419 raise NotImplementedError
421 async def _cat(
422 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
423 ):
424 paths = await self._expand_path(path, recursive=recursive)
425 coros = [self._cat_file(path, **kwargs) for path in paths]
426 batch_size = batch_size or self.batch_size
427 out = await _run_coros_in_chunks(
428 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
429 )
430 if on_error == "raise":
431 ex = next(filter(is_exception, out), False)
432 if ex:
433 raise ex
434 if (
435 len(paths) > 1
436 or isinstance(path, list)
437 or paths[0] != self._strip_protocol(path)
438 ):
439 return {
440 k: v
441 for k, v in zip(paths, out)
442 if on_error != "omit" or not is_exception(v)
443 }
444 else:
445 return out[0]
447 async def _cat_ranges(
448 self,
449 paths,
450 starts,
451 ends,
452 max_gap=None,
453 batch_size=None,
454 on_error="return",
455 **kwargs,
456 ):
457 # TODO: on_error
458 if max_gap is not None:
459 # use utils.merge_offset_ranges
460 raise NotImplementedError
461 if not isinstance(paths, list):
462 raise TypeError
463 if not isinstance(starts, Iterable):
464 starts = [starts] * len(paths)
465 if not isinstance(ends, Iterable):
466 ends = [starts] * len(paths)
467 if len(starts) != len(paths) or len(ends) != len(paths):
468 raise ValueError
469 coros = [
470 self._cat_file(p, start=s, end=e, **kwargs)
471 for p, s, e in zip(paths, starts, ends)
472 ]
473 batch_size = batch_size or self.batch_size
474 return await _run_coros_in_chunks(
475 coros, batch_size=batch_size, nofiles=True, return_exceptions=True
476 )
478 async def _put_file(self, lpath, rpath, **kwargs):
479 raise NotImplementedError
481 async def _put(
482 self,
483 lpath,
484 rpath,
485 recursive=False,
486 callback=_DEFAULT_CALLBACK,
487 batch_size=None,
488 maxdepth=None,
489 **kwargs,
490 ):
491 """Copy file(s) from local.
493 Copies a specific file or tree of files (if recursive=True). If rpath
494 ends with a "/", it will be assumed to be a directory, and target files
495 will go within.
497 The put_file method will be called concurrently on a batch of files. The
498 batch_size option can configure the amount of futures that can be executed
499 at the same time. If it is -1, then all the files will be uploaded concurrently.
500 The default can be set for this instance by passing "batch_size" in the
501 constructor, or for all instances by setting the "gather_batch_size" key
502 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
503 """
504 source_is_str = isinstance(lpath, str)
505 if source_is_str:
506 lpath = make_path_posix(lpath)
507 fs = LocalFileSystem()
508 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
509 if source_is_str and (not recursive or maxdepth is not None):
510 # Non-recursive glob does not copy directories
511 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
512 if not lpaths:
513 return
515 isdir = isinstance(rpath, str) and (
516 trailing_sep(rpath) or await self._isdir(rpath)
517 )
518 rpath = self._strip_protocol(rpath)
519 rpaths = other_paths(
520 lpaths,
521 rpath,
522 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(lpath),
523 is_dir=isdir,
524 flatten=not source_is_str,
525 )
527 is_dir = {l: os.path.isdir(l) for l in lpaths}
528 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
529 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
531 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
532 batch_size = batch_size or self.batch_size
534 coros = []
535 callback.set_size(len(file_pairs))
536 for lfile, rfile in file_pairs:
537 callback.branch(lfile, rfile, kwargs)
538 coros.append(self._put_file(lfile, rfile, **kwargs))
540 return await _run_coros_in_chunks(
541 coros, batch_size=batch_size, callback=callback
542 )
544 async def _get_file(self, rpath, lpath, **kwargs):
545 raise NotImplementedError
547 async def _get(
548 self,
549 rpath,
550 lpath,
551 recursive=False,
552 callback=_DEFAULT_CALLBACK,
553 maxdepth=None,
554 **kwargs,
555 ):
556 """Copy file(s) to local.
558 Copies a specific file or tree of files (if recursive=True). If lpath
559 ends with a "/", it will be assumed to be a directory, and target files
560 will go within. Can submit a list of paths, which may be glob-patterns
561 and will be expanded.
563 The get_file method will be called concurrently on a batch of files. The
564 batch_size option can configure the amount of futures that can be executed
565 at the same time. If it is -1, then all the files will be uploaded concurrently.
566 The default can be set for this instance by passing "batch_size" in the
567 constructor, or for all instances by setting the "gather_batch_size" key
568 in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
569 """
570 source_is_str = isinstance(rpath, str)
571 # First check for rpath trailing slash as _strip_protocol removes it.
572 source_not_trailing_sep = source_is_str and not trailing_sep_maybe_asterisk(
573 rpath
574 )
575 rpath = self._strip_protocol(rpath)
576 rpaths = await self._expand_path(rpath, recursive=recursive)
577 if source_is_str and (not recursive or maxdepth is not None):
578 # Non-recursive glob does not copy directories
579 rpaths = [
580 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
581 ]
582 if not rpaths:
583 return
585 lpath = make_path_posix(lpath)
586 isdir = isinstance(lpath, str) and (
587 trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
588 )
589 lpaths = other_paths(
590 rpaths,
591 lpath,
592 exists=isdir and source_not_trailing_sep,
593 is_dir=isdir,
594 flatten=not source_is_str,
595 )
596 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
597 batch_size = kwargs.pop("batch_size", self.batch_size)
599 coros = []
600 callback.set_size(len(lpaths))
601 for lpath, rpath in zip(lpaths, rpaths):
602 callback.branch(rpath, lpath, kwargs)
603 coros.append(self._get_file(rpath, lpath, **kwargs))
604 return await _run_coros_in_chunks(
605 coros, batch_size=batch_size, callback=callback
606 )
608 async def _isfile(self, path):
609 try:
610 return (await self._info(path))["type"] == "file"
611 except: # noqa: E722
612 return False
614 async def _isdir(self, path):
615 try:
616 return (await self._info(path))["type"] == "directory"
617 except OSError:
618 return False
620 async def _size(self, path):
621 return (await self._info(path)).get("size", None)
623 async def _sizes(self, paths, batch_size=None):
624 batch_size = batch_size or self.batch_size
625 return await _run_coros_in_chunks(
626 [self._size(p) for p in paths], batch_size=batch_size
627 )
629 async def _exists(self, path):
630 try:
631 await self._info(path)
632 return True
633 except FileNotFoundError:
634 return False
636 async def _info(self, path, **kwargs):
637 raise NotImplementedError
639 async def _ls(self, path, detail=True, **kwargs):
640 raise NotImplementedError
642 async def _walk(self, path, maxdepth=None, **kwargs):
643 if maxdepth is not None and maxdepth < 1:
644 raise ValueError("maxdepth must be at least 1")
646 path = self._strip_protocol(path)
647 full_dirs = {}
648 dirs = {}
649 files = {}
651 detail = kwargs.pop("detail", False)
652 try:
653 listing = await self._ls(path, detail=True, **kwargs)
654 except (FileNotFoundError, OSError):
655 if detail:
656 yield path, {}, {}
657 else:
658 yield path, [], []
659 return
661 for info in listing:
662 # each info name must be at least [path]/part , but here
663 # we check also for names like [path]/part/
664 pathname = info["name"].rstrip("/")
665 name = pathname.rsplit("/", 1)[-1]
666 if info["type"] == "directory" and pathname != path:
667 # do not include "self" path
668 full_dirs[name] = pathname
669 dirs[name] = info
670 elif pathname == path:
671 # file-like with same name as give path
672 files[""] = info
673 else:
674 files[name] = info
676 if detail:
677 yield path, dirs, files
678 else:
679 yield path, list(dirs), list(files)
681 if maxdepth is not None:
682 maxdepth -= 1
683 if maxdepth < 1:
684 return
686 for d in dirs:
687 async for _ in self._walk(
688 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
689 ):
690 yield _
692 async def _glob(self, path, **kwargs):
693 import re
695 ends = path.endswith("/")
696 path = self._strip_protocol(path)
697 indstar = path.find("*") if path.find("*") >= 0 else len(path)
698 indques = path.find("?") if path.find("?") >= 0 else len(path)
699 indbrace = path.find("[") if path.find("[") >= 0 else len(path)
701 ind = min(indstar, indques, indbrace)
703 detail = kwargs.pop("detail", False)
705 if not has_magic(path):
706 root = path
707 depth = 1
708 if ends:
709 path += "/*"
710 elif await self._exists(path):
711 if not detail:
712 return [path]
713 else:
714 return {path: await self._info(path)}
715 else:
716 if not detail:
717 return [] # glob of non-existent returns empty
718 else:
719 return {}
720 elif "/" in path[:ind]:
721 ind2 = path[:ind].rindex("/")
722 root = path[: ind2 + 1]
723 depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1
724 else:
725 root = ""
726 depth = None if "**" in path else path[ind + 1 :].count("/") + 1
728 allpaths = await self._find(
729 root, maxdepth=depth, withdirs=True, detail=True, **kwargs
730 )
731 # Escape characters special to python regex, leaving our supported
732 # special characters in place.
733 # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html
734 # for shell globbing details.
735 pattern = (
736 "^"
737 + (
738 path.replace("\\", r"\\")
739 .replace(".", r"\.")
740 .replace("+", r"\+")
741 .replace("//", "/")
742 .replace("(", r"\(")
743 .replace(")", r"\)")
744 .replace("|", r"\|")
745 .replace("^", r"\^")
746 .replace("$", r"\$")
747 .replace("{", r"\{")
748 .replace("}", r"\}")
749 .rstrip("/")
750 .replace("?", ".")
751 )
752 + "$"
753 )
754 pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern)
755 pattern = re.sub("[*]", "[^/]*", pattern)
756 pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*"))
757 out = {
758 p: allpaths[p]
759 for p in sorted(allpaths)
760 if pattern.match(p.replace("//", "/").rstrip("/"))
761 }
762 if detail:
763 return out
764 else:
765 return list(out)
767 async def _du(self, path, total=True, maxdepth=None, **kwargs):
768 sizes = {}
769 # async for?
770 for f in await self._find(path, maxdepth=maxdepth, **kwargs):
771 info = await self._info(f)
772 sizes[info["name"]] = info["size"]
773 if total:
774 return sum(sizes.values())
775 else:
776 return sizes
778 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
779 path = self._strip_protocol(path)
780 out = dict()
781 detail = kwargs.pop("detail", False)
782 # async for?
783 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
784 if withdirs:
785 files.update(dirs)
786 out.update({info["name"]: info for name, info in files.items()})
787 if not out and (await self._isfile(path)):
788 # walk works on directories, but find should also return [path]
789 # when path happens to be a file
790 out[path] = {}
791 names = sorted(out)
792 if not detail:
793 return names
794 else:
795 return {name: out[name] for name in names}
797 async def _expand_path(self, path, recursive=False, maxdepth=None):
798 if maxdepth is not None and maxdepth < 1:
799 raise ValueError("maxdepth must be at least 1")
801 if isinstance(path, str):
802 out = await self._expand_path([path], recursive, maxdepth)
803 else:
804 out = set()
805 path = [self._strip_protocol(p) for p in path]
806 for p in path: # can gather here
807 if has_magic(p):
808 bit = set(await self._glob(p))
809 out |= bit
810 if recursive:
811 # glob call above expanded one depth so if maxdepth is defined
812 # then decrement it in expand_path call below. If it is zero
813 # after decrementing then avoid expand_path call.
814 if maxdepth is not None and maxdepth <= 1:
815 continue
816 out |= set(
817 await self._expand_path(
818 list(bit),
819 recursive=recursive,
820 maxdepth=maxdepth - 1 if maxdepth is not None else None,
821 )
822 )
823 continue
824 elif recursive:
825 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
826 out |= rec
827 if p not in out and (recursive is False or (await self._exists(p))):
828 # should only check once, for the root
829 out.add(p)
830 if not out:
831 raise FileNotFoundError(path)
832 return list(sorted(out))
834 async def _mkdir(self, path, create_parents=True, **kwargs):
835 pass # not necessary to implement, may not have directories
837 async def _makedirs(self, path, exist_ok=False):
838 pass # not necessary to implement, may not have directories
840 async def open_async(self, path, mode="rb", **kwargs):
841 if "b" not in mode or kwargs.get("compression"):
842 raise ValueError
843 raise NotImplementedError
846def mirror_sync_methods(obj):
847 """Populate sync and async methods for obj
849 For each method will create a sync version if the name refers to an async method
850 (coroutine) and there is no override in the child class; will create an async
851 method for the corresponding sync method if there is no implementation.
853 Uses the methods specified in
854 - async_methods: the set that an implementation is expected to provide
855 - default_async_methods: that can be derived from their sync version in
856 AbstractFileSystem
857 - AsyncFileSystem: async-specific default coroutines
858 """
859 from fsspec import AbstractFileSystem
861 for method in async_methods + dir(AsyncFileSystem):
862 if not method.startswith("_"):
863 continue
864 smethod = method[1:]
865 if private.match(method):
866 isco = inspect.iscoroutinefunction(getattr(obj, method, None))
867 unsync = getattr(getattr(obj, smethod, False), "__func__", None)
868 is_default = unsync is getattr(AbstractFileSystem, smethod, "")
869 if isco and is_default:
870 mth = sync_wrapper(getattr(obj, method), obj=obj)
871 setattr(obj, smethod, mth)
872 if not mth.__doc__:
873 mth.__doc__ = getattr(
874 getattr(AbstractFileSystem, smethod, None), "__doc__", ""
875 )
878class FSSpecCoroutineCancel(Exception):
879 pass
882def _dump_running_tasks(
883 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
884):
885 import traceback
887 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
888 if printout:
889 [task.print_stack() for task in tasks]
890 out = [
891 {
892 "locals": task._coro.cr_frame.f_locals,
893 "file": task._coro.cr_frame.f_code.co_filename,
894 "firstline": task._coro.cr_frame.f_code.co_firstlineno,
895 "linelo": task._coro.cr_frame.f_lineno,
896 "stack": traceback.format_stack(task._coro.cr_frame),
897 "task": task if with_task else None,
898 }
899 for task in tasks
900 ]
901 if cancel:
902 for t in tasks:
903 cbs = t._callbacks
904 t.cancel()
905 asyncio.futures.Future.set_exception(t, exc)
906 asyncio.futures.Future.cancel(t)
907 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
908 try:
909 t._coro.throw(exc) # exits coro, unless explicitly handled
910 except exc:
911 pass
912 return out
915class AbstractAsyncStreamedFile(AbstractBufferedFile):
916 # no read buffering, and always auto-commit
917 # TODO: readahead might still be useful here, but needs async version
919 async def read(self, length=-1):
920 """
921 Return data from cache, or fetch pieces as necessary
923 Parameters
924 ----------
925 length: int (-1)
926 Number of bytes to read; if <0, all remaining bytes.
927 """
928 length = -1 if length is None else int(length)
929 if self.mode != "rb":
930 raise ValueError("File not in read mode")
931 if length < 0:
932 length = self.size - self.loc
933 if self.closed:
934 raise ValueError("I/O operation on closed file.")
935 if length == 0:
936 # don't even bother calling fetch
937 return b""
938 out = await self._fetch_range(self.loc, self.loc + length)
939 self.loc += len(out)
940 return out
942 async def write(self, data):
943 """
944 Write data to buffer.
946 Buffer only sent on flush() or if buffer is greater than
947 or equal to blocksize.
949 Parameters
950 ----------
951 data: bytes
952 Set of bytes to be written.
953 """
954 if self.mode not in {"wb", "ab"}:
955 raise ValueError("File not in write mode")
956 if self.closed:
957 raise ValueError("I/O operation on closed file.")
958 if self.forced:
959 raise ValueError("This file has been force-flushed, can only close")
960 out = self.buffer.write(data)
961 self.loc += out
962 if self.buffer.tell() >= self.blocksize:
963 await self.flush()
964 return out
966 async def close(self):
967 """Close file
969 Finalizes writes, discards cache
970 """
971 if getattr(self, "_unclosable", False):
972 return
973 if self.closed:
974 return
975 if self.mode == "rb":
976 self.cache = None
977 else:
978 if not self.forced:
979 await self.flush(force=True)
981 if self.fs is not None:
982 self.fs.invalidate_cache(self.path)
983 self.fs.invalidate_cache(self.fs._parent(self.path))
985 self.closed = True
987 async def flush(self, force=False):
988 if self.closed:
989 raise ValueError("Flush on closed file")
990 if force and self.forced:
991 raise ValueError("Force flush cannot be called more than once")
992 if force:
993 self.forced = True
995 if self.mode not in {"wb", "ab"}:
996 # no-op to flush on read-mode
997 return
999 if not force and self.buffer.tell() < self.blocksize:
1000 # Defer write on small block
1001 return
1003 if self.offset is None:
1004 # Initialize a multipart upload
1005 self.offset = 0
1006 try:
1007 await self._initiate_upload()
1008 except: # noqa: E722
1009 self.closed = True
1010 raise
1012 if await self._upload_chunk(final=force) is not False:
1013 self.offset += self.buffer.seek(0, 2)
1014 self.buffer = io.BytesIO()
1016 async def __aenter__(self):
1017 return self
1019 async def __aexit__(self, exc_type, exc_val, exc_tb):
1020 await self.close()
1022 async def _fetch_range(self, start, end):
1023 raise NotImplementedError
1025 async def _initiate_upload(self):
1026 pass
1028 async def _upload_chunk(self, final=False):
1029 raise NotImplementedError