Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

587 statements  

1import asyncio 

2import asyncio.events 

3import functools 

4import inspect 

5import io 

6import numbers 

7import os 

8import re 

9import threading 

10from collections.abc import Iterable 

11from glob import has_magic 

12from typing import TYPE_CHECKING 

13 

14from .callbacks import DEFAULT_CALLBACK 

15from .exceptions import FSTimeoutError 

16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep 

17from .spec import AbstractBufferedFile, AbstractFileSystem 

18from .utils import glob_translate, is_exception, other_paths 

19 

20private = re.compile("_[^_]") 

21iothread = [None] # dedicated fsspec IO thread 

22loop = [None] # global event loop for any non-async instance 

23_lock = None # global lock placeholder 

24get_running_loop = asyncio.get_running_loop 

25 

26 

27def get_lock(): 

28 """Allocate or return a threading lock. 

29 

30 The lock is allocated on first use to allow setting one lock per forked process. 

31 """ 

32 global _lock 

33 if not _lock: 

34 _lock = threading.Lock() 

35 return _lock 

36 

37 

38def reset_lock(): 

39 """Reset the global lock. 

40 

41 This should be called only on the init of a forked process to reset the lock to 

42 None, enabling the new forked process to get a new lock. 

43 """ 

44 global _lock 

45 

46 iothread[0] = None 

47 loop[0] = None 

48 _lock = None 

49 

50 

51async def _runner(event, coro, result, timeout=None): 

52 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

53 if timeout is not None: 

54 coro = asyncio.wait_for(coro, timeout=timeout) 

55 try: 

56 result[0] = await coro 

57 except Exception as ex: 

58 result[0] = ex 

59 finally: 

60 event.set() 

61 

62 

63def sync(loop, func, *args, timeout=None, **kwargs): 

64 """ 

65 Make loop run coroutine until it returns. Runs in other thread 

66 

67 Examples 

68 -------- 

69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, 

70 timeout=timeout, **kwargs) 

71 """ 

72 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

73 # NB: if the loop is not running *yet*, it is OK to submit work 

74 # and we will wait for it 

75 if loop is None or loop.is_closed(): 

76 raise RuntimeError("Loop is not running") 

77 try: 

78 loop0 = asyncio.events.get_running_loop() 

79 if loop0 is loop: 

80 raise NotImplementedError("Calling sync() from within a running loop") 

81 except NotImplementedError: 

82 raise 

83 except RuntimeError: 

84 pass 

85 coro = func(*args, **kwargs) 

86 result = [None] 

87 event = threading.Event() 

88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 

89 while True: 

90 # this loops allows thread to get interrupted 

91 if event.wait(1): 

92 break 

93 if timeout is not None: 

94 timeout -= 1 

95 if timeout < 0: 

96 raise FSTimeoutError 

97 

98 return_result = result[0] 

99 if isinstance(return_result, asyncio.TimeoutError): 

100 # suppress asyncio.TimeoutError, raise FSTimeoutError 

101 raise FSTimeoutError from return_result 

102 elif isinstance(return_result, BaseException): 

103 raise return_result 

104 else: 

105 return return_result 

106 

107 

108def sync_wrapper(func, obj=None): 

109 """Given a function, make so can be called in blocking contexts 

110 

111 Leave obj=None if defining within a class. Pass the instance if attaching 

112 as an attribute of the instance. 

113 """ 

114 

115 @functools.wraps(func) 

116 def wrapper(*args, **kwargs): 

117 self = obj or args[0] 

118 return sync(self.loop, func, *args, **kwargs) 

119 

120 return wrapper 

121 

122 

123def get_loop(): 

124 """Create or return the default fsspec IO loop 

125 

126 The loop will be running on a separate thread. 

127 """ 

128 if loop[0] is None: 

129 with get_lock(): 

130 # repeat the check just in case the loop got filled between the 

131 # previous two calls from another thread 

132 if loop[0] is None: 

133 loop[0] = asyncio.new_event_loop() 

134 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") 

135 th.daemon = True 

136 th.start() 

137 iothread[0] = th 

138 return loop[0] 

139 

140 

141def reset_after_fork(): 

142 global lock 

143 loop[0] = None 

144 iothread[0] = None 

145 lock = None 

146 

147 

148if hasattr(os, "register_at_fork"): 

149 # should be posix; this will do nothing for spawn or forkserver subprocesses 

150 os.register_at_fork(after_in_child=reset_after_fork) 

151 

152 

153if TYPE_CHECKING: 

154 import resource 

155 

156 ResourceError = resource.error 

157else: 

158 try: 

159 import resource 

160 except ImportError: 

161 resource = None 

162 ResourceError = OSError 

163 else: 

164 ResourceError = getattr(resource, "error", OSError) 

165 

166_DEFAULT_BATCH_SIZE = 128 

167_NOFILES_DEFAULT_BATCH_SIZE = 1280 

168 

169 

170def _get_batch_size(nofiles=False): 

171 from fsspec.config import conf 

172 

173 if nofiles: 

174 if "nofiles_gather_batch_size" in conf: 

175 return conf["nofiles_gather_batch_size"] 

176 else: 

177 if "gather_batch_size" in conf: 

178 return conf["gather_batch_size"] 

179 if nofiles: 

180 return _NOFILES_DEFAULT_BATCH_SIZE 

181 if resource is None: 

182 return _DEFAULT_BATCH_SIZE 

183 

184 try: 

185 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) 

186 except (ImportError, ValueError, ResourceError): 

187 return _DEFAULT_BATCH_SIZE 

188 

189 if soft_limit == resource.RLIM_INFINITY: 

190 return -1 

191 else: 

192 return soft_limit // 8 

193 

194 

195def running_async() -> bool: 

196 """Being executed by an event loop?""" 

197 try: 

198 asyncio.get_running_loop() 

199 return True 

200 except RuntimeError: 

201 return False 

202 

203 

204async def _run_coros_in_chunks( 

205 coros, 

206 batch_size=None, 

207 callback=DEFAULT_CALLBACK, 

208 timeout=None, 

209 return_exceptions=False, 

210 nofiles=False, 

211): 

212 """Run the given coroutines in chunks. 

213 

214 Parameters 

215 ---------- 

216 coros: list of coroutines to run 

217 batch_size: int or None 

218 Number of coroutines to submit/wait on simultaneously. 

219 If -1, then it will not be any throttling. If 

220 None, it will be inferred from _get_batch_size() 

221 callback: fsspec.callbacks.Callback instance 

222 Gets a relative_update when each coroutine completes 

223 timeout: number or None 

224 If given, each coroutine times out after this time. Note that, since 

225 there are multiple batches, the total run time of this function will in 

226 general be longer 

227 return_exceptions: bool 

228 Same meaning as in asyncio.gather 

229 nofiles: bool 

230 If inferring the batch_size, does this operation involve local files? 

231 If yes, you normally expect smaller batches. 

232 """ 

233 

234 if batch_size is None: 

235 batch_size = _get_batch_size(nofiles=nofiles) 

236 

237 if batch_size == -1: 

238 batch_size = len(coros) 

239 

240 assert batch_size > 0 

241 

242 async def _run_coro(coro, i): 

243 try: 

244 return await asyncio.wait_for(coro, timeout=timeout), i 

245 except Exception as e: 

246 if not return_exceptions: 

247 raise 

248 return e, i 

249 finally: 

250 callback.relative_update(1) 

251 

252 i = 0 

253 n = len(coros) 

254 results = [None] * n 

255 pending = set() 

256 

257 while pending or i < n: 

258 while len(pending) < batch_size and i < n: 

259 pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) 

260 i += 1 

261 

262 if not pending: 

263 break 

264 

265 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) 

266 while done: 

267 result, k = await done.pop() 

268 results[k] = result 

269 

270 return results 

271 

272 

273# these methods should be implemented as async by any async-able backend 

274async_methods = [ 

275 "_ls", 

276 "_cat_file", 

277 "_get_file", 

278 "_put_file", 

279 "_rm_file", 

280 "_cp_file", 

281 "_pipe_file", 

282 "_expand_path", 

283 "_info", 

284 "_isfile", 

285 "_isdir", 

286 "_exists", 

287 "_walk", 

288 "_glob", 

289 "_find", 

290 "_du", 

291 "_size", 

292 "_mkdir", 

293 "_makedirs", 

294] 

295 

296 

297class AsyncFileSystem(AbstractFileSystem): 

298 """Async file operations, default implementations 

299 

300 Passes bulk operations to asyncio.gather for concurrent operation. 

301 

302 Implementations that have concurrent batch operations and/or async methods 

303 should inherit from this class instead of AbstractFileSystem. Docstrings are 

304 copied from the un-underscored method in AbstractFileSystem, if not given. 

305 """ 

306 

307 # note that methods do not have docstring here; they will be copied 

308 # for _* methods and inferred for overridden methods. 

309 

310 async_impl = True 

311 mirror_sync_methods = True 

312 disable_throttling = False 

313 

314 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): 

315 self.asynchronous = asynchronous 

316 self._pid = os.getpid() 

317 if not asynchronous: 

318 self._loop = loop or get_loop() 

319 else: 

320 self._loop = None 

321 self.batch_size = batch_size 

322 super().__init__(*args, **kwargs) 

323 

324 @property 

325 def loop(self): 

326 if self._pid != os.getpid(): 

327 raise RuntimeError("This class is not fork-safe") 

328 return self._loop 

329 

330 async def _rm_file(self, path, **kwargs): 

331 raise NotImplementedError 

332 

333 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 

334 # TODO: implement on_error 

335 batch_size = batch_size or self.batch_size 

336 path = await self._expand_path(path, recursive=recursive) 

337 return await _run_coros_in_chunks( 

338 [self._rm_file(p, **kwargs) for p in reversed(path)], 

339 batch_size=batch_size, 

340 nofiles=True, 

341 ) 

342 

343 async def _cp_file(self, path1, path2, **kwargs): 

344 raise NotImplementedError 

345 

346 async def _mv_file(self, path1, path2): 

347 await self._cp_file(path1, path2) 

348 await self._rm_file(path1) 

349 

350 async def _copy( 

351 self, 

352 path1, 

353 path2, 

354 recursive=False, 

355 on_error=None, 

356 maxdepth=None, 

357 batch_size=None, 

358 **kwargs, 

359 ): 

360 if on_error is None and recursive: 

361 on_error = "ignore" 

362 elif on_error is None: 

363 on_error = "raise" 

364 

365 if isinstance(path1, list) and isinstance(path2, list): 

366 # No need to expand paths when both source and destination 

367 # are provided as lists 

368 paths1 = path1 

369 paths2 = path2 

370 else: 

371 source_is_str = isinstance(path1, str) 

372 paths1 = await self._expand_path( 

373 path1, maxdepth=maxdepth, recursive=recursive 

374 ) 

375 if source_is_str and (not recursive or maxdepth is not None): 

376 # Non-recursive glob does not copy directories 

377 paths1 = [ 

378 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) 

379 ] 

380 if not paths1: 

381 return 

382 

383 source_is_file = len(paths1) == 1 

384 dest_is_dir = isinstance(path2, str) and ( 

385 trailing_sep(path2) or await self._isdir(path2) 

386 ) 

387 

388 exists = source_is_str and ( 

389 (has_magic(path1) and source_is_file) 

390 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) 

391 ) 

392 paths2 = other_paths( 

393 paths1, 

394 path2, 

395 exists=exists, 

396 flatten=not source_is_str, 

397 ) 

398 

399 batch_size = batch_size or self.batch_size 

400 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] 

401 result = await _run_coros_in_chunks( 

402 coros, batch_size=batch_size, return_exceptions=True, nofiles=True 

403 ) 

404 

405 for ex in filter(is_exception, result): 

406 if on_error == "ignore" and isinstance(ex, FileNotFoundError): 

407 continue 

408 raise ex 

409 

410 async def _pipe_file(self, path, value, mode="overwrite", **kwargs): 

411 raise NotImplementedError 

412 

413 async def _pipe(self, path, value=None, batch_size=None, **kwargs): 

414 if isinstance(path, str): 

415 path = {path: value} 

416 batch_size = batch_size or self.batch_size 

417 return await _run_coros_in_chunks( 

418 [self._pipe_file(k, v, **kwargs) for k, v in path.items()], 

419 batch_size=batch_size, 

420 nofiles=True, 

421 ) 

422 

423 async def _process_limits(self, url, start, end): 

424 """Helper for "Range"-based _cat_file""" 

425 size = None 

426 suff = False 

427 if start is not None and start < 0: 

428 # if start is negative and end None, end is the "suffix length" 

429 if end is None: 

430 end = -start 

431 start = "" 

432 suff = True 

433 else: 

434 size = size or (await self._info(url))["size"] 

435 start = size + start 

436 elif start is None: 

437 start = 0 

438 if not suff: 

439 if end is not None and end < 0: 

440 if start is not None: 

441 size = size or (await self._info(url))["size"] 

442 end = size + end 

443 elif end is None: 

444 end = "" 

445 if isinstance(end, numbers.Integral): 

446 end -= 1 # bytes range is inclusive 

447 return f"bytes={start}-{end}" 

448 

449 async def _cat_file(self, path, start=None, end=None, **kwargs): 

450 raise NotImplementedError 

451 

452 async def _cat( 

453 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs 

454 ): 

455 paths = await self._expand_path(path, recursive=recursive) 

456 coros = [self._cat_file(path, **kwargs) for path in paths] 

457 batch_size = batch_size or self.batch_size 

458 out = await _run_coros_in_chunks( 

459 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

460 ) 

461 if on_error == "raise": 

462 ex = next(filter(is_exception, out), False) 

463 if ex: 

464 raise ex 

465 if ( 

466 len(paths) > 1 

467 or isinstance(path, list) 

468 or paths[0] != self._strip_protocol(path) 

469 ): 

470 return { 

471 k: v 

472 for k, v in zip(paths, out) 

473 if on_error != "omit" or not is_exception(v) 

474 } 

475 else: 

476 return out[0] 

477 

478 async def _cat_ranges( 

479 self, 

480 paths, 

481 starts, 

482 ends, 

483 max_gap=None, 

484 batch_size=None, 

485 on_error="return", 

486 **kwargs, 

487 ): 

488 """Get the contents of byte ranges from one or more files 

489 

490 Parameters 

491 ---------- 

492 paths: list 

493 A list of of filepaths on this filesystems 

494 starts, ends: int or list 

495 Bytes limits of the read. If using a single int, the same value will be 

496 used to read all the specified files. 

497 """ 

498 # TODO: on_error 

499 if max_gap is not None: 

500 # use utils.merge_offset_ranges 

501 raise NotImplementedError 

502 if not isinstance(paths, list): 

503 raise TypeError 

504 if not isinstance(starts, Iterable): 

505 starts = [starts] * len(paths) 

506 if not isinstance(ends, Iterable): 

507 ends = [ends] * len(paths) 

508 if len(starts) != len(paths) or len(ends) != len(paths): 

509 raise ValueError 

510 coros = [ 

511 self._cat_file(p, start=s, end=e, **kwargs) 

512 for p, s, e in zip(paths, starts, ends) 

513 ] 

514 batch_size = batch_size or self.batch_size 

515 return await _run_coros_in_chunks( 

516 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

517 ) 

518 

519 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): 

520 raise NotImplementedError 

521 

522 async def _put( 

523 self, 

524 lpath, 

525 rpath, 

526 recursive=False, 

527 callback=DEFAULT_CALLBACK, 

528 batch_size=None, 

529 maxdepth=None, 

530 **kwargs, 

531 ): 

532 """Copy file(s) from local. 

533 

534 Copies a specific file or tree of files (if recursive=True). If rpath 

535 ends with a "/", it will be assumed to be a directory, and target files 

536 will go within. 

537 

538 The put_file method will be called concurrently on a batch of files. The 

539 batch_size option can configure the amount of futures that can be executed 

540 at the same time. If it is -1, then all the files will be uploaded concurrently. 

541 The default can be set for this instance by passing "batch_size" in the 

542 constructor, or for all instances by setting the "gather_batch_size" key 

543 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

544 """ 

545 if isinstance(lpath, list) and isinstance(rpath, list): 

546 # No need to expand paths when both source and destination 

547 # are provided as lists 

548 rpaths = rpath 

549 lpaths = lpath 

550 else: 

551 source_is_str = isinstance(lpath, str) 

552 if source_is_str: 

553 lpath = make_path_posix(lpath) 

554 fs = LocalFileSystem() 

555 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

556 if source_is_str and (not recursive or maxdepth is not None): 

557 # Non-recursive glob does not copy directories 

558 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

559 if not lpaths: 

560 return 

561 

562 source_is_file = len(lpaths) == 1 

563 dest_is_dir = isinstance(rpath, str) and ( 

564 trailing_sep(rpath) or await self._isdir(rpath) 

565 ) 

566 

567 rpath = self._strip_protocol(rpath) 

568 exists = source_is_str and ( 

569 (has_magic(lpath) and source_is_file) 

570 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) 

571 ) 

572 rpaths = other_paths( 

573 lpaths, 

574 rpath, 

575 exists=exists, 

576 flatten=not source_is_str, 

577 ) 

578 

579 is_dir = {l: os.path.isdir(l) for l in lpaths} 

580 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] 

581 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] 

582 

583 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) 

584 batch_size = batch_size or self.batch_size 

585 

586 coros = [] 

587 callback.set_size(len(file_pairs)) 

588 for lfile, rfile in file_pairs: 

589 put_file = callback.branch_coro(self._put_file) 

590 coros.append(put_file(lfile, rfile, **kwargs)) 

591 

592 return await _run_coros_in_chunks( 

593 coros, batch_size=batch_size, callback=callback 

594 ) 

595 

596 async def _get_file(self, rpath, lpath, **kwargs): 

597 raise NotImplementedError 

598 

599 async def _get( 

600 self, 

601 rpath, 

602 lpath, 

603 recursive=False, 

604 callback=DEFAULT_CALLBACK, 

605 maxdepth=None, 

606 **kwargs, 

607 ): 

608 """Copy file(s) to local. 

609 

610 Copies a specific file or tree of files (if recursive=True). If lpath 

611 ends with a "/", it will be assumed to be a directory, and target files 

612 will go within. Can submit a list of paths, which may be glob-patterns 

613 and will be expanded. 

614 

615 The get_file method will be called concurrently on a batch of files. The 

616 batch_size option can configure the amount of futures that can be executed 

617 at the same time. If it is -1, then all the files will be uploaded concurrently. 

618 The default can be set for this instance by passing "batch_size" in the 

619 constructor, or for all instances by setting the "gather_batch_size" key 

620 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

621 """ 

622 if isinstance(lpath, list) and isinstance(rpath, list): 

623 # No need to expand paths when both source and destination 

624 # are provided as lists 

625 rpaths = rpath 

626 lpaths = lpath 

627 else: 

628 source_is_str = isinstance(rpath, str) 

629 # First check for rpath trailing slash as _strip_protocol removes it. 

630 source_not_trailing_sep = source_is_str and not trailing_sep(rpath) 

631 rpath = self._strip_protocol(rpath) 

632 rpaths = await self._expand_path( 

633 rpath, recursive=recursive, maxdepth=maxdepth 

634 ) 

635 if source_is_str and (not recursive or maxdepth is not None): 

636 # Non-recursive glob does not copy directories 

637 rpaths = [ 

638 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) 

639 ] 

640 if not rpaths: 

641 return 

642 

643 lpath = make_path_posix(lpath) 

644 source_is_file = len(rpaths) == 1 

645 dest_is_dir = isinstance(lpath, str) and ( 

646 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

647 ) 

648 

649 exists = source_is_str and ( 

650 (has_magic(rpath) and source_is_file) 

651 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) 

652 ) 

653 lpaths = other_paths( 

654 rpaths, 

655 lpath, 

656 exists=exists, 

657 flatten=not source_is_str, 

658 ) 

659 

660 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] 

661 batch_size = kwargs.pop("batch_size", self.batch_size) 

662 

663 coros = [] 

664 callback.set_size(len(lpaths)) 

665 for lpath, rpath in zip(lpaths, rpaths): 

666 get_file = callback.branch_coro(self._get_file) 

667 coros.append(get_file(rpath, lpath, **kwargs)) 

668 return await _run_coros_in_chunks( 

669 coros, batch_size=batch_size, callback=callback 

670 ) 

671 

672 async def _isfile(self, path): 

673 try: 

674 return (await self._info(path))["type"] == "file" 

675 except: # noqa: E722 

676 return False 

677 

678 async def _isdir(self, path): 

679 try: 

680 return (await self._info(path))["type"] == "directory" 

681 except OSError: 

682 return False 

683 

684 async def _size(self, path): 

685 return (await self._info(path)).get("size", None) 

686 

687 async def _sizes(self, paths, batch_size=None): 

688 batch_size = batch_size or self.batch_size 

689 return await _run_coros_in_chunks( 

690 [self._size(p) for p in paths], batch_size=batch_size 

691 ) 

692 

693 async def _exists(self, path, **kwargs): 

694 try: 

695 await self._info(path, **kwargs) 

696 return True 

697 except FileNotFoundError: 

698 return False 

699 

700 async def _info(self, path, **kwargs): 

701 raise NotImplementedError 

702 

703 async def _ls(self, path, detail=True, **kwargs): 

704 raise NotImplementedError 

705 

706 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): 

707 if maxdepth is not None and maxdepth < 1: 

708 raise ValueError("maxdepth must be at least 1") 

709 

710 path = self._strip_protocol(path) 

711 full_dirs = {} 

712 dirs = {} 

713 files = {} 

714 

715 detail = kwargs.pop("detail", False) 

716 try: 

717 listing = await self._ls(path, detail=True, **kwargs) 

718 except (FileNotFoundError, OSError) as e: 

719 if on_error == "raise": 

720 raise 

721 elif callable(on_error): 

722 on_error(e) 

723 if detail: 

724 yield path, {}, {} 

725 else: 

726 yield path, [], [] 

727 return 

728 

729 for info in listing: 

730 # each info name must be at least [path]/part , but here 

731 # we check also for names like [path]/part/ 

732 pathname = info["name"].rstrip("/") 

733 name = pathname.rsplit("/", 1)[-1] 

734 if info["type"] == "directory" and pathname != path: 

735 # do not include "self" path 

736 full_dirs[name] = pathname 

737 dirs[name] = info 

738 elif pathname == path: 

739 # file-like with same name as give path 

740 files[""] = info 

741 else: 

742 files[name] = info 

743 

744 if detail: 

745 yield path, dirs, files 

746 else: 

747 yield path, list(dirs), list(files) 

748 

749 if maxdepth is not None: 

750 maxdepth -= 1 

751 if maxdepth < 1: 

752 return 

753 

754 for d in dirs: 

755 async for _ in self._walk( 

756 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs 

757 ): 

758 yield _ 

759 

760 async def _glob(self, path, maxdepth=None, **kwargs): 

761 if maxdepth is not None and maxdepth < 1: 

762 raise ValueError("maxdepth must be at least 1") 

763 

764 import re 

765 

766 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) 

767 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash 

768 path = self._strip_protocol(path) 

769 append_slash_to_dirname = ends_with_sep or path.endswith( 

770 tuple(sep + "**" for sep in seps) 

771 ) 

772 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

773 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) 

774 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

775 

776 min_idx = min(idx_star, idx_qmark, idx_brace) 

777 

778 detail = kwargs.pop("detail", False) 

779 

780 if not has_magic(path): 

781 if await self._exists(path, **kwargs): 

782 if not detail: 

783 return [path] 

784 else: 

785 return {path: await self._info(path, **kwargs)} 

786 else: 

787 if not detail: 

788 return [] # glob of non-existent returns empty 

789 else: 

790 return {} 

791 elif "/" in path[:min_idx]: 

792 min_idx = path[:min_idx].rindex("/") 

793 root = path[: min_idx + 1] 

794 depth = path[min_idx + 1 :].count("/") + 1 

795 else: 

796 root = "" 

797 depth = path[min_idx + 1 :].count("/") + 1 

798 

799 if "**" in path: 

800 if maxdepth is not None: 

801 idx_double_stars = path.find("**") 

802 depth_double_stars = path[idx_double_stars:].count("/") + 1 

803 depth = depth - depth_double_stars + maxdepth 

804 else: 

805 depth = None 

806 

807 allpaths = await self._find( 

808 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

809 ) 

810 

811 pattern = glob_translate(path + ("/" if ends_with_sep else "")) 

812 pattern = re.compile(pattern) 

813 

814 out = { 

815 p: info 

816 for p, info in sorted(allpaths.items()) 

817 if pattern.match( 

818 p + "/" 

819 if append_slash_to_dirname and info["type"] == "directory" 

820 else p 

821 ) 

822 } 

823 

824 if detail: 

825 return out 

826 else: 

827 return list(out) 

828 

829 async def _du(self, path, total=True, maxdepth=None, **kwargs): 

830 sizes = {} 

831 # async for? 

832 for f in await self._find(path, maxdepth=maxdepth, **kwargs): 

833 info = await self._info(f) 

834 sizes[info["name"]] = info["size"] 

835 if total: 

836 return sum(sizes.values()) 

837 else: 

838 return sizes 

839 

840 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): 

841 path = self._strip_protocol(path) 

842 out = {} 

843 detail = kwargs.pop("detail", False) 

844 

845 # Add the root directory if withdirs is requested 

846 # This is needed for posix glob compliance 

847 if withdirs and path != "" and await self._isdir(path): 

848 out[path] = await self._info(path) 

849 

850 # async for? 

851 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): 

852 if withdirs: 

853 files.update(dirs) 

854 out.update({info["name"]: info for name, info in files.items()}) 

855 if not out and (await self._isfile(path)): 

856 # walk works on directories, but find should also return [path] 

857 # when path happens to be a file 

858 out[path] = {} 

859 names = sorted(out) 

860 if not detail: 

861 return names 

862 else: 

863 return {name: out[name] for name in names} 

864 

865 async def _expand_path(self, path, recursive=False, maxdepth=None): 

866 if maxdepth is not None and maxdepth < 1: 

867 raise ValueError("maxdepth must be at least 1") 

868 

869 if isinstance(path, str): 

870 out = await self._expand_path([path], recursive, maxdepth) 

871 else: 

872 out = set() 

873 path = [self._strip_protocol(p) for p in path] 

874 for p in path: # can gather here 

875 if has_magic(p): 

876 bit = set(await self._glob(p, maxdepth=maxdepth)) 

877 out |= bit 

878 if recursive: 

879 # glob call above expanded one depth so if maxdepth is defined 

880 # then decrement it in expand_path call below. If it is zero 

881 # after decrementing then avoid expand_path call. 

882 if maxdepth is not None and maxdepth <= 1: 

883 continue 

884 out |= set( 

885 await self._expand_path( 

886 list(bit), 

887 recursive=recursive, 

888 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

889 ) 

890 ) 

891 continue 

892 elif recursive: 

893 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) 

894 out |= rec 

895 if p not in out and (recursive is False or (await self._exists(p))): 

896 # should only check once, for the root 

897 out.add(p) 

898 if not out: 

899 raise FileNotFoundError(path) 

900 return sorted(out) 

901 

902 async def _mkdir(self, path, create_parents=True, **kwargs): 

903 pass # not necessary to implement, may not have directories 

904 

905 async def _makedirs(self, path, exist_ok=False): 

906 pass # not necessary to implement, may not have directories 

907 

908 async def open_async(self, path, mode="rb", **kwargs): 

909 if "b" not in mode or kwargs.get("compression"): 

910 raise ValueError 

911 raise NotImplementedError 

912 

913 

914def mirror_sync_methods(obj): 

915 """Populate sync and async methods for obj 

916 

917 For each method will create a sync version if the name refers to an async method 

918 (coroutine) and there is no override in the child class; will create an async 

919 method for the corresponding sync method if there is no implementation. 

920 

921 Uses the methods specified in 

922 - async_methods: the set that an implementation is expected to provide 

923 - default_async_methods: that can be derived from their sync version in 

924 AbstractFileSystem 

925 - AsyncFileSystem: async-specific default coroutines 

926 """ 

927 from fsspec import AbstractFileSystem 

928 

929 for method in async_methods + dir(AsyncFileSystem): 

930 if not method.startswith("_"): 

931 continue 

932 smethod = method[1:] 

933 if private.match(method): 

934 isco = inspect.iscoroutinefunction(getattr(obj, method, None)) 

935 unsync = getattr(getattr(obj, smethod, False), "__func__", None) 

936 is_default = unsync is getattr(AbstractFileSystem, smethod, "") 

937 if isco and is_default: 

938 mth = sync_wrapper(getattr(obj, method), obj=obj) 

939 setattr(obj, smethod, mth) 

940 if not mth.__doc__: 

941 mth.__doc__ = getattr( 

942 getattr(AbstractFileSystem, smethod, None), "__doc__", "" 

943 ) 

944 

945 

946class FSSpecCoroutineCancel(Exception): 

947 pass 

948 

949 

950def _dump_running_tasks( 

951 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False 

952): 

953 import traceback 

954 

955 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] 

956 if printout: 

957 [task.print_stack() for task in tasks] 

958 out = [ 

959 { 

960 "locals": task._coro.cr_frame.f_locals, 

961 "file": task._coro.cr_frame.f_code.co_filename, 

962 "firstline": task._coro.cr_frame.f_code.co_firstlineno, 

963 "linelo": task._coro.cr_frame.f_lineno, 

964 "stack": traceback.format_stack(task._coro.cr_frame), 

965 "task": task if with_task else None, 

966 } 

967 for task in tasks 

968 ] 

969 if cancel: 

970 for t in tasks: 

971 cbs = t._callbacks 

972 t.cancel() 

973 asyncio.futures.Future.set_exception(t, exc) 

974 asyncio.futures.Future.cancel(t) 

975 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures 

976 try: 

977 t._coro.throw(exc) # exits coro, unless explicitly handled 

978 except exc: 

979 pass 

980 return out 

981 

982 

983class AbstractAsyncStreamedFile(AbstractBufferedFile): 

984 # no read buffering, and always auto-commit 

985 # TODO: readahead might still be useful here, but needs async version 

986 

987 async def read(self, length=-1): 

988 """ 

989 Return data from cache, or fetch pieces as necessary 

990 

991 Parameters 

992 ---------- 

993 length: int (-1) 

994 Number of bytes to read; if <0, all remaining bytes. 

995 """ 

996 length = -1 if length is None else int(length) 

997 if self.mode != "rb": 

998 raise ValueError("File not in read mode") 

999 if length < 0: 

1000 length = self.size - self.loc 

1001 if self.closed: 

1002 raise ValueError("I/O operation on closed file.") 

1003 if length == 0: 

1004 # don't even bother calling fetch 

1005 return b"" 

1006 out = await self._fetch_range(self.loc, self.loc + length) 

1007 self.loc += len(out) 

1008 return out 

1009 

1010 async def write(self, data): 

1011 """ 

1012 Write data to buffer. 

1013 

1014 Buffer only sent on flush() or if buffer is greater than 

1015 or equal to blocksize. 

1016 

1017 Parameters 

1018 ---------- 

1019 data: bytes 

1020 Set of bytes to be written. 

1021 """ 

1022 if self.mode not in {"wb", "ab"}: 

1023 raise ValueError("File not in write mode") 

1024 if self.closed: 

1025 raise ValueError("I/O operation on closed file.") 

1026 if self.forced: 

1027 raise ValueError("This file has been force-flushed, can only close") 

1028 out = self.buffer.write(data) 

1029 self.loc += out 

1030 if self.buffer.tell() >= self.blocksize: 

1031 await self.flush() 

1032 return out 

1033 

1034 async def close(self): 

1035 """Close file 

1036 

1037 Finalizes writes, discards cache 

1038 """ 

1039 if getattr(self, "_unclosable", False): 

1040 return 

1041 if self.closed: 

1042 return 

1043 if self.mode == "rb": 

1044 self.cache = None 

1045 else: 

1046 if not self.forced: 

1047 await self.flush(force=True) 

1048 

1049 if self.fs is not None: 

1050 self.fs.invalidate_cache(self.path) 

1051 self.fs.invalidate_cache(self.fs._parent(self.path)) 

1052 

1053 self.closed = True 

1054 

1055 async def flush(self, force=False): 

1056 if self.closed: 

1057 raise ValueError("Flush on closed file") 

1058 if force and self.forced: 

1059 raise ValueError("Force flush cannot be called more than once") 

1060 if force: 

1061 self.forced = True 

1062 

1063 if self.mode not in {"wb", "ab"}: 

1064 # no-op to flush on read-mode 

1065 return 

1066 

1067 if not force and self.buffer.tell() < self.blocksize: 

1068 # Defer write on small block 

1069 return 

1070 

1071 if self.offset is None: 

1072 # Initialize a multipart upload 

1073 self.offset = 0 

1074 try: 

1075 await self._initiate_upload() 

1076 except: 

1077 self.closed = True 

1078 raise 

1079 

1080 if await self._upload_chunk(final=force) is not False: 

1081 self.offset += self.buffer.seek(0, 2) 

1082 self.buffer = io.BytesIO() 

1083 

1084 async def __aenter__(self): 

1085 return self 

1086 

1087 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1088 await self.close() 

1089 

1090 async def _fetch_range(self, start, end): 

1091 raise NotImplementedError 

1092 

1093 async def _initiate_upload(self): 

1094 pass 

1095 

1096 async def _upload_chunk(self, final=False): 

1097 raise NotImplementedError