Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/asyn.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

590 statements  

1import asyncio 

2import asyncio.events 

3import functools 

4import inspect 

5import io 

6import numbers 

7import os 

8import re 

9import threading 

10from collections.abc import Iterable 

11from glob import has_magic 

12from typing import TYPE_CHECKING 

13 

14from .callbacks import DEFAULT_CALLBACK 

15from .exceptions import FSTimeoutError 

16from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep 

17from .spec import AbstractBufferedFile, AbstractFileSystem 

18from .utils import glob_translate, is_exception, other_paths 

19 

20private = re.compile("_[^_]") 

21iothread = [None] # dedicated fsspec IO thread 

22loop = [None] # global event loop for any non-async instance 

23_lock = None # global lock placeholder 

24get_running_loop = asyncio.get_running_loop 

25 

26 

27def get_lock(): 

28 """Allocate or return a threading lock. 

29 

30 The lock is allocated on first use to allow setting one lock per forked process. 

31 """ 

32 global _lock 

33 if not _lock: 

34 _lock = threading.Lock() 

35 return _lock 

36 

37 

38def reset_lock(): 

39 """Reset the global lock. 

40 

41 This should be called only on the init of a forked process to reset the lock to 

42 None, enabling the new forked process to get a new lock. 

43 """ 

44 global _lock 

45 

46 iothread[0] = None 

47 loop[0] = None 

48 _lock = None 

49 

50 

51async def _runner(event, coro, result, timeout=None): 

52 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

53 if timeout is not None: 

54 coro = asyncio.wait_for(coro, timeout=timeout) 

55 try: 

56 result[0] = await coro 

57 except Exception as ex: 

58 result[0] = ex 

59 finally: 

60 event.set() 

61 

62 

63def sync(loop, func, *args, timeout=None, **kwargs): 

64 """ 

65 Make loop run coroutine until it returns. Runs in other thread 

66 

67 Examples 

68 -------- 

69 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, 

70 timeout=timeout, **kwargs) 

71 """ 

72 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

73 # NB: if the loop is not running *yet*, it is OK to submit work 

74 # and we will wait for it 

75 if loop is None or loop.is_closed(): 

76 raise RuntimeError("Loop is not running") 

77 try: 

78 loop0 = asyncio.events.get_running_loop() 

79 if loop0 is loop: 

80 raise NotImplementedError("Calling sync() from within a running loop") 

81 except NotImplementedError: 

82 raise 

83 except RuntimeError: 

84 pass 

85 coro = func(*args, **kwargs) 

86 result = [None] 

87 event = threading.Event() 

88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 

89 while True: 

90 # this loops allows thread to get interrupted 

91 if event.wait(1): 

92 break 

93 if timeout is not None: 

94 timeout -= 1 

95 if timeout < 0: 

96 raise FSTimeoutError 

97 

98 return_result = result[0] 

99 if isinstance(return_result, asyncio.TimeoutError): 

100 # suppress asyncio.TimeoutError, raise FSTimeoutError 

101 raise FSTimeoutError from return_result 

102 elif isinstance(return_result, BaseException): 

103 raise return_result 

104 else: 

105 return return_result 

106 

107 

108def sync_wrapper(func, obj=None): 

109 """Given a function, make so can be called in blocking contexts 

110 

111 Leave obj=None if defining within a class. Pass the instance if attaching 

112 as an attribute of the instance. 

113 """ 

114 

115 @functools.wraps(func) 

116 def wrapper(*args, **kwargs): 

117 self = obj or args[0] 

118 return sync(self.loop, func, *args, **kwargs) 

119 

120 return wrapper 

121 

122 

123def get_loop(): 

124 """Create or return the default fsspec IO loop 

125 

126 The loop will be running on a separate thread. 

127 """ 

128 if loop[0] is None: 

129 with get_lock(): 

130 # repeat the check just in case the loop got filled between the 

131 # previous two calls from another thread 

132 if loop[0] is None: 

133 loop[0] = asyncio.new_event_loop() 

134 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") 

135 th.daemon = True 

136 th.start() 

137 iothread[0] = th 

138 return loop[0] 

139 

140 

141def reset_after_fork(): 

142 global lock 

143 loop[0] = None 

144 iothread[0] = None 

145 lock = None 

146 

147 

148if hasattr(os, "register_at_fork"): 

149 # should be posix; this will do nothing for spawn or forkserver subprocesses 

150 os.register_at_fork(after_in_child=reset_after_fork) 

151 

152 

153if TYPE_CHECKING: 

154 import resource 

155 

156 ResourceError = resource.error 

157else: 

158 try: 

159 import resource 

160 except ImportError: 

161 resource = None 

162 ResourceError = OSError 

163 else: 

164 ResourceError = getattr(resource, "error", OSError) 

165 

166_DEFAULT_BATCH_SIZE = 128 

167_NOFILES_DEFAULT_BATCH_SIZE = 1280 

168 

169 

170def _get_batch_size(nofiles=False): 

171 from fsspec.config import conf 

172 

173 if nofiles: 

174 if "nofiles_gather_batch_size" in conf: 

175 return conf["nofiles_gather_batch_size"] 

176 else: 

177 if "gather_batch_size" in conf: 

178 return conf["gather_batch_size"] 

179 if nofiles: 

180 return _NOFILES_DEFAULT_BATCH_SIZE 

181 if resource is None: 

182 return _DEFAULT_BATCH_SIZE 

183 

184 try: 

185 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) 

186 except (ImportError, ValueError, ResourceError): 

187 return _DEFAULT_BATCH_SIZE 

188 

189 if soft_limit == resource.RLIM_INFINITY: 

190 return -1 

191 else: 

192 return soft_limit // 8 

193 

194 

195def running_async() -> bool: 

196 """Being executed by an event loop?""" 

197 try: 

198 asyncio.get_running_loop() 

199 return True 

200 except RuntimeError: 

201 return False 

202 

203 

204async def _run_coros_in_chunks( 

205 coros, 

206 batch_size=None, 

207 callback=DEFAULT_CALLBACK, 

208 timeout=None, 

209 return_exceptions=False, 

210 nofiles=False, 

211): 

212 """Run the given coroutines in chunks. 

213 

214 Parameters 

215 ---------- 

216 coros: list of coroutines to run 

217 batch_size: int or None 

218 Number of coroutines to submit/wait on simultaneously. 

219 If -1, then it will not be any throttling. If 

220 None, it will be inferred from _get_batch_size() 

221 callback: fsspec.callbacks.Callback instance 

222 Gets a relative_update when each coroutine completes 

223 timeout: number or None 

224 If given, each coroutine times out after this time. Note that, since 

225 there are multiple batches, the total run time of this function will in 

226 general be longer 

227 return_exceptions: bool 

228 Same meaning as in asyncio.gather 

229 nofiles: bool 

230 If inferring the batch_size, does this operation involve local files? 

231 If yes, you normally expect smaller batches. 

232 """ 

233 

234 if batch_size is None: 

235 batch_size = _get_batch_size(nofiles=nofiles) 

236 

237 if batch_size == -1: 

238 batch_size = len(coros) 

239 

240 assert batch_size > 0 

241 

242 async def _run_coro(coro, i): 

243 try: 

244 return await asyncio.wait_for(coro, timeout=timeout), i 

245 except Exception as e: 

246 if not return_exceptions: 

247 raise 

248 return e, i 

249 finally: 

250 callback.relative_update(1) 

251 

252 i = 0 

253 n = len(coros) 

254 results = [None] * n 

255 pending = set() 

256 

257 while pending or i < n: 

258 while len(pending) < batch_size and i < n: 

259 pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) 

260 i += 1 

261 

262 if not pending: 

263 break 

264 

265 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) 

266 while done: 

267 result, k = await done.pop() 

268 results[k] = result 

269 

270 return results 

271 

272 

273# these methods should be implemented as async by any async-able backend 

274async_methods = [ 

275 "_ls", 

276 "_cat_file", 

277 "_get_file", 

278 "_put_file", 

279 "_rm_file", 

280 "_cp_file", 

281 "_pipe_file", 

282 "_expand_path", 

283 "_info", 

284 "_isfile", 

285 "_isdir", 

286 "_exists", 

287 "_walk", 

288 "_glob", 

289 "_find", 

290 "_du", 

291 "_size", 

292 "_mkdir", 

293 "_makedirs", 

294] 

295 

296 

297class AsyncFileSystem(AbstractFileSystem): 

298 """Async file operations, default implementations 

299 

300 Passes bulk operations to asyncio.gather for concurrent operation. 

301 

302 Implementations that have concurrent batch operations and/or async methods 

303 should inherit from this class instead of AbstractFileSystem. Docstrings are 

304 copied from the un-underscored method in AbstractFileSystem, if not given. 

305 """ 

306 

307 # note that methods do not have docstring here; they will be copied 

308 # for _* methods and inferred for overridden methods. 

309 

310 async_impl = True 

311 mirror_sync_methods = True 

312 disable_throttling = False 

313 

314 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): 

315 self.asynchronous = asynchronous 

316 self._pid = os.getpid() 

317 if not asynchronous: 

318 self._loop = loop or get_loop() 

319 else: 

320 self._loop = None 

321 self.batch_size = batch_size 

322 super().__init__(*args, **kwargs) 

323 

324 @property 

325 def loop(self): 

326 if self._pid != os.getpid(): 

327 raise RuntimeError("This class is not fork-safe") 

328 return self._loop 

329 

330 async def _rm_file(self, path, **kwargs): 

331 if ( 

332 inspect.iscoroutinefunction(self._rm) 

333 and type(self)._rm is not AsyncFileSystem._rm 

334 ): 

335 return await self._rm(path, recursive=False, batch_size=1, **kwargs) 

336 raise NotImplementedError 

337 

338 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 

339 # TODO: implement on_error 

340 batch_size = batch_size or self.batch_size 

341 path = await self._expand_path(path, recursive=recursive) 

342 return await _run_coros_in_chunks( 

343 [self._rm_file(p, **kwargs) for p in reversed(path)], 

344 batch_size=batch_size, 

345 nofiles=True, 

346 ) 

347 

348 async def _cp_file(self, path1, path2, **kwargs): 

349 raise NotImplementedError 

350 

351 async def _mv_file(self, path1, path2): 

352 await self._cp_file(path1, path2) 

353 await self._rm_file(path1) 

354 

355 async def _copy( 

356 self, 

357 path1, 

358 path2, 

359 recursive=False, 

360 on_error=None, 

361 maxdepth=None, 

362 batch_size=None, 

363 **kwargs, 

364 ): 

365 if on_error is None and recursive: 

366 on_error = "ignore" 

367 elif on_error is None: 

368 on_error = "raise" 

369 

370 if isinstance(path1, list) and isinstance(path2, list): 

371 # No need to expand paths when both source and destination 

372 # are provided as lists 

373 paths1 = path1 

374 paths2 = path2 

375 else: 

376 source_is_str = isinstance(path1, str) 

377 paths1 = await self._expand_path( 

378 path1, maxdepth=maxdepth, recursive=recursive 

379 ) 

380 if source_is_str and (not recursive or maxdepth is not None): 

381 # Non-recursive glob does not copy directories 

382 paths1 = [ 

383 p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) 

384 ] 

385 if not paths1: 

386 return 

387 

388 source_is_file = len(paths1) == 1 

389 dest_is_dir = isinstance(path2, str) and ( 

390 trailing_sep(path2) or await self._isdir(path2) 

391 ) 

392 

393 exists = source_is_str and ( 

394 (has_magic(path1) and source_is_file) 

395 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) 

396 ) 

397 paths2 = other_paths( 

398 paths1, 

399 path2, 

400 exists=exists, 

401 flatten=not source_is_str, 

402 ) 

403 

404 batch_size = batch_size or self.batch_size 

405 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] 

406 result = await _run_coros_in_chunks( 

407 coros, batch_size=batch_size, return_exceptions=True, nofiles=True 

408 ) 

409 

410 for ex in filter(is_exception, result): 

411 if on_error == "ignore" and isinstance(ex, FileNotFoundError): 

412 continue 

413 raise ex 

414 

415 async def _pipe_file(self, path, value, mode="overwrite", **kwargs): 

416 raise NotImplementedError 

417 

418 async def _pipe(self, path, value=None, batch_size=None, **kwargs): 

419 if isinstance(path, str): 

420 path = {path: value} 

421 batch_size = batch_size or self.batch_size 

422 return await _run_coros_in_chunks( 

423 [self._pipe_file(k, v, **kwargs) for k, v in path.items()], 

424 batch_size=batch_size, 

425 nofiles=True, 

426 ) 

427 

428 async def _process_limits(self, url, start, end): 

429 """Helper for "Range"-based _cat_file""" 

430 size = None 

431 suff = False 

432 if start is not None and start < 0: 

433 # if start is negative and end None, end is the "suffix length" 

434 if end is None: 

435 end = -start 

436 start = "" 

437 suff = True 

438 else: 

439 size = size or (await self._info(url))["size"] 

440 start = size + start 

441 elif start is None: 

442 start = 0 

443 if not suff: 

444 if end is not None and end < 0: 

445 if start is not None: 

446 size = size or (await self._info(url))["size"] 

447 end = size + end 

448 elif end is None: 

449 end = "" 

450 if isinstance(end, numbers.Integral): 

451 end -= 1 # bytes range is inclusive 

452 return f"bytes={start}-{end}" 

453 

454 async def _cat_file(self, path, start=None, end=None, **kwargs): 

455 raise NotImplementedError 

456 

457 async def _cat( 

458 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs 

459 ): 

460 paths = await self._expand_path(path, recursive=recursive) 

461 coros = [self._cat_file(path, **kwargs) for path in paths] 

462 batch_size = batch_size or self.batch_size 

463 out = await _run_coros_in_chunks( 

464 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

465 ) 

466 if on_error == "raise": 

467 ex = next(filter(is_exception, out), False) 

468 if ex: 

469 raise ex 

470 if ( 

471 len(paths) > 1 

472 or isinstance(path, list) 

473 or paths[0] != self._strip_protocol(path) 

474 ): 

475 return { 

476 k: v 

477 for k, v in zip(paths, out) 

478 if on_error != "omit" or not is_exception(v) 

479 } 

480 else: 

481 return out[0] 

482 

483 async def _cat_ranges( 

484 self, 

485 paths, 

486 starts, 

487 ends, 

488 max_gap=None, 

489 batch_size=None, 

490 on_error="return", 

491 **kwargs, 

492 ): 

493 """Get the contents of byte ranges from one or more files 

494 

495 Parameters 

496 ---------- 

497 paths: list 

498 A list of of filepaths on this filesystems 

499 starts, ends: int or list 

500 Bytes limits of the read. If using a single int, the same value will be 

501 used to read all the specified files. 

502 """ 

503 # TODO: on_error 

504 if max_gap is not None: 

505 # use utils.merge_offset_ranges 

506 raise NotImplementedError 

507 if not isinstance(paths, list): 

508 raise TypeError 

509 if not isinstance(starts, Iterable): 

510 starts = [starts] * len(paths) 

511 if not isinstance(ends, Iterable): 

512 ends = [ends] * len(paths) 

513 if len(starts) != len(paths) or len(ends) != len(paths): 

514 raise ValueError 

515 coros = [ 

516 self._cat_file(p, start=s, end=e, **kwargs) 

517 for p, s, e in zip(paths, starts, ends) 

518 ] 

519 batch_size = batch_size or self.batch_size 

520 return await _run_coros_in_chunks( 

521 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

522 ) 

523 

524 async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): 

525 raise NotImplementedError 

526 

527 async def _put( 

528 self, 

529 lpath, 

530 rpath, 

531 recursive=False, 

532 callback=DEFAULT_CALLBACK, 

533 batch_size=None, 

534 maxdepth=None, 

535 **kwargs, 

536 ): 

537 """Copy file(s) from local. 

538 

539 Copies a specific file or tree of files (if recursive=True). If rpath 

540 ends with a "/", it will be assumed to be a directory, and target files 

541 will go within. 

542 

543 The put_file method will be called concurrently on a batch of files. The 

544 batch_size option can configure the amount of futures that can be executed 

545 at the same time. If it is -1, then all the files will be uploaded concurrently. 

546 The default can be set for this instance by passing "batch_size" in the 

547 constructor, or for all instances by setting the "gather_batch_size" key 

548 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

549 """ 

550 if isinstance(lpath, list) and isinstance(rpath, list): 

551 # No need to expand paths when both source and destination 

552 # are provided as lists 

553 rpaths = rpath 

554 lpaths = lpath 

555 else: 

556 source_is_str = isinstance(lpath, str) 

557 if source_is_str: 

558 lpath = make_path_posix(lpath) 

559 fs = LocalFileSystem() 

560 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

561 if source_is_str and (not recursive or maxdepth is not None): 

562 # Non-recursive glob does not copy directories 

563 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

564 if not lpaths: 

565 return 

566 

567 source_is_file = len(lpaths) == 1 

568 dest_is_dir = isinstance(rpath, str) and ( 

569 trailing_sep(rpath) or await self._isdir(rpath) 

570 ) 

571 

572 rpath = self._strip_protocol(rpath) 

573 exists = source_is_str and ( 

574 (has_magic(lpath) and source_is_file) 

575 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) 

576 ) 

577 rpaths = other_paths( 

578 lpaths, 

579 rpath, 

580 exists=exists, 

581 flatten=not source_is_str, 

582 ) 

583 

584 is_dir = {l: os.path.isdir(l) for l in lpaths} 

585 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] 

586 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] 

587 

588 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) 

589 batch_size = batch_size or self.batch_size 

590 

591 coros = [] 

592 callback.set_size(len(file_pairs)) 

593 for lfile, rfile in file_pairs: 

594 put_file = callback.branch_coro(self._put_file) 

595 coros.append(put_file(lfile, rfile, **kwargs)) 

596 

597 return await _run_coros_in_chunks( 

598 coros, batch_size=batch_size, callback=callback 

599 ) 

600 

601 async def _get_file(self, rpath, lpath, **kwargs): 

602 raise NotImplementedError 

603 

604 async def _get( 

605 self, 

606 rpath, 

607 lpath, 

608 recursive=False, 

609 callback=DEFAULT_CALLBACK, 

610 maxdepth=None, 

611 **kwargs, 

612 ): 

613 """Copy file(s) to local. 

614 

615 Copies a specific file or tree of files (if recursive=True). If lpath 

616 ends with a "/", it will be assumed to be a directory, and target files 

617 will go within. Can submit a list of paths, which may be glob-patterns 

618 and will be expanded. 

619 

620 The get_file method will be called concurrently on a batch of files. The 

621 batch_size option can configure the amount of futures that can be executed 

622 at the same time. If it is -1, then all the files will be uploaded concurrently. 

623 The default can be set for this instance by passing "batch_size" in the 

624 constructor, or for all instances by setting the "gather_batch_size" key 

625 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

626 """ 

627 if isinstance(lpath, list) and isinstance(rpath, list): 

628 # No need to expand paths when both source and destination 

629 # are provided as lists 

630 rpaths = rpath 

631 lpaths = lpath 

632 else: 

633 source_is_str = isinstance(rpath, str) 

634 # First check for rpath trailing slash as _strip_protocol removes it. 

635 source_not_trailing_sep = source_is_str and not trailing_sep(rpath) 

636 rpath = self._strip_protocol(rpath) 

637 rpaths = await self._expand_path( 

638 rpath, recursive=recursive, maxdepth=maxdepth 

639 ) 

640 if source_is_str and (not recursive or maxdepth is not None): 

641 # Non-recursive glob does not copy directories 

642 rpaths = [ 

643 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) 

644 ] 

645 if not rpaths: 

646 return 

647 

648 lpath = make_path_posix(lpath) 

649 source_is_file = len(rpaths) == 1 

650 dest_is_dir = isinstance(lpath, str) and ( 

651 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

652 ) 

653 

654 exists = source_is_str and ( 

655 (has_magic(rpath) and source_is_file) 

656 or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) 

657 ) 

658 lpaths = other_paths( 

659 rpaths, 

660 lpath, 

661 exists=exists, 

662 flatten=not source_is_str, 

663 ) 

664 

665 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] 

666 batch_size = kwargs.pop("batch_size", self.batch_size) 

667 

668 coros = [] 

669 callback.set_size(len(lpaths)) 

670 for lpath, rpath in zip(lpaths, rpaths): 

671 get_file = callback.branch_coro(self._get_file) 

672 coros.append(get_file(rpath, lpath, **kwargs)) 

673 return await _run_coros_in_chunks( 

674 coros, batch_size=batch_size, callback=callback 

675 ) 

676 

677 async def _isfile(self, path): 

678 try: 

679 return (await self._info(path))["type"] == "file" 

680 except: # noqa: E722 

681 return False 

682 

683 async def _isdir(self, path): 

684 try: 

685 return (await self._info(path))["type"] == "directory" 

686 except OSError: 

687 return False 

688 

689 async def _size(self, path): 

690 return (await self._info(path)).get("size", None) 

691 

692 async def _sizes(self, paths, batch_size=None): 

693 batch_size = batch_size or self.batch_size 

694 return await _run_coros_in_chunks( 

695 [self._size(p) for p in paths], batch_size=batch_size 

696 ) 

697 

698 async def _exists(self, path, **kwargs): 

699 try: 

700 await self._info(path, **kwargs) 

701 return True 

702 except FileNotFoundError: 

703 return False 

704 

705 async def _info(self, path, **kwargs): 

706 raise NotImplementedError 

707 

708 async def _ls(self, path, detail=True, **kwargs): 

709 raise NotImplementedError 

710 

711 async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): 

712 if maxdepth is not None and maxdepth < 1: 

713 raise ValueError("maxdepth must be at least 1") 

714 

715 path = self._strip_protocol(path) 

716 full_dirs = {} 

717 dirs = {} 

718 files = {} 

719 

720 detail = kwargs.pop("detail", False) 

721 try: 

722 listing = await self._ls(path, detail=True, **kwargs) 

723 except (FileNotFoundError, OSError) as e: 

724 if on_error == "raise": 

725 raise 

726 elif callable(on_error): 

727 on_error(e) 

728 if detail: 

729 yield path, {}, {} 

730 else: 

731 yield path, [], [] 

732 return 

733 

734 for info in listing: 

735 # each info name must be at least [path]/part , but here 

736 # we check also for names like [path]/part/ 

737 pathname = info["name"].rstrip("/") 

738 name = pathname.rsplit("/", 1)[-1] 

739 if info["type"] == "directory" and pathname != path: 

740 # do not include "self" path 

741 full_dirs[name] = pathname 

742 dirs[name] = info 

743 elif pathname == path: 

744 # file-like with same name as give path 

745 files[""] = info 

746 else: 

747 files[name] = info 

748 

749 if detail: 

750 yield path, dirs, files 

751 else: 

752 yield path, list(dirs), list(files) 

753 

754 if maxdepth is not None: 

755 maxdepth -= 1 

756 if maxdepth < 1: 

757 return 

758 

759 for d in dirs: 

760 async for _ in self._walk( 

761 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs 

762 ): 

763 yield _ 

764 

765 async def _glob(self, path, maxdepth=None, **kwargs): 

766 if maxdepth is not None and maxdepth < 1: 

767 raise ValueError("maxdepth must be at least 1") 

768 

769 import re 

770 

771 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) 

772 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash 

773 path = self._strip_protocol(path) 

774 append_slash_to_dirname = ends_with_sep or path.endswith( 

775 tuple(sep + "**" for sep in seps) 

776 ) 

777 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

778 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) 

779 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

780 

781 min_idx = min(idx_star, idx_qmark, idx_brace) 

782 

783 detail = kwargs.pop("detail", False) 

784 withdirs = kwargs.pop("withdirs", True) 

785 

786 if not has_magic(path): 

787 if await self._exists(path, **kwargs): 

788 if not detail: 

789 return [path] 

790 else: 

791 return {path: await self._info(path, **kwargs)} 

792 else: 

793 if not detail: 

794 return [] # glob of non-existent returns empty 

795 else: 

796 return {} 

797 elif "/" in path[:min_idx]: 

798 min_idx = path[:min_idx].rindex("/") 

799 root = path[: min_idx + 1] 

800 depth = path[min_idx + 1 :].count("/") + 1 

801 else: 

802 root = "" 

803 depth = path[min_idx + 1 :].count("/") + 1 

804 

805 if "**" in path: 

806 if maxdepth is not None: 

807 idx_double_stars = path.find("**") 

808 depth_double_stars = path[idx_double_stars:].count("/") + 1 

809 depth = depth - depth_double_stars + maxdepth 

810 else: 

811 depth = None 

812 

813 allpaths = await self._find( 

814 root, maxdepth=depth, withdirs=withdirs, detail=True, **kwargs 

815 ) 

816 

817 pattern = glob_translate(path + ("/" if ends_with_sep else "")) 

818 pattern = re.compile(pattern) 

819 

820 out = { 

821 p: info 

822 for p, info in sorted(allpaths.items()) 

823 if pattern.match( 

824 p + "/" 

825 if append_slash_to_dirname and info["type"] == "directory" 

826 else p 

827 ) 

828 } 

829 

830 if detail: 

831 return out 

832 else: 

833 return list(out) 

834 

835 async def _du(self, path, total=True, maxdepth=None, **kwargs): 

836 sizes = {} 

837 # async for? 

838 for f in await self._find(path, maxdepth=maxdepth, **kwargs): 

839 info = await self._info(f) 

840 sizes[info["name"]] = info["size"] 

841 if total: 

842 return sum(sizes.values()) 

843 else: 

844 return sizes 

845 

846 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): 

847 path = self._strip_protocol(path) 

848 out = {} 

849 detail = kwargs.pop("detail", False) 

850 

851 # Add the root directory if withdirs is requested 

852 # This is needed for posix glob compliance 

853 if withdirs and path != "" and await self._isdir(path): 

854 out[path] = await self._info(path) 

855 

856 # async for? 

857 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): 

858 if withdirs: 

859 files.update(dirs) 

860 out.update({info["name"]: info for name, info in files.items()}) 

861 if not out and (await self._isfile(path)): 

862 # walk works on directories, but find should also return [path] 

863 # when path happens to be a file 

864 out[path] = {} 

865 names = sorted(out) 

866 if not detail: 

867 return names 

868 else: 

869 return {name: out[name] for name in names} 

870 

871 async def _expand_path(self, path, recursive=False, maxdepth=None): 

872 if maxdepth is not None and maxdepth < 1: 

873 raise ValueError("maxdepth must be at least 1") 

874 

875 if isinstance(path, str): 

876 out = await self._expand_path([path], recursive, maxdepth) 

877 else: 

878 out = set() 

879 path = [self._strip_protocol(p) for p in path] 

880 for p in path: # can gather here 

881 if has_magic(p): 

882 bit = set(await self._glob(p, maxdepth=maxdepth)) 

883 out |= bit 

884 if recursive: 

885 # glob call above expanded one depth so if maxdepth is defined 

886 # then decrement it in expand_path call below. If it is zero 

887 # after decrementing then avoid expand_path call. 

888 if maxdepth is not None and maxdepth <= 1: 

889 continue 

890 out |= set( 

891 await self._expand_path( 

892 list(bit), 

893 recursive=recursive, 

894 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

895 ) 

896 ) 

897 continue 

898 elif recursive: 

899 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) 

900 out |= rec 

901 if p not in out and (recursive is False or (await self._exists(p))): 

902 # should only check once, for the root 

903 out.add(p) 

904 if not out: 

905 raise FileNotFoundError(path) 

906 return sorted(out) 

907 

908 async def _mkdir(self, path, create_parents=True, **kwargs): 

909 pass # not necessary to implement, may not have directories 

910 

911 async def _makedirs(self, path, exist_ok=False): 

912 pass # not necessary to implement, may not have directories 

913 

914 async def open_async(self, path, mode="rb", **kwargs): 

915 if "b" not in mode or kwargs.get("compression"): 

916 raise ValueError 

917 raise NotImplementedError 

918 

919 

920def mirror_sync_methods(obj): 

921 """Populate sync and async methods for obj 

922 

923 For each method will create a sync version if the name refers to an async method 

924 (coroutine) and there is no override in the child class; will create an async 

925 method for the corresponding sync method if there is no implementation. 

926 

927 Uses the methods specified in 

928 - async_methods: the set that an implementation is expected to provide 

929 - default_async_methods: that can be derived from their sync version in 

930 AbstractFileSystem 

931 - AsyncFileSystem: async-specific default coroutines 

932 """ 

933 from fsspec import AbstractFileSystem 

934 

935 for method in async_methods + dir(AsyncFileSystem): 

936 if not method.startswith("_"): 

937 continue 

938 smethod = method[1:] 

939 if private.match(method): 

940 isco = inspect.iscoroutinefunction(getattr(obj, method, None)) 

941 unsync = getattr(getattr(obj, smethod, False), "__func__", None) 

942 is_default = unsync is getattr(AbstractFileSystem, smethod, "") 

943 if isco and is_default: 

944 mth = sync_wrapper(getattr(obj, method), obj=obj) 

945 setattr(obj, smethod, mth) 

946 if not mth.__doc__: 

947 mth.__doc__ = getattr( 

948 getattr(AbstractFileSystem, smethod, None), "__doc__", "" 

949 ) 

950 

951 

952class FSSpecCoroutineCancel(Exception): 

953 pass 

954 

955 

956def _dump_running_tasks( 

957 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False 

958): 

959 import traceback 

960 

961 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] 

962 if printout: 

963 [task.print_stack() for task in tasks] 

964 out = [ 

965 { 

966 "locals": task._coro.cr_frame.f_locals, 

967 "file": task._coro.cr_frame.f_code.co_filename, 

968 "firstline": task._coro.cr_frame.f_code.co_firstlineno, 

969 "linelo": task._coro.cr_frame.f_lineno, 

970 "stack": traceback.format_stack(task._coro.cr_frame), 

971 "task": task if with_task else None, 

972 } 

973 for task in tasks 

974 ] 

975 if cancel: 

976 for t in tasks: 

977 cbs = t._callbacks 

978 t.cancel() 

979 asyncio.futures.Future.set_exception(t, exc) 

980 asyncio.futures.Future.cancel(t) 

981 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures 

982 try: 

983 t._coro.throw(exc) # exits coro, unless explicitly handled 

984 except exc: 

985 pass 

986 return out 

987 

988 

989class AbstractAsyncStreamedFile(AbstractBufferedFile): 

990 # no read buffering, and always auto-commit 

991 # TODO: readahead might still be useful here, but needs async version 

992 

993 async def read(self, length=-1): 

994 """ 

995 Return data from cache, or fetch pieces as necessary 

996 

997 Parameters 

998 ---------- 

999 length: int (-1) 

1000 Number of bytes to read; if <0, all remaining bytes. 

1001 """ 

1002 length = -1 if length is None else int(length) 

1003 if self.mode != "rb": 

1004 raise ValueError("File not in read mode") 

1005 if length < 0: 

1006 length = self.size - self.loc 

1007 if self.closed: 

1008 raise ValueError("I/O operation on closed file.") 

1009 if length == 0: 

1010 # don't even bother calling fetch 

1011 return b"" 

1012 out = await self._fetch_range(self.loc, self.loc + length) 

1013 self.loc += len(out) 

1014 return out 

1015 

1016 async def write(self, data): 

1017 """ 

1018 Write data to buffer. 

1019 

1020 Buffer only sent on flush() or if buffer is greater than 

1021 or equal to blocksize. 

1022 

1023 Parameters 

1024 ---------- 

1025 data: bytes 

1026 Set of bytes to be written. 

1027 """ 

1028 if self.mode not in {"wb", "ab"}: 

1029 raise ValueError("File not in write mode") 

1030 if self.closed: 

1031 raise ValueError("I/O operation on closed file.") 

1032 if self.forced: 

1033 raise ValueError("This file has been force-flushed, can only close") 

1034 out = self.buffer.write(data) 

1035 self.loc += out 

1036 if self.buffer.tell() >= self.blocksize: 

1037 await self.flush() 

1038 return out 

1039 

1040 async def close(self): 

1041 """Close file 

1042 

1043 Finalizes writes, discards cache 

1044 """ 

1045 if getattr(self, "_unclosable", False): 

1046 return 

1047 if self.closed: 

1048 return 

1049 if self.mode == "rb": 

1050 self.cache = None 

1051 else: 

1052 if not self.forced: 

1053 await self.flush(force=True) 

1054 

1055 if self.fs is not None: 

1056 self.fs.invalidate_cache(self.path) 

1057 self.fs.invalidate_cache(self.fs._parent(self.path)) 

1058 

1059 self.closed = True 

1060 

1061 async def flush(self, force=False): 

1062 if self.closed: 

1063 raise ValueError("Flush on closed file") 

1064 if force and self.forced: 

1065 raise ValueError("Force flush cannot be called more than once") 

1066 if force: 

1067 self.forced = True 

1068 

1069 if self.mode not in {"wb", "ab"}: 

1070 # no-op to flush on read-mode 

1071 return 

1072 

1073 if not force and self.buffer.tell() < self.blocksize: 

1074 # Defer write on small block 

1075 return 

1076 

1077 if self.offset is None: 

1078 # Initialize a multipart upload 

1079 self.offset = 0 

1080 try: 

1081 await self._initiate_upload() 

1082 except: 

1083 self.closed = True 

1084 raise 

1085 

1086 if await self._upload_chunk(final=force) is not False: 

1087 self.offset += self.buffer.seek(0, 2) 

1088 self.buffer = io.BytesIO() 

1089 

1090 async def __aenter__(self): 

1091 return self 

1092 

1093 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1094 await self.close() 

1095 

1096 async def _fetch_range(self, start, end): 

1097 raise NotImplementedError 

1098 

1099 async def _initiate_upload(self): 

1100 pass 

1101 

1102 async def _upload_chunk(self, final=False): 

1103 raise NotImplementedError