Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/asyn.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

586 statements  

1import asyncio 

2import asyncio.events 

3import functools 

4import inspect 

5import io 

6import numbers 

7import os 

8import re 

9import threading 

10from contextlib import contextmanager 

11from glob import has_magic 

12from typing import TYPE_CHECKING, Iterable 

13 

14from .callbacks import DEFAULT_CALLBACK 

15from .exceptions import FSTimeoutError 

16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep 

17from .spec import AbstractBufferedFile, AbstractFileSystem 

18from .utils import glob_translate, is_exception, other_paths 

19 

20private = re.compile("_[^_]") 

21iothread = [None] # dedicated fsspec IO thread 

22loop = [None] # global event loop for any non-async instance 

23_lock = None # global lock placeholder 

24get_running_loop = asyncio.get_running_loop 

25 

26 

27def get_lock(): 

28 """Allocate or return a threading lock. 

29 

30 The lock is allocated on first use to allow setting one lock per forked process. 

31 """ 

32 global _lock 

33 if not _lock: 

34 _lock = threading.Lock() 

35 return _lock 

36 

37 

38def reset_lock(): 

39 """Reset the global lock. 

40 

41 This should be called only on the init of a forked process to reset the lock to 

42 None, enabling the new forked process to get a new lock. 

43 """ 

44 global _lock 

45 

46 iothread[0] = None 

47 loop[0] = None 

48 _lock = None 

49 

50 

51async def _runner(event, coro, result, timeout=None): 

52 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

53 if timeout is not None: 

54 coro = asyncio.wait_for(coro, timeout=timeout) 

55 try: 

56 result[0] = await coro 

57 except Exception as ex: 

58 result[0] = ex 

59 finally: 

60 event.set() 

61 

62 

63def sync(loop, func, *args, timeout=None, **kwargs): 

64 """ 

65 Make loop run coroutine until it returns. Runs in other thread 

66 

67 Examples 

68 -------- 

69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, 

70 timeout=timeout, **kwargs) 

71 """ 

72 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

73 # NB: if the loop is not running *yet*, it is OK to submit work 

74 # and we will wait for it 

75 if loop is None or loop.is_closed(): 

76 raise RuntimeError("Loop is not running") 

77 try: 

78 loop0 = asyncio.events.get_running_loop() 

79 if loop0 is loop: 

80 raise NotImplementedError("Calling sync() from within a running loop") 

81 except NotImplementedError: 

82 raise 

83 except RuntimeError: 

84 pass 

85 coro = func(*args, **kwargs) 

86 result = [None] 

87 event = threading.Event() 

88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 

89 while True: 

90 # this loops allows thread to get interrupted 

91 if event.wait(1): 

92 break 

93 if timeout is not None: 

94 timeout -= 1 

95 if timeout < 0: 

96 raise FSTimeoutError 

97 

98 return_result = result[0] 

99 if isinstance(return_result, asyncio.TimeoutError): 

100 # suppress asyncio.TimeoutError, raise FSTimeoutError 

101 raise FSTimeoutError from return_result 

102 elif isinstance(return_result, BaseException): 

103 raise return_result 

104 else: 

105 return return_result 

106 

107 

108def sync_wrapper(func, obj=None): 

109 """Given a function, make so can be called in blocking contexts 

110 

111 Leave obj=None if defining within a class. Pass the instance if attaching 

112 as an attribute of the instance. 

113 """ 

114 

115 @functools.wraps(func) 

116 def wrapper(*args, **kwargs): 

117 self = obj or args[0] 

118 return sync(self.loop, func, *args, **kwargs) 

119 

120 return wrapper 

121 

122 

123@contextmanager 

124def _selector_policy(): 

125 original_policy = asyncio.get_event_loop_policy() 

126 try: 

127 if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): 

128 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) 

129 

130 yield 

131 finally: 

132 asyncio.set_event_loop_policy(original_policy) 

133 

134 

135def get_loop(): 

136 """Create or return the default fsspec IO loop 

137 

138 The loop will be running on a separate thread. 

139 """ 

140 if loop[0] is None: 

141 with get_lock(): 

142 # repeat the check just in case the loop got filled between the 

143 # previous two calls from another thread 

144 if loop[0] is None: 

145 with _selector_policy(): 

146 loop[0] = asyncio.new_event_loop() 

147 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") 

148 th.daemon = True 

149 th.start() 

150 iothread[0] = th 

151 return loop[0] 

152 

153 

154if TYPE_CHECKING: 

155 import resource 

156 

157 ResourceError = resource.error 

158else: 

159 try: 

160 import resource 

161 except ImportError: 

162 resource = None 

163 ResourceError = OSError 

164 else: 

165 ResourceError = getattr(resource, "error", OSError) 

166 

167_DEFAULT_BATCH_SIZE = 128 

168_NOFILES_DEFAULT_BATCH_SIZE = 1280 

169 

170 

171def _get_batch_size(nofiles=False): 

172 from fsspec.config import conf 

173 

174 if nofiles: 

175 if "nofiles_gather_batch_size" in conf: 

176 return conf["nofiles_gather_batch_size"] 

177 else: 

178 if "gather_batch_size" in conf: 

179 return conf["gather_batch_size"] 

180 if nofiles: 

181 return _NOFILES_DEFAULT_BATCH_SIZE 

182 if resource is None: 

183 return _DEFAULT_BATCH_SIZE 

184 

185 try: 

186 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) 

187 except (ImportError, ValueError, ResourceError): 

188 return _DEFAULT_BATCH_SIZE 

189 

190 if soft_limit == resource.RLIM_INFINITY: 

191 return -1 

192 else: 

193 return soft_limit // 8 

194 

195 

196def running_async() -> bool: 

197 """Being executed by an event loop?""" 

198 try: 

199 asyncio.get_running_loop() 

200 return True 

201 except RuntimeError: 

202 return False 

203 

204 

205async def _run_coros_in_chunks( 

206 coros, 

207 batch_size=None, 

208 callback=DEFAULT_CALLBACK, 

209 timeout=None, 

210 return_exceptions=False, 

211 nofiles=False, 

212): 

213 """Run the given coroutines in chunks. 

214 

215 Parameters 

216 ---------- 

217 coros: list of coroutines to run 

218 batch_size: int or None 

219 Number of coroutines to submit/wait on simultaneously. 

220 If -1, then it will not be any throttling. If 

221 None, it will be inferred from _get_batch_size() 

222 callback: fsspec.callbacks.Callback instance 

223 Gets a relative_update when each coroutine completes 

224 timeout: number or None 

225 If given, each coroutine times out after this time. Note that, since 

226 there are multiple batches, the total run time of this function will in 

227 general be longer 

228 return_exceptions: bool 

229 Same meaning as in asyncio.gather 

230 nofiles: bool 

231 If inferring the batch_size, does this operation involve local files? 

232 If yes, you normally expect smaller batches. 

233 """ 

234 

235 if batch_size is None: 

236 batch_size = _get_batch_size(nofiles=nofiles) 

237 

238 if batch_size == -1: 

239 batch_size = len(coros) 

240 

241 assert batch_size > 0 

242 

243 async def _run_coro(coro, i): 

244 try: 

245 return await asyncio.wait_for(coro, timeout=timeout), i 

246 except Exception as e: 

247 if not return_exceptions: 

248 raise 

249 return e, i 

250 finally: 

251 callback.relative_update(1) 

252 

253 i = 0 

254 n = len(coros) 

255 results = [None] * n 

256 pending = set() 

257 

258 while pending or i < n: 

259 while len(pending) < batch_size and i < n: 

260 pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) 

261 i += 1 

262 

263 if not pending: 

264 break 

265 

266 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) 

267 while done: 

268 result, k = await done.pop() 

269 results[k] = result 

270 

271 return results 

272 

273 

274# these methods should be implemented as async by any async-able backend 

275async_methods = [ 

276 "_ls", 

277 "_cat_file", 

278 "_get_file", 

279 "_put_file", 

280 "_rm_file", 

281 "_cp_file", 

282 "_pipe_file", 

283 "_expand_path", 

284 "_info", 

285 "_isfile", 

286 "_isdir", 

287 "_exists", 

288 "_walk", 

289 "_glob", 

290 "_find", 

291 "_du", 

292 "_size", 

293 "_mkdir", 

294 "_makedirs", 

295] 

296 

297 

298class AsyncFileSystem(AbstractFileSystem): 

299 """Async file operations, default implementations 

300 

301 Passes bulk operations to asyncio.gather for concurrent operation. 

302 

303 Implementations that have concurrent batch operations and/or async methods 

304 should inherit from this class instead of AbstractFileSystem. Docstrings are 

305 copied from the un-underscored method in AbstractFileSystem, if not given. 

306 """ 

307 

308 # note that methods do not have docstring here; they will be copied 

309 # for _* methods and inferred for overridden methods. 

310 

311 async_impl = True 

312 mirror_sync_methods = True 

313 disable_throttling = False 

314 

315 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): 

316 self.asynchronous = asynchronous 

317 self._pid = os.getpid() 

318 if not asynchronous: 

319 self._loop = loop or get_loop() 

320 else: 

321 self._loop = None 

322 self.batch_size = batch_size 

323 super().__init__(*args, **kwargs) 

324 

325 @property 

326 def loop(self): 

327 if self._pid != os.getpid(): 

328 raise RuntimeError("This class is not fork-safe") 

329 return self._loop 

330 

331 async def _rm_file(self, path, **kwargs): 

332 raise NotImplementedError 

333 

334 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 

335 # TODO: implement on_error 

336 batch_size = batch_size or self.batch_size 

337 path = await self._expand_path(path, recursive=recursive) 

338 return await _run_coros_in_chunks( 

339 [self._rm_file(p, **kwargs) for p in reversed(path)], 

340 batch_size=batch_size, 

341 nofiles=True, 

342 ) 

343 

344 async def _cp_file(self, path1, path2, **kwargs): 

345 raise NotImplementedError 

346 

347 async def _copy( 

348 self, 

349 path1, 

350 path2, 

351 recursive=False, 

352 on_error=None, 

353 maxdepth=None, 

354 batch_size=None, 

355 **kwargs, 

356 ): 

357 if on_error is None and recursive: 

358 on_error = "ignore" 

359 elif on_error is None: 

360 on_error = "raise" 

361 

362 if isinstance(path1, list) and isinstance(path2, list): 

363 # No need to expand paths when both source and destination 

364 # are provided as lists 

365 paths1 = path1 

366 paths2 = path2 

367 else: 

368 source_is_str = isinstance(path1, str) 

369 paths1 = await self._expand_path( 

370 path1, maxdepth=maxdepth, recursive=recursive 

371 ) 

372 if source_is_str and (not recursive or maxdepth is not None): 

373 # Non-recursive glob does not copy directories 

374 paths1 = [ 

375 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) 

376 ] 

377 if not paths1: 

378 return 

379 

380 source_is_file = len(paths1) == 1 

381 dest_is_dir = isinstance(path2, str) and ( 

382 trailing_sep(path2) or await self._isdir(path2) 

383 ) 

384 

385 exists = source_is_str and ( 

386 (has_magic(path1) and source_is_file) 

387 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) 

388 ) 

389 paths2 = other_paths( 

390 paths1, 

391 path2, 

392 exists=exists, 

393 flatten=not source_is_str, 

394 ) 

395 

396 batch_size = batch_size or self.batch_size 

397 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] 

398 result = await _run_coros_in_chunks( 

399 coros, batch_size=batch_size, return_exceptions=True, nofiles=True 

400 ) 

401 

402 for ex in filter(is_exception, result): 

403 if on_error == "ignore" and isinstance(ex, FileNotFoundError): 

404 continue 

405 raise ex 

406 

407 async def _pipe_file(self, path, value, **kwargs): 

408 raise NotImplementedError 

409 

410 async def _pipe(self, path, value=None, batch_size=None, **kwargs): 

411 if isinstance(path, str): 

412 path = {path: value} 

413 batch_size = batch_size or self.batch_size 

414 return await _run_coros_in_chunks( 

415 [self._pipe_file(k, v, **kwargs) for k, v in path.items()], 

416 batch_size=batch_size, 

417 nofiles=True, 

418 ) 

419 

420 async def _process_limits(self, url, start, end): 

421 """Helper for "Range"-based _cat_file""" 

422 size = None 

423 suff = False 

424 if start is not None and start < 0: 

425 # if start is negative and end None, end is the "suffix length" 

426 if end is None: 

427 end = -start 

428 start = "" 

429 suff = True 

430 else: 

431 size = size or (await self._info(url))["size"] 

432 start = size + start 

433 elif start is None: 

434 start = 0 

435 if not suff: 

436 if end is not None and end < 0: 

437 if start is not None: 

438 size = size or (await self._info(url))["size"] 

439 end = size + end 

440 elif end is None: 

441 end = "" 

442 if isinstance(end, numbers.Integral): 

443 end -= 1 # bytes range is inclusive 

444 return f"bytes={start}-{end}" 

445 

446 async def _cat_file(self, path, start=None, end=None, **kwargs): 

447 raise NotImplementedError 

448 

449 async def _cat( 

450 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs 

451 ): 

452 paths = await self._expand_path(path, recursive=recursive) 

453 coros = [self._cat_file(path, **kwargs) for path in paths] 

454 batch_size = batch_size or self.batch_size 

455 out = await _run_coros_in_chunks( 

456 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

457 ) 

458 if on_error == "raise": 

459 ex = next(filter(is_exception, out), False) 

460 if ex: 

461 raise ex 

462 if ( 

463 len(paths) > 1 

464 or isinstance(path, list) 

465 or paths[0] != self._strip_protocol(path) 

466 ): 

467 return { 

468 k: v 

469 for k, v in zip(paths, out) 

470 if on_error != "omit" or not is_exception(v) 

471 } 

472 else: 

473 return out[0] 

474 

475 async def _cat_ranges( 

476 self, 

477 paths, 

478 starts, 

479 ends, 

480 max_gap=None, 

481 batch_size=None, 

482 on_error="return", 

483 **kwargs, 

484 ): 

485 """Get the contents of byte ranges from one or more files 

486 

487 Parameters 

488 ---------- 

489 paths: list 

490 A list of of filepaths on this filesystems 

491 starts, ends: int or list 

492 Bytes limits of the read. If using a single int, the same value will be 

493 used to read all the specified files. 

494 """ 

495 # TODO: on_error 

496 if max_gap is not None: 

497 # use utils.merge_offset_ranges 

498 raise NotImplementedError 

499 if not isinstance(paths, list): 

500 raise TypeError 

501 if not isinstance(starts, Iterable): 

502 starts = [starts] * len(paths) 

503 if not isinstance(ends, Iterable): 

504 ends = [ends] * len(paths) 

505 if len(starts) != len(paths) or len(ends) != len(paths): 

506 raise ValueError 

507 coros = [ 

508 self._cat_file(p, start=s, end=e, **kwargs) 

509 for p, s, e in zip(paths, starts, ends) 

510 ] 

511 batch_size = batch_size or self.batch_size 

512 return await _run_coros_in_chunks( 

513 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

514 ) 

515 

516 async def _put_file(self, lpath, rpath, **kwargs): 

517 raise NotImplementedError 

518 

519 async def _put( 

520 self, 

521 lpath, 

522 rpath, 

523 recursive=False, 

524 callback=DEFAULT_CALLBACK, 

525 batch_size=None, 

526 maxdepth=None, 

527 **kwargs, 

528 ): 

529 """Copy file(s) from local. 

530 

531 Copies a specific file or tree of files (if recursive=True). If rpath 

532 ends with a "/", it will be assumed to be a directory, and target files 

533 will go within. 

534 

535 The put_file method will be called concurrently on a batch of files. The 

536 batch_size option can configure the amount of futures that can be executed 

537 at the same time. If it is -1, then all the files will be uploaded concurrently. 

538 The default can be set for this instance by passing "batch_size" in the 

539 constructor, or for all instances by setting the "gather_batch_size" key 

540 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

541 """ 

542 if isinstance(lpath, list) and isinstance(rpath, list): 

543 # No need to expand paths when both source and destination 

544 # are provided as lists 

545 rpaths = rpath 

546 lpaths = lpath 

547 else: 

548 source_is_str = isinstance(lpath, str) 

549 if source_is_str: 

550 lpath = make_path_posix(lpath) 

551 fs = LocalFileSystem() 

552 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

553 if source_is_str and (not recursive or maxdepth is not None): 

554 # Non-recursive glob does not copy directories 

555 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

556 if not lpaths: 

557 return 

558 

559 source_is_file = len(lpaths) == 1 

560 dest_is_dir = isinstance(rpath, str) and ( 

561 trailing_sep(rpath) or await self._isdir(rpath) 

562 ) 

563 

564 rpath = self._strip_protocol(rpath) 

565 exists = source_is_str and ( 

566 (has_magic(lpath) and source_is_file) 

567 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) 

568 ) 

569 rpaths = other_paths( 

570 lpaths, 

571 rpath, 

572 exists=exists, 

573 flatten=not source_is_str, 

574 ) 

575 

576 is_dir = {l: os.path.isdir(l) for l in lpaths} 

577 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] 

578 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] 

579 

580 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) 

581 batch_size = batch_size or self.batch_size 

582 

583 coros = [] 

584 callback.set_size(len(file_pairs)) 

585 for lfile, rfile in file_pairs: 

586 put_file = callback.branch_coro(self._put_file) 

587 coros.append(put_file(lfile, rfile, **kwargs)) 

588 

589 return await _run_coros_in_chunks( 

590 coros, batch_size=batch_size, callback=callback 

591 ) 

592 

593 async def _get_file(self, rpath, lpath, **kwargs): 

594 raise NotImplementedError 

595 

596 async def _get( 

597 self, 

598 rpath, 

599 lpath, 

600 recursive=False, 

601 callback=DEFAULT_CALLBACK, 

602 maxdepth=None, 

603 **kwargs, 

604 ): 

605 """Copy file(s) to local. 

606 

607 Copies a specific file or tree of files (if recursive=True). If lpath 

608 ends with a "/", it will be assumed to be a directory, and target files 

609 will go within. Can submit a list of paths, which may be glob-patterns 

610 and will be expanded. 

611 

612 The get_file method will be called concurrently on a batch of files. The 

613 batch_size option can configure the amount of futures that can be executed 

614 at the same time. If it is -1, then all the files will be uploaded concurrently. 

615 The default can be set for this instance by passing "batch_size" in the 

616 constructor, or for all instances by setting the "gather_batch_size" key 

617 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

618 """ 

619 if isinstance(lpath, list) and isinstance(rpath, list): 

620 # No need to expand paths when both source and destination 

621 # are provided as lists 

622 rpaths = rpath 

623 lpaths = lpath 

624 else: 

625 source_is_str = isinstance(rpath, str) 

626 # First check for rpath trailing slash as _strip_protocol removes it. 

627 source_not_trailing_sep = source_is_str and not trailing_sep(rpath) 

628 rpath = self._strip_protocol(rpath) 

629 rpaths = await self._expand_path( 

630 rpath, recursive=recursive, maxdepth=maxdepth 

631 ) 

632 if source_is_str and (not recursive or maxdepth is not None): 

633 # Non-recursive glob does not copy directories 

634 rpaths = [ 

635 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) 

636 ] 

637 if not rpaths: 

638 return 

639 

640 lpath = make_path_posix(lpath) 

641 source_is_file = len(rpaths) == 1 

642 dest_is_dir = isinstance(lpath, str) and ( 

643 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

644 ) 

645 

646 exists = source_is_str and ( 

647 (has_magic(rpath) and source_is_file) 

648 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) 

649 ) 

650 lpaths = other_paths( 

651 rpaths, 

652 lpath, 

653 exists=exists, 

654 flatten=not source_is_str, 

655 ) 

656 

657 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] 

658 batch_size = kwargs.pop("batch_size", self.batch_size) 

659 

660 coros = [] 

661 callback.set_size(len(lpaths)) 

662 for lpath, rpath in zip(lpaths, rpaths): 

663 get_file = callback.branch_coro(self._get_file) 

664 coros.append(get_file(rpath, lpath, **kwargs)) 

665 return await _run_coros_in_chunks( 

666 coros, batch_size=batch_size, callback=callback 

667 ) 

668 

669 async def _isfile(self, path): 

670 try: 

671 return (await self._info(path))["type"] == "file" 

672 except: # noqa: E722 

673 return False 

674 

675 async def _isdir(self, path): 

676 try: 

677 return (await self._info(path))["type"] == "directory" 

678 except OSError: 

679 return False 

680 

681 async def _size(self, path): 

682 return (await self._info(path)).get("size", None) 

683 

684 async def _sizes(self, paths, batch_size=None): 

685 batch_size = batch_size or self.batch_size 

686 return await _run_coros_in_chunks( 

687 [self._size(p) for p in paths], batch_size=batch_size 

688 ) 

689 

690 async def _exists(self, path, **kwargs): 

691 try: 

692 await self._info(path, **kwargs) 

693 return True 

694 except FileNotFoundError: 

695 return False 

696 

697 async def _info(self, path, **kwargs): 

698 raise NotImplementedError 

699 

700 async def _ls(self, path, detail=True, **kwargs): 

701 raise NotImplementedError 

702 

703 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): 

704 if maxdepth is not None and maxdepth < 1: 

705 raise ValueError("maxdepth must be at least 1") 

706 

707 path = self._strip_protocol(path) 

708 full_dirs = {} 

709 dirs = {} 

710 files = {} 

711 

712 detail = kwargs.pop("detail", False) 

713 try: 

714 listing = await self._ls(path, detail=True, **kwargs) 

715 except (FileNotFoundError, OSError) as e: 

716 if on_error == "raise": 

717 raise 

718 elif callable(on_error): 

719 on_error(e) 

720 if detail: 

721 yield path, {}, {} 

722 else: 

723 yield path, [], [] 

724 return 

725 

726 for info in listing: 

727 # each info name must be at least [path]/part , but here 

728 # we check also for names like [path]/part/ 

729 pathname = info["name"].rstrip("/") 

730 name = pathname.rsplit("/", 1)[-1] 

731 if info["type"] == "directory" and pathname != path: 

732 # do not include "self" path 

733 full_dirs[name] = pathname 

734 dirs[name] = info 

735 elif pathname == path: 

736 # file-like with same name as give path 

737 files[""] = info 

738 else: 

739 files[name] = info 

740 

741 if detail: 

742 yield path, dirs, files 

743 else: 

744 yield path, list(dirs), list(files) 

745 

746 if maxdepth is not None: 

747 maxdepth -= 1 

748 if maxdepth < 1: 

749 return 

750 

751 for d in dirs: 

752 async for _ in self._walk( 

753 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs 

754 ): 

755 yield _ 

756 

757 async def _glob(self, path, maxdepth=None, **kwargs): 

758 if maxdepth is not None and maxdepth < 1: 

759 raise ValueError("maxdepth must be at least 1") 

760 

761 import re 

762 

763 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) 

764 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash 

765 path = self._strip_protocol(path) 

766 append_slash_to_dirname = ends_with_sep or path.endswith( 

767 tuple(sep + "**" for sep in seps) 

768 ) 

769 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

770 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) 

771 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

772 

773 min_idx = min(idx_star, idx_qmark, idx_brace) 

774 

775 detail = kwargs.pop("detail", False) 

776 

777 if not has_magic(path): 

778 if await self._exists(path, **kwargs): 

779 if not detail: 

780 return [path] 

781 else: 

782 return {path: await self._info(path, **kwargs)} 

783 else: 

784 if not detail: 

785 return [] # glob of non-existent returns empty 

786 else: 

787 return {} 

788 elif "/" in path[:min_idx]: 

789 min_idx = path[:min_idx].rindex("/") 

790 root = path[: min_idx + 1] 

791 depth = path[min_idx + 1 :].count("/") + 1 

792 else: 

793 root = "" 

794 depth = path[min_idx + 1 :].count("/") + 1 

795 

796 if "**" in path: 

797 if maxdepth is not None: 

798 idx_double_stars = path.find("**") 

799 depth_double_stars = path[idx_double_stars:].count("/") + 1 

800 depth = depth - depth_double_stars + maxdepth 

801 else: 

802 depth = None 

803 

804 allpaths = await self._find( 

805 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

806 ) 

807 

808 pattern = glob_translate(path + ("/" if ends_with_sep else "")) 

809 pattern = re.compile(pattern) 

810 

811 out = { 

812 p: info 

813 for p, info in sorted(allpaths.items()) 

814 if pattern.match( 

815 ( 

816 p + "/" 

817 if append_slash_to_dirname and info["type"] == "directory" 

818 else p 

819 ) 

820 ) 

821 } 

822 

823 if detail: 

824 return out 

825 else: 

826 return list(out) 

827 

828 async def _du(self, path, total=True, maxdepth=None, **kwargs): 

829 sizes = {} 

830 # async for? 

831 for f in await self._find(path, maxdepth=maxdepth, **kwargs): 

832 info = await self._info(f) 

833 sizes[info["name"]] = info["size"] 

834 if total: 

835 return sum(sizes.values()) 

836 else: 

837 return sizes 

838 

839 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): 

840 path = self._strip_protocol(path) 

841 out = {} 

842 detail = kwargs.pop("detail", False) 

843 

844 # Add the root directory if withdirs is requested 

845 # This is needed for posix glob compliance 

846 if withdirs and path != "" and await self._isdir(path): 

847 out[path] = await self._info(path) 

848 

849 # async for? 

850 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): 

851 if withdirs: 

852 files.update(dirs) 

853 out.update({info["name"]: info for name, info in files.items()}) 

854 if not out and (await self._isfile(path)): 

855 # walk works on directories, but find should also return [path] 

856 # when path happens to be a file 

857 out[path] = {} 

858 names = sorted(out) 

859 if not detail: 

860 return names 

861 else: 

862 return {name: out[name] for name in names} 

863 

864 async def _expand_path(self, path, recursive=False, maxdepth=None): 

865 if maxdepth is not None and maxdepth < 1: 

866 raise ValueError("maxdepth must be at least 1") 

867 

868 if isinstance(path, str): 

869 out = await self._expand_path([path], recursive, maxdepth) 

870 else: 

871 out = set() 

872 path = [self._strip_protocol(p) for p in path] 

873 for p in path: # can gather here 

874 if has_magic(p): 

875 bit = set(await self._glob(p, maxdepth=maxdepth)) 

876 out |= bit 

877 if recursive: 

878 # glob call above expanded one depth so if maxdepth is defined 

879 # then decrement it in expand_path call below. If it is zero 

880 # after decrementing then avoid expand_path call. 

881 if maxdepth is not None and maxdepth <= 1: 

882 continue 

883 out |= set( 

884 await self._expand_path( 

885 list(bit), 

886 recursive=recursive, 

887 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

888 ) 

889 ) 

890 continue 

891 elif recursive: 

892 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) 

893 out |= rec 

894 if p not in out and (recursive is False or (await self._exists(p))): 

895 # should only check once, for the root 

896 out.add(p) 

897 if not out: 

898 raise FileNotFoundError(path) 

899 return sorted(out) 

900 

901 async def _mkdir(self, path, create_parents=True, **kwargs): 

902 pass # not necessary to implement, may not have directories 

903 

904 async def _makedirs(self, path, exist_ok=False): 

905 pass # not necessary to implement, may not have directories 

906 

907 async def open_async(self, path, mode="rb", **kwargs): 

908 if "b" not in mode or kwargs.get("compression"): 

909 raise ValueError 

910 raise NotImplementedError 

911 

912 

913def mirror_sync_methods(obj): 

914 """Populate sync and async methods for obj 

915 

916 For each method will create a sync version if the name refers to an async method 

917 (coroutine) and there is no override in the child class; will create an async 

918 method for the corresponding sync method if there is no implementation. 

919 

920 Uses the methods specified in 

921 - async_methods: the set that an implementation is expected to provide 

922 - default_async_methods: that can be derived from their sync version in 

923 AbstractFileSystem 

924 - AsyncFileSystem: async-specific default coroutines 

925 """ 

926 from fsspec import AbstractFileSystem 

927 

928 for method in async_methods + dir(AsyncFileSystem): 

929 if not method.startswith("_"): 

930 continue 

931 smethod = method[1:] 

932 if private.match(method): 

933 isco = inspect.iscoroutinefunction(getattr(obj, method, None)) 

934 unsync = getattr(getattr(obj, smethod, False), "__func__", None) 

935 is_default = unsync is getattr(AbstractFileSystem, smethod, "") 

936 if isco and is_default: 

937 mth = sync_wrapper(getattr(obj, method), obj=obj) 

938 setattr(obj, smethod, mth) 

939 if not mth.__doc__: 

940 mth.__doc__ = getattr( 

941 getattr(AbstractFileSystem, smethod, None), "__doc__", "" 

942 ) 

943 

944 

945class FSSpecCoroutineCancel(Exception): 

946 pass 

947 

948 

949def _dump_running_tasks( 

950 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False 

951): 

952 import traceback 

953 

954 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] 

955 if printout: 

956 [task.print_stack() for task in tasks] 

957 out = [ 

958 { 

959 "locals": task._coro.cr_frame.f_locals, 

960 "file": task._coro.cr_frame.f_code.co_filename, 

961 "firstline": task._coro.cr_frame.f_code.co_firstlineno, 

962 "linelo": task._coro.cr_frame.f_lineno, 

963 "stack": traceback.format_stack(task._coro.cr_frame), 

964 "task": task if with_task else None, 

965 } 

966 for task in tasks 

967 ] 

968 if cancel: 

969 for t in tasks: 

970 cbs = t._callbacks 

971 t.cancel() 

972 asyncio.futures.Future.set_exception(t, exc) 

973 asyncio.futures.Future.cancel(t) 

974 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures 

975 try: 

976 t._coro.throw(exc) # exits coro, unless explicitly handled 

977 except exc: 

978 pass 

979 return out 

980 

981 

982class AbstractAsyncStreamedFile(AbstractBufferedFile): 

983 # no read buffering, and always auto-commit 

984 # TODO: readahead might still be useful here, but needs async version 

985 

986 async def read(self, length=-1): 

987 """ 

988 Return data from cache, or fetch pieces as necessary 

989 

990 Parameters 

991 ---------- 

992 length: int (-1) 

993 Number of bytes to read; if <0, all remaining bytes. 

994 """ 

995 length = -1 if length is None else int(length) 

996 if self.mode != "rb": 

997 raise ValueError("File not in read mode") 

998 if length < 0: 

999 length = self.size - self.loc 

1000 if self.closed: 

1001 raise ValueError("I/O operation on closed file.") 

1002 if length == 0: 

1003 # don't even bother calling fetch 

1004 return b"" 

1005 out = await self._fetch_range(self.loc, self.loc + length) 

1006 self.loc += len(out) 

1007 return out 

1008 

1009 async def write(self, data): 

1010 """ 

1011 Write data to buffer. 

1012 

1013 Buffer only sent on flush() or if buffer is greater than 

1014 or equal to blocksize. 

1015 

1016 Parameters 

1017 ---------- 

1018 data: bytes 

1019 Set of bytes to be written. 

1020 """ 

1021 if self.mode not in {"wb", "ab"}: 

1022 raise ValueError("File not in write mode") 

1023 if self.closed: 

1024 raise ValueError("I/O operation on closed file.") 

1025 if self.forced: 

1026 raise ValueError("This file has been force-flushed, can only close") 

1027 out = self.buffer.write(data) 

1028 self.loc += out 

1029 if self.buffer.tell() >= self.blocksize: 

1030 await self.flush() 

1031 return out 

1032 

1033 async def close(self): 

1034 """Close file 

1035 

1036 Finalizes writes, discards cache 

1037 """ 

1038 if getattr(self, "_unclosable", False): 

1039 return 

1040 if self.closed: 

1041 return 

1042 if self.mode == "rb": 

1043 self.cache = None 

1044 else: 

1045 if not self.forced: 

1046 await self.flush(force=True) 

1047 

1048 if self.fs is not None: 

1049 self.fs.invalidate_cache(self.path) 

1050 self.fs.invalidate_cache(self.fs._parent(self.path)) 

1051 

1052 self.closed = True 

1053 

1054 async def flush(self, force=False): 

1055 if self.closed: 

1056 raise ValueError("Flush on closed file") 

1057 if force and self.forced: 

1058 raise ValueError("Force flush cannot be called more than once") 

1059 if force: 

1060 self.forced = True 

1061 

1062 if self.mode not in {"wb", "ab"}: 

1063 # no-op to flush on read-mode 

1064 return 

1065 

1066 if not force and self.buffer.tell() < self.blocksize: 

1067 # Defer write on small block 

1068 return 

1069 

1070 if self.offset is None: 

1071 # Initialize a multipart upload 

1072 self.offset = 0 

1073 try: 

1074 await self._initiate_upload() 

1075 except: # noqa: E722 

1076 self.closed = True 

1077 raise 

1078 

1079 if await self._upload_chunk(final=force) is not False: 

1080 self.offset += self.buffer.seek(0, 2) 

1081 self.buffer = io.BytesIO() 

1082 

1083 async def __aenter__(self): 

1084 return self 

1085 

1086 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1087 await self.close() 

1088 

1089 async def _fetch_range(self, start, end): 

1090 raise NotImplementedError 

1091 

1092 async def _initiate_upload(self): 

1093 pass 

1094 

1095 async def _upload_chunk(self, final=False): 

1096 raise NotImplementedError