Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/asyn.py: 28%

570 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import asyncio 

2import asyncio.events 

3import functools 

4import inspect 

5import io 

6import numbers 

7import os 

8import re 

9import threading 

10from contextlib import contextmanager 

11from glob import has_magic 

12from typing import TYPE_CHECKING, Iterable 

13 

14from .callbacks import _DEFAULT_CALLBACK 

15from .exceptions import FSTimeoutError 

16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep 

17from .spec import AbstractBufferedFile, AbstractFileSystem 

18from .utils import glob_translate, is_exception, other_paths 

19 

20private = re.compile("_[^_]") 

21iothread = [None] # dedicated fsspec IO thread 

22loop = [None] # global event loop for any non-async instance 

23_lock = None # global lock placeholder 

24get_running_loop = asyncio.get_running_loop 

25 

26 

27def get_lock(): 

28 """Allocate or return a threading lock. 

29 

30 The lock is allocated on first use to allow setting one lock per forked process. 

31 """ 

32 global _lock 

33 if not _lock: 

34 _lock = threading.Lock() 

35 return _lock 

36 

37 

38def reset_lock(): 

39 """Reset the global lock. 

40 

41 This should be called only on the init of a forked process to reset the lock to 

42 None, enabling the new forked process to get a new lock. 

43 """ 

44 global _lock 

45 

46 iothread[0] = None 

47 loop[0] = None 

48 _lock = None 

49 

50 

51async def _runner(event, coro, result, timeout=None): 

52 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

53 if timeout is not None: 

54 coro = asyncio.wait_for(coro, timeout=timeout) 

55 try: 

56 result[0] = await coro 

57 except Exception as ex: 

58 result[0] = ex 

59 finally: 

60 event.set() 

61 

62 

63def sync(loop, func, *args, timeout=None, **kwargs): 

64 """ 

65 Make loop run coroutine until it returns. Runs in other thread 

66 

67 Examples 

68 -------- 

69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, 

70 timeout=timeout, **kwargs) 

71 """ 

72 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

73 # NB: if the loop is not running *yet*, it is OK to submit work 

74 # and we will wait for it 

75 if loop is None or loop.is_closed(): 

76 raise RuntimeError("Loop is not running") 

77 try: 

78 loop0 = asyncio.events.get_running_loop() 

79 if loop0 is loop: 

80 raise NotImplementedError("Calling sync() from within a running loop") 

81 except NotImplementedError: 

82 raise 

83 except RuntimeError: 

84 pass 

85 coro = func(*args, **kwargs) 

86 result = [None] 

87 event = threading.Event() 

88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 

89 while True: 

90 # this loops allows thread to get interrupted 

91 if event.wait(1): 

92 break 

93 if timeout is not None: 

94 timeout -= 1 

95 if timeout < 0: 

96 raise FSTimeoutError 

97 

98 return_result = result[0] 

99 if isinstance(return_result, asyncio.TimeoutError): 

100 # suppress asyncio.TimeoutError, raise FSTimeoutError 

101 raise FSTimeoutError from return_result 

102 elif isinstance(return_result, BaseException): 

103 raise return_result 

104 else: 

105 return return_result 

106 

107 

108def sync_wrapper(func, obj=None): 

109 """Given a function, make so can be called in blocking contexts 

110 

111 Leave obj=None if defining within a class. Pass the instance if attaching 

112 as an attribute of the instance. 

113 """ 

114 

115 @functools.wraps(func) 

116 def wrapper(*args, **kwargs): 

117 self = obj or args[0] 

118 return sync(self.loop, func, *args, **kwargs) 

119 

120 return wrapper 

121 

122 

123@contextmanager 

124def _selector_policy(): 

125 original_policy = asyncio.get_event_loop_policy() 

126 try: 

127 if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): 

128 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) 

129 

130 yield 

131 finally: 

132 asyncio.set_event_loop_policy(original_policy) 

133 

134 

135def get_loop(): 

136 """Create or return the default fsspec IO loop 

137 

138 The loop will be running on a separate thread. 

139 """ 

140 if loop[0] is None: 

141 with get_lock(): 

142 # repeat the check just in case the loop got filled between the 

143 # previous two calls from another thread 

144 if loop[0] is None: 

145 with _selector_policy(): 

146 loop[0] = asyncio.new_event_loop() 

147 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") 

148 th.daemon = True 

149 th.start() 

150 iothread[0] = th 

151 return loop[0] 

152 

153 

154if TYPE_CHECKING: 

155 import resource 

156 

157 ResourceError = resource.error 

158else: 

159 try: 

160 import resource 

161 except ImportError: 

162 resource = None 

163 ResourceError = OSError 

164 else: 

165 ResourceError = getattr(resource, "error", OSError) 

166 

167_DEFAULT_BATCH_SIZE = 128 

168_NOFILES_DEFAULT_BATCH_SIZE = 1280 

169 

170 

171def _get_batch_size(nofiles=False): 

172 from fsspec.config import conf 

173 

174 if nofiles: 

175 if "nofiles_gather_batch_size" in conf: 

176 return conf["nofiles_gather_batch_size"] 

177 else: 

178 if "gather_batch_size" in conf: 

179 return conf["gather_batch_size"] 

180 if nofiles: 

181 return _NOFILES_DEFAULT_BATCH_SIZE 

182 if resource is None: 

183 return _DEFAULT_BATCH_SIZE 

184 

185 try: 

186 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) 

187 except (ImportError, ValueError, ResourceError): 

188 return _DEFAULT_BATCH_SIZE 

189 

190 if soft_limit == resource.RLIM_INFINITY: 

191 return -1 

192 else: 

193 return soft_limit // 8 

194 

195 

196def running_async() -> bool: 

197 """Being executed by an event loop?""" 

198 try: 

199 asyncio.get_running_loop() 

200 return True 

201 except RuntimeError: 

202 return False 

203 

204 

205async def _run_coros_in_chunks( 

206 coros, 

207 batch_size=None, 

208 callback=_DEFAULT_CALLBACK, 

209 timeout=None, 

210 return_exceptions=False, 

211 nofiles=False, 

212): 

213 """Run the given coroutines in chunks. 

214 

215 Parameters 

216 ---------- 

217 coros: list of coroutines to run 

218 batch_size: int or None 

219 Number of coroutines to submit/wait on simultaneously. 

220 If -1, then it will not be any throttling. If 

221 None, it will be inferred from _get_batch_size() 

222 callback: fsspec.callbacks.Callback instance 

223 Gets a relative_update when each coroutine completes 

224 timeout: number or None 

225 If given, each coroutine times out after this time. Note that, since 

226 there are multiple batches, the total run time of this function will in 

227 general be longer 

228 return_exceptions: bool 

229 Same meaning as in asyncio.gather 

230 nofiles: bool 

231 If inferring the batch_size, does this operation involve local files? 

232 If yes, you normally expect smaller batches. 

233 """ 

234 

235 if batch_size is None: 

236 batch_size = _get_batch_size(nofiles=nofiles) 

237 

238 if batch_size == -1: 

239 batch_size = len(coros) 

240 

241 assert batch_size > 0 

242 results = [] 

243 for start in range(0, len(coros), batch_size): 

244 chunk = [ 

245 asyncio.Task(asyncio.wait_for(c, timeout=timeout)) 

246 for c in coros[start : start + batch_size] 

247 ] 

248 if callback is not _DEFAULT_CALLBACK: 

249 [ 

250 t.add_done_callback(lambda *_, **__: callback.relative_update(1)) 

251 for t in chunk 

252 ] 

253 results.extend( 

254 await asyncio.gather(*chunk, return_exceptions=return_exceptions), 

255 ) 

256 return results 

257 

258 

259# these methods should be implemented as async by any async-able backend 

260async_methods = [ 

261 "_ls", 

262 "_cat_file", 

263 "_get_file", 

264 "_put_file", 

265 "_rm_file", 

266 "_cp_file", 

267 "_pipe_file", 

268 "_expand_path", 

269 "_info", 

270 "_isfile", 

271 "_isdir", 

272 "_exists", 

273 "_walk", 

274 "_glob", 

275 "_find", 

276 "_du", 

277 "_size", 

278 "_mkdir", 

279 "_makedirs", 

280] 

281 

282 

283class AsyncFileSystem(AbstractFileSystem): 

284 """Async file operations, default implementations 

285 

286 Passes bulk operations to asyncio.gather for concurrent operation. 

287 

288 Implementations that have concurrent batch operations and/or async methods 

289 should inherit from this class instead of AbstractFileSystem. Docstrings are 

290 copied from the un-underscored method in AbstractFileSystem, if not given. 

291 """ 

292 

293 # note that methods do not have docstring here; they will be copied 

294 # for _* methods and inferred for overridden methods. 

295 

296 async_impl = True 

297 mirror_sync_methods = True 

298 disable_throttling = False 

299 

300 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): 

301 self.asynchronous = asynchronous 

302 self._pid = os.getpid() 

303 if not asynchronous: 

304 self._loop = loop or get_loop() 

305 else: 

306 self._loop = None 

307 self.batch_size = batch_size 

308 super().__init__(*args, **kwargs) 

309 

310 @property 

311 def loop(self): 

312 if self._pid != os.getpid(): 

313 raise RuntimeError("This class is not fork-safe") 

314 return self._loop 

315 

316 async def _rm_file(self, path, **kwargs): 

317 raise NotImplementedError 

318 

319 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 

320 # TODO: implement on_error 

321 batch_size = batch_size or self.batch_size 

322 path = await self._expand_path(path, recursive=recursive) 

323 return await _run_coros_in_chunks( 

324 [self._rm_file(p, **kwargs) for p in reversed(path)], 

325 batch_size=batch_size, 

326 nofiles=True, 

327 ) 

328 

329 async def _cp_file(self, path1, path2, **kwargs): 

330 raise NotImplementedError 

331 

332 async def _copy( 

333 self, 

334 path1, 

335 path2, 

336 recursive=False, 

337 on_error=None, 

338 maxdepth=None, 

339 batch_size=None, 

340 **kwargs, 

341 ): 

342 if on_error is None and recursive: 

343 on_error = "ignore" 

344 elif on_error is None: 

345 on_error = "raise" 

346 

347 if isinstance(path1, list) and isinstance(path2, list): 

348 # No need to expand paths when both source and destination 

349 # are provided as lists 

350 paths1 = path1 

351 paths2 = path2 

352 else: 

353 source_is_str = isinstance(path1, str) 

354 paths1 = await self._expand_path( 

355 path1, maxdepth=maxdepth, recursive=recursive 

356 ) 

357 if source_is_str and (not recursive or maxdepth is not None): 

358 # Non-recursive glob does not copy directories 

359 paths1 = [ 

360 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) 

361 ] 

362 if not paths1: 

363 return 

364 

365 source_is_file = len(paths1) == 1 

366 dest_is_dir = isinstance(path2, str) and ( 

367 trailing_sep(path2) or await self._isdir(path2) 

368 ) 

369 

370 exists = source_is_str and ( 

371 (has_magic(path1) and source_is_file) 

372 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) 

373 ) 

374 paths2 = other_paths( 

375 paths1, 

376 path2, 

377 exists=exists, 

378 flatten=not source_is_str, 

379 ) 

380 

381 batch_size = batch_size or self.batch_size 

382 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] 

383 result = await _run_coros_in_chunks( 

384 coros, batch_size=batch_size, return_exceptions=True, nofiles=True 

385 ) 

386 

387 for ex in filter(is_exception, result): 

388 if on_error == "ignore" and isinstance(ex, FileNotFoundError): 

389 continue 

390 raise ex 

391 

392 async def _pipe_file(self, path, value, **kwargs): 

393 raise NotImplementedError 

394 

395 async def _pipe(self, path, value=None, batch_size=None, **kwargs): 

396 if isinstance(path, str): 

397 path = {path: value} 

398 batch_size = batch_size or self.batch_size 

399 return await _run_coros_in_chunks( 

400 [self._pipe_file(k, v, **kwargs) for k, v in path.items()], 

401 batch_size=batch_size, 

402 nofiles=True, 

403 ) 

404 

405 async def _process_limits(self, url, start, end): 

406 """Helper for "Range"-based _cat_file""" 

407 size = None 

408 suff = False 

409 if start is not None and start < 0: 

410 # if start is negative and end None, end is the "suffix length" 

411 if end is None: 

412 end = -start 

413 start = "" 

414 suff = True 

415 else: 

416 size = size or (await self._info(url))["size"] 

417 start = size + start 

418 elif start is None: 

419 start = 0 

420 if not suff: 

421 if end is not None and end < 0: 

422 if start is not None: 

423 size = size or (await self._info(url))["size"] 

424 end = size + end 

425 elif end is None: 

426 end = "" 

427 if isinstance(end, numbers.Integral): 

428 end -= 1 # bytes range is inclusive 

429 return f"bytes={start}-{end}" 

430 

431 async def _cat_file(self, path, start=None, end=None, **kwargs): 

432 raise NotImplementedError 

433 

434 async def _cat( 

435 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs 

436 ): 

437 paths = await self._expand_path(path, recursive=recursive) 

438 coros = [self._cat_file(path, **kwargs) for path in paths] 

439 batch_size = batch_size or self.batch_size 

440 out = await _run_coros_in_chunks( 

441 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

442 ) 

443 if on_error == "raise": 

444 ex = next(filter(is_exception, out), False) 

445 if ex: 

446 raise ex 

447 if ( 

448 len(paths) > 1 

449 or isinstance(path, list) 

450 or paths[0] != self._strip_protocol(path) 

451 ): 

452 return { 

453 k: v 

454 for k, v in zip(paths, out) 

455 if on_error != "omit" or not is_exception(v) 

456 } 

457 else: 

458 return out[0] 

459 

460 async def _cat_ranges( 

461 self, 

462 paths, 

463 starts, 

464 ends, 

465 max_gap=None, 

466 batch_size=None, 

467 on_error="return", 

468 **kwargs, 

469 ): 

470 """Get the contents of byte ranges from one or more files 

471 

472 Parameters 

473 ---------- 

474 paths: list 

475 A list of of filepaths on this filesystems 

476 starts, ends: int or list 

477 Bytes limits of the read. If using a single int, the same value will be 

478 used to read all the specified files. 

479 """ 

480 # TODO: on_error 

481 if max_gap is not None: 

482 # use utils.merge_offset_ranges 

483 raise NotImplementedError 

484 if not isinstance(paths, list): 

485 raise TypeError 

486 if not isinstance(starts, Iterable): 

487 starts = [starts] * len(paths) 

488 if not isinstance(ends, Iterable): 

489 ends = [ends] * len(paths) 

490 if len(starts) != len(paths) or len(ends) != len(paths): 

491 raise ValueError 

492 coros = [ 

493 self._cat_file(p, start=s, end=e, **kwargs) 

494 for p, s, e in zip(paths, starts, ends) 

495 ] 

496 batch_size = batch_size or self.batch_size 

497 return await _run_coros_in_chunks( 

498 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

499 ) 

500 

501 async def _put_file(self, lpath, rpath, **kwargs): 

502 raise NotImplementedError 

503 

504 async def _put( 

505 self, 

506 lpath, 

507 rpath, 

508 recursive=False, 

509 callback=_DEFAULT_CALLBACK, 

510 batch_size=None, 

511 maxdepth=None, 

512 **kwargs, 

513 ): 

514 """Copy file(s) from local. 

515 

516 Copies a specific file or tree of files (if recursive=True). If rpath 

517 ends with a "/", it will be assumed to be a directory, and target files 

518 will go within. 

519 

520 The put_file method will be called concurrently on a batch of files. The 

521 batch_size option can configure the amount of futures that can be executed 

522 at the same time. If it is -1, then all the files will be uploaded concurrently. 

523 The default can be set for this instance by passing "batch_size" in the 

524 constructor, or for all instances by setting the "gather_batch_size" key 

525 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

526 """ 

527 if isinstance(lpath, list) and isinstance(rpath, list): 

528 # No need to expand paths when both source and destination 

529 # are provided as lists 

530 rpaths = rpath 

531 lpaths = lpath 

532 else: 

533 source_is_str = isinstance(lpath, str) 

534 if source_is_str: 

535 lpath = make_path_posix(lpath) 

536 fs = LocalFileSystem() 

537 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

538 if source_is_str and (not recursive or maxdepth is not None): 

539 # Non-recursive glob does not copy directories 

540 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

541 if not lpaths: 

542 return 

543 

544 source_is_file = len(lpaths) == 1 

545 dest_is_dir = isinstance(rpath, str) and ( 

546 trailing_sep(rpath) or await self._isdir(rpath) 

547 ) 

548 

549 rpath = self._strip_protocol(rpath) 

550 exists = source_is_str and ( 

551 (has_magic(lpath) and source_is_file) 

552 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) 

553 ) 

554 rpaths = other_paths( 

555 lpaths, 

556 rpath, 

557 exists=exists, 

558 flatten=not source_is_str, 

559 ) 

560 

561 is_dir = {l: os.path.isdir(l) for l in lpaths} 

562 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] 

563 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] 

564 

565 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) 

566 batch_size = batch_size or self.batch_size 

567 

568 coros = [] 

569 callback.set_size(len(file_pairs)) 

570 for lfile, rfile in file_pairs: 

571 callback.branch(lfile, rfile, kwargs) 

572 coros.append(self._put_file(lfile, rfile, **kwargs)) 

573 

574 return await _run_coros_in_chunks( 

575 coros, batch_size=batch_size, callback=callback 

576 ) 

577 

578 async def _get_file(self, rpath, lpath, **kwargs): 

579 raise NotImplementedError 

580 

581 async def _get( 

582 self, 

583 rpath, 

584 lpath, 

585 recursive=False, 

586 callback=_DEFAULT_CALLBACK, 

587 maxdepth=None, 

588 **kwargs, 

589 ): 

590 """Copy file(s) to local. 

591 

592 Copies a specific file or tree of files (if recursive=True). If lpath 

593 ends with a "/", it will be assumed to be a directory, and target files 

594 will go within. Can submit a list of paths, which may be glob-patterns 

595 and will be expanded. 

596 

597 The get_file method will be called concurrently on a batch of files. The 

598 batch_size option can configure the amount of futures that can be executed 

599 at the same time. If it is -1, then all the files will be uploaded concurrently. 

600 The default can be set for this instance by passing "batch_size" in the 

601 constructor, or for all instances by setting the "gather_batch_size" key 

602 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

603 """ 

604 if isinstance(lpath, list) and isinstance(rpath, list): 

605 # No need to expand paths when both source and destination 

606 # are provided as lists 

607 rpaths = rpath 

608 lpaths = lpath 

609 else: 

610 source_is_str = isinstance(rpath, str) 

611 # First check for rpath trailing slash as _strip_protocol removes it. 

612 source_not_trailing_sep = source_is_str and not trailing_sep(rpath) 

613 rpath = self._strip_protocol(rpath) 

614 rpaths = await self._expand_path( 

615 rpath, recursive=recursive, maxdepth=maxdepth 

616 ) 

617 if source_is_str and (not recursive or maxdepth is not None): 

618 # Non-recursive glob does not copy directories 

619 rpaths = [ 

620 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) 

621 ] 

622 if not rpaths: 

623 return 

624 

625 lpath = make_path_posix(lpath) 

626 source_is_file = len(rpaths) == 1 

627 dest_is_dir = isinstance(lpath, str) and ( 

628 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

629 ) 

630 

631 exists = source_is_str and ( 

632 (has_magic(rpath) and source_is_file) 

633 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) 

634 ) 

635 lpaths = other_paths( 

636 rpaths, 

637 lpath, 

638 exists=exists, 

639 flatten=not source_is_str, 

640 ) 

641 

642 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] 

643 batch_size = kwargs.pop("batch_size", self.batch_size) 

644 

645 coros = [] 

646 callback.set_size(len(lpaths)) 

647 for lpath, rpath in zip(lpaths, rpaths): 

648 callback.branch(rpath, lpath, kwargs) 

649 coros.append(self._get_file(rpath, lpath, **kwargs)) 

650 return await _run_coros_in_chunks( 

651 coros, batch_size=batch_size, callback=callback 

652 ) 

653 

654 async def _isfile(self, path): 

655 try: 

656 return (await self._info(path))["type"] == "file" 

657 except: # noqa: E722 

658 return False 

659 

660 async def _isdir(self, path): 

661 try: 

662 return (await self._info(path))["type"] == "directory" 

663 except OSError: 

664 return False 

665 

666 async def _size(self, path): 

667 return (await self._info(path)).get("size", None) 

668 

669 async def _sizes(self, paths, batch_size=None): 

670 batch_size = batch_size or self.batch_size 

671 return await _run_coros_in_chunks( 

672 [self._size(p) for p in paths], batch_size=batch_size 

673 ) 

674 

675 async def _exists(self, path, **kwargs): 

676 try: 

677 await self._info(path, **kwargs) 

678 return True 

679 except FileNotFoundError: 

680 return False 

681 

682 async def _info(self, path, **kwargs): 

683 raise NotImplementedError 

684 

685 async def _ls(self, path, detail=True, **kwargs): 

686 raise NotImplementedError 

687 

688 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): 

689 if maxdepth is not None and maxdepth < 1: 

690 raise ValueError("maxdepth must be at least 1") 

691 

692 path = self._strip_protocol(path) 

693 full_dirs = {} 

694 dirs = {} 

695 files = {} 

696 

697 detail = kwargs.pop("detail", False) 

698 try: 

699 listing = await self._ls(path, detail=True, **kwargs) 

700 except (FileNotFoundError, OSError) as e: 

701 if on_error == "raise": 

702 raise 

703 elif callable(on_error): 

704 on_error(e) 

705 if detail: 

706 yield path, {}, {} 

707 else: 

708 yield path, [], [] 

709 return 

710 

711 for info in listing: 

712 # each info name must be at least [path]/part , but here 

713 # we check also for names like [path]/part/ 

714 pathname = info["name"].rstrip("/") 

715 name = pathname.rsplit("/", 1)[-1] 

716 if info["type"] == "directory" and pathname != path: 

717 # do not include "self" path 

718 full_dirs[name] = pathname 

719 dirs[name] = info 

720 elif pathname == path: 

721 # file-like with same name as give path 

722 files[""] = info 

723 else: 

724 files[name] = info 

725 

726 if detail: 

727 yield path, dirs, files 

728 else: 

729 yield path, list(dirs), list(files) 

730 

731 if maxdepth is not None: 

732 maxdepth -= 1 

733 if maxdepth < 1: 

734 return 

735 

736 for d in dirs: 

737 async for _ in self._walk( 

738 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs 

739 ): 

740 yield _ 

741 

742 async def _glob(self, path, maxdepth=None, **kwargs): 

743 if maxdepth is not None and maxdepth < 1: 

744 raise ValueError("maxdepth must be at least 1") 

745 

746 import re 

747 

748 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) 

749 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash 

750 path = self._strip_protocol(path) 

751 append_slash_to_dirname = ends_with_sep or path.endswith( 

752 tuple(sep + "**" for sep in seps) 

753 ) 

754 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

755 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) 

756 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

757 

758 min_idx = min(idx_star, idx_qmark, idx_brace) 

759 

760 detail = kwargs.pop("detail", False) 

761 

762 if not has_magic(path): 

763 if await self._exists(path, **kwargs): 

764 if not detail: 

765 return [path] 

766 else: 

767 return {path: await self._info(path, **kwargs)} 

768 else: 

769 if not detail: 

770 return [] # glob of non-existent returns empty 

771 else: 

772 return {} 

773 elif "/" in path[:min_idx]: 

774 min_idx = path[:min_idx].rindex("/") 

775 root = path[: min_idx + 1] 

776 depth = path[min_idx + 1 :].count("/") + 1 

777 else: 

778 root = "" 

779 depth = path[min_idx + 1 :].count("/") + 1 

780 

781 if "**" in path: 

782 if maxdepth is not None: 

783 idx_double_stars = path.find("**") 

784 depth_double_stars = path[idx_double_stars:].count("/") + 1 

785 depth = depth - depth_double_stars + maxdepth 

786 else: 

787 depth = None 

788 

789 allpaths = await self._find( 

790 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

791 ) 

792 

793 pattern = glob_translate(path + ("/" if ends_with_sep else "")) 

794 pattern = re.compile(pattern) 

795 

796 out = { 

797 p: info 

798 for p, info in sorted(allpaths.items()) 

799 if pattern.match( 

800 ( 

801 p + "/" 

802 if append_slash_to_dirname and info["type"] == "directory" 

803 else p 

804 ) 

805 ) 

806 } 

807 

808 if detail: 

809 return out 

810 else: 

811 return list(out) 

812 

813 async def _du(self, path, total=True, maxdepth=None, **kwargs): 

814 sizes = {} 

815 # async for? 

816 for f in await self._find(path, maxdepth=maxdepth, **kwargs): 

817 info = await self._info(f) 

818 sizes[info["name"]] = info["size"] 

819 if total: 

820 return sum(sizes.values()) 

821 else: 

822 return sizes 

823 

824 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): 

825 path = self._strip_protocol(path) 

826 out = {} 

827 detail = kwargs.pop("detail", False) 

828 

829 # Add the root directory if withdirs is requested 

830 # This is needed for posix glob compliance 

831 if withdirs and path != "" and await self._isdir(path): 

832 out[path] = await self._info(path) 

833 

834 # async for? 

835 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): 

836 if withdirs: 

837 files.update(dirs) 

838 out.update({info["name"]: info for name, info in files.items()}) 

839 if not out and (await self._isfile(path)): 

840 # walk works on directories, but find should also return [path] 

841 # when path happens to be a file 

842 out[path] = {} 

843 names = sorted(out) 

844 if not detail: 

845 return names 

846 else: 

847 return {name: out[name] for name in names} 

848 

849 async def _expand_path(self, path, recursive=False, maxdepth=None): 

850 if maxdepth is not None and maxdepth < 1: 

851 raise ValueError("maxdepth must be at least 1") 

852 

853 if isinstance(path, str): 

854 out = await self._expand_path([path], recursive, maxdepth) 

855 else: 

856 out = set() 

857 path = [self._strip_protocol(p) for p in path] 

858 for p in path: # can gather here 

859 if has_magic(p): 

860 bit = set(await self._glob(p, maxdepth=maxdepth)) 

861 out |= bit 

862 if recursive: 

863 # glob call above expanded one depth so if maxdepth is defined 

864 # then decrement it in expand_path call below. If it is zero 

865 # after decrementing then avoid expand_path call. 

866 if maxdepth is not None and maxdepth <= 1: 

867 continue 

868 out |= set( 

869 await self._expand_path( 

870 list(bit), 

871 recursive=recursive, 

872 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

873 ) 

874 ) 

875 continue 

876 elif recursive: 

877 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) 

878 out |= rec 

879 if p not in out and (recursive is False or (await self._exists(p))): 

880 # should only check once, for the root 

881 out.add(p) 

882 if not out: 

883 raise FileNotFoundError(path) 

884 return sorted(out) 

885 

886 async def _mkdir(self, path, create_parents=True, **kwargs): 

887 pass # not necessary to implement, may not have directories 

888 

889 async def _makedirs(self, path, exist_ok=False): 

890 pass # not necessary to implement, may not have directories 

891 

892 async def open_async(self, path, mode="rb", **kwargs): 

893 if "b" not in mode or kwargs.get("compression"): 

894 raise ValueError 

895 raise NotImplementedError 

896 

897 

898def mirror_sync_methods(obj): 

899 """Populate sync and async methods for obj 

900 

901 For each method will create a sync version if the name refers to an async method 

902 (coroutine) and there is no override in the child class; will create an async 

903 method for the corresponding sync method if there is no implementation. 

904 

905 Uses the methods specified in 

906 - async_methods: the set that an implementation is expected to provide 

907 - default_async_methods: that can be derived from their sync version in 

908 AbstractFileSystem 

909 - AsyncFileSystem: async-specific default coroutines 

910 """ 

911 from fsspec import AbstractFileSystem 

912 

913 for method in async_methods + dir(AsyncFileSystem): 

914 if not method.startswith("_"): 

915 continue 

916 smethod = method[1:] 

917 if private.match(method): 

918 isco = inspect.iscoroutinefunction(getattr(obj, method, None)) 

919 unsync = getattr(getattr(obj, smethod, False), "__func__", None) 

920 is_default = unsync is getattr(AbstractFileSystem, smethod, "") 

921 if isco and is_default: 

922 mth = sync_wrapper(getattr(obj, method), obj=obj) 

923 setattr(obj, smethod, mth) 

924 if not mth.__doc__: 

925 mth.__doc__ = getattr( 

926 getattr(AbstractFileSystem, smethod, None), "__doc__", "" 

927 ) 

928 

929 

930class FSSpecCoroutineCancel(Exception): 

931 pass 

932 

933 

934def _dump_running_tasks( 

935 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False 

936): 

937 import traceback 

938 

939 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] 

940 if printout: 

941 [task.print_stack() for task in tasks] 

942 out = [ 

943 { 

944 "locals": task._coro.cr_frame.f_locals, 

945 "file": task._coro.cr_frame.f_code.co_filename, 

946 "firstline": task._coro.cr_frame.f_code.co_firstlineno, 

947 "linelo": task._coro.cr_frame.f_lineno, 

948 "stack": traceback.format_stack(task._coro.cr_frame), 

949 "task": task if with_task else None, 

950 } 

951 for task in tasks 

952 ] 

953 if cancel: 

954 for t in tasks: 

955 cbs = t._callbacks 

956 t.cancel() 

957 asyncio.futures.Future.set_exception(t, exc) 

958 asyncio.futures.Future.cancel(t) 

959 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures 

960 try: 

961 t._coro.throw(exc) # exits coro, unless explicitly handled 

962 except exc: 

963 pass 

964 return out 

965 

966 

967class AbstractAsyncStreamedFile(AbstractBufferedFile): 

968 # no read buffering, and always auto-commit 

969 # TODO: readahead might still be useful here, but needs async version 

970 

971 async def read(self, length=-1): 

972 """ 

973 Return data from cache, or fetch pieces as necessary 

974 

975 Parameters 

976 ---------- 

977 length: int (-1) 

978 Number of bytes to read; if <0, all remaining bytes. 

979 """ 

980 length = -1 if length is None else int(length) 

981 if self.mode != "rb": 

982 raise ValueError("File not in read mode") 

983 if length < 0: 

984 length = self.size - self.loc 

985 if self.closed: 

986 raise ValueError("I/O operation on closed file.") 

987 if length == 0: 

988 # don't even bother calling fetch 

989 return b"" 

990 out = await self._fetch_range(self.loc, self.loc + length) 

991 self.loc += len(out) 

992 return out 

993 

994 async def write(self, data): 

995 """ 

996 Write data to buffer. 

997 

998 Buffer only sent on flush() or if buffer is greater than 

999 or equal to blocksize. 

1000 

1001 Parameters 

1002 ---------- 

1003 data: bytes 

1004 Set of bytes to be written. 

1005 """ 

1006 if self.mode not in {"wb", "ab"}: 

1007 raise ValueError("File not in write mode") 

1008 if self.closed: 

1009 raise ValueError("I/O operation on closed file.") 

1010 if self.forced: 

1011 raise ValueError("This file has been force-flushed, can only close") 

1012 out = self.buffer.write(data) 

1013 self.loc += out 

1014 if self.buffer.tell() >= self.blocksize: 

1015 await self.flush() 

1016 return out 

1017 

1018 async def close(self): 

1019 """Close file 

1020 

1021 Finalizes writes, discards cache 

1022 """ 

1023 if getattr(self, "_unclosable", False): 

1024 return 

1025 if self.closed: 

1026 return 

1027 if self.mode == "rb": 

1028 self.cache = None 

1029 else: 

1030 if not self.forced: 

1031 await self.flush(force=True) 

1032 

1033 if self.fs is not None: 

1034 self.fs.invalidate_cache(self.path) 

1035 self.fs.invalidate_cache(self.fs._parent(self.path)) 

1036 

1037 self.closed = True 

1038 

1039 async def flush(self, force=False): 

1040 if self.closed: 

1041 raise ValueError("Flush on closed file") 

1042 if force and self.forced: 

1043 raise ValueError("Force flush cannot be called more than once") 

1044 if force: 

1045 self.forced = True 

1046 

1047 if self.mode not in {"wb", "ab"}: 

1048 # no-op to flush on read-mode 

1049 return 

1050 

1051 if not force and self.buffer.tell() < self.blocksize: 

1052 # Defer write on small block 

1053 return 

1054 

1055 if self.offset is None: 

1056 # Initialize a multipart upload 

1057 self.offset = 0 

1058 try: 

1059 await self._initiate_upload() 

1060 except: # noqa: E722 

1061 self.closed = True 

1062 raise 

1063 

1064 if await self._upload_chunk(final=force) is not False: 

1065 self.offset += self.buffer.seek(0, 2) 

1066 self.buffer = io.BytesIO() 

1067 

1068 async def __aenter__(self): 

1069 return self 

1070 

1071 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1072 await self.close() 

1073 

1074 async def _fetch_range(self, start, end): 

1075 raise NotImplementedError 

1076 

1077 async def _initiate_upload(self): 

1078 pass 

1079 

1080 async def _upload_chunk(self, final=False): 

1081 raise NotImplementedError