Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/asyn.py: 29%

543 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1import asyncio 

2import asyncio.events 

3import functools 

4import inspect 

5import io 

6import numbers 

7import os 

8import re 

9import threading 

10from contextlib import contextmanager 

11from glob import has_magic 

12from typing import TYPE_CHECKING, Iterable 

13 

14from .callbacks import _DEFAULT_CALLBACK 

15from .exceptions import FSTimeoutError 

16from .implementations.local import ( 

17 LocalFileSystem, 

18 make_path_posix, 

19 trailing_sep, 

20 trailing_sep_maybe_asterisk, 

21) 

22from .spec import AbstractBufferedFile, AbstractFileSystem 

23from .utils import is_exception, other_paths 

24 

25private = re.compile("_[^_]") 

26iothread = [None] # dedicated fsspec IO thread 

27loop = [None] # global event loop for any non-async instance 

28_lock = None # global lock placeholder 

29get_running_loop = asyncio.get_running_loop 

30 

31 

32def get_lock(): 

33 """Allocate or return a threading lock. 

34 

35 The lock is allocated on first use to allow setting one lock per forked process. 

36 """ 

37 global _lock 

38 if not _lock: 

39 _lock = threading.Lock() 

40 return _lock 

41 

42 

43def reset_lock(): 

44 """Reset the global lock. 

45 

46 This should be called only on the init of a forked process to reset the lock to 

47 None, enabling the new forked process to get a new lock. 

48 """ 

49 global _lock 

50 

51 iothread[0] = None 

52 loop[0] = None 

53 _lock = None 

54 

55 

56async def _runner(event, coro, result, timeout=None): 

57 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

58 if timeout is not None: 

59 coro = asyncio.wait_for(coro, timeout=timeout) 

60 try: 

61 result[0] = await coro 

62 except Exception as ex: 

63 result[0] = ex 

64 finally: 

65 event.set() 

66 

67 

68def sync(loop, func, *args, timeout=None, **kwargs): 

69 """ 

70 Make loop run coroutine until it returns. Runs in other thread 

71 

72 Examples 

73 -------- 

74 >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, 

75 timeout=timeout, **kwargs) 

76 """ 

77 timeout = timeout if timeout else None # convert 0 or 0.0 to None 

78 # NB: if the loop is not running *yet*, it is OK to submit work 

79 # and we will wait for it 

80 if loop is None or loop.is_closed(): 

81 raise RuntimeError("Loop is not running") 

82 try: 

83 loop0 = asyncio.events.get_running_loop() 

84 if loop0 is loop: 

85 raise NotImplementedError("Calling sync() from within a running loop") 

86 except RuntimeError: 

87 pass 

88 coro = func(*args, **kwargs) 

89 result = [None] 

90 event = threading.Event() 

91 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 

92 while True: 

93 # this loops allows thread to get interrupted 

94 if event.wait(1): 

95 break 

96 if timeout is not None: 

97 timeout -= 1 

98 if timeout < 0: 

99 raise FSTimeoutError 

100 

101 return_result = result[0] 

102 if isinstance(return_result, asyncio.TimeoutError): 

103 # suppress asyncio.TimeoutError, raise FSTimeoutError 

104 raise FSTimeoutError from return_result 

105 elif isinstance(return_result, BaseException): 

106 raise return_result 

107 else: 

108 return return_result 

109 

110 

111def sync_wrapper(func, obj=None): 

112 """Given a function, make so can be called in async or blocking contexts 

113 

114 Leave obj=None if defining within a class. Pass the instance if attaching 

115 as an attribute of the instance. 

116 """ 

117 

118 @functools.wraps(func) 

119 def wrapper(*args, **kwargs): 

120 self = obj or args[0] 

121 return sync(self.loop, func, *args, **kwargs) 

122 

123 return wrapper 

124 

125 

126@contextmanager 

127def _selector_policy(): 

128 original_policy = asyncio.get_event_loop_policy() 

129 try: 

130 if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): 

131 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) 

132 

133 yield 

134 finally: 

135 asyncio.set_event_loop_policy(original_policy) 

136 

137 

138def get_loop(): 

139 """Create or return the default fsspec IO loop 

140 

141 The loop will be running on a separate thread. 

142 """ 

143 if loop[0] is None: 

144 with get_lock(): 

145 # repeat the check just in case the loop got filled between the 

146 # previous two calls from another thread 

147 if loop[0] is None: 

148 with _selector_policy(): 

149 loop[0] = asyncio.new_event_loop() 

150 th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") 

151 th.daemon = True 

152 th.start() 

153 iothread[0] = th 

154 return loop[0] 

155 

156 

157if TYPE_CHECKING: 

158 import resource 

159 

160 ResourceError = resource.error 

161else: 

162 try: 

163 import resource 

164 except ImportError: 

165 resource = None 

166 ResourceError = OSError 

167 else: 

168 ResourceError = getattr(resource, "error", OSError) 

169 

170_DEFAULT_BATCH_SIZE = 128 

171_NOFILES_DEFAULT_BATCH_SIZE = 1280 

172 

173 

174def _get_batch_size(nofiles=False): 

175 from fsspec.config import conf 

176 

177 if nofiles: 

178 if "nofiles_gather_batch_size" in conf: 

179 return conf["nofiles_gather_batch_size"] 

180 else: 

181 if "gather_batch_size" in conf: 

182 return conf["gather_batch_size"] 

183 if nofiles: 

184 return _NOFILES_DEFAULT_BATCH_SIZE 

185 if resource is None: 

186 return _DEFAULT_BATCH_SIZE 

187 

188 try: 

189 soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) 

190 except (ImportError, ValueError, ResourceError): 

191 return _DEFAULT_BATCH_SIZE 

192 

193 if soft_limit == resource.RLIM_INFINITY: 

194 return -1 

195 else: 

196 return soft_limit // 8 

197 

198 

199def running_async() -> bool: 

200 """Being executed by an event loop?""" 

201 try: 

202 asyncio.get_running_loop() 

203 return True 

204 except RuntimeError: 

205 return False 

206 

207 

208async def _run_coros_in_chunks( 

209 coros, 

210 batch_size=None, 

211 callback=_DEFAULT_CALLBACK, 

212 timeout=None, 

213 return_exceptions=False, 

214 nofiles=False, 

215): 

216 """Run the given coroutines in chunks. 

217 

218 Parameters 

219 ---------- 

220 coros: list of coroutines to run 

221 batch_size: int or None 

222 Number of coroutines to submit/wait on simultaneously. 

223 If -1, then it will not be any throttling. If 

224 None, it will be inferred from _get_batch_size() 

225 callback: fsspec.callbacks.Callback instance 

226 Gets a relative_update when each coroutine completes 

227 timeout: number or None 

228 If given, each coroutine times out after this time. Note that, since 

229 there are multiple batches, the total run time of this function will in 

230 general be longer 

231 return_exceptions: bool 

232 Same meaning as in asyncio.gather 

233 nofiles: bool 

234 If inferring the batch_size, does this operation involve local files? 

235 If yes, you normally expect smaller batches. 

236 """ 

237 

238 if batch_size is None: 

239 batch_size = _get_batch_size(nofiles=nofiles) 

240 

241 if batch_size == -1: 

242 batch_size = len(coros) 

243 

244 assert batch_size > 0 

245 results = [] 

246 for start in range(0, len(coros), batch_size): 

247 chunk = [ 

248 asyncio.Task(asyncio.wait_for(c, timeout=timeout)) 

249 for c in coros[start : start + batch_size] 

250 ] 

251 if callback is not _DEFAULT_CALLBACK: 

252 [ 

253 t.add_done_callback(lambda *_, **__: callback.relative_update(1)) 

254 for t in chunk 

255 ] 

256 results.extend( 

257 await asyncio.gather(*chunk, return_exceptions=return_exceptions), 

258 ) 

259 return results 

260 

261 

262# these methods should be implemented as async by any async-able backend 

263async_methods = [ 

264 "_ls", 

265 "_cat_file", 

266 "_get_file", 

267 "_put_file", 

268 "_rm_file", 

269 "_cp_file", 

270 "_pipe_file", 

271 "_expand_path", 

272 "_info", 

273 "_isfile", 

274 "_isdir", 

275 "_exists", 

276 "_walk", 

277 "_glob", 

278 "_find", 

279 "_du", 

280 "_size", 

281 "_mkdir", 

282 "_makedirs", 

283] 

284 

285 

286class AsyncFileSystem(AbstractFileSystem): 

287 """Async file operations, default implementations 

288 

289 Passes bulk operations to asyncio.gather for concurrent operation. 

290 

291 Implementations that have concurrent batch operations and/or async methods 

292 should inherit from this class instead of AbstractFileSystem. Docstrings are 

293 copied from the un-underscored method in AbstractFileSystem, if not given. 

294 """ 

295 

296 # note that methods do not have docstring here; they will be copied 

297 # for _* methods and inferred for overridden methods. 

298 

299 async_impl = True 

300 mirror_sync_methods = True 

301 disable_throttling = False 

302 

303 def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): 

304 self.asynchronous = asynchronous 

305 self._pid = os.getpid() 

306 if not asynchronous: 

307 self._loop = loop or get_loop() 

308 else: 

309 self._loop = None 

310 self.batch_size = batch_size 

311 super().__init__(*args, **kwargs) 

312 

313 @property 

314 def loop(self): 

315 if self._pid != os.getpid(): 

316 raise RuntimeError("This class is not fork-safe") 

317 return self._loop 

318 

319 async def _rm_file(self, path, **kwargs): 

320 raise NotImplementedError 

321 

322 async def _rm(self, path, recursive=False, batch_size=None, **kwargs): 

323 # TODO: implement on_error 

324 batch_size = batch_size or self.batch_size 

325 path = await self._expand_path(path, recursive=recursive) 

326 return await _run_coros_in_chunks( 

327 [self._rm_file(p, **kwargs) for p in reversed(path)], 

328 batch_size=batch_size, 

329 nofiles=True, 

330 ) 

331 

332 async def _cp_file(self, path1, path2, **kwargs): 

333 raise NotImplementedError 

334 

335 async def _copy( 

336 self, 

337 path1, 

338 path2, 

339 recursive=False, 

340 on_error=None, 

341 maxdepth=None, 

342 batch_size=None, 

343 **kwargs, 

344 ): 

345 if on_error is None and recursive: 

346 on_error = "ignore" 

347 elif on_error is None: 

348 on_error = "raise" 

349 

350 source_is_str = isinstance(path1, str) 

351 paths = await self._expand_path(path1, maxdepth=maxdepth, recursive=recursive) 

352 if source_is_str and (not recursive or maxdepth is not None): 

353 # Non-recursive glob does not copy directories 

354 paths = [p for p in paths if not (trailing_sep(p) or await self._isdir(p))] 

355 if not paths: 

356 return 

357 

358 isdir = isinstance(path2, str) and ( 

359 trailing_sep(path2) or await self._isdir(path2) 

360 ) 

361 path2 = other_paths( 

362 paths, 

363 path2, 

364 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(path1), 

365 is_dir=isdir, 

366 flatten=not source_is_str, 

367 ) 

368 batch_size = batch_size or self.batch_size 

369 coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)] 

370 result = await _run_coros_in_chunks( 

371 coros, batch_size=batch_size, return_exceptions=True, nofiles=True 

372 ) 

373 

374 for ex in filter(is_exception, result): 

375 if on_error == "ignore" and isinstance(ex, FileNotFoundError): 

376 continue 

377 raise ex 

378 

379 async def _pipe_file(self, path, value, **kwargs): 

380 raise NotImplementedError 

381 

382 async def _pipe(self, path, value=None, batch_size=None, **kwargs): 

383 if isinstance(path, str): 

384 path = {path: value} 

385 batch_size = batch_size or self.batch_size 

386 return await _run_coros_in_chunks( 

387 [self._pipe_file(k, v, **kwargs) for k, v in path.items()], 

388 batch_size=batch_size, 

389 nofiles=True, 

390 ) 

391 

392 async def _process_limits(self, url, start, end): 

393 """Helper for "Range"-based _cat_file""" 

394 size = None 

395 suff = False 

396 if start is not None and start < 0: 

397 # if start is negative and end None, end is the "suffix length" 

398 if end is None: 

399 end = -start 

400 start = "" 

401 suff = True 

402 else: 

403 size = size or (await self._info(url))["size"] 

404 start = size + start 

405 elif start is None: 

406 start = 0 

407 if not suff: 

408 if end is not None and end < 0: 

409 if start is not None: 

410 size = size or (await self._info(url))["size"] 

411 end = size + end 

412 elif end is None: 

413 end = "" 

414 if isinstance(end, numbers.Integral): 

415 end -= 1 # bytes range is inclusive 

416 return "bytes=%s-%s" % (start, end) 

417 

418 async def _cat_file(self, path, start=None, end=None, **kwargs): 

419 raise NotImplementedError 

420 

421 async def _cat( 

422 self, path, recursive=False, on_error="raise", batch_size=None, **kwargs 

423 ): 

424 paths = await self._expand_path(path, recursive=recursive) 

425 coros = [self._cat_file(path, **kwargs) for path in paths] 

426 batch_size = batch_size or self.batch_size 

427 out = await _run_coros_in_chunks( 

428 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

429 ) 

430 if on_error == "raise": 

431 ex = next(filter(is_exception, out), False) 

432 if ex: 

433 raise ex 

434 if ( 

435 len(paths) > 1 

436 or isinstance(path, list) 

437 or paths[0] != self._strip_protocol(path) 

438 ): 

439 return { 

440 k: v 

441 for k, v in zip(paths, out) 

442 if on_error != "omit" or not is_exception(v) 

443 } 

444 else: 

445 return out[0] 

446 

447 async def _cat_ranges( 

448 self, 

449 paths, 

450 starts, 

451 ends, 

452 max_gap=None, 

453 batch_size=None, 

454 on_error="return", 

455 **kwargs, 

456 ): 

457 # TODO: on_error 

458 if max_gap is not None: 

459 # use utils.merge_offset_ranges 

460 raise NotImplementedError 

461 if not isinstance(paths, list): 

462 raise TypeError 

463 if not isinstance(starts, Iterable): 

464 starts = [starts] * len(paths) 

465 if not isinstance(ends, Iterable): 

466 ends = [starts] * len(paths) 

467 if len(starts) != len(paths) or len(ends) != len(paths): 

468 raise ValueError 

469 coros = [ 

470 self._cat_file(p, start=s, end=e, **kwargs) 

471 for p, s, e in zip(paths, starts, ends) 

472 ] 

473 batch_size = batch_size or self.batch_size 

474 return await _run_coros_in_chunks( 

475 coros, batch_size=batch_size, nofiles=True, return_exceptions=True 

476 ) 

477 

478 async def _put_file(self, lpath, rpath, **kwargs): 

479 raise NotImplementedError 

480 

481 async def _put( 

482 self, 

483 lpath, 

484 rpath, 

485 recursive=False, 

486 callback=_DEFAULT_CALLBACK, 

487 batch_size=None, 

488 maxdepth=None, 

489 **kwargs, 

490 ): 

491 """Copy file(s) from local. 

492 

493 Copies a specific file or tree of files (if recursive=True). If rpath 

494 ends with a "/", it will be assumed to be a directory, and target files 

495 will go within. 

496 

497 The put_file method will be called concurrently on a batch of files. The 

498 batch_size option can configure the amount of futures that can be executed 

499 at the same time. If it is -1, then all the files will be uploaded concurrently. 

500 The default can be set for this instance by passing "batch_size" in the 

501 constructor, or for all instances by setting the "gather_batch_size" key 

502 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

503 """ 

504 source_is_str = isinstance(lpath, str) 

505 if source_is_str: 

506 lpath = make_path_posix(lpath) 

507 fs = LocalFileSystem() 

508 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

509 if source_is_str and (not recursive or maxdepth is not None): 

510 # Non-recursive glob does not copy directories 

511 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

512 if not lpaths: 

513 return 

514 

515 isdir = isinstance(rpath, str) and ( 

516 trailing_sep(rpath) or await self._isdir(rpath) 

517 ) 

518 rpath = self._strip_protocol(rpath) 

519 rpaths = other_paths( 

520 lpaths, 

521 rpath, 

522 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(lpath), 

523 is_dir=isdir, 

524 flatten=not source_is_str, 

525 ) 

526 

527 is_dir = {l: os.path.isdir(l) for l in lpaths} 

528 rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] 

529 file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] 

530 

531 await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) 

532 batch_size = batch_size or self.batch_size 

533 

534 coros = [] 

535 callback.set_size(len(file_pairs)) 

536 for lfile, rfile in file_pairs: 

537 callback.branch(lfile, rfile, kwargs) 

538 coros.append(self._put_file(lfile, rfile, **kwargs)) 

539 

540 return await _run_coros_in_chunks( 

541 coros, batch_size=batch_size, callback=callback 

542 ) 

543 

544 async def _get_file(self, rpath, lpath, **kwargs): 

545 raise NotImplementedError 

546 

547 async def _get( 

548 self, 

549 rpath, 

550 lpath, 

551 recursive=False, 

552 callback=_DEFAULT_CALLBACK, 

553 maxdepth=None, 

554 **kwargs, 

555 ): 

556 """Copy file(s) to local. 

557 

558 Copies a specific file or tree of files (if recursive=True). If lpath 

559 ends with a "/", it will be assumed to be a directory, and target files 

560 will go within. Can submit a list of paths, which may be glob-patterns 

561 and will be expanded. 

562 

563 The get_file method will be called concurrently on a batch of files. The 

564 batch_size option can configure the amount of futures that can be executed 

565 at the same time. If it is -1, then all the files will be uploaded concurrently. 

566 The default can be set for this instance by passing "batch_size" in the 

567 constructor, or for all instances by setting the "gather_batch_size" key 

568 in ``fsspec.config.conf``, falling back to 1/8th of the system limit . 

569 """ 

570 source_is_str = isinstance(rpath, str) 

571 # First check for rpath trailing slash as _strip_protocol removes it. 

572 source_not_trailing_sep = source_is_str and not trailing_sep_maybe_asterisk( 

573 rpath 

574 ) 

575 rpath = self._strip_protocol(rpath) 

576 rpaths = await self._expand_path(rpath, recursive=recursive) 

577 if source_is_str and (not recursive or maxdepth is not None): 

578 # Non-recursive glob does not copy directories 

579 rpaths = [ 

580 p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) 

581 ] 

582 if not rpaths: 

583 return 

584 

585 lpath = make_path_posix(lpath) 

586 isdir = isinstance(lpath, str) and ( 

587 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

588 ) 

589 lpaths = other_paths( 

590 rpaths, 

591 lpath, 

592 exists=isdir and source_not_trailing_sep, 

593 is_dir=isdir, 

594 flatten=not source_is_str, 

595 ) 

596 [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] 

597 batch_size = kwargs.pop("batch_size", self.batch_size) 

598 

599 coros = [] 

600 callback.set_size(len(lpaths)) 

601 for lpath, rpath in zip(lpaths, rpaths): 

602 callback.branch(rpath, lpath, kwargs) 

603 coros.append(self._get_file(rpath, lpath, **kwargs)) 

604 return await _run_coros_in_chunks( 

605 coros, batch_size=batch_size, callback=callback 

606 ) 

607 

608 async def _isfile(self, path): 

609 try: 

610 return (await self._info(path))["type"] == "file" 

611 except: # noqa: E722 

612 return False 

613 

614 async def _isdir(self, path): 

615 try: 

616 return (await self._info(path))["type"] == "directory" 

617 except OSError: 

618 return False 

619 

620 async def _size(self, path): 

621 return (await self._info(path)).get("size", None) 

622 

623 async def _sizes(self, paths, batch_size=None): 

624 batch_size = batch_size or self.batch_size 

625 return await _run_coros_in_chunks( 

626 [self._size(p) for p in paths], batch_size=batch_size 

627 ) 

628 

629 async def _exists(self, path): 

630 try: 

631 await self._info(path) 

632 return True 

633 except FileNotFoundError: 

634 return False 

635 

636 async def _info(self, path, **kwargs): 

637 raise NotImplementedError 

638 

639 async def _ls(self, path, detail=True, **kwargs): 

640 raise NotImplementedError 

641 

642 async def _walk(self, path, maxdepth=None, **kwargs): 

643 if maxdepth is not None and maxdepth < 1: 

644 raise ValueError("maxdepth must be at least 1") 

645 

646 path = self._strip_protocol(path) 

647 full_dirs = {} 

648 dirs = {} 

649 files = {} 

650 

651 detail = kwargs.pop("detail", False) 

652 try: 

653 listing = await self._ls(path, detail=True, **kwargs) 

654 except (FileNotFoundError, OSError): 

655 if detail: 

656 yield path, {}, {} 

657 else: 

658 yield path, [], [] 

659 return 

660 

661 for info in listing: 

662 # each info name must be at least [path]/part , but here 

663 # we check also for names like [path]/part/ 

664 pathname = info["name"].rstrip("/") 

665 name = pathname.rsplit("/", 1)[-1] 

666 if info["type"] == "directory" and pathname != path: 

667 # do not include "self" path 

668 full_dirs[name] = pathname 

669 dirs[name] = info 

670 elif pathname == path: 

671 # file-like with same name as give path 

672 files[""] = info 

673 else: 

674 files[name] = info 

675 

676 if detail: 

677 yield path, dirs, files 

678 else: 

679 yield path, list(dirs), list(files) 

680 

681 if maxdepth is not None: 

682 maxdepth -= 1 

683 if maxdepth < 1: 

684 return 

685 

686 for d in dirs: 

687 async for _ in self._walk( 

688 full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs 

689 ): 

690 yield _ 

691 

692 async def _glob(self, path, **kwargs): 

693 import re 

694 

695 ends = path.endswith("/") 

696 path = self._strip_protocol(path) 

697 indstar = path.find("*") if path.find("*") >= 0 else len(path) 

698 indques = path.find("?") if path.find("?") >= 0 else len(path) 

699 indbrace = path.find("[") if path.find("[") >= 0 else len(path) 

700 

701 ind = min(indstar, indques, indbrace) 

702 

703 detail = kwargs.pop("detail", False) 

704 

705 if not has_magic(path): 

706 root = path 

707 depth = 1 

708 if ends: 

709 path += "/*" 

710 elif await self._exists(path): 

711 if not detail: 

712 return [path] 

713 else: 

714 return {path: await self._info(path)} 

715 else: 

716 if not detail: 

717 return [] # glob of non-existent returns empty 

718 else: 

719 return {} 

720 elif "/" in path[:ind]: 

721 ind2 = path[:ind].rindex("/") 

722 root = path[: ind2 + 1] 

723 depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1 

724 else: 

725 root = "" 

726 depth = None if "**" in path else path[ind + 1 :].count("/") + 1 

727 

728 allpaths = await self._find( 

729 root, maxdepth=depth, withdirs=True, detail=True, **kwargs 

730 ) 

731 # Escape characters special to python regex, leaving our supported 

732 # special characters in place. 

733 # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html 

734 # for shell globbing details. 

735 pattern = ( 

736 "^" 

737 + ( 

738 path.replace("\\", r"\\") 

739 .replace(".", r"\.") 

740 .replace("+", r"\+") 

741 .replace("//", "/") 

742 .replace("(", r"\(") 

743 .replace(")", r"\)") 

744 .replace("|", r"\|") 

745 .replace("^", r"\^") 

746 .replace("$", r"\$") 

747 .replace("{", r"\{") 

748 .replace("}", r"\}") 

749 .rstrip("/") 

750 .replace("?", ".") 

751 ) 

752 + "$" 

753 ) 

754 pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern) 

755 pattern = re.sub("[*]", "[^/]*", pattern) 

756 pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*")) 

757 out = { 

758 p: allpaths[p] 

759 for p in sorted(allpaths) 

760 if pattern.match(p.replace("//", "/").rstrip("/")) 

761 } 

762 if detail: 

763 return out 

764 else: 

765 return list(out) 

766 

767 async def _du(self, path, total=True, maxdepth=None, **kwargs): 

768 sizes = {} 

769 # async for? 

770 for f in await self._find(path, maxdepth=maxdepth, **kwargs): 

771 info = await self._info(f) 

772 sizes[info["name"]] = info["size"] 

773 if total: 

774 return sum(sizes.values()) 

775 else: 

776 return sizes 

777 

778 async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): 

779 path = self._strip_protocol(path) 

780 out = dict() 

781 detail = kwargs.pop("detail", False) 

782 # async for? 

783 async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): 

784 if withdirs: 

785 files.update(dirs) 

786 out.update({info["name"]: info for name, info in files.items()}) 

787 if not out and (await self._isfile(path)): 

788 # walk works on directories, but find should also return [path] 

789 # when path happens to be a file 

790 out[path] = {} 

791 names = sorted(out) 

792 if not detail: 

793 return names 

794 else: 

795 return {name: out[name] for name in names} 

796 

797 async def _expand_path(self, path, recursive=False, maxdepth=None): 

798 if maxdepth is not None and maxdepth < 1: 

799 raise ValueError("maxdepth must be at least 1") 

800 

801 if isinstance(path, str): 

802 out = await self._expand_path([path], recursive, maxdepth) 

803 else: 

804 out = set() 

805 path = [self._strip_protocol(p) for p in path] 

806 for p in path: # can gather here 

807 if has_magic(p): 

808 bit = set(await self._glob(p)) 

809 out |= bit 

810 if recursive: 

811 # glob call above expanded one depth so if maxdepth is defined 

812 # then decrement it in expand_path call below. If it is zero 

813 # after decrementing then avoid expand_path call. 

814 if maxdepth is not None and maxdepth <= 1: 

815 continue 

816 out |= set( 

817 await self._expand_path( 

818 list(bit), 

819 recursive=recursive, 

820 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

821 ) 

822 ) 

823 continue 

824 elif recursive: 

825 rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) 

826 out |= rec 

827 if p not in out and (recursive is False or (await self._exists(p))): 

828 # should only check once, for the root 

829 out.add(p) 

830 if not out: 

831 raise FileNotFoundError(path) 

832 return list(sorted(out)) 

833 

834 async def _mkdir(self, path, create_parents=True, **kwargs): 

835 pass # not necessary to implement, may not have directories 

836 

837 async def _makedirs(self, path, exist_ok=False): 

838 pass # not necessary to implement, may not have directories 

839 

840 async def open_async(self, path, mode="rb", **kwargs): 

841 if "b" not in mode or kwargs.get("compression"): 

842 raise ValueError 

843 raise NotImplementedError 

844 

845 

846def mirror_sync_methods(obj): 

847 """Populate sync and async methods for obj 

848 

849 For each method will create a sync version if the name refers to an async method 

850 (coroutine) and there is no override in the child class; will create an async 

851 method for the corresponding sync method if there is no implementation. 

852 

853 Uses the methods specified in 

854 - async_methods: the set that an implementation is expected to provide 

855 - default_async_methods: that can be derived from their sync version in 

856 AbstractFileSystem 

857 - AsyncFileSystem: async-specific default coroutines 

858 """ 

859 from fsspec import AbstractFileSystem 

860 

861 for method in async_methods + dir(AsyncFileSystem): 

862 if not method.startswith("_"): 

863 continue 

864 smethod = method[1:] 

865 if private.match(method): 

866 isco = inspect.iscoroutinefunction(getattr(obj, method, None)) 

867 unsync = getattr(getattr(obj, smethod, False), "__func__", None) 

868 is_default = unsync is getattr(AbstractFileSystem, smethod, "") 

869 if isco and is_default: 

870 mth = sync_wrapper(getattr(obj, method), obj=obj) 

871 setattr(obj, smethod, mth) 

872 if not mth.__doc__: 

873 mth.__doc__ = getattr( 

874 getattr(AbstractFileSystem, smethod, None), "__doc__", "" 

875 ) 

876 

877 

878class FSSpecCoroutineCancel(Exception): 

879 pass 

880 

881 

882def _dump_running_tasks( 

883 printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False 

884): 

885 import traceback 

886 

887 tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] 

888 if printout: 

889 [task.print_stack() for task in tasks] 

890 out = [ 

891 { 

892 "locals": task._coro.cr_frame.f_locals, 

893 "file": task._coro.cr_frame.f_code.co_filename, 

894 "firstline": task._coro.cr_frame.f_code.co_firstlineno, 

895 "linelo": task._coro.cr_frame.f_lineno, 

896 "stack": traceback.format_stack(task._coro.cr_frame), 

897 "task": task if with_task else None, 

898 } 

899 for task in tasks 

900 ] 

901 if cancel: 

902 for t in tasks: 

903 cbs = t._callbacks 

904 t.cancel() 

905 asyncio.futures.Future.set_exception(t, exc) 

906 asyncio.futures.Future.cancel(t) 

907 [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures 

908 try: 

909 t._coro.throw(exc) # exits coro, unless explicitly handled 

910 except exc: 

911 pass 

912 return out 

913 

914 

915class AbstractAsyncStreamedFile(AbstractBufferedFile): 

916 # no read buffering, and always auto-commit 

917 # TODO: readahead might still be useful here, but needs async version 

918 

919 async def read(self, length=-1): 

920 """ 

921 Return data from cache, or fetch pieces as necessary 

922 

923 Parameters 

924 ---------- 

925 length: int (-1) 

926 Number of bytes to read; if <0, all remaining bytes. 

927 """ 

928 length = -1 if length is None else int(length) 

929 if self.mode != "rb": 

930 raise ValueError("File not in read mode") 

931 if length < 0: 

932 length = self.size - self.loc 

933 if self.closed: 

934 raise ValueError("I/O operation on closed file.") 

935 if length == 0: 

936 # don't even bother calling fetch 

937 return b"" 

938 out = await self._fetch_range(self.loc, self.loc + length) 

939 self.loc += len(out) 

940 return out 

941 

942 async def write(self, data): 

943 """ 

944 Write data to buffer. 

945 

946 Buffer only sent on flush() or if buffer is greater than 

947 or equal to blocksize. 

948 

949 Parameters 

950 ---------- 

951 data: bytes 

952 Set of bytes to be written. 

953 """ 

954 if self.mode not in {"wb", "ab"}: 

955 raise ValueError("File not in write mode") 

956 if self.closed: 

957 raise ValueError("I/O operation on closed file.") 

958 if self.forced: 

959 raise ValueError("This file has been force-flushed, can only close") 

960 out = self.buffer.write(data) 

961 self.loc += out 

962 if self.buffer.tell() >= self.blocksize: 

963 await self.flush() 

964 return out 

965 

966 async def close(self): 

967 """Close file 

968 

969 Finalizes writes, discards cache 

970 """ 

971 if getattr(self, "_unclosable", False): 

972 return 

973 if self.closed: 

974 return 

975 if self.mode == "rb": 

976 self.cache = None 

977 else: 

978 if not self.forced: 

979 await self.flush(force=True) 

980 

981 if self.fs is not None: 

982 self.fs.invalidate_cache(self.path) 

983 self.fs.invalidate_cache(self.fs._parent(self.path)) 

984 

985 self.closed = True 

986 

987 async def flush(self, force=False): 

988 if self.closed: 

989 raise ValueError("Flush on closed file") 

990 if force and self.forced: 

991 raise ValueError("Force flush cannot be called more than once") 

992 if force: 

993 self.forced = True 

994 

995 if self.mode not in {"wb", "ab"}: 

996 # no-op to flush on read-mode 

997 return 

998 

999 if not force and self.buffer.tell() < self.blocksize: 

1000 # Defer write on small block 

1001 return 

1002 

1003 if self.offset is None: 

1004 # Initialize a multipart upload 

1005 self.offset = 0 

1006 try: 

1007 await self._initiate_upload() 

1008 except: # noqa: E722 

1009 self.closed = True 

1010 raise 

1011 

1012 if await self._upload_chunk(final=force) is not False: 

1013 self.offset += self.buffer.seek(0, 2) 

1014 self.buffer = io.BytesIO() 

1015 

1016 async def __aenter__(self): 

1017 return self 

1018 

1019 async def __aexit__(self, exc_type, exc_val, exc_tb): 

1020 await self.close() 

1021 

1022 async def _fetch_range(self, start, end): 

1023 raise NotImplementedError 

1024 

1025 async def _initiate_upload(self): 

1026 pass 

1027 

1028 async def _upload_chunk(self, final=False): 

1029 raise NotImplementedError