Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

586 statements  

1import asyncio 

2import asyncio.events 

3import functools 

4import inspect 

5import io 

6import numbers 

7import os 

8import re 

9import threading 

10from glob import has_magic 

11from typing import TYPE_CHECKING, Iterable 

12 

13from .callbacks import DEFAULT_CALLBACK 

14from .exceptions import FSTimeoutError 

15from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep 

16from .spec import AbstractBufferedFile, AbstractFileSystem 

17from .utils import glob_translate, is_exception, other_paths 

18 

19private = re.compile("_[^_]") 

20iothread = [None] # dedicated fsspec IO thread 

21loop = [None] # global event loop for any non-async instance 

22_lock = None # global lock placeholder 

23get_running_loop = asyncio.get_running_loop 

24 

25 

26def get_lock(): 

27 """Allocate or return a threading lock. 

28 

29 The lock is allocated on first use to allow setting one lock per forked process. 

30 """ 

31 global _lock 

32 if not _lock: 

33 _lock = threading.Lock() 

34 return _lock 

35 

36 

37def reset_lock(): 

38 """Reset the global lock. 

39 

40 This should be called only on the init of a forked process to reset the lock to 

41 None, enabling the new forked process to get a new lock. 

42 """ 

43 global _lock 

44 

45 iothread[0] = None 

46 loop[0] = None 

47 _lock = None 

48 

49 

50async def _runner(event, coro, result, timeout=None): 

51 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

52 if timeout is not None: 

53 coro = asyncio.wait_for(coro, timeout=timeout) 

54 try: 

55 result[0] = await coro 

56 except Exception as ex: 

57 result[0] = ex 

58 finally: 

59 event.set() 

60 

61 

62def sync(loop, func, *args, timeout=None, **kwargs): 

63 """ 

64 Make loop run coroutine until it returns. Runs in other thread 

65 

66 Examples 

67 -------- 

68 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, 

69 timeout=timeout, **kwargs) 

70 """ 

71 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

72 # NB: if the loop is not running *yet*, it is OK to submit work 

73 # and we will wait for it 

74 if loop is None or loop.is_closed(): 

75 raise RuntimeError("Loop is not running") 

76 try: 

77 loop0 = asyncio.events.get_running_loop() 

78 if loop0 is loop: 

79 raise NotImplementedError("Calling sync() from within a running loop") 

80 except NotImplementedError: 

81 raise 

82 except RuntimeError: 

83 pass 

84 coro = func(*args, **kwargs) 

85 result = [None] 

86 event = threading.Event() 

87 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 

88 while True: 

89 # this loops allows thread to get interrupted 

90 if event.wait(1): 

91 break 

92 if timeout is not None: 

93 timeout -= 1 

94 if timeout < 0: 

95 raise FSTimeoutError 

96 

97 return_result = result[0] 

98 if isinstance(return_result, asyncio.TimeoutError): 

99 # suppress asyncio.TimeoutError, raise FSTimeoutError 

100 raise FSTimeoutError from return_result 

101 elif isinstance(return_result, BaseException): 

102 raise return_result 

103 else: 

104 return return_result 

105 

106 

107def sync_wrapper(func, obj=None): 

108 """Given a function, make so can be called in blocking contexts 

109 

110 Leave obj=None if defining within a class. Pass the instance if attaching 

111 as an attribute of the instance. 

112 """ 

113 

114 @functools.wraps(func) 

115 def wrapper(*args, **kwargs): 

116 self = obj or args[0] 

117 return sync(self.loop, func, *args, **kwargs) 

118 

119 return wrapper 

120 

121 

122def get_loop(): 

123 """Create or return the default fsspec IO loop 

124 

125 The loop will be running on a separate thread. 

126 """ 

127 if loop[0] is None: 

128 with get_lock(): 

129 # repeat the check just in case the loop got filled between the 

130 # previous two calls from another thread 

131 if loop[0] is None: 

132 loop[0] = asyncio.new_event_loop() 

133 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") 

134 th.daemon = True 

135 th.start() 

136 iothread[0] = th 

137 return loop[0] 

138 

139 

140def reset_after_fork(): 

141 global lock 

142 loop[0] = None 

143 iothread[0] = None 

144 lock = None 

145 

146 

147if hasattr(os, "register_at_fork"): 

148 # should be posix; this will do nothing for spawn or forkserver subprocesses 

149 os.register_at_fork(after_in_child=reset_after_fork) 

150 

151 

152if TYPE_CHECKING: 

153 import resource 

154 

155 ResourceError = resource.error 

156else: 

157 try: 

158 import resource 

159 except ImportError: 

160 resource = None 

161 ResourceError = OSError 

162 else: 

163 ResourceError = getattr(resource, "error", OSError) 

164 

165_DEFAULT_BATCH_SIZE = 128 

166_NOFILES_DEFAULT_BATCH_SIZE = 1280 

167 

168 

169def _get_batch_size(nofiles=False): 

170 from fsspec.config import conf 

171 

172 if nofiles: 

173 if "nofiles_gather_batch_size" in conf: 

174 return conf["nofiles_gather_batch_size"] 

175 else: 

176 if "gather_batch_size" in conf: 

177 return conf["gather_batch_size"] 

178 if nofiles: 

179 return _NOFILES_DEFAULT_BATCH_SIZE 

180 if resource is None: 

181 return _DEFAULT_BATCH_SIZE 

182 

183 try: 

184 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) 

185 except (ImportError, ValueError, ResourceError): 

186 return _DEFAULT_BATCH_SIZE 

187 

188 if soft_limit == resource.RLIM_INFINITY: 

189 return -1 

190 else: 

191 return soft_limit // 8 

192 

193 

194def running_async() -> bool: 

195 """Being executed by an event loop?""" 

196 try: 

197 asyncio.get_running_loop() 

198 return True 

199 except RuntimeError: 

200 return False 

201 

202 

203async def _run_coros_in_chunks( 

204 coros, 

205 batch_size=None, 

206 callback=DEFAULT_CALLBACK, 

207 timeout=None, 

208 return_exceptions=False, 

209 nofiles=False, 

210): 

211 """Run the given coroutines in chunks. 

212 

213 Parameters 

214 ---------- 

215 coros: list of coroutines to run 

216 batch_size: int or None 

217 Number of coroutines to submit/wait on simultaneously. 

218 If -1, then it will not be any throttling. If 

219 None, it will be inferred from _get_batch_size() 

220 callback: fsspec.callbacks.Callback instance 

221 Gets a relative_update when each coroutine completes 

222 timeout: number or None 

223 If given, each coroutine times out after this time. Note that, since 

224 there are multiple batches, the total run time of this function will in 

225 general be longer 

226 return_exceptions: bool 

227 Same meaning as in asyncio.gather 

228 nofiles: bool 

229 If inferring the batch_size, does this operation involve local files? 

230 If yes, you normally expect smaller batches. 

231 """ 

232 

233 if batch_size is None: 

234 batch_size = _get_batch_size(nofiles=nofiles) 

235 

236 if batch_size == -1: 

237 batch_size = len(coros) 

238 

239 assert batch_size > 0 

240 

241 async def _run_coro(coro, i): 

242 try: 

243 return await asyncio.wait_for(coro, timeout=timeout), i 

244 except Exception as e: 

245 if not return_exceptions: 

246 raise 

247 return e, i 

248 finally: 

249 callback.relative_update(1) 

250 

251 i = 0 

252 n = len(coros) 

253 results = [None] * n 

254 pending = set() 

255 

256 while pending or i < n: 

257 while len(pending) < batch_size and i < n: 

258 pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) 

259 i += 1 

260 

261 if not pending: 

262 break 

263 

264 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) 

265 while done: 

266 result, k = await done.pop() 

267 results[k] = result 

268 

269 return results 

270 

271 

272# these methods should be implemented as async by any async-able backend 

273async_methods = [ 

274 "_ls", 

275 "_cat_file", 

276 "_get_file", 

277 "_put_file", 

278 "_rm_file", 

279 "_cp_file", 

280 "_pipe_file", 

281 "_expand_path", 

282 "_info", 

283 "_isfile", 

284 "_isdir", 

285 "_exists", 

286 "_walk", 

287 "_glob", 

288 "_find", 

289 "_du", 

290 "_size", 

291 "_mkdir", 

292 "_makedirs", 

293] 

294 

295 

296class AsyncFileSystem(AbstractFileSystem): 

297 """Async file operations, default implementations 

298 

299 Passes bulk operations to asyncio.gather for concurrent operation. 

300 

301 Implementations that have concurrent batch operations and/or async methods 

302 should inherit from this class instead of AbstractFileSystem. Docstrings are 

303 copied from the un-underscored method in AbstractFileSystem, if not given. 

304 """ 

305 

306 # note that methods do not have docstring here; they will be copied 

307 # for _* methods and inferred for overridden methods. 

308 

309 async_impl = True 

310 mirror_sync_methods = True 

311 disable_throttling = False 

312 

313 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): 

314 self.asynchronous = asynchronous 

315 self._pid = os.getpid() 

316 if not asynchronous: 

317 self._loop = loop or get_loop() 

318 else: 

319 self._loop = None 

320 self.batch_size = batch_size 

321 super().__init__(*args, **kwargs) 

322 

323 @property 

324 def loop(self): 

325 if self._pid != os.getpid(): 

326 raise RuntimeError("This class is not fork-safe") 

327 return self._loop 

328 

329 async def _rm_file(self, path, **kwargs): 

330 raise NotImplementedError 

331 

332 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 

333 # TODO: implement on_error 

334 batch_size = batch_size or self.batch_size 

335 path = await self._expand_path(path, recursive=recursive) 

336 return await _run_coros_in_chunks( 

337 [self._rm_file(p, **kwargs) for p in reversed(path)], 

338 batch_size=batch_size, 

339 nofiles=True, 

340 ) 

341 

342 async def _cp_file(self, path1, path2, **kwargs): 

343 raise NotImplementedError 

344 

345 async def _mv_file(self, path1, path2): 

346 await self._cp_file(path1, path2) 

347 await self._rm_file(path1) 

348 

349 async def _copy( 

350 self, 

351 path1, 

352 path2, 

353 recursive=False, 

354 on_error=None, 

355 maxdepth=None, 

356 batch_size=None, 

357 **kwargs, 

358 ): 

359 if on_error is None and recursive: 

360 on_error = "ignore" 

361 elif on_error is None: 

362 on_error = "raise" 

363 

364 if isinstance(path1, list) and isinstance(path2, list): 

365 # No need to expand paths when both source and destination 

366 # are provided as lists 

367 paths1 = path1 

368 paths2 = path2 

369 else: 

370 source_is_str = isinstance(path1, str) 

371 paths1 = await self._expand_path( 

372 path1, maxdepth=maxdepth, recursive=recursive 

373 ) 

374 if source_is_str and (not recursive or maxdepth is not None): 

375 # Non-recursive glob does not copy directories 

376 paths1 = [ 

377 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) 

378 ] 

379 if not paths1: 

380 return 

381 

382 source_is_file = len(paths1) == 1 

383 dest_is_dir = isinstance(path2, str) and ( 

384 trailing_sep(path2) or await self._isdir(path2) 

385 ) 

386 

387 exists = source_is_str and ( 

388 (has_magic(path1) and source_is_file) 

389 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) 

390 ) 

391 paths2 = other_paths( 

392 paths1, 

393 path2, 

394 exists=exists, 

395 flatten=not source_is_str, 

396 ) 

397 

398 batch_size = batch_size or self.batch_size 

399 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] 

400 result = await _run_coros_in_chunks( 

401 coros, batch_size=batch_size, return_exceptions=True, nofiles=True 

402 ) 

403 

404 for ex in filter(is_exception, result): 

405 if on_error == "ignore" and isinstance(ex, FileNotFoundError): 

406 continue 

407 raise ex 

408 

409 async def _pipe_file(self, path, value, mode="overwrite", **kwargs): 

410 raise NotImplementedError 

411 

412 async def _pipe(self, path, value=None, batch_size=None, **kwargs): 

413 if isinstance(path, str): 

414 path = {path: value} 

415 batch_size = batch_size or self.batch_size 

416 return await _run_coros_in_chunks( 

417 [self._pipe_file(k, v, **kwargs) for k, v in path.items()], 

418 batch_size=batch_size, 

419 nofiles=True, 

420 ) 

421 

422 async def _process_limits(self, url, start, end): 

423 """Helper for "Range"-based _cat_file""" 

424 size = None 

425 suff = False 

426 if start is not None and start < 0: 

427 # if start is negative and end None, end is the "suffix length" 

428 if end is None: 

429 end = -start 

430 start = "" 

431 suff = True 

432 else: 

433 size = size or (await self._info(url))["size"] 

434 start = size + start 

435 elif start is None: 

436 start = 0 

437 if not suff: 

438 if end is not None and end < 0: 

439 if start is not None: 

440 size = size or (await self._info(url))["size"] 

441 end = size + end 

442 elif end is None: 

443 end = "" 

444 if isinstance(end, numbers.Integral): 

445 end -= 1 # bytes range is inclusive 

446 return f"bytes={start}-{end}" 

447 

448 async def _cat_file(self, path, start=None, end=None, **kwargs): 

449 raise NotImplementedError 

450 

451 async def _cat( 

452 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs 

453 ): 

454 paths = await self._expand_path(path, recursive=recursive) 

455 coros = [self._cat_file(path, **kwargs) for path in paths] 

456 batch_size = batch_size or self.batch_size 

457 out = await _run_coros_in_chunks( 

458 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

459 ) 

460 if on_error == "raise": 

461 ex = next(filter(is_exception, out), False) 

462 if ex: 

463 raise ex 

464 if ( 

465 len(paths) > 1 

466 or isinstance(path, list) 

467 or paths[0] != self._strip_protocol(path) 

468 ): 

469 return { 

470 k: v 

471 for k, v in zip(paths, out) 

472 if on_error != "omit" or not is_exception(v) 

473 } 

474 else: 

475 return out[0] 

476 

477 async def _cat_ranges( 

478 self, 

479 paths, 

480 starts, 

481 ends, 

482 max_gap=None, 

483 batch_size=None, 

484 on_error="return", 

485 **kwargs, 

486 ): 

487 """Get the contents of byte ranges from one or more files 

488 

489 Parameters 

490 ---------- 

491 paths: list 

492 A list of of filepaths on this filesystems 

493 starts, ends: int or list 

494 Bytes limits of the read. If using a single int, the same value will be 

495 used to read all the specified files. 

496 """ 

497 # TODO: on_error 

498 if max_gap is not None: 

499 # use utils.merge_offset_ranges 

500 raise NotImplementedError 

501 if not isinstance(paths, list): 

502 raise TypeError 

503 if not isinstance(starts, Iterable): 

504 starts = [starts] * len(paths) 

505 if not isinstance(ends, Iterable): 

506 ends = [ends] * len(paths) 

507 if len(starts) != len(paths) or len(ends) != len(paths): 

508 raise ValueError 

509 coros = [ 

510 self._cat_file(p, start=s, end=e, **kwargs) 

511 for p, s, e in zip(paths, starts, ends) 

512 ] 

513 batch_size = batch_size or self.batch_size 

514 return await _run_coros_in_chunks( 

515 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

516 ) 

517 

518 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): 

519 raise NotImplementedError 

520 

521 async def _put( 

522 self, 

523 lpath, 

524 rpath, 

525 recursive=False, 

526 callback=DEFAULT_CALLBACK, 

527 batch_size=None, 

528 maxdepth=None, 

529 **kwargs, 

530 ): 

531 """Copy file(s) from local. 

532 

533 Copies a specific file or tree of files (if recursive=True). If rpath 

534 ends with a "/", it will be assumed to be a directory, and target files 

535 will go within. 

536 

537 The put_file method will be called concurrently on a batch of files. The 

538 batch_size option can configure the amount of futures that can be executed 

539 at the same time. If it is -1, then all the files will be uploaded concurrently. 

540 The default can be set for this instance by passing "batch_size" in the 

541 constructor, or for all instances by setting the "gather_batch_size" key 

542 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

543 """ 

544 if isinstance(lpath, list) and isinstance(rpath, list): 

545 # No need to expand paths when both source and destination 

546 # are provided as lists 

547 rpaths = rpath 

548 lpaths = lpath 

549 else: 

550 source_is_str = isinstance(lpath, str) 

551 if source_is_str: 

552 lpath = make_path_posix(lpath) 

553 fs = LocalFileSystem() 

554 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

555 if source_is_str and (not recursive or maxdepth is not None): 

556 # Non-recursive glob does not copy directories 

557 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

558 if not lpaths: 

559 return 

560 

561 source_is_file = len(lpaths) == 1 

562 dest_is_dir = isinstance(rpath, str) and ( 

563 trailing_sep(rpath) or await self._isdir(rpath) 

564 ) 

565 

566 rpath = self._strip_protocol(rpath) 

567 exists = source_is_str and ( 

568 (has_magic(lpath) and source_is_file) 

569 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) 

570 ) 

571 rpaths = other_paths( 

572 lpaths, 

573 rpath, 

574 exists=exists, 

575 flatten=not source_is_str, 

576 ) 

577 

578 is_dir = {l: os.path.isdir(l) for l in lpaths} 

579 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] 

580 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] 

581 

582 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) 

583 batch_size = batch_size or self.batch_size 

584 

585 coros = [] 

586 callback.set_size(len(file_pairs)) 

587 for lfile, rfile in file_pairs: 

588 put_file = callback.branch_coro(self._put_file) 

589 coros.append(put_file(lfile, rfile, **kwargs)) 

590 

591 return await _run_coros_in_chunks( 

592 coros, batch_size=batch_size, callback=callback 

593 ) 

594 

595 async def _get_file(self, rpath, lpath, **kwargs): 

596 raise NotImplementedError 

597 

598 async def _get( 

599 self, 

600 rpath, 

601 lpath, 

602 recursive=False, 

603 callback=DEFAULT_CALLBACK, 

604 maxdepth=None, 

605 **kwargs, 

606 ): 

607 """Copy file(s) to local. 

608 

609 Copies a specific file or tree of files (if recursive=True). If lpath 

610 ends with a "/", it will be assumed to be a directory, and target files 

611 will go within. Can submit a list of paths, which may be glob-patterns 

612 and will be expanded. 

613 

614 The get_file method will be called concurrently on a batch of files. The 

615 batch_size option can configure the amount of futures that can be executed 

616 at the same time. If it is -1, then all the files will be uploaded concurrently. 

617 The default can be set for this instance by passing "batch_size" in the 

618 constructor, or for all instances by setting the "gather_batch_size" key 

619 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

620 """ 

621 if isinstance(lpath, list) and isinstance(rpath, list): 

622 # No need to expand paths when both source and destination 

623 # are provided as lists 

624 rpaths = rpath 

625 lpaths = lpath 

626 else: 

627 source_is_str = isinstance(rpath, str) 

628 # First check for rpath trailing slash as _strip_protocol removes it. 

629 source_not_trailing_sep = source_is_str and not trailing_sep(rpath) 

630 rpath = self._strip_protocol(rpath) 

631 rpaths = await self._expand_path( 

632 rpath, recursive=recursive, maxdepth=maxdepth 

633 ) 

634 if source_is_str and (not recursive or maxdepth is not None): 

635 # Non-recursive glob does not copy directories 

636 rpaths = [ 

637 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) 

638 ] 

639 if not rpaths: 

640 return 

641 

642 lpath = make_path_posix(lpath) 

643 source_is_file = len(rpaths) == 1 

644 dest_is_dir = isinstance(lpath, str) and ( 

645 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

646 ) 

647 

648 exists = source_is_str and ( 

649 (has_magic(rpath) and source_is_file) 

650 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) 

651 ) 

652 lpaths = other_paths( 

653 rpaths, 

654 lpath, 

655 exists=exists, 

656 flatten=not source_is_str, 

657 ) 

658 

659 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] 

660 batch_size = kwargs.pop("batch_size", self.batch_size) 

661 

662 coros = [] 

663 callback.set_size(len(lpaths)) 

664 for lpath, rpath in zip(lpaths, rpaths): 

665 get_file = callback.branch_coro(self._get_file) 

666 coros.append(get_file(rpath, lpath, **kwargs)) 

667 return await _run_coros_in_chunks( 

668 coros, batch_size=batch_size, callback=callback 

669 ) 

670 

671 async def _isfile(self, path): 

672 try: 

673 return (await self._info(path))["type"] == "file" 

674 except: # noqa: E722 

675 return False 

676 

677 async def _isdir(self, path): 

678 try: 

679 return (await self._info(path))["type"] == "directory" 

680 except OSError: 

681 return False 

682 

683 async def _size(self, path): 

684 return (await self._info(path)).get("size", None) 

685 

686 async def _sizes(self, paths, batch_size=None): 

687 batch_size = batch_size or self.batch_size 

688 return await _run_coros_in_chunks( 

689 [self._size(p) for p in paths], batch_size=batch_size 

690 ) 

691 

692 async def _exists(self, path, **kwargs): 

693 try: 

694 await self._info(path, **kwargs) 

695 return True 

696 except FileNotFoundError: 

697 return False 

698 

699 async def _info(self, path, **kwargs): 

700 raise NotImplementedError 

701 

702 async def _ls(self, path, detail=True, **kwargs): 

703 raise NotImplementedError 

704 

705 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): 

706 if maxdepth is not None and maxdepth < 1: 

707 raise ValueError("maxdepth must be at least 1") 

708 

709 path = self._strip_protocol(path) 

710 full_dirs = {} 

711 dirs = {} 

712 files = {} 

713 

714 detail = kwargs.pop("detail", False) 

715 try: 

716 listing = await self._ls(path, detail=True, **kwargs) 

717 except (FileNotFoundError, OSError) as e: 

718 if on_error == "raise": 

719 raise 

720 elif callable(on_error): 

721 on_error(e) 

722 if detail: 

723 yield path, {}, {} 

724 else: 

725 yield path, [], [] 

726 return 

727 

728 for info in listing: 

729 # each info name must be at least [path]/part , but here 

730 # we check also for names like [path]/part/ 

731 pathname = info["name"].rstrip("/") 

732 name = pathname.rsplit("/", 1)[-1] 

733 if info["type"] == "directory" and pathname != path: 

734 # do not include "self" path 

735 full_dirs[name] = pathname 

736 dirs[name] = info 

737 elif pathname == path: 

738 # file-like with same name as give path 

739 files[""] = info 

740 else: 

741 files[name] = info 

742 

743 if detail: 

744 yield path, dirs, files 

745 else: 

746 yield path, list(dirs), list(files) 

747 

748 if maxdepth is not None: 

749 maxdepth -= 1 

750 if maxdepth < 1: 

751 return 

752 

753 for d in dirs: 

754 async for _ in self._walk( 

755 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs 

756 ): 

757 yield _ 

758 

759 async def _glob(self, path, maxdepth=None, **kwargs): 

760 if maxdepth is not None and maxdepth < 1: 

761 raise ValueError("maxdepth must be at least 1") 

762 

763 import re 

764 

765 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) 

766 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash 

767 path = self._strip_protocol(path) 

768 append_slash_to_dirname = ends_with_sep or path.endswith( 

769 tuple(sep + "**" for sep in seps) 

770 ) 

771 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

772 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) 

773 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

774 

775 min_idx = min(idx_star, idx_qmark, idx_brace) 

776 

777 detail = kwargs.pop("detail", False) 

778 

779 if not has_magic(path): 

780 if await self._exists(path, **kwargs): 

781 if not detail: 

782 return [path] 

783 else: 

784 return {path: await self._info(path, **kwargs)} 

785 else: 

786 if not detail: 

787 return [] # glob of non-existent returns empty 

788 else: 

789 return {} 

790 elif "/" in path[:min_idx]: 

791 min_idx = path[:min_idx].rindex("/") 

792 root = path[: min_idx + 1] 

793 depth = path[min_idx + 1 :].count("/") + 1 

794 else: 

795 root = "" 

796 depth = path[min_idx + 1 :].count("/") + 1 

797 

798 if "**" in path: 

799 if maxdepth is not None: 

800 idx_double_stars = path.find("**") 

801 depth_double_stars = path[idx_double_stars:].count("/") + 1 

802 depth = depth - depth_double_stars + maxdepth 

803 else: 

804 depth = None 

805 

806 allpaths = await self._find( 

807 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

808 ) 

809 

810 pattern = glob_translate(path + ("/" if ends_with_sep else "")) 

811 pattern = re.compile(pattern) 

812 

813 out = { 

814 p: info 

815 for p, info in sorted(allpaths.items()) 

816 if pattern.match( 

817 p + "/" 

818 if append_slash_to_dirname and info["type"] == "directory" 

819 else p 

820 ) 

821 } 

822 

823 if detail: 

824 return out 

825 else: 

826 return list(out) 

827 

828 async def _du(self, path, total=True, maxdepth=None, **kwargs): 

829 sizes = {} 

830 # async for? 

831 for f in await self._find(path, maxdepth=maxdepth, **kwargs): 

832 info = await self._info(f) 

833 sizes[info["name"]] = info["size"] 

834 if total: 

835 return sum(sizes.values()) 

836 else: 

837 return sizes 

838 

839 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): 

840 path = self._strip_protocol(path) 

841 out = {} 

842 detail = kwargs.pop("detail", False) 

843 

844 # Add the root directory if withdirs is requested 

845 # This is needed for posix glob compliance 

846 if withdirs and path != "" and await self._isdir(path): 

847 out[path] = await self._info(path) 

848 

849 # async for? 

850 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): 

851 if withdirs: 

852 files.update(dirs) 

853 out.update({info["name"]: info for name, info in files.items()}) 

854 if not out and (await self._isfile(path)): 

855 # walk works on directories, but find should also return [path] 

856 # when path happens to be a file 

857 out[path] = {} 

858 names = sorted(out) 

859 if not detail: 

860 return names 

861 else: 

862 return {name: out[name] for name in names} 

863 

864 async def _expand_path(self, path, recursive=False, maxdepth=None): 

865 if maxdepth is not None and maxdepth < 1: 

866 raise ValueError("maxdepth must be at least 1") 

867 

868 if isinstance(path, str): 

869 out = await self._expand_path([path], recursive, maxdepth) 

870 else: 

871 out = set() 

872 path = [self._strip_protocol(p) for p in path] 

873 for p in path: # can gather here 

874 if has_magic(p): 

875 bit = set(await self._glob(p, maxdepth=maxdepth)) 

876 out |= bit 

877 if recursive: 

878 # glob call above expanded one depth so if maxdepth is defined 

879 # then decrement it in expand_path call below. If it is zero 

880 # after decrementing then avoid expand_path call. 

881 if maxdepth is not None and maxdepth <= 1: 

882 continue 

883 out |= set( 

884 await self._expand_path( 

885 list(bit), 

886 recursive=recursive, 

887 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

888 ) 

889 ) 

890 continue 

891 elif recursive: 

892 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) 

893 out |= rec 

894 if p not in out and (recursive is False or (await self._exists(p))): 

895 # should only check once, for the root 

896 out.add(p) 

897 if not out: 

898 raise FileNotFoundError(path) 

899 return sorted(out) 

900 

901 async def _mkdir(self, path, create_parents=True, **kwargs): 

902 pass # not necessary to implement, may not have directories 

903 

904 async def _makedirs(self, path, exist_ok=False): 

905 pass # not necessary to implement, may not have directories 

906 

907 async def open_async(self, path, mode="rb", **kwargs): 

908 if "b" not in mode or kwargs.get("compression"): 

909 raise ValueError 

910 raise NotImplementedError 

911 

912 

913def mirror_sync_methods(obj): 

914 """Populate sync and async methods for obj 

915 

916 For each method will create a sync version if the name refers to an async method 

917 (coroutine) and there is no override in the child class; will create an async 

918 method for the corresponding sync method if there is no implementation. 

919 

920 Uses the methods specified in 

921 - async_methods: the set that an implementation is expected to provide 

922 - default_async_methods: that can be derived from their sync version in 

923 AbstractFileSystem 

924 - AsyncFileSystem: async-specific default coroutines 

925 """ 

926 from fsspec import AbstractFileSystem 

927 

928 for method in async_methods + dir(AsyncFileSystem): 

929 if not method.startswith("_"): 

930 continue 

931 smethod = method[1:] 

932 if private.match(method): 

933 isco = inspect.iscoroutinefunction(getattr(obj, method, None)) 

934 unsync = getattr(getattr(obj, smethod, False), "__func__", None) 

935 is_default = unsync is getattr(AbstractFileSystem, smethod, "") 

936 if isco and is_default: 

937 mth = sync_wrapper(getattr(obj, method), obj=obj) 

938 setattr(obj, smethod, mth) 

939 if not mth.__doc__: 

940 mth.__doc__ = getattr( 

941 getattr(AbstractFileSystem, smethod, None), "__doc__", "" 

942 ) 

943 

944 

945class FSSpecCoroutineCancel(Exception): 

946 pass 

947 

948 

949def _dump_running_tasks( 

950 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False 

951): 

952 import traceback 

953 

954 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] 

955 if printout: 

956 [task.print_stack() for task in tasks] 

957 out = [ 

958 { 

959 "locals": task._coro.cr_frame.f_locals, 

960 "file": task._coro.cr_frame.f_code.co_filename, 

961 "firstline": task._coro.cr_frame.f_code.co_firstlineno, 

962 "linelo": task._coro.cr_frame.f_lineno, 

963 "stack": traceback.format_stack(task._coro.cr_frame), 

964 "task": task if with_task else None, 

965 } 

966 for task in tasks 

967 ] 

968 if cancel: 

969 for t in tasks: 

970 cbs = t._callbacks 

971 t.cancel() 

972 asyncio.futures.Future.set_exception(t, exc) 

973 asyncio.futures.Future.cancel(t) 

974 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures 

975 try: 

976 t._coro.throw(exc) # exits coro, unless explicitly handled 

977 except exc: 

978 pass 

979 return out 

980 

981 

982class AbstractAsyncStreamedFile(AbstractBufferedFile): 

983 # no read buffering, and always auto-commit 

984 # TODO: readahead might still be useful here, but needs async version 

985 

986 async def read(self, length=-1): 

987 """ 

988 Return data from cache, or fetch pieces as necessary 

989 

990 Parameters 

991 ---------- 

992 length: int (-1) 

993 Number of bytes to read; if <0, all remaining bytes. 

994 """ 

995 length = -1 if length is None else int(length) 

996 if self.mode != "rb": 

997 raise ValueError("File not in read mode") 

998 if length < 0: 

999 length = self.size - self.loc 

1000 if self.closed: 

1001 raise ValueError("I/O operation on closed file.") 

1002 if length == 0: 

1003 # don't even bother calling fetch 

1004 return b"" 

1005 out = await self._fetch_range(self.loc, self.loc + length) 

1006 self.loc += len(out) 

1007 return out 

1008 

1009 async def write(self, data): 

1010 """ 

1011 Write data to buffer. 

1012 

1013 Buffer only sent on flush() or if buffer is greater than 

1014 or equal to blocksize. 

1015 

1016 Parameters 

1017 ---------- 

1018 data: bytes 

1019 Set of bytes to be written. 

1020 """ 

1021 if self.mode not in {"wb", "ab"}: 

1022 raise ValueError("File not in write mode") 

1023 if self.closed: 

1024 raise ValueError("I/O operation on closed file.") 

1025 if self.forced: 

1026 raise ValueError("This file has been force-flushed, can only close") 

1027 out = self.buffer.write(data) 

1028 self.loc += out 

1029 if self.buffer.tell() >= self.blocksize: 

1030 await self.flush() 

1031 return out 

1032 

1033 async def close(self): 

1034 """Close file 

1035 

1036 Finalizes writes, discards cache 

1037 """ 

1038 if getattr(self, "_unclosable", False): 

1039 return 

1040 if self.closed: 

1041 return 

1042 if self.mode == "rb": 

1043 self.cache = None 

1044 else: 

1045 if not self.forced: 

1046 await self.flush(force=True) 

1047 

1048 if self.fs is not None: 

1049 self.fs.invalidate_cache(self.path) 

1050 self.fs.invalidate_cache(self.fs._parent(self.path)) 

1051 

1052 self.closed = True 

1053 

1054 async def flush(self, force=False): 

1055 if self.closed: 

1056 raise ValueError("Flush on closed file") 

1057 if force and self.forced: 

1058 raise ValueError("Force flush cannot be called more than once") 

1059 if force: 

1060 self.forced = True 

1061 

1062 if self.mode not in {"wb", "ab"}: 

1063 # no-op to flush on read-mode 

1064 return 

1065 

1066 if not force and self.buffer.tell() < self.blocksize: 

1067 # Defer write on small block 

1068 return 

1069 

1070 if self.offset is None: 

1071 # Initialize a multipart upload 

1072 self.offset = 0 

1073 try: 

1074 await self._initiate_upload() 

1075 except: 

1076 self.closed = True 

1077 raise 

1078 

1079 if await self._upload_chunk(final=force) is not False: 

1080 self.offset += self.buffer.seek(0, 2) 

1081 self.buffer = io.BytesIO() 

1082 

1083 async def __aenter__(self): 

1084 return self 

1085 

1086 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1087 await self.close() 

1088 

1089 async def _fetch_range(self, start, end): 

1090 raise NotImplementedError 

1091 

1092 async def _initiate_upload(self): 

1093 pass 

1094 

1095 async def _upload_chunk(self, final=False): 

1096 raise NotImplementedError