Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

608 statements  

1import asyncio 

2import asyncio.events 

3import functools 

4import inspect 

5import io 

6import numbers 

7import os 

8import re 

9import threading 

10from collections.abc import Iterable 

11from glob import has_magic 

12from typing import TYPE_CHECKING 

13 

14from .callbacks import DEFAULT_CALLBACK 

15from .exceptions import FSTimeoutError 

16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep 

17from .spec import AbstractBufferedFile, AbstractFileSystem 

18from .utils import glob_translate, is_exception, other_paths 

19 

20private = re.compile("_[^_]") 

21iothread = [None] # dedicated fsspec IO thread 

22loop = [None] # global event loop for any non-async instance 

23_lock = None # global lock placeholder 

24get_running_loop = asyncio.get_running_loop 

25 

26 

27def get_lock(): 

28 """Allocate or return a threading lock. 

29 

30 The lock is allocated on first use to allow setting one lock per forked process. 

31 """ 

32 global _lock 

33 if not _lock: 

34 _lock = threading.Lock() 

35 return _lock 

36 

37 

38def reset_lock(): 

39 """Reset the global lock. 

40 

41 This should be called only on the init of a forked process to reset the lock to 

42 None, enabling the new forked process to get a new lock. 

43 """ 

44 global _lock 

45 

46 iothread[0] = None 

47 loop[0] = None 

48 _lock = None 

49 

50 

51async def _runner(event, coro, result, timeout=None): 

52 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

53 if timeout is not None: 

54 coro = asyncio.wait_for(coro, timeout=timeout) 

55 try: 

56 result[0] = await coro 

57 except Exception as ex: 

58 result[0] = ex 

59 finally: 

60 event.set() 

61 

62 

63def sync(loop, func, *args, timeout=None, **kwargs): 

64 """ 

65 Make loop run coroutine until it returns. Runs in other thread 

66 

67 Examples 

68 -------- 

69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, 

70 timeout=timeout, **kwargs) 

71 """ 

72 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

73 # NB: if the loop is not running *yet*, it is OK to submit work 

74 # and we will wait for it 

75 if loop is None or loop.is_closed(): 

76 raise RuntimeError("Loop is not running") 

77 try: 

78 loop0 = asyncio.events.get_running_loop() 

79 if loop0 is loop: 

80 raise NotImplementedError("Calling sync() from within a running loop") 

81 except NotImplementedError: 

82 raise 

83 except RuntimeError: 

84 pass 

85 coro = func(*args, **kwargs) 

86 result = [None] 

87 event = threading.Event() 

88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 

89 while True: 

90 # this loops allows thread to get interrupted 

91 if event.wait(1): 

92 break 

93 if timeout is not None: 

94 timeout -= 1 

95 if timeout < 0: 

96 raise FSTimeoutError 

97 

98 return_result = result[0] 

99 if isinstance(return_result, asyncio.TimeoutError): 

100 # suppress asyncio.TimeoutError, raise FSTimeoutError 

101 raise FSTimeoutError from return_result 

102 elif isinstance(return_result, BaseException): 

103 raise return_result 

104 else: 

105 return return_result 

106 

107 

108def sync_wrapper(func, obj=None): 

109 """Given a function, make so can be called in blocking contexts 

110 

111 Leave obj=None if defining within a class. Pass the instance if attaching 

112 as an attribute of the instance. 

113 """ 

114 

115 @functools.wraps(func) 

116 def wrapper(*args, **kwargs): 

117 self = obj or args[0] 

118 return sync(self.loop, func, *args, **kwargs) 

119 

120 return wrapper 

121 

122 

123def get_loop(): 

124 """Create or return the default fsspec IO loop 

125 

126 The loop will be running on a separate thread. 

127 """ 

128 if loop[0] is None: 

129 with get_lock(): 

130 # repeat the check just in case the loop got filled between the 

131 # previous two calls from another thread 

132 if loop[0] is None: 

133 loop[0] = asyncio.new_event_loop() 

134 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") 

135 th.daemon = True 

136 th.start() 

137 iothread[0] = th 

138 return loop[0] 

139 

140 

141def reset_after_fork(): 

142 global lock 

143 loop[0] = None 

144 iothread[0] = None 

145 lock = None 

146 

147 

148if hasattr(os, "register_at_fork"): 

149 # should be posix; this will do nothing for spawn or forkserver subprocesses 

150 os.register_at_fork(after_in_child=reset_after_fork) 

151 

152 

153if TYPE_CHECKING: 

154 import resource 

155 

156 ResourceError = resource.error 

157else: 

158 try: 

159 import resource 

160 except ImportError: 

161 resource = None 

162 ResourceError = OSError 

163 else: 

164 ResourceError = getattr(resource, "error", OSError) 

165 

166_DEFAULT_BATCH_SIZE = 128 

167_NOFILES_DEFAULT_BATCH_SIZE = 1280 

168 

169 

170def _get_batch_size(nofiles=False): 

171 from fsspec.config import conf 

172 

173 if nofiles: 

174 if "nofiles_gather_batch_size" in conf: 

175 return conf["nofiles_gather_batch_size"] 

176 else: 

177 if "gather_batch_size" in conf: 

178 return conf["gather_batch_size"] 

179 if nofiles: 

180 return _NOFILES_DEFAULT_BATCH_SIZE 

181 if resource is None: 

182 return _DEFAULT_BATCH_SIZE 

183 

184 try: 

185 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) 

186 except (ImportError, ValueError, ResourceError): 

187 return _DEFAULT_BATCH_SIZE 

188 

189 if soft_limit == resource.RLIM_INFINITY: 

190 return -1 

191 else: 

192 return soft_limit // 8 

193 

194 

195def running_async() -> bool: 

196 """Being executed by an event loop?""" 

197 try: 

198 asyncio.get_running_loop() 

199 return True 

200 except RuntimeError: 

201 return False 

202 

203 

204async def _run_coros_in_chunks( 

205 coros, 

206 batch_size=None, 

207 callback=DEFAULT_CALLBACK, 

208 timeout=None, 

209 return_exceptions=False, 

210 nofiles=False, 

211): 

212 """Run the given coroutines in chunks. 

213 

214 Parameters 

215 ---------- 

216 coros: list of coroutines to run 

217 batch_size: int or None 

218 Number of coroutines to submit/wait on simultaneously. 

219 If -1, then it will not be any throttling. If 

220 None, it will be inferred from _get_batch_size() 

221 callback: fsspec.callbacks.Callback instance 

222 Gets a relative_update when each coroutine completes 

223 timeout: number or None 

224 If given, each coroutine times out after this time. Note that, since 

225 there are multiple batches, the total run time of this function will in 

226 general be longer 

227 return_exceptions: bool 

228 Same meaning as in asyncio.gather 

229 nofiles: bool 

230 If inferring the batch_size, does this operation involve local files? 

231 If yes, you normally expect smaller batches. 

232 """ 

233 

234 if batch_size is None: 

235 batch_size = _get_batch_size(nofiles=nofiles) 

236 

237 if batch_size == -1: 

238 batch_size = len(coros) 

239 elif batch_size <= 0: 

240 raise ValueError 

241 

242 async def _run_coro(coro, i): 

243 try: 

244 return await asyncio.wait_for(coro, timeout=timeout), i 

245 except Exception as e: 

246 if not return_exceptions: 

247 raise 

248 return e, i 

249 finally: 

250 callback.relative_update(1) 

251 

252 i = 0 

253 n = len(coros) 

254 results = [None] * n 

255 pending = set() 

256 

257 while pending or i < n: 

258 while len(pending) < batch_size and i < n: 

259 pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) 

260 i += 1 

261 

262 if not pending: 

263 break 

264 

265 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) 

266 first_exc = None 

267 while done: 

268 task = done.pop() 

269 try: 

270 result, k = await task 

271 results[k] = result 

272 except Exception as exc: 

273 if first_exc is None: 

274 first_exc = exc 

275 

276 if first_exc is not None: 

277 for task in pending: 

278 task.cancel() 

279 if pending: 

280 await asyncio.gather(*pending, return_exceptions=True) 

281 raise first_exc 

282 

283 return results 

284 

285 

286# these methods should be implemented as async by any async-able backend 

287async_methods = [ 

288 "_ls", 

289 "_cat_file", 

290 "_get_file", 

291 "_put_file", 

292 "_rm_file", 

293 "_cp_file", 

294 "_pipe_file", 

295 "_expand_path", 

296 "_info", 

297 "_isfile", 

298 "_isdir", 

299 "_exists", 

300 "_walk", 

301 "_glob", 

302 "_find", 

303 "_du", 

304 "_size", 

305 "_mkdir", 

306 "_makedirs", 

307] 

308 

309 

310class AsyncFileSystem(AbstractFileSystem): 

311 """Async file operations, default implementations 

312 

313 Passes bulk operations to asyncio.gather for concurrent operation. 

314 

315 Implementations that have concurrent batch operations and/or async methods 

316 should inherit from this class instead of AbstractFileSystem. Docstrings are 

317 copied from the un-underscored method in AbstractFileSystem, if not given. 

318 """ 

319 

320 # note that methods do not have docstring here; they will be copied 

321 # for _* methods and inferred for overridden methods. 

322 

323 async_impl = True 

324 mirror_sync_methods = True 

325 disable_throttling = False 

326 

327 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): 

328 self.asynchronous = asynchronous 

329 self._pid = os.getpid() 

330 if not asynchronous: 

331 self._loop = loop or get_loop() 

332 else: 

333 self._loop = None 

334 self.batch_size = batch_size 

335 super().__init__(*args, **kwargs) 

336 

337 @property 

338 def loop(self): 

339 if self._pid != os.getpid(): 

340 raise RuntimeError("This class is not fork-safe") 

341 return self._loop 

342 

343 async def _rm_file(self, path, **kwargs): 

344 if ( 

345 inspect.iscoroutinefunction(self._rm) 

346 and type(self)._rm is not AsyncFileSystem._rm 

347 ): 

348 return await self._rm(path, recursive=False, batch_size=1, **kwargs) 

349 raise NotImplementedError 

350 

351 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 

352 # TODO: implement on_error 

353 batch_size = batch_size or self.batch_size 

354 path = await self._expand_path(path, recursive=recursive) 

355 return await _run_coros_in_chunks( 

356 [self._rm_file(p, **kwargs) for p in reversed(path)], 

357 batch_size=batch_size, 

358 nofiles=True, 

359 ) 

360 

361 async def _cp_file(self, path1, path2, **kwargs): 

362 raise NotImplementedError 

363 

364 async def _mv_file(self, path1, path2): 

365 await self._cp_file(path1, path2) 

366 await self._rm_file(path1) 

367 

368 async def _copy( 

369 self, 

370 path1, 

371 path2, 

372 recursive=False, 

373 on_error=None, 

374 maxdepth=None, 

375 batch_size=None, 

376 **kwargs, 

377 ): 

378 if on_error is None and recursive: 

379 on_error = "ignore" 

380 elif on_error is None: 

381 on_error = "raise" 

382 

383 if isinstance(path1, list) and isinstance(path2, list): 

384 # No need to expand paths when both source and destination 

385 # are provided as lists 

386 paths1 = path1 

387 paths2 = path2 

388 else: 

389 source_is_str = isinstance(path1, str) 

390 paths1 = await self._expand_path( 

391 path1, maxdepth=maxdepth, recursive=recursive 

392 ) 

393 if source_is_str and (not recursive or maxdepth is not None): 

394 # Non-recursive glob does not copy directories 

395 paths1 = [ 

396 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) 

397 ] 

398 if not paths1: 

399 return 

400 

401 source_is_file = len(paths1) == 1 

402 dest_is_dir = isinstance(path2, str) and ( 

403 trailing_sep(path2) or await self._isdir(path2) 

404 ) 

405 

406 exists = source_is_str and ( 

407 (has_magic(path1) and source_is_file) 

408 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) 

409 ) 

410 paths2 = other_paths( 

411 paths1, 

412 path2, 

413 exists=exists, 

414 flatten=not source_is_str, 

415 ) 

416 

417 batch_size = batch_size or self.batch_size 

418 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] 

419 result = await _run_coros_in_chunks( 

420 coros, batch_size=batch_size, return_exceptions=True, nofiles=True 

421 ) 

422 

423 for ex in filter(is_exception, result): 

424 if on_error == "ignore" and isinstance(ex, FileNotFoundError): 

425 continue 

426 raise ex 

427 

428 async def _pipe_file(self, path, value, mode="overwrite", **kwargs): 

429 raise NotImplementedError 

430 

431 async def _pipe(self, path, value=None, batch_size=None, **kwargs): 

432 if isinstance(path, str): 

433 path = {path: value} 

434 batch_size = batch_size or self.batch_size 

435 return await _run_coros_in_chunks( 

436 [self._pipe_file(k, v, **kwargs) for k, v in path.items()], 

437 batch_size=batch_size, 

438 nofiles=True, 

439 ) 

440 

441 async def _process_limits(self, url, start, end): 

442 """Helper for "Range"-based _cat_file""" 

443 size = None 

444 suff = False 

445 if start is not None and start < 0: 

446 # if start is negative and end None, end is the "suffix length" 

447 if end is None: 

448 end = -start 

449 start = "" 

450 suff = True 

451 else: 

452 size = size or (await self._info(url))["size"] 

453 start = size + start 

454 elif start is None: 

455 start = 0 

456 if not suff: 

457 if end is not None and end < 0: 

458 if start is not None: 

459 size = size or (await self._info(url))["size"] 

460 end = size + end 

461 elif end is None: 

462 end = "" 

463 if isinstance(end, numbers.Integral): 

464 end -= 1 # bytes range is inclusive 

465 return f"bytes={start}-{end}" 

466 

467 async def _cat_file(self, path, start=None, end=None, **kwargs): 

468 raise NotImplementedError 

469 

470 async def _cat( 

471 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs 

472 ): 

473 paths = await self._expand_path(path, recursive=recursive) 

474 coros = [self._cat_file(path, **kwargs) for path in paths] 

475 batch_size = batch_size or self.batch_size 

476 out = await _run_coros_in_chunks( 

477 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

478 ) 

479 if on_error == "raise": 

480 ex = next(filter(is_exception, out), False) 

481 if ex: 

482 raise ex 

483 if ( 

484 len(paths) > 1 

485 or isinstance(path, list) 

486 or paths[0] != self._strip_protocol(path) 

487 ): 

488 return { 

489 k: v 

490 for k, v in zip(paths, out) 

491 if on_error != "omit" or not is_exception(v) 

492 } 

493 else: 

494 return out[0] 

495 

496 async def _cat_ranges( 

497 self, 

498 paths, 

499 starts, 

500 ends, 

501 max_gap=None, 

502 batch_size=None, 

503 on_error="return", 

504 **kwargs, 

505 ): 

506 """Get the contents of byte ranges from one or more files 

507 

508 Parameters 

509 ---------- 

510 paths: list 

511 A list of of filepaths on this filesystems 

512 starts, ends: int or list 

513 Bytes limits of the read. If using a single int, the same value will be 

514 used to read all the specified files. 

515 """ 

516 # TODO: on_error 

517 if max_gap is not None: 

518 # use utils.merge_offset_ranges 

519 raise NotImplementedError 

520 if not isinstance(paths, list): 

521 raise TypeError 

522 if not isinstance(starts, Iterable): 

523 starts = [starts] * len(paths) 

524 if not isinstance(ends, Iterable): 

525 ends = [ends] * len(paths) 

526 if len(starts) != len(paths) or len(ends) != len(paths): 

527 raise ValueError 

528 coros = [ 

529 self._cat_file(p, start=s, end=e, **kwargs) 

530 for p, s, e in zip(paths, starts, ends) 

531 ] 

532 batch_size = batch_size or self.batch_size 

533 return await _run_coros_in_chunks( 

534 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

535 ) 

536 

537 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): 

538 raise NotImplementedError 

539 

540 async def _put( 

541 self, 

542 lpath, 

543 rpath, 

544 recursive=False, 

545 callback=DEFAULT_CALLBACK, 

546 batch_size=None, 

547 maxdepth=None, 

548 **kwargs, 

549 ): 

550 """Copy file(s) from local. 

551 

552 Copies a specific file or tree of files (if recursive=True). If rpath 

553 ends with a "/", it will be assumed to be a directory, and target files 

554 will go within. 

555 

556 The put_file method will be called concurrently on a batch of files. The 

557 batch_size option can configure the amount of futures that can be executed 

558 at the same time. If it is -1, then all the files will be uploaded concurrently. 

559 The default can be set for this instance by passing "batch_size" in the 

560 constructor, or for all instances by setting the "gather_batch_size" key 

561 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

562 """ 

563 if isinstance(lpath, list) and isinstance(rpath, list): 

564 # No need to expand paths when both source and destination 

565 # are provided as lists 

566 rpaths = rpath 

567 lpaths = lpath 

568 else: 

569 source_is_str = isinstance(lpath, str) 

570 if source_is_str: 

571 lpath = make_path_posix(lpath) 

572 fs = LocalFileSystem() 

573 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

574 if source_is_str and (not recursive or maxdepth is not None): 

575 # Non-recursive glob does not copy directories 

576 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

577 if not lpaths: 

578 return 

579 

580 source_is_file = len(lpaths) == 1 

581 dest_is_dir = isinstance(rpath, str) and ( 

582 trailing_sep(rpath) or await self._isdir(rpath) 

583 ) 

584 

585 rpath = self._strip_protocol(rpath) 

586 exists = source_is_str and ( 

587 (has_magic(lpath) and source_is_file) 

588 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) 

589 ) 

590 rpaths = other_paths( 

591 lpaths, 

592 rpath, 

593 exists=exists, 

594 flatten=not source_is_str, 

595 ) 

596 

597 is_dir = {l: os.path.isdir(l) for l in lpaths} 

598 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] 

599 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] 

600 

601 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) 

602 batch_size = batch_size or self.batch_size 

603 

604 coros = [] 

605 callback.set_size(len(file_pairs)) 

606 for lfile, rfile in file_pairs: 

607 put_file = callback.branch_coro(self._put_file) 

608 coros.append(put_file(lfile, rfile, **kwargs)) 

609 

610 return await _run_coros_in_chunks( 

611 coros, batch_size=batch_size, callback=callback 

612 ) 

613 

614 async def _get_file(self, rpath, lpath, **kwargs): 

615 raise NotImplementedError 

616 

617 async def _get( 

618 self, 

619 rpath, 

620 lpath, 

621 recursive=False, 

622 callback=DEFAULT_CALLBACK, 

623 maxdepth=None, 

624 **kwargs, 

625 ): 

626 """Copy file(s) to local. 

627 

628 Copies a specific file or tree of files (if recursive=True). If lpath 

629 ends with a "/", it will be assumed to be a directory, and target files 

630 will go within. Can submit a list of paths, which may be glob-patterns 

631 and will be expanded. 

632 

633 The get_file method will be called concurrently on a batch of files. The 

634 batch_size option can configure the amount of futures that can be executed 

635 at the same time. If it is -1, then all the files will be uploaded concurrently. 

636 The default can be set for this instance by passing "batch_size" in the 

637 constructor, or for all instances by setting the "gather_batch_size" key 

638 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

639 """ 

640 if isinstance(lpath, list) and isinstance(rpath, list): 

641 # No need to expand paths when both source and destination 

642 # are provided as lists 

643 rpaths = rpath 

644 lpaths = lpath 

645 else: 

646 source_is_str = isinstance(rpath, str) 

647 # First check for rpath trailing slash as _strip_protocol removes it. 

648 source_not_trailing_sep = source_is_str and not trailing_sep(rpath) 

649 rpath = self._strip_protocol(rpath) 

650 rpaths = await self._expand_path( 

651 rpath, recursive=recursive, maxdepth=maxdepth 

652 ) 

653 if source_is_str and (not recursive or maxdepth is not None): 

654 # Non-recursive glob does not copy directories 

655 rpaths = [ 

656 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) 

657 ] 

658 if not rpaths: 

659 return 

660 

661 lpath = make_path_posix(lpath) 

662 source_is_file = len(rpaths) == 1 

663 dest_is_dir = isinstance(lpath, str) and ( 

664 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

665 ) 

666 

667 exists = source_is_str and ( 

668 (has_magic(rpath) and source_is_file) 

669 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) 

670 ) 

671 lpaths = other_paths( 

672 rpaths, 

673 lpath, 

674 exists=exists, 

675 flatten=not source_is_str, 

676 ) 

677 

678 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] 

679 batch_size = kwargs.pop("batch_size", self.batch_size) 

680 

681 coros = [] 

682 callback.set_size(len(lpaths)) 

683 for lpath, rpath in zip(lpaths, rpaths): 

684 get_file = callback.branch_coro(self._get_file) 

685 coros.append(get_file(rpath, lpath, **kwargs)) 

686 return await _run_coros_in_chunks( 

687 coros, batch_size=batch_size, callback=callback 

688 ) 

689 

690 async def _isfile(self, path): 

691 try: 

692 return (await self._info(path))["type"] == "file" 

693 except: # noqa: E722 

694 return False 

695 

696 async def _isdir(self, path): 

697 try: 

698 return (await self._info(path))["type"] == "directory" 

699 except OSError: 

700 return False 

701 

702 async def _size(self, path): 

703 return (await self._info(path)).get("size", None) 

704 

705 async def _sizes(self, paths, batch_size=None): 

706 batch_size = batch_size or self.batch_size 

707 return await _run_coros_in_chunks( 

708 [self._size(p) for p in paths], batch_size=batch_size 

709 ) 

710 

711 async def _exists(self, path, **kwargs): 

712 try: 

713 await self._info(path, **kwargs) 

714 return True 

715 except FileNotFoundError: 

716 return False 

717 

718 async def _info(self, path, **kwargs): 

719 raise NotImplementedError 

720 

721 async def _ls(self, path, detail=True, **kwargs): 

722 raise NotImplementedError 

723 

724 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): 

725 if maxdepth is not None and maxdepth < 1: 

726 raise ValueError("maxdepth must be at least 1") 

727 

728 path = self._strip_protocol(path) 

729 full_dirs = {} 

730 dirs = {} 

731 files = {} 

732 

733 detail = kwargs.pop("detail", False) 

734 try: 

735 listing = await self._ls(path, detail=True, **kwargs) 

736 except (FileNotFoundError, OSError) as e: 

737 if on_error == "raise": 

738 raise 

739 elif callable(on_error): 

740 on_error(e) 

741 if detail: 

742 yield path, {}, {} 

743 else: 

744 yield path, [], [] 

745 return 

746 

747 for info in listing: 

748 # each info name must be at least [path]/part , but here 

749 # we check also for names like [path]/part/ 

750 pathname = info["name"].rstrip("/") 

751 name = pathname.rsplit("/", 1)[-1] 

752 if info["type"] == "directory" and pathname != path: 

753 # do not include "self" path 

754 full_dirs[name] = pathname 

755 dirs[name] = info 

756 elif pathname == path: 

757 # file-like with same name as give path 

758 files[""] = info 

759 else: 

760 files[name] = info 

761 

762 if detail: 

763 yield path, dirs, files 

764 else: 

765 yield path, list(dirs), list(files) 

766 

767 if maxdepth is not None: 

768 maxdepth -= 1 

769 if maxdepth < 1: 

770 return 

771 

772 for d in dirs: 

773 async for _ in self._walk( 

774 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs 

775 ): 

776 yield _ 

777 

778 async def _glob(self, path, maxdepth=None, **kwargs): 

779 if maxdepth is not None and maxdepth < 1: 

780 raise ValueError("maxdepth must be at least 1") 

781 

782 import re 

783 

784 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) 

785 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash 

786 path = self._strip_protocol(path) 

787 append_slash_to_dirname = ends_with_sep or path.endswith( 

788 tuple(sep + "**" for sep in seps) 

789 ) 

790 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

791 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) 

792 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

793 

794 min_idx = min(idx_star, idx_qmark, idx_brace) 

795 

796 detail = kwargs.pop("detail", False) 

797 withdirs = kwargs.pop("withdirs", True) 

798 

799 if not has_magic(path): 

800 if await self._exists(path, **kwargs): 

801 if not detail: 

802 return [path] 

803 else: 

804 return {path: await self._info(path, **kwargs)} 

805 else: 

806 if not detail: 

807 return [] # glob of non-existent returns empty 

808 else: 

809 return {} 

810 elif "/" in path[:min_idx]: 

811 first_wildcard_idx = min_idx 

812 min_idx = path[:min_idx].rindex("/") 

813 root = path[ 

814 : min_idx + 1 

815 ] # everything up to the last / before the first wildcard 

816 prefix = path[ 

817 min_idx + 1 : first_wildcard_idx 

818 ] # stem between last "/" and first wildcard 

819 depth = path[min_idx + 1 :].count("/") + 1 

820 else: 

821 root = "" 

822 prefix = path[:min_idx] # stem up to the first wildcard 

823 depth = path[min_idx + 1 :].count("/") + 1 

824 

825 if "**" in path: 

826 if maxdepth is not None: 

827 idx_double_stars = path.find("**") 

828 depth_double_stars = path[idx_double_stars:].count("/") + 1 

829 depth = depth - depth_double_stars + maxdepth 

830 else: 

831 depth = None 

832 

833 # Pass the filename stem as prefix= so backends that support it such as 

834 # gcsfs, s3fs and adlfs can filter server-side up to the first wildcard. 

835 if prefix: 

836 kwargs["prefix"] = prefix 

837 allpaths = await self._find( 

838 root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs 

839 ) 

840 

841 pattern = glob_translate(path + ("/" if ends_with_sep else "")) 

842 pattern = re.compile(pattern) 

843 

844 out = { 

845 p: info 

846 for p, info in sorted(allpaths.items()) 

847 if pattern.match( 

848 p + "/" 

849 if append_slash_to_dirname and info["type"] == "directory" 

850 else p 

851 ) 

852 } 

853 

854 if detail: 

855 return out 

856 else: 

857 return list(out) 

858 

859 async def _du(self, path, total=True, maxdepth=None, **kwargs): 

860 sizes = {} 

861 # async for? 

862 for f in await self._find(path, maxdepth=maxdepth, **kwargs): 

863 info = await self._info(f) 

864 sizes[info["name"]] = info["size"] 

865 if total: 

866 return sum(sizes.values()) 

867 else: 

868 return sizes 

869 

870 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): 

871 path = self._strip_protocol(path) 

872 out = {} 

873 detail = kwargs.pop("detail", False) 

874 

875 # Add the root directory if withdirs is requested 

876 # This is needed for posix glob compliance 

877 if withdirs and path != "" and await self._isdir(path): 

878 out[path] = await self._info(path) 

879 

880 # async for? 

881 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): 

882 if withdirs: 

883 files.update(dirs) 

884 out.update({info["name"]: info for name, info in files.items()}) 

885 if not out and (await self._isfile(path)): 

886 # walk works on directories, but find should also return [path] 

887 # when path happens to be a file 

888 out[path] = {} 

889 names = sorted(out) 

890 if not detail: 

891 return names 

892 else: 

893 return {name: out[name] for name in names} 

894 

895 async def _expand_path( 

896 self, path, recursive=False, maxdepth=None, assume_literal=False 

897 ): 

898 if maxdepth is not None and maxdepth < 1: 

899 raise ValueError("maxdepth must be at least 1") 

900 

901 if isinstance(path, str): 

902 out = await self._expand_path([path], recursive, maxdepth) 

903 else: 

904 out = set() 

905 path = [self._strip_protocol(p) for p in path] 

906 for p in path: # can gather here 

907 if not assume_literal and has_magic(p): 

908 bit = set(await self._glob(p, maxdepth=maxdepth)) 

909 out |= bit 

910 if recursive: 

911 # glob call above expanded one depth so if maxdepth is defined 

912 # then decrement it in expand_path call below. If it is zero 

913 # after decrementing then avoid expand_path call. 

914 if maxdepth is not None and maxdepth <= 1: 

915 continue 

916 out |= set( 

917 await self._expand_path( 

918 list(bit), 

919 recursive=recursive, 

920 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

921 assume_literal=True, 

922 ) 

923 ) 

924 continue 

925 elif recursive: 

926 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) 

927 out |= rec 

928 if p not in out and (recursive is False or (await self._exists(p))): 

929 # should only check once, for the root 

930 out.add(p) 

931 if not out: 

932 raise FileNotFoundError(path) 

933 return sorted(out) 

934 

935 async def _mkdir(self, path, create_parents=True, **kwargs): 

936 pass # not necessary to implement, may not have directories 

937 

938 async def _makedirs(self, path, exist_ok=False): 

939 pass # not necessary to implement, may not have directories 

940 

941 async def open_async(self, path, mode="rb", **kwargs): 

942 if "b" not in mode or kwargs.get("compression"): 

943 raise ValueError 

944 raise NotImplementedError 

945 

946 

947def mirror_sync_methods(obj): 

948 """Populate sync and async methods for obj 

949 

950 For each method will create a sync version if the name refers to an async method 

951 (coroutine) and there is no override in the child class; will create an async 

952 method for the corresponding sync method if there is no implementation. 

953 

954 Uses the methods specified in 

955 - async_methods: the set that an implementation is expected to provide 

956 - default_async_methods: that can be derived from their sync version in 

957 AbstractFileSystem 

958 - AsyncFileSystem: async-specific default coroutines 

959 """ 

960 from fsspec import AbstractFileSystem 

961 

962 for method in async_methods + dir(AsyncFileSystem): 

963 if not method.startswith("_"): 

964 continue 

965 smethod = method[1:] 

966 if private.match(method): 

967 isco = inspect.iscoroutinefunction(getattr(obj, method, None)) 

968 unsync = getattr(getattr(obj, smethod, False), "__func__", None) 

969 is_default = unsync is getattr(AbstractFileSystem, smethod, "") 

970 if isco and is_default: 

971 mth = sync_wrapper(getattr(obj, method), obj=obj) 

972 setattr(obj, smethod, mth) 

973 if not mth.__doc__: 

974 mth.__doc__ = getattr( 

975 getattr(AbstractFileSystem, smethod, None), "__doc__", "" 

976 ) 

977 

978 

979class FSSpecCoroutineCancel(Exception): 

980 pass 

981 

982 

983def _dump_running_tasks( 

984 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False 

985): 

986 import traceback 

987 

988 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] 

989 if printout: 

990 [task.print_stack() for task in tasks] 

991 out = [ 

992 { 

993 "locals": task._coro.cr_frame.f_locals, 

994 "file": task._coro.cr_frame.f_code.co_filename, 

995 "firstline": task._coro.cr_frame.f_code.co_firstlineno, 

996 "linelo": task._coro.cr_frame.f_lineno, 

997 "stack": traceback.format_stack(task._coro.cr_frame), 

998 "task": task if with_task else None, 

999 } 

1000 for task in tasks 

1001 ] 

1002 if cancel: 

1003 for t in tasks: 

1004 cbs = t._callbacks 

1005 t.cancel() 

1006 asyncio.futures.Future.set_exception(t, exc) 

1007 asyncio.futures.Future.cancel(t) 

1008 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures 

1009 try: 

1010 t._coro.throw(exc) # exits coro, unless explicitly handled 

1011 except exc: 

1012 pass 

1013 return out 

1014 

1015 

1016class AbstractAsyncStreamedFile(AbstractBufferedFile): 

1017 # no read buffering, and always auto-commit 

1018 # TODO: readahead might still be useful here, but needs async version 

1019 

1020 async def read(self, length=-1): 

1021 """ 

1022 Return data from cache, or fetch pieces as necessary 

1023 

1024 Parameters 

1025 ---------- 

1026 length: int (-1) 

1027 Number of bytes to read; if <0, all remaining bytes. 

1028 """ 

1029 length = -1 if length is None else int(length) 

1030 if self.mode != "rb": 

1031 raise ValueError("File not in read mode") 

1032 if length < 0: 

1033 length = self.size - self.loc 

1034 if self.closed: 

1035 raise ValueError("I/O operation on closed file.") 

1036 if length == 0: 

1037 # don't even bother calling fetch 

1038 return b"" 

1039 out = await self._fetch_range(self.loc, self.loc + length) 

1040 self.loc += len(out) 

1041 return out 

1042 

1043 async def write(self, data): 

1044 """ 

1045 Write data to buffer. 

1046 

1047 Buffer only sent on flush() or if buffer is greater than 

1048 or equal to blocksize. 

1049 

1050 Parameters 

1051 ---------- 

1052 data: bytes 

1053 Set of bytes to be written. 

1054 """ 

1055 if self.mode not in {"wb", "ab"}: 

1056 raise ValueError("File not in write mode") 

1057 if self.closed: 

1058 raise ValueError("I/O operation on closed file.") 

1059 if self.forced: 

1060 raise ValueError("This file has been force-flushed, can only close") 

1061 out = self.buffer.write(data) 

1062 self.loc += out 

1063 if self.buffer.tell() >= self.blocksize: 

1064 await self.flush() 

1065 return out 

1066 

1067 async def close(self): 

1068 """Close file 

1069 

1070 Finalizes writes, discards cache 

1071 """ 

1072 if getattr(self, "_unclosable", False): 

1073 return 

1074 if self.closed: 

1075 return 

1076 if self.mode == "rb": 

1077 self.cache = None 

1078 else: 

1079 if not self.forced: 

1080 await self.flush(force=True) 

1081 

1082 if self.fs is not None: 

1083 self.fs.invalidate_cache(self.path) 

1084 self.fs.invalidate_cache(self.fs._parent(self.path)) 

1085 

1086 self.closed = True 

1087 

1088 async def flush(self, force=False): 

1089 if self.closed: 

1090 raise ValueError("Flush on closed file") 

1091 if force and self.forced: 

1092 raise ValueError("Force flush cannot be called more than once") 

1093 if force: 

1094 self.forced = True 

1095 

1096 if self.mode not in {"wb", "ab"}: 

1097 # no-op to flush on read-mode 

1098 return 

1099 

1100 if not force and self.buffer.tell() < self.blocksize: 

1101 # Defer write on small block 

1102 return 

1103 

1104 if self.offset is None: 

1105 # Initialize a multipart upload 

1106 self.offset = 0 

1107 try: 

1108 await self._initiate_upload() 

1109 except: 

1110 self.closed = True 

1111 raise 

1112 

1113 if await self._upload_chunk(final=force) is not False: 

1114 self.offset += self.buffer.seek(0, 2) 

1115 self.buffer = io.BytesIO() 

1116 

1117 async def __aenter__(self): 

1118 return self 

1119 

1120 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1121 await self.close() 

1122 

1123 async def _fetch_range(self, start, end): 

1124 raise NotImplementedError 

1125 

1126 async def _initiate_upload(self): 

1127 pass 

1128 

1129 async def _upload_chunk(self, final=False): 

1130 raise NotImplementedError