Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

633 statements  

1import asyncio 

2import asyncio.events 

3import functools 

4import inspect 

5import io 

6import numbers 

7import os 

8import re 

9import threading 

10from collections.abc import Iterable 

11from glob import has_magic 

12from typing import TYPE_CHECKING 

13 

14from .callbacks import DEFAULT_CALLBACK 

15from .exceptions import FSTimeoutError 

16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep 

17from .spec import AbstractBufferedFile, AbstractFileSystem 

18from .utils import glob_translate, is_exception, other_paths 

19 

20private = re.compile("_[^_]") 

21iothread = [None] # dedicated fsspec IO thread 

22loop = [None] # global event loop for any non-async instance 

23_lock = None # global lock placeholder 

24get_running_loop = asyncio.get_running_loop 

25 

26 

27def get_lock(): 

28 """Allocate or return a threading lock. 

29 

30 The lock is allocated on first use to allow setting one lock per forked process. 

31 """ 

32 global _lock 

33 if not _lock: 

34 _lock = threading.Lock() 

35 return _lock 

36 

37 

38def reset_lock(): 

39 """Reset the global lock. 

40 

41 This should be called only on the init of a forked process to reset the lock to 

42 None, enabling the new forked process to get a new lock. 

43 """ 

44 global _lock 

45 

46 iothread[0] = None 

47 loop[0] = None 

48 _lock = None 

49 

50 

51async def _runner(event, coro, result, timeout=None): 

52 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

53 if timeout is not None: 

54 coro = asyncio.wait_for(coro, timeout=timeout) 

55 try: 

56 result[0] = await coro 

57 except Exception as ex: 

58 result[0] = ex 

59 finally: 

60 event.set() 

61 

62 

63def sync(loop, func, *args, timeout=None, **kwargs): 

64 """ 

65 Make loop run coroutine until it returns. Runs in other thread 

66 

67 Examples 

68 -------- 

69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, 

70 timeout=timeout, **kwargs) 

71 """ 

72 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

73 # NB: if the loop is not running *yet*, it is OK to submit work 

74 # and we will wait for it 

75 if loop is None or loop.is_closed(): 

76 raise RuntimeError("Loop is not running") 

77 try: 

78 loop0 = asyncio.events.get_running_loop() 

79 if loop0 is loop: 

80 raise NotImplementedError("Calling sync() from within a running loop") 

81 except NotImplementedError: 

82 raise 

83 except RuntimeError: 

84 pass 

85 coro = func(*args, **kwargs) 

86 result = [None] 

87 event = threading.Event() 

88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 

89 while True: 

90 # this loops allows thread to get interrupted 

91 if event.wait(1): 

92 break 

93 if timeout is not None: 

94 timeout -= 1 

95 if timeout < 0: 

96 raise FSTimeoutError 

97 

98 return_result = result[0] 

99 if isinstance(return_result, asyncio.TimeoutError): 

100 # suppress asyncio.TimeoutError, raise FSTimeoutError 

101 raise FSTimeoutError from return_result 

102 elif isinstance(return_result, BaseException): 

103 raise return_result 

104 else: 

105 return return_result 

106 

107 

108def sync_wrapper(func, obj=None): 

109 """Given a function, make so can be called in blocking contexts 

110 

111 Leave obj=None if defining within a class. Pass the instance if attaching 

112 as an attribute of the instance. 

113 """ 

114 

115 @functools.wraps(func) 

116 def wrapper(*args, **kwargs): 

117 self = obj or args[0] 

118 return sync(self.loop, func, *args, **kwargs) 

119 

120 return wrapper 

121 

122 

123def async_gen_wrapper(func, obj=None): 

124 """Given a async generator, make so can be called in blocking contexts""" 

125 

126 @functools.wraps(func) 

127 def wrapper(*args, **kwargs): 

128 self = obj or args[0] 

129 gen = func(*args, **kwargs) 

130 while True: 

131 try: 

132 yield sync(self.loop, gen.__anext__) 

133 except StopAsyncIteration: 

134 break 

135 

136 return wrapper 

137 

138 

139def get_loop(): 

140 """Create or return the default fsspec IO loop 

141 

142 The loop will be running on a separate thread. 

143 """ 

144 if loop[0] is None: 

145 with get_lock(): 

146 # repeat the check just in case the loop got filled between the 

147 # previous two calls from another thread 

148 if loop[0] is None: 

149 loop[0] = asyncio.new_event_loop() 

150 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") 

151 th.daemon = True 

152 th.start() 

153 iothread[0] = th 

154 return loop[0] 

155 

156 

157def reset_after_fork(): 

158 global lock 

159 loop[0] = None 

160 iothread[0] = None 

161 lock = None 

162 

163 

164if hasattr(os, "register_at_fork"): 

165 # should be posix; this will do nothing for spawn or forkserver subprocesses 

166 os.register_at_fork(after_in_child=reset_after_fork) 

167 

168 

169if TYPE_CHECKING: 

170 import resource 

171 

172 ResourceError = resource.error 

173else: 

174 try: 

175 import resource 

176 except ImportError: 

177 resource = None 

178 ResourceError = OSError 

179 else: 

180 ResourceError = getattr(resource, "error", OSError) 

181 

182_DEFAULT_BATCH_SIZE = 128 

183_NOFILES_DEFAULT_BATCH_SIZE = 1280 

184 

185 

186def _get_batch_size(nofiles=False): 

187 from fsspec.config import conf 

188 

189 if nofiles: 

190 if "nofiles_gather_batch_size" in conf: 

191 return conf["nofiles_gather_batch_size"] 

192 else: 

193 if "gather_batch_size" in conf: 

194 return conf["gather_batch_size"] 

195 if nofiles: 

196 return _NOFILES_DEFAULT_BATCH_SIZE 

197 if resource is None: 

198 return _DEFAULT_BATCH_SIZE 

199 

200 try: 

201 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) 

202 except (ImportError, ValueError, ResourceError): 

203 return _DEFAULT_BATCH_SIZE 

204 

205 if soft_limit == resource.RLIM_INFINITY: 

206 return -1 

207 else: 

208 return soft_limit // 8 

209 

210 

211def running_async() -> bool: 

212 """Being executed by an event loop?""" 

213 try: 

214 asyncio.get_running_loop() 

215 return True 

216 except RuntimeError: 

217 return False 

218 

219 

220async def _run_coros_in_chunks( 

221 coros, 

222 batch_size=None, 

223 callback=DEFAULT_CALLBACK, 

224 timeout=None, 

225 return_exceptions=False, 

226 nofiles=False, 

227): 

228 """Run the given coroutines in chunks. 

229 

230 Parameters 

231 ---------- 

232 coros: list of coroutines to run 

233 batch_size: int or None 

234 Number of coroutines to submit/wait on simultaneously. 

235 If -1, then it will not be any throttling. If 

236 None, it will be inferred from _get_batch_size() 

237 callback: fsspec.callbacks.Callback instance 

238 Gets a relative_update when each coroutine completes 

239 timeout: number or None 

240 If given, each coroutine times out after this time. Note that, since 

241 there are multiple batches, the total run time of this function will in 

242 general be longer 

243 return_exceptions: bool 

244 Same meaning as in asyncio.gather 

245 nofiles: bool 

246 If inferring the batch_size, does this operation involve local files? 

247 If yes, you normally expect smaller batches. 

248 """ 

249 

250 if batch_size is None: 

251 batch_size = _get_batch_size(nofiles=nofiles) 

252 

253 if batch_size == -1: 

254 batch_size = len(coros) 

255 elif batch_size <= 0: 

256 raise ValueError 

257 

258 async def _run_coro(coro, i): 

259 try: 

260 return await asyncio.wait_for(coro, timeout=timeout), i 

261 except Exception as e: 

262 if not return_exceptions: 

263 raise 

264 return e, i 

265 finally: 

266 callback.relative_update(1) 

267 

268 i = 0 

269 n = len(coros) 

270 results = [None] * n 

271 pending = set() 

272 

273 while pending or i < n: 

274 while len(pending) < batch_size and i < n: 

275 pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) 

276 i += 1 

277 

278 if not pending: 

279 break 

280 

281 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) 

282 first_exc = None 

283 while done: 

284 task = done.pop() 

285 try: 

286 result, k = await task 

287 results[k] = result 

288 except Exception as exc: 

289 if first_exc is None: 

290 first_exc = exc 

291 

292 if first_exc is not None: 

293 for task in pending: 

294 task.cancel() 

295 if pending: 

296 await asyncio.gather(*pending, return_exceptions=True) 

297 raise first_exc 

298 

299 return results 

300 

301 

302# these methods should be implemented as async by any async-able backend 

303async_methods = [ 

304 "_ls", 

305 "_cat_file", 

306 "_get_file", 

307 "_put_file", 

308 "_rm_file", 

309 "_cp_file", 

310 "_pipe_file", 

311 "_expand_path", 

312 "_info", 

313 "_isfile", 

314 "_isdir", 

315 "_exists", 

316 "_walk", 

317 "_glob", 

318 "_find", 

319 "_du", 

320 "_size", 

321 "_mkdir", 

322 "_makedirs", 

323] 

324 

325 

326class AsyncFileSystem(AbstractFileSystem): 

327 """Async file operations, default implementations 

328 

329 Passes bulk operations to asyncio.gather for concurrent operation. 

330 

331 Implementations that have concurrent batch operations and/or async methods 

332 should inherit from this class instead of AbstractFileSystem. Docstrings are 

333 copied from the un-underscored method in AbstractFileSystem, if not given. 

334 """ 

335 

336 # note that methods do not have docstring here; they will be copied 

337 # for _* methods and inferred for overridden methods. 

338 

339 async_impl = True 

340 mirror_sync_methods = True 

341 disable_throttling = False 

342 

343 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): 

344 self.asynchronous = asynchronous 

345 self._pid = os.getpid() 

346 if not asynchronous: 

347 self._loop = loop or get_loop() 

348 else: 

349 self._loop = None 

350 self.batch_size = batch_size 

351 super().__init__(*args, **kwargs) 

352 

353 @property 

354 def loop(self): 

355 if self._pid != os.getpid(): 

356 raise RuntimeError("This class is not fork-safe") 

357 return self._loop 

358 

359 async def _rm_file(self, path, **kwargs): 

360 if ( 

361 inspect.iscoroutinefunction(self._rm) 

362 and type(self)._rm is not AsyncFileSystem._rm 

363 ): 

364 return await self._rm(path, recursive=False, batch_size=1, **kwargs) 

365 raise NotImplementedError 

366 

367 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 

368 # TODO: implement on_error 

369 batch_size = batch_size or self.batch_size 

370 path = await self._expand_path(path, recursive=recursive) 

371 return await _run_coros_in_chunks( 

372 [self._rm_file(p, **kwargs) for p in reversed(path)], 

373 batch_size=batch_size, 

374 nofiles=True, 

375 ) 

376 

377 async def _cp_file(self, path1, path2, **kwargs): 

378 raise NotImplementedError 

379 

380 async def _mv_file(self, path1, path2): 

381 await self._cp_file(path1, path2) 

382 await self._rm_file(path1) 

383 

384 async def _copy( 

385 self, 

386 path1, 

387 path2, 

388 recursive=False, 

389 on_error=None, 

390 maxdepth=None, 

391 batch_size=None, 

392 **kwargs, 

393 ): 

394 if on_error is None and recursive: 

395 on_error = "ignore" 

396 elif on_error is None: 

397 on_error = "raise" 

398 

399 if isinstance(path1, list) and isinstance(path2, list): 

400 # No need to expand paths when both source and destination 

401 # are provided as lists 

402 paths1 = path1 

403 paths2 = path2 

404 else: 

405 source_is_str = isinstance(path1, str) 

406 paths1 = await self._expand_path( 

407 path1, maxdepth=maxdepth, recursive=recursive 

408 ) 

409 if source_is_str and (not recursive or maxdepth is not None): 

410 # Non-recursive glob does not copy directories 

411 paths1 = [ 

412 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) 

413 ] 

414 if not paths1: 

415 return 

416 

417 source_is_file = len(paths1) == 1 

418 dest_is_dir = isinstance(path2, str) and ( 

419 trailing_sep(path2) or await self._isdir(path2) 

420 ) 

421 

422 exists = source_is_str and ( 

423 (has_magic(path1) and source_is_file) 

424 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) 

425 ) 

426 paths2 = other_paths( 

427 paths1, 

428 path2, 

429 exists=exists, 

430 flatten=not source_is_str, 

431 ) 

432 

433 batch_size = batch_size or self.batch_size 

434 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] 

435 result = await _run_coros_in_chunks( 

436 coros, batch_size=batch_size, return_exceptions=True, nofiles=True 

437 ) 

438 

439 for ex in filter(is_exception, result): 

440 if on_error == "ignore" and isinstance(ex, FileNotFoundError): 

441 continue 

442 raise ex 

443 

444 async def _pipe_file(self, path, value, mode="overwrite", **kwargs): 

445 raise NotImplementedError 

446 

447 async def _pipe(self, path, value=None, batch_size=None, **kwargs): 

448 if isinstance(path, str): 

449 path = {path: value} 

450 batch_size = batch_size or self.batch_size 

451 return await _run_coros_in_chunks( 

452 [self._pipe_file(k, v, **kwargs) for k, v in path.items()], 

453 batch_size=batch_size, 

454 nofiles=True, 

455 ) 

456 

457 async def _process_limits(self, url, start, end): 

458 """Helper for "Range"-based _cat_file""" 

459 size = None 

460 suff = False 

461 if start is not None and start < 0: 

462 # if start is negative and end None, end is the "suffix length" 

463 if end is None: 

464 end = -start 

465 start = "" 

466 suff = True 

467 else: 

468 size = size or (await self._info(url))["size"] 

469 start = size + start 

470 elif start is None: 

471 start = 0 

472 if not suff: 

473 if end is not None and end < 0: 

474 if start is not None: 

475 size = size or (await self._info(url))["size"] 

476 end = size + end 

477 elif end is None: 

478 end = "" 

479 if isinstance(end, numbers.Integral): 

480 end -= 1 # bytes range is inclusive 

481 return f"bytes={start}-{end}" 

482 

483 async def _cat_file(self, path, start=None, end=None, **kwargs): 

484 raise NotImplementedError 

485 

486 async def _cat( 

487 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs 

488 ): 

489 paths = await self._expand_path(path, recursive=recursive) 

490 coros = [self._cat_file(path, **kwargs) for path in paths] 

491 batch_size = batch_size or self.batch_size 

492 out = await _run_coros_in_chunks( 

493 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

494 ) 

495 if on_error == "raise": 

496 ex = next(filter(is_exception, out), False) 

497 if ex: 

498 raise ex 

499 if ( 

500 len(paths) > 1 

501 or isinstance(path, list) 

502 or paths[0] != self._strip_protocol(path) 

503 ): 

504 return { 

505 k: v 

506 for k, v in zip(paths, out) 

507 if on_error != "omit" or not is_exception(v) 

508 } 

509 else: 

510 return out[0] 

511 

512 async def _cat_ranges( 

513 self, 

514 paths, 

515 starts, 

516 ends, 

517 max_gap=None, 

518 batch_size=None, 

519 on_error="return", 

520 **kwargs, 

521 ): 

522 """Get the contents of byte ranges from one or more files 

523 

524 Parameters 

525 ---------- 

526 paths: list 

527 A list of of filepaths on this filesystems 

528 starts, ends: int or list 

529 Bytes limits of the read. If using a single int, the same value will be 

530 used to read all the specified files. 

531 on_error: "return" or "raise" 

532 If "return" (default), any per-range exception is placed in the output 

533 list at the corresponding position. Otherwise the first such exception 

534 is raised. Matches ``AbstractFileSystem.cat_ranges``. 

535 """ 

536 if max_gap is not None: 

537 # use utils.merge_offset_ranges 

538 raise NotImplementedError 

539 if not isinstance(paths, list): 

540 raise TypeError 

541 if not isinstance(starts, Iterable): 

542 starts = [starts] * len(paths) 

543 if not isinstance(ends, Iterable): 

544 ends = [ends] * len(paths) 

545 if len(starts) != len(paths) or len(ends) != len(paths): 

546 raise ValueError 

547 coros = [ 

548 self._cat_file(p, start=s, end=e, **kwargs) 

549 for p, s, e in zip(paths, starts, ends) 

550 ] 

551 batch_size = batch_size or self.batch_size 

552 out = await _run_coros_in_chunks( 

553 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

554 ) 

555 if on_error != "return": 

556 ex = next(filter(is_exception, out), None) 

557 if ex is not None: 

558 raise ex 

559 return out 

560 

561 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): 

562 raise NotImplementedError 

563 

564 async def _put( 

565 self, 

566 lpath, 

567 rpath, 

568 recursive=False, 

569 callback=DEFAULT_CALLBACK, 

570 batch_size=None, 

571 maxdepth=None, 

572 **kwargs, 

573 ): 

574 """Copy file(s) from local. 

575 

576 Copies a specific file or tree of files (if recursive=True). If rpath 

577 ends with a "/", it will be assumed to be a directory, and target files 

578 will go within. 

579 

580 The put_file method will be called concurrently on a batch of files. The 

581 batch_size option can configure the amount of futures that can be executed 

582 at the same time. If it is -1, then all the files will be uploaded concurrently. 

583 The default can be set for this instance by passing "batch_size" in the 

584 constructor, or for all instances by setting the "gather_batch_size" key 

585 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

586 """ 

587 if isinstance(lpath, list) and isinstance(rpath, list): 

588 # No need to expand paths when both source and destination 

589 # are provided as lists 

590 rpaths = rpath 

591 lpaths = lpath 

592 else: 

593 source_is_str = isinstance(lpath, str) 

594 if source_is_str: 

595 lpath = make_path_posix(lpath) 

596 fs = LocalFileSystem() 

597 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

598 if source_is_str and (not recursive or maxdepth is not None): 

599 # Non-recursive glob does not copy directories 

600 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

601 if not lpaths: 

602 return 

603 

604 source_is_file = len(lpaths) == 1 

605 dest_is_dir = isinstance(rpath, str) and ( 

606 trailing_sep(rpath) or await self._isdir(rpath) 

607 ) 

608 

609 rpath = self._strip_protocol(rpath) 

610 exists = source_is_str and ( 

611 (has_magic(lpath) and source_is_file) 

612 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) 

613 ) 

614 rpaths = other_paths( 

615 lpaths, 

616 rpath, 

617 exists=exists, 

618 flatten=not source_is_str, 

619 ) 

620 

621 is_dir = {l: os.path.isdir(l) for l in lpaths} 

622 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] 

623 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] 

624 

625 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) 

626 batch_size = batch_size or self.batch_size 

627 

628 coros = [] 

629 callback.set_size(len(file_pairs)) 

630 for lfile, rfile in file_pairs: 

631 put_file = callback.branch_coro(self._put_file) 

632 coros.append(put_file(lfile, rfile, **kwargs)) 

633 

634 return await _run_coros_in_chunks( 

635 coros, batch_size=batch_size, callback=callback 

636 ) 

637 

638 async def _get_file(self, rpath, lpath, **kwargs): 

639 raise NotImplementedError 

640 

641 async def _get( 

642 self, 

643 rpath, 

644 lpath, 

645 recursive=False, 

646 callback=DEFAULT_CALLBACK, 

647 maxdepth=None, 

648 **kwargs, 

649 ): 

650 """Copy file(s) to local. 

651 

652 Copies a specific file or tree of files (if recursive=True). If lpath 

653 ends with a "/", it will be assumed to be a directory, and target files 

654 will go within. Can submit a list of paths, which may be glob-patterns 

655 and will be expanded. 

656 

657 The get_file method will be called concurrently on a batch of files. The 

658 batch_size option can configure the amount of futures that can be executed 

659 at the same time. If it is -1, then all the files will be uploaded concurrently. 

660 The default can be set for this instance by passing "batch_size" in the 

661 constructor, or for all instances by setting the "gather_batch_size" key 

662 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

663 """ 

664 if isinstance(lpath, list) and isinstance(rpath, list): 

665 # No need to expand paths when both source and destination 

666 # are provided as lists 

667 rpaths = rpath 

668 lpaths = lpath 

669 else: 

670 source_is_str = isinstance(rpath, str) 

671 # First check for rpath trailing slash as _strip_protocol removes it. 

672 source_not_trailing_sep = source_is_str and not trailing_sep(rpath) 

673 rpath = self._strip_protocol(rpath) 

674 rpaths = await self._expand_path( 

675 rpath, recursive=recursive, maxdepth=maxdepth 

676 ) 

677 if source_is_str and (not recursive or maxdepth is not None): 

678 # Non-recursive glob does not copy directories 

679 rpaths = [ 

680 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) 

681 ] 

682 if not rpaths: 

683 return 

684 

685 lpath = make_path_posix(lpath) 

686 source_is_file = len(rpaths) == 1 

687 dest_is_dir = isinstance(lpath, str) and ( 

688 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

689 ) 

690 

691 exists = source_is_str and ( 

692 (has_magic(rpath) and source_is_file) 

693 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) 

694 ) 

695 lpaths = other_paths( 

696 rpaths, 

697 lpath, 

698 exists=exists, 

699 flatten=not source_is_str, 

700 ) 

701 

702 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] 

703 batch_size = kwargs.pop("batch_size", self.batch_size) 

704 

705 coros = [] 

706 callback.set_size(len(lpaths)) 

707 for lpath, rpath in zip(lpaths, rpaths): 

708 get_file = callback.branch_coro(self._get_file) 

709 coros.append(get_file(rpath, lpath, **kwargs)) 

710 return await _run_coros_in_chunks( 

711 coros, batch_size=batch_size, callback=callback 

712 ) 

713 

714 async def _isfile(self, path): 

715 try: 

716 return (await self._info(path))["type"] == "file" 

717 except: # noqa: E722 

718 return False 

719 

720 async def _isdir(self, path): 

721 try: 

722 return (await self._info(path))["type"] == "directory" 

723 except OSError: 

724 return False 

725 

726 async def _size(self, path): 

727 return (await self._info(path)).get("size", None) 

728 

729 async def _sizes(self, paths, batch_size=None): 

730 batch_size = batch_size or self.batch_size 

731 return await _run_coros_in_chunks( 

732 [self._size(p) for p in paths], batch_size=batch_size 

733 ) 

734 

735 async def _exists(self, path, **kwargs): 

736 try: 

737 await self._info(path, **kwargs) 

738 return True 

739 except FileNotFoundError: 

740 return False 

741 

742 async def _info(self, path, **kwargs): 

743 raise NotImplementedError 

744 

745 async def _ls(self, path, detail=True, **kwargs): 

746 raise NotImplementedError 

747 

748 async def _walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs): 

749 if maxdepth is not None and maxdepth < 1: 

750 raise ValueError("maxdepth must be at least 1") 

751 

752 path = self._strip_protocol(path) 

753 full_dirs = {} 

754 dirs = {} 

755 files = {} 

756 

757 detail = kwargs.pop("detail", False) 

758 try: 

759 listing = await self._ls(path, detail=True, **kwargs) 

760 except (FileNotFoundError, OSError) as e: 

761 if on_error == "raise": 

762 raise 

763 elif callable(on_error): 

764 on_error(e) 

765 if detail: 

766 yield path, {}, {} 

767 else: 

768 yield path, [], [] 

769 return 

770 

771 for info in listing: 

772 # each info name must be at least [path]/part , but here 

773 # we check also for names like [path]/part/ 

774 pathname = info["name"].rstrip("/") 

775 name = pathname.rsplit("/", 1)[-1] 

776 if info["type"] == "directory" and pathname != path: 

777 # do not include "self" path 

778 full_dirs[name] = pathname 

779 dirs[name] = info 

780 elif pathname == path: 

781 # file-like with same name as give path 

782 files[""] = info 

783 else: 

784 files[name] = info 

785 

786 if not detail: 

787 dirs = list(dirs) 

788 files = list(files) 

789 

790 if topdown: 

791 # Yield before recursion if walking top down 

792 yield path, dirs, files 

793 

794 if maxdepth is not None: 

795 maxdepth -= 1 

796 if maxdepth < 1: 

797 if not topdown: 

798 yield path, dirs, files 

799 return 

800 

801 for d in dirs: 

802 async for _ in self._walk( 

803 full_dirs[d], 

804 maxdepth=maxdepth, 

805 detail=detail, 

806 topdown=topdown, 

807 **kwargs, 

808 ): 

809 yield _ 

810 

811 if not topdown: 

812 # Yield after recursion if walking bottom up 

813 yield path, dirs, files 

814 

815 async def _glob(self, path, maxdepth=None, **kwargs): 

816 if maxdepth is not None and maxdepth < 1: 

817 raise ValueError("maxdepth must be at least 1") 

818 

819 import re 

820 

821 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) 

822 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash 

823 path = self._strip_protocol(path) 

824 append_slash_to_dirname = ends_with_sep or path.endswith( 

825 tuple(sep + "**" for sep in seps) 

826 ) 

827 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

828 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) 

829 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

830 

831 min_idx = min(idx_star, idx_qmark, idx_brace) 

832 

833 detail = kwargs.pop("detail", False) 

834 withdirs = kwargs.pop("withdirs", True) 

835 

836 if not has_magic(path): 

837 if await self._exists(path, **kwargs): 

838 if not detail: 

839 return [path] 

840 else: 

841 return {path: await self._info(path, **kwargs)} 

842 else: 

843 if not detail: 

844 return [] # glob of non-existent returns empty 

845 else: 

846 return {} 

847 elif "/" in path[:min_idx]: 

848 first_wildcard_idx = min_idx 

849 min_idx = path[:min_idx].rindex("/") 

850 root = path[ 

851 : min_idx + 1 

852 ] # everything up to the last / before the first wildcard 

853 prefix = path[ 

854 min_idx + 1 : first_wildcard_idx 

855 ] # stem between last "/" and first wildcard 

856 depth = path[min_idx + 1 :].count("/") + 1 

857 else: 

858 root = "" 

859 prefix = path[:min_idx] # stem up to the first wildcard 

860 depth = path[min_idx + 1 :].count("/") + 1 

861 

862 if "**" in path: 

863 if maxdepth is not None: 

864 idx_double_stars = path.find("**") 

865 depth_double_stars = path[idx_double_stars:].count("/") + 1 

866 depth = depth - depth_double_stars + maxdepth 

867 else: 

868 depth = None 

869 

870 # Pass the filename stem as prefix= so backends that support it such as 

871 # gcsfs, s3fs and adlfs can filter server-side up to the first wildcard. 

872 if prefix: 

873 kwargs["prefix"] = prefix 

874 allpaths = await self._find( 

875 root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs 

876 ) 

877 

878 pattern = glob_translate(path + ("/" if ends_with_sep else "")) 

879 pattern = re.compile(pattern) 

880 

881 out = { 

882 p: info 

883 for p, info in sorted(allpaths.items()) 

884 if pattern.match( 

885 p + "/" 

886 if append_slash_to_dirname and info["type"] == "directory" 

887 else p 

888 ) 

889 } 

890 

891 if detail: 

892 return out 

893 else: 

894 return list(out) 

895 

896 async def _du(self, path, total=True, maxdepth=None, **kwargs): 

897 sizes = {} 

898 # async for? 

899 for f in await self._find(path, maxdepth=maxdepth, **kwargs): 

900 info = await self._info(f) 

901 sizes[info["name"]] = info["size"] 

902 if total: 

903 return sum(sizes.values()) 

904 else: 

905 return sizes 

906 

907 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): 

908 path = self._strip_protocol(path) 

909 out = {} 

910 detail = kwargs.pop("detail", False) 

911 

912 # Add the root directory if withdirs is requested 

913 # This is needed for posix glob compliance 

914 if withdirs and path != "" and await self._isdir(path): 

915 out[path] = await self._info(path) 

916 

917 # async for? 

918 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): 

919 if withdirs: 

920 files.update(dirs) 

921 out.update({info["name"]: info for name, info in files.items()}) 

922 if not out and (await self._isfile(path)): 

923 # walk works on directories, but find should also return [path] 

924 # when path happens to be a file 

925 out[path] = {} 

926 names = sorted(out) 

927 if not detail: 

928 return names 

929 else: 

930 return {name: out[name] for name in names} 

931 

932 async def _expand_path( 

933 self, path, recursive=False, maxdepth=None, assume_literal=False 

934 ): 

935 if maxdepth is not None and maxdepth < 1: 

936 raise ValueError("maxdepth must be at least 1") 

937 

938 if isinstance(path, str): 

939 out = await self._expand_path([path], recursive, maxdepth) 

940 else: 

941 out = set() 

942 path = [self._strip_protocol(p) for p in path] 

943 for p in path: # can gather here 

944 if not assume_literal and has_magic(p): 

945 bit = set(await self._glob(p, maxdepth=maxdepth)) 

946 out |= bit 

947 if recursive: 

948 # glob call above expanded one depth so if maxdepth is defined 

949 # then decrement it in expand_path call below. If it is zero 

950 # after decrementing then avoid expand_path call. 

951 if maxdepth is not None and maxdepth <= 1: 

952 continue 

953 out |= set( 

954 await self._expand_path( 

955 list(bit), 

956 recursive=recursive, 

957 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

958 assume_literal=True, 

959 ) 

960 ) 

961 continue 

962 elif recursive: 

963 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) 

964 out |= rec 

965 if p not in out and (recursive is False or (await self._exists(p))): 

966 # should only check once, for the root 

967 out.add(p) 

968 if not out: 

969 raise FileNotFoundError(path) 

970 return sorted(out) 

971 

972 async def _mkdir(self, path, create_parents=True, **kwargs): 

973 pass # not necessary to implement, may not have directories 

974 

975 async def _makedirs(self, path, exist_ok=False): 

976 pass # not necessary to implement, may not have directories 

977 

978 async def open_async(self, path, mode="rb", **kwargs): 

979 if "b" not in mode or kwargs.get("compression"): 

980 raise ValueError 

981 raise NotImplementedError 

982 

983 

984def mirror_sync_methods(obj): 

985 """Populate sync and async methods for obj 

986 

987 For each method will create a sync version if the name refers to an async method 

988 (coroutine) and there is no override in the child class; will create an async 

989 method for the corresponding sync method if there is no implementation. 

990 

991 Uses the methods specified in 

992 - async_methods: the set that an implementation is expected to provide 

993 - default_async_methods: that can be derived from their sync version in 

994 AbstractFileSystem 

995 - AsyncFileSystem: async-specific default coroutines 

996 """ 

997 from fsspec import AbstractFileSystem 

998 

999 for method in set(async_methods + dir(AsyncFileSystem)): 

1000 if not method.startswith("_"): 

1001 continue 

1002 smethod = method[1:] 

1003 if private.match(method): 

1004 isco = inspect.iscoroutinefunction(getattr(obj, method, None)) 

1005 unsync = getattr(getattr(obj, smethod, False), "__func__", None) 

1006 is_default = unsync is getattr(AbstractFileSystem, smethod, "") 

1007 if isco and is_default: 

1008 mth = sync_wrapper(getattr(obj, method), obj=obj) 

1009 elif inspect.isasyncgenfunction(getattr(obj, method, None)) and is_default: 

1010 mth = async_gen_wrapper(getattr(obj, method), obj=obj) 

1011 else: 

1012 continue 

1013 setattr(obj, smethod, mth) 

1014 if not mth.__doc__: 

1015 mth.__doc__ = getattr( 

1016 getattr(AbstractFileSystem, smethod, None), "__doc__", "" 

1017 ) 

1018 

1019 

1020class FSSpecCoroutineCancel(Exception): 

1021 pass 

1022 

1023 

1024def _dump_running_tasks( 

1025 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False 

1026): 

1027 import traceback 

1028 

1029 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] 

1030 if printout: 

1031 [task.print_stack() for task in tasks] 

1032 out = [ 

1033 { 

1034 "locals": task._coro.cr_frame.f_locals, 

1035 "file": task._coro.cr_frame.f_code.co_filename, 

1036 "firstline": task._coro.cr_frame.f_code.co_firstlineno, 

1037 "linelo": task._coro.cr_frame.f_lineno, 

1038 "stack": traceback.format_stack(task._coro.cr_frame), 

1039 "task": task if with_task else None, 

1040 } 

1041 for task in tasks 

1042 ] 

1043 if cancel: 

1044 for t in tasks: 

1045 cbs = t._callbacks 

1046 t.cancel() 

1047 asyncio.futures.Future.set_exception(t, exc) 

1048 asyncio.futures.Future.cancel(t) 

1049 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures 

1050 try: 

1051 t._coro.throw(exc) # exits coro, unless explicitly handled 

1052 except exc: 

1053 pass 

1054 return out 

1055 

1056 

1057class AbstractAsyncStreamedFile(AbstractBufferedFile): 

1058 # no read buffering, and always auto-commit 

1059 # TODO: readahead might still be useful here, but needs async version 

1060 

1061 async def read(self, length=-1): 

1062 """ 

1063 Return data from cache, or fetch pieces as necessary 

1064 

1065 Parameters 

1066 ---------- 

1067 length: int (-1) 

1068 Number of bytes to read; if <0, all remaining bytes. 

1069 """ 

1070 length = -1 if length is None else int(length) 

1071 if self.mode != "rb": 

1072 raise ValueError("File not in read mode") 

1073 if length < 0: 

1074 length = self.size - self.loc 

1075 if self.closed: 

1076 raise ValueError("I/O operation on closed file.") 

1077 if length == 0: 

1078 # don't even bother calling fetch 

1079 return b"" 

1080 out = await self._fetch_range(self.loc, self.loc + length) 

1081 self.loc += len(out) 

1082 return out 

1083 

1084 async def write(self, data): 

1085 """ 

1086 Write data to buffer. 

1087 

1088 Buffer only sent on flush() or if buffer is greater than 

1089 or equal to blocksize. 

1090 

1091 Parameters 

1092 ---------- 

1093 data: bytes 

1094 Set of bytes to be written. 

1095 """ 

1096 if self.mode not in {"wb", "ab"}: 

1097 raise ValueError("File not in write mode") 

1098 if self.closed: 

1099 raise ValueError("I/O operation on closed file.") 

1100 if self.forced: 

1101 raise ValueError("This file has been force-flushed, can only close") 

1102 out = self.buffer.write(data) 

1103 self.loc += out 

1104 if self.buffer.tell() >= self.blocksize: 

1105 await self.flush() 

1106 return out 

1107 

1108 async def close(self): 

1109 """Close file 

1110 

1111 Finalizes writes, discards cache 

1112 """ 

1113 if getattr(self, "_unclosable", False): 

1114 return 

1115 if self.closed: 

1116 return 

1117 if self.mode == "rb": 

1118 self.cache = None 

1119 else: 

1120 if not self.forced: 

1121 await self.flush(force=True) 

1122 

1123 if self.fs is not None: 

1124 self.fs.invalidate_cache(self.path) 

1125 self.fs.invalidate_cache(self.fs._parent(self.path)) 

1126 

1127 self.closed = True 

1128 

1129 async def flush(self, force=False): 

1130 if self.closed: 

1131 raise ValueError("Flush on closed file") 

1132 if force and self.forced: 

1133 raise ValueError("Force flush cannot be called more than once") 

1134 if force: 

1135 self.forced = True 

1136 

1137 if self.mode not in {"wb", "ab"}: 

1138 # no-op to flush on read-mode 

1139 return 

1140 

1141 if not force and self.buffer.tell() < self.blocksize: 

1142 # Defer write on small block 

1143 return 

1144 

1145 if self.offset is None: 

1146 # Initialize a multipart upload 

1147 self.offset = 0 

1148 try: 

1149 await self._initiate_upload() 

1150 except: 

1151 self.closed = True 

1152 raise 

1153 

1154 if await self._upload_chunk(final=force) is not False: 

1155 self.offset += self.buffer.seek(0, 2) 

1156 self.buffer = io.BytesIO() 

1157 

1158 async def __aenter__(self): 

1159 return self 

1160 

1161 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1162 await self.close() 

1163 

1164 async def _fetch_range(self, start, end): 

1165 raise NotImplementedError 

1166 

1167 async def _initiate_upload(self): 

1168 pass 

1169 

1170 async def _upload_chunk(self, final=False): 

1171 raise NotImplementedError