Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/spec.py: 24%

808 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1from __future__ import annotations 

2 

3import io 

4import logging 

5import os 

6import threading 

7import warnings 

8import weakref 

9from errno import ESPIPE 

10from glob import has_magic 

11from hashlib import sha256 

12from typing import ClassVar 

13 

14from .callbacks import _DEFAULT_CALLBACK 

15from .config import apply_config, conf 

16from .dircache import DirCache 

17from .transaction import Transaction 

18from .utils import ( 

19 _unstrip_protocol, 

20 glob_translate, 

21 isfilelike, 

22 other_paths, 

23 read_block, 

24 stringify_path, 

25 tokenize, 

26) 

27 

28logger = logging.getLogger("fsspec") 

29 

30 

31def make_instance(cls, args, kwargs): 

32 return cls(*args, **kwargs) 

33 

34 

35class _Cached(type): 

36 """ 

37 Metaclass for caching file system instances. 

38 

39 Notes 

40 ----- 

41 Instances are cached according to 

42 

43 * The values of the class attributes listed in `_extra_tokenize_attributes` 

44 * The arguments passed to ``__init__``. 

45 

46 This creates an additional reference to the filesystem, which prevents the 

47 filesystem from being garbage collected when all *user* references go away. 

48 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also* 

49 be made for a filesystem instance to be garbage collected. 

50 """ 

51 

52 def __init__(cls, *args, **kwargs): 

53 super().__init__(*args, **kwargs) 

54 # Note: we intentionally create a reference here, to avoid garbage 

55 # collecting instances when all other references are gone. To really 

56 # delete a FileSystem, the cache must be cleared. 

57 if conf.get("weakref_instance_cache"): # pragma: no cover 

58 # debug option for analysing fork/spawn conditions 

59 cls._cache = weakref.WeakValueDictionary() 

60 else: 

61 cls._cache = {} 

62 cls._pid = os.getpid() 

63 

64 def __call__(cls, *args, **kwargs): 

65 kwargs = apply_config(cls, kwargs) 

66 extra_tokens = tuple( 

67 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes 

68 ) 

69 token = tokenize( 

70 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs 

71 ) 

72 skip = kwargs.pop("skip_instance_cache", False) 

73 if os.getpid() != cls._pid: 

74 cls._cache.clear() 

75 cls._pid = os.getpid() 

76 if not skip and cls.cachable and token in cls._cache: 

77 cls._latest = token 

78 return cls._cache[token] 

79 else: 

80 obj = super().__call__(*args, **kwargs) 

81 # Setting _fs_token here causes some static linters to complain. 

82 obj._fs_token_ = token 

83 obj.storage_args = args 

84 obj.storage_options = kwargs 

85 if obj.async_impl and obj.mirror_sync_methods: 

86 from .asyn import mirror_sync_methods 

87 

88 mirror_sync_methods(obj) 

89 

90 if cls.cachable and not skip: 

91 cls._latest = token 

92 cls._cache[token] = obj 

93 return obj 

94 

95 

96class AbstractFileSystem(metaclass=_Cached): 

97 """ 

98 An abstract super-class for pythonic file-systems 

99 

100 Implementations are expected to be compatible with or, better, subclass 

101 from here. 

102 """ 

103 

104 cachable = True # this class can be cached, instances reused 

105 _cached = False 

106 blocksize = 2**22 

107 sep = "/" 

108 protocol: ClassVar[str | tuple[str, ...]] = "abstract" 

109 _latest = None 

110 async_impl = False 

111 mirror_sync_methods = False 

112 root_marker = "" # For some FSs, may require leading '/' or other character 

113 transaction_type = Transaction 

114 

115 #: Extra *class attributes* that should be considered when hashing. 

116 _extra_tokenize_attributes = () 

117 

118 def __init__(self, *args, **storage_options): 

119 """Create and configure file-system instance 

120 

121 Instances may be cachable, so if similar enough arguments are seen 

122 a new instance is not required. The token attribute exists to allow 

123 implementations to cache instances if they wish. 

124 

125 A reasonable default should be provided if there are no arguments. 

126 

127 Subclasses should call this method. 

128 

129 Parameters 

130 ---------- 

131 use_listings_cache, listings_expiry_time, max_paths: 

132 passed to ``DirCache``, if the implementation supports 

133 directory listing caching. Pass use_listings_cache=False 

134 to disable such caching. 

135 skip_instance_cache: bool 

136 If this is a cachable implementation, pass True here to force 

137 creating a new instance even if a matching instance exists, and prevent 

138 storing this instance. 

139 asynchronous: bool 

140 loop: asyncio-compatible IOLoop or None 

141 """ 

142 if self._cached: 

143 # reusing instance, don't change 

144 return 

145 self._cached = True 

146 self._intrans = False 

147 self._transaction = None 

148 self._invalidated_caches_in_transaction = [] 

149 self.dircache = DirCache(**storage_options) 

150 

151 if storage_options.pop("add_docs", None): 

152 warnings.warn("add_docs is no longer supported.", FutureWarning) 

153 

154 if storage_options.pop("add_aliases", None): 

155 warnings.warn("add_aliases has been removed.", FutureWarning) 

156 # This is set in _Cached 

157 self._fs_token_ = None 

158 

159 @property 

160 def fsid(self): 

161 """Persistent filesystem id that can be used to compare filesystems 

162 across sessions. 

163 """ 

164 raise NotImplementedError 

165 

166 @property 

167 def _fs_token(self): 

168 return self._fs_token_ 

169 

170 def __dask_tokenize__(self): 

171 return self._fs_token 

172 

173 def __hash__(self): 

174 return int(self._fs_token, 16) 

175 

176 def __eq__(self, other): 

177 return isinstance(other, type(self)) and self._fs_token == other._fs_token 

178 

179 def __reduce__(self): 

180 return make_instance, (type(self), self.storage_args, self.storage_options) 

181 

182 @classmethod 

183 def _strip_protocol(cls, path): 

184 """Turn path from fully-qualified to file-system-specific 

185 

186 May require FS-specific handling, e.g., for relative paths or links. 

187 """ 

188 if isinstance(path, list): 

189 return [cls._strip_protocol(p) for p in path] 

190 path = stringify_path(path) 

191 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol 

192 for protocol in protos: 

193 if path.startswith(protocol + "://"): 

194 path = path[len(protocol) + 3 :] 

195 elif path.startswith(protocol + "::"): 

196 path = path[len(protocol) + 2 :] 

197 path = path.rstrip("/") 

198 # use of root_marker to make minimum required path, e.g., "/" 

199 return path or cls.root_marker 

200 

201 def unstrip_protocol(self, name: str) -> str: 

202 """Format FS-specific path to generic, including protocol""" 

203 protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol 

204 for protocol in protos: 

205 if name.startswith(f"{protocol}://"): 

206 return name 

207 return f"{protos[0]}://{name}" 

208 

209 @staticmethod 

210 def _get_kwargs_from_urls(path): 

211 """If kwargs can be encoded in the paths, extract them here 

212 

213 This should happen before instantiation of the class; incoming paths 

214 then should be amended to strip the options in methods. 

215 

216 Examples may look like an sftp path "sftp://user@host:/my/path", where 

217 the user and host should become kwargs and later get stripped. 

218 """ 

219 # by default, nothing happens 

220 return {} 

221 

222 @classmethod 

223 def current(cls): 

224 """Return the most recently instantiated FileSystem 

225 

226 If no instance has been created, then create one with defaults 

227 """ 

228 if cls._latest in cls._cache: 

229 return cls._cache[cls._latest] 

230 return cls() 

231 

232 @property 

233 def transaction(self): 

234 """A context within which files are committed together upon exit 

235 

236 Requires the file class to implement `.commit()` and `.discard()` 

237 for the normal and exception cases. 

238 """ 

239 if self._transaction is None: 

240 self._transaction = self.transaction_type(self) 

241 return self._transaction 

242 

243 def start_transaction(self): 

244 """Begin write transaction for deferring files, non-context version""" 

245 self._intrans = True 

246 self._transaction = self.transaction_type(self) 

247 return self.transaction 

248 

249 def end_transaction(self): 

250 """Finish write transaction, non-context version""" 

251 self.transaction.complete() 

252 self._transaction = None 

253 # The invalid cache must be cleared after the transaction is completed. 

254 for path in self._invalidated_caches_in_transaction: 

255 self.invalidate_cache(path) 

256 self._invalidated_caches_in_transaction.clear() 

257 

258 def invalidate_cache(self, path=None): 

259 """ 

260 Discard any cached directory information 

261 

262 Parameters 

263 ---------- 

264 path: string or None 

265 If None, clear all listings cached else listings at or under given 

266 path. 

267 """ 

268 # Not necessary to implement invalidation mechanism, may have no cache. 

269 # But if have, you should call this method of parent class from your 

270 # subclass to ensure expiring caches after transacations correctly. 

271 # See the implementation of FTPFileSystem in ftp.py 

272 if self._intrans: 

273 self._invalidated_caches_in_transaction.append(path) 

274 

275 def mkdir(self, path, create_parents=True, **kwargs): 

276 """ 

277 Create directory entry at path 

278 

279 For systems that don't have true directories, may create an for 

280 this instance only and not touch the real filesystem 

281 

282 Parameters 

283 ---------- 

284 path: str 

285 location 

286 create_parents: bool 

287 if True, this is equivalent to ``makedirs`` 

288 kwargs: 

289 may be permissions, etc. 

290 """ 

291 pass # not necessary to implement, may not have directories 

292 

293 def makedirs(self, path, exist_ok=False): 

294 """Recursively make directories 

295 

296 Creates directory at path and any intervening required directories. 

297 Raises exception if, for instance, the path already exists but is a 

298 file. 

299 

300 Parameters 

301 ---------- 

302 path: str 

303 leaf directory name 

304 exist_ok: bool (False) 

305 If False, will error if the target already exists 

306 """ 

307 pass # not necessary to implement, may not have directories 

308 

309 def rmdir(self, path): 

310 """Remove a directory, if empty""" 

311 pass # not necessary to implement, may not have directories 

312 

313 def ls(self, path, detail=True, **kwargs): 

314 """List objects at path. 

315 

316 This should include subdirectories and files at that location. The 

317 difference between a file and a directory must be clear when details 

318 are requested. 

319 

320 The specific keys, or perhaps a FileInfo class, or similar, is TBD, 

321 but must be consistent across implementations. 

322 Must include: 

323 

324 - full path to the entry (without protocol) 

325 - size of the entry, in bytes. If the value cannot be determined, will 

326 be ``None``. 

327 - type of entry, "file", "directory" or other 

328 

329 Additional information 

330 may be present, appropriate to the file-system, e.g., generation, 

331 checksum, etc. 

332 

333 May use refresh=True|False to allow use of self._ls_from_cache to 

334 check for a saved listing and avoid calling the backend. This would be 

335 common where listing may be expensive. 

336 

337 Parameters 

338 ---------- 

339 path: str 

340 detail: bool 

341 if True, gives a list of dictionaries, where each is the same as 

342 the result of ``info(path)``. If False, gives a list of paths 

343 (str). 

344 kwargs: may have additional backend-specific options, such as version 

345 information 

346 

347 Returns 

348 ------- 

349 List of strings if detail is False, or list of directory information 

350 dicts if detail is True. 

351 """ 

352 raise NotImplementedError 

353 

354 def _ls_from_cache(self, path): 

355 """Check cache for listing 

356 

357 Returns listing, if found (may be empty list for a directly that exists 

358 but contains nothing), None if not in cache. 

359 """ 

360 parent = self._parent(path) 

361 if path.rstrip("/") in self.dircache: 

362 return self.dircache[path.rstrip("/")] 

363 try: 

364 files = [ 

365 f 

366 for f in self.dircache[parent] 

367 if f["name"] == path 

368 or (f["name"] == path.rstrip("/") and f["type"] == "directory") 

369 ] 

370 if len(files) == 0: 

371 # parent dir was listed but did not contain this file 

372 raise FileNotFoundError(path) 

373 return files 

374 except KeyError: 

375 pass 

376 

377 def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs): 

378 """Return all files belows path 

379 

380 List all files, recursing into subdirectories; output is iterator-style, 

381 like ``os.walk()``. For a simple list of files, ``find()`` is available. 

382 

383 When topdown is True, the caller can modify the dirnames list in-place (perhaps 

384 using del or slice assignment), and walk() will 

385 only recurse into the subdirectories whose names remain in dirnames; 

386 this can be used to prune the search, impose a specific order of visiting, 

387 or even to inform walk() about directories the caller creates or renames before 

388 it resumes walk() again. 

389 Modifying dirnames when topdown is False has no effect. (see os.walk) 

390 

391 Note that the "files" outputted will include anything that is not 

392 a directory, such as links. 

393 

394 Parameters 

395 ---------- 

396 path: str 

397 Root to recurse into 

398 maxdepth: int 

399 Maximum recursion depth. None means limitless, but not recommended 

400 on link-based file-systems. 

401 topdown: bool (True) 

402 Whether to walk the directory tree from the top downwards or from 

403 the bottom upwards. 

404 on_error: "omit", "raise", a collable 

405 if omit (default), path with exception will simply be empty; 

406 If raise, an underlying exception will be raised; 

407 if callable, it will be called with a single OSError instance as argument 

408 kwargs: passed to ``ls`` 

409 """ 

410 if maxdepth is not None and maxdepth < 1: 

411 raise ValueError("maxdepth must be at least 1") 

412 

413 path = self._strip_protocol(path) 

414 full_dirs = {} 

415 dirs = {} 

416 files = {} 

417 

418 detail = kwargs.pop("detail", False) 

419 try: 

420 listing = self.ls(path, detail=True, **kwargs) 

421 except (FileNotFoundError, OSError) as e: 

422 if on_error == "raise": 

423 raise 

424 elif callable(on_error): 

425 on_error(e) 

426 if detail: 

427 return path, {}, {} 

428 return path, [], [] 

429 

430 for info in listing: 

431 # each info name must be at least [path]/part , but here 

432 # we check also for names like [path]/part/ 

433 pathname = info["name"].rstrip("/") 

434 name = pathname.rsplit("/", 1)[-1] 

435 if info["type"] == "directory" and pathname != path: 

436 # do not include "self" path 

437 full_dirs[name] = pathname 

438 dirs[name] = info 

439 elif pathname == path: 

440 # file-like with same name as give path 

441 files[""] = info 

442 else: 

443 files[name] = info 

444 

445 if not detail: 

446 dirs = list(dirs) 

447 files = list(files) 

448 

449 if topdown: 

450 # Yield before recursion if walking top down 

451 yield path, dirs, files 

452 

453 if maxdepth is not None: 

454 maxdepth -= 1 

455 if maxdepth < 1: 

456 if not topdown: 

457 yield path, dirs, files 

458 return 

459 

460 for d in dirs: 

461 yield from self.walk( 

462 full_dirs[d], 

463 maxdepth=maxdepth, 

464 detail=detail, 

465 topdown=topdown, 

466 **kwargs, 

467 ) 

468 

469 if not topdown: 

470 # Yield after recursion if walking bottom up 

471 yield path, dirs, files 

472 

473 def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs): 

474 """List all files below path. 

475 

476 Like posix ``find`` command without conditions 

477 

478 Parameters 

479 ---------- 

480 path : str 

481 maxdepth: int or None 

482 If not None, the maximum number of levels to descend 

483 withdirs: bool 

484 Whether to include directory paths in the output. This is True 

485 when used by glob, but users usually only want files. 

486 kwargs are passed to ``ls``. 

487 """ 

488 # TODO: allow equivalent of -name parameter 

489 path = self._strip_protocol(path) 

490 out = {} 

491 

492 # Add the root directory if withdirs is requested 

493 # This is needed for posix glob compliance 

494 if withdirs and path != "" and self.isdir(path): 

495 out[path] = self.info(path) 

496 

497 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs): 

498 if withdirs: 

499 files.update(dirs) 

500 out.update({info["name"]: info for name, info in files.items()}) 

501 if not out and self.isfile(path): 

502 # walk works on directories, but find should also return [path] 

503 # when path happens to be a file 

504 out[path] = {} 

505 names = sorted(out) 

506 if not detail: 

507 return names 

508 else: 

509 return {name: out[name] for name in names} 

510 

511 def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs): 

512 """Space used by files and optionally directories within a path 

513 

514 Directory size does not include the size of its contents. 

515 

516 Parameters 

517 ---------- 

518 path: str 

519 total: bool 

520 Whether to sum all the file sizes 

521 maxdepth: int or None 

522 Maximum number of directory levels to descend, None for unlimited. 

523 withdirs: bool 

524 Whether to include directory paths in the output. 

525 kwargs: passed to ``find`` 

526 

527 Returns 

528 ------- 

529 Dict of {path: size} if total=False, or int otherwise, where numbers 

530 refer to bytes used. 

531 """ 

532 sizes = {} 

533 if withdirs and self.isdir(path): 

534 # Include top-level directory in output 

535 info = self.info(path) 

536 sizes[info["name"]] = info["size"] 

537 for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs): 

538 info = self.info(f) 

539 sizes[info["name"]] = info["size"] 

540 if total: 

541 return sum(sizes.values()) 

542 else: 

543 return sizes 

544 

545 def glob(self, path, maxdepth=None, **kwargs): 

546 """ 

547 Find files by glob-matching. 

548 

549 If the path ends with '/', only folders are returned. 

550 

551 We support ``"**"``, 

552 ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation. 

553 

554 The `maxdepth` option is applied on the first `**` found in the path. 

555 

556 kwargs are passed to ``ls``. 

557 """ 

558 if maxdepth is not None and maxdepth < 1: 

559 raise ValueError("maxdepth must be at least 1") 

560 

561 import re 

562 

563 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) 

564 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash 

565 path = self._strip_protocol(path) 

566 append_slash_to_dirname = ends_with_sep or path.endswith( 

567 tuple(sep + "**" for sep in seps) 

568 ) 

569 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

570 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) 

571 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

572 

573 min_idx = min(idx_star, idx_qmark, idx_brace) 

574 

575 detail = kwargs.pop("detail", False) 

576 

577 if not has_magic(path): 

578 if self.exists(path, **kwargs): 

579 if not detail: 

580 return [path] 

581 else: 

582 return {path: self.info(path, **kwargs)} 

583 else: 

584 if not detail: 

585 return [] # glob of non-existent returns empty 

586 else: 

587 return {} 

588 elif "/" in path[:min_idx]: 

589 min_idx = path[:min_idx].rindex("/") 

590 root = path[: min_idx + 1] 

591 depth = path[min_idx + 1 :].count("/") + 1 

592 else: 

593 root = "" 

594 depth = path[min_idx + 1 :].count("/") + 1 

595 

596 if "**" in path: 

597 if maxdepth is not None: 

598 idx_double_stars = path.find("**") 

599 depth_double_stars = path[idx_double_stars:].count("/") + 1 

600 depth = depth - depth_double_stars + maxdepth 

601 else: 

602 depth = None 

603 

604 allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs) 

605 

606 pattern = glob_translate(path + ("/" if ends_with_sep else "")) 

607 pattern = re.compile(pattern) 

608 

609 out = { 

610 p: info 

611 for p, info in sorted(allpaths.items()) 

612 if pattern.match( 

613 ( 

614 p + "/" 

615 if append_slash_to_dirname and info["type"] == "directory" 

616 else p 

617 ) 

618 ) 

619 } 

620 

621 if detail: 

622 return out 

623 else: 

624 return list(out) 

625 

626 def exists(self, path, **kwargs): 

627 """Is there a file at the given path""" 

628 try: 

629 self.info(path, **kwargs) 

630 return True 

631 except: # noqa: E722 

632 # any exception allowed bar FileNotFoundError? 

633 return False 

634 

635 def lexists(self, path, **kwargs): 

636 """If there is a file at the given path (including 

637 broken links)""" 

638 return self.exists(path) 

639 

640 def info(self, path, **kwargs): 

641 """Give details of entry at path 

642 

643 Returns a single dictionary, with exactly the same information as ``ls`` 

644 would with ``detail=True``. 

645 

646 The default implementation should calls ls and could be overridden by a 

647 shortcut. kwargs are passed on to ```ls()``. 

648 

649 Some file systems might not be able to measure the file's size, in 

650 which case, the returned dict will include ``'size': None``. 

651 

652 Returns 

653 ------- 

654 dict with keys: name (full path in the FS), size (in bytes), type (file, 

655 directory, or something else) and other FS-specific keys. 

656 """ 

657 path = self._strip_protocol(path) 

658 out = self.ls(self._parent(path), detail=True, **kwargs) 

659 out = [o for o in out if o["name"].rstrip("/") == path] 

660 if out: 

661 return out[0] 

662 out = self.ls(path, detail=True, **kwargs) 

663 path = path.rstrip("/") 

664 out1 = [o for o in out if o["name"].rstrip("/") == path] 

665 if len(out1) == 1: 

666 if "size" not in out1[0]: 

667 out1[0]["size"] = None 

668 return out1[0] 

669 elif len(out1) > 1 or out: 

670 return {"name": path, "size": 0, "type": "directory"} 

671 else: 

672 raise FileNotFoundError(path) 

673 

674 def checksum(self, path): 

675 """Unique value for current version of file 

676 

677 If the checksum is the same from one moment to another, the contents 

678 are guaranteed to be the same. If the checksum changes, the contents 

679 *might* have changed. 

680 

681 This should normally be overridden; default will probably capture 

682 creation/modification timestamp (which would be good) or maybe 

683 access timestamp (which would be bad) 

684 """ 

685 return int(tokenize(self.info(path)), 16) 

686 

687 def size(self, path): 

688 """Size in bytes of file""" 

689 return self.info(path).get("size", None) 

690 

691 def sizes(self, paths): 

692 """Size in bytes of each file in a list of paths""" 

693 return [self.size(p) for p in paths] 

694 

695 def isdir(self, path): 

696 """Is this entry directory-like?""" 

697 try: 

698 return self.info(path)["type"] == "directory" 

699 except OSError: 

700 return False 

701 

702 def isfile(self, path): 

703 """Is this entry file-like?""" 

704 try: 

705 return self.info(path)["type"] == "file" 

706 except: # noqa: E722 

707 return False 

708 

709 def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs): 

710 """Get the contents of the file as a string. 

711 

712 Parameters 

713 ---------- 

714 path: str 

715 URL of file on this filesystems 

716 encoding, errors, newline: same as `open`. 

717 """ 

718 with self.open( 

719 path, 

720 mode="r", 

721 encoding=encoding, 

722 errors=errors, 

723 newline=newline, 

724 **kwargs, 

725 ) as f: 

726 return f.read() 

727 

728 def write_text( 

729 self, path, value, encoding=None, errors=None, newline=None, **kwargs 

730 ): 

731 """Write the text to the given file. 

732 

733 An existing file will be overwritten. 

734 

735 Parameters 

736 ---------- 

737 path: str 

738 URL of file on this filesystems 

739 value: str 

740 Text to write. 

741 encoding, errors, newline: same as `open`. 

742 """ 

743 with self.open( 

744 path, 

745 mode="w", 

746 encoding=encoding, 

747 errors=errors, 

748 newline=newline, 

749 **kwargs, 

750 ) as f: 

751 return f.write(value) 

752 

753 def cat_file(self, path, start=None, end=None, **kwargs): 

754 """Get the content of a file 

755 

756 Parameters 

757 ---------- 

758 path: URL of file on this filesystems 

759 start, end: int 

760 Bytes limits of the read. If negative, backwards from end, 

761 like usual python slices. Either can be None for start or 

762 end of file, respectively 

763 kwargs: passed to ``open()``. 

764 """ 

765 # explicitly set buffering off? 

766 with self.open(path, "rb", **kwargs) as f: 

767 if start is not None: 

768 if start >= 0: 

769 f.seek(start) 

770 else: 

771 f.seek(max(0, f.size + start)) 

772 if end is not None: 

773 if end < 0: 

774 end = f.size + end 

775 return f.read(end - f.tell()) 

776 return f.read() 

777 

778 def pipe_file(self, path, value, **kwargs): 

779 """Set the bytes of given file""" 

780 with self.open(path, "wb", **kwargs) as f: 

781 f.write(value) 

782 

783 def pipe(self, path, value=None, **kwargs): 

784 """Put value into path 

785 

786 (counterpart to ``cat``) 

787 

788 Parameters 

789 ---------- 

790 path: string or dict(str, bytes) 

791 If a string, a single remote location to put ``value`` bytes; if a dict, 

792 a mapping of {path: bytesvalue}. 

793 value: bytes, optional 

794 If using a single path, these are the bytes to put there. Ignored if 

795 ``path`` is a dict 

796 """ 

797 if isinstance(path, str): 

798 self.pipe_file(self._strip_protocol(path), value, **kwargs) 

799 elif isinstance(path, dict): 

800 for k, v in path.items(): 

801 self.pipe_file(self._strip_protocol(k), v, **kwargs) 

802 else: 

803 raise ValueError("path must be str or dict") 

804 

805 def cat_ranges( 

806 self, paths, starts, ends, max_gap=None, on_error="return", **kwargs 

807 ): 

808 """Get the contents of byte ranges from one or more files 

809 

810 Parameters 

811 ---------- 

812 paths: list 

813 A list of of filepaths on this filesystems 

814 starts, ends: int or list 

815 Bytes limits of the read. If using a single int, the same value will be 

816 used to read all the specified files. 

817 """ 

818 if max_gap is not None: 

819 raise NotImplementedError 

820 if not isinstance(paths, list): 

821 raise TypeError 

822 if not isinstance(starts, list): 

823 starts = [starts] * len(paths) 

824 if not isinstance(ends, list): 

825 ends = [ends] * len(paths) 

826 if len(starts) != len(paths) or len(ends) != len(paths): 

827 raise ValueError 

828 out = [] 

829 for p, s, e in zip(paths, starts, ends): 

830 try: 

831 out.append(self.cat_file(p, s, e)) 

832 except Exception as e: 

833 if on_error == "return": 

834 out.append(e) 

835 else: 

836 raise 

837 return out 

838 

839 def cat(self, path, recursive=False, on_error="raise", **kwargs): 

840 """Fetch (potentially multiple) paths' contents 

841 

842 Parameters 

843 ---------- 

844 recursive: bool 

845 If True, assume the path(s) are directories, and get all the 

846 contained files 

847 on_error : "raise", "omit", "return" 

848 If raise, an underlying exception will be raised (converted to KeyError 

849 if the type is in self.missing_exceptions); if omit, keys with exception 

850 will simply not be included in the output; if "return", all keys are 

851 included in the output, but the value will be bytes or an exception 

852 instance. 

853 kwargs: passed to cat_file 

854 

855 Returns 

856 ------- 

857 dict of {path: contents} if there are multiple paths 

858 or the path has been otherwise expanded 

859 """ 

860 paths = self.expand_path(path, recursive=recursive) 

861 if ( 

862 len(paths) > 1 

863 or isinstance(path, list) 

864 or paths[0] != self._strip_protocol(path) 

865 ): 

866 out = {} 

867 for path in paths: 

868 try: 

869 out[path] = self.cat_file(path, **kwargs) 

870 except Exception as e: 

871 if on_error == "raise": 

872 raise 

873 if on_error == "return": 

874 out[path] = e 

875 return out 

876 else: 

877 return self.cat_file(paths[0], **kwargs) 

878 

879 def get_file( 

880 self, rpath, lpath, callback=_DEFAULT_CALLBACK, outfile=None, **kwargs 

881 ): 

882 """Copy single remote file to local""" 

883 from .implementations.local import LocalFileSystem 

884 

885 if isfilelike(lpath): 

886 outfile = lpath 

887 elif self.isdir(rpath): 

888 os.makedirs(lpath, exist_ok=True) 

889 return None 

890 

891 fs = LocalFileSystem(auto_mkdir=True) 

892 fs.makedirs(fs._parent(lpath), exist_ok=True) 

893 

894 with self.open(rpath, "rb", **kwargs) as f1: 

895 if outfile is None: 

896 outfile = open(lpath, "wb") 

897 

898 try: 

899 callback.set_size(getattr(f1, "size", None)) 

900 data = True 

901 while data: 

902 data = f1.read(self.blocksize) 

903 segment_len = outfile.write(data) 

904 if segment_len is None: 

905 segment_len = len(data) 

906 callback.relative_update(segment_len) 

907 finally: 

908 if not isfilelike(lpath): 

909 outfile.close() 

910 

911 def get( 

912 self, 

913 rpath, 

914 lpath, 

915 recursive=False, 

916 callback=_DEFAULT_CALLBACK, 

917 maxdepth=None, 

918 **kwargs, 

919 ): 

920 """Copy file(s) to local. 

921 

922 Copies a specific file or tree of files (if recursive=True). If lpath 

923 ends with a "/", it will be assumed to be a directory, and target files 

924 will go within. Can submit a list of paths, which may be glob-patterns 

925 and will be expanded. 

926 

927 Calls get_file for each source. 

928 """ 

929 if isinstance(lpath, list) and isinstance(rpath, list): 

930 # No need to expand paths when both source and destination 

931 # are provided as lists 

932 rpaths = rpath 

933 lpaths = lpath 

934 else: 

935 from .implementations.local import ( 

936 LocalFileSystem, 

937 make_path_posix, 

938 trailing_sep, 

939 ) 

940 

941 source_is_str = isinstance(rpath, str) 

942 rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth) 

943 if source_is_str and (not recursive or maxdepth is not None): 

944 # Non-recursive glob does not copy directories 

945 rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))] 

946 if not rpaths: 

947 return 

948 

949 if isinstance(lpath, str): 

950 lpath = make_path_posix(lpath) 

951 

952 source_is_file = len(rpaths) == 1 

953 dest_is_dir = isinstance(lpath, str) and ( 

954 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

955 ) 

956 

957 exists = source_is_str and ( 

958 (has_magic(rpath) and source_is_file) 

959 or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath)) 

960 ) 

961 lpaths = other_paths( 

962 rpaths, 

963 lpath, 

964 exists=exists, 

965 flatten=not source_is_str, 

966 ) 

967 

968 callback.set_size(len(lpaths)) 

969 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): 

970 callback.branch(rpath, lpath, kwargs) 

971 self.get_file(rpath, lpath, **kwargs) 

972 

973 def put_file(self, lpath, rpath, callback=_DEFAULT_CALLBACK, **kwargs): 

974 """Copy single file to remote""" 

975 if os.path.isdir(lpath): 

976 self.makedirs(rpath, exist_ok=True) 

977 return None 

978 

979 with open(lpath, "rb") as f1: 

980 size = f1.seek(0, 2) 

981 callback.set_size(size) 

982 f1.seek(0) 

983 

984 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True) 

985 with self.open(rpath, "wb", **kwargs) as f2: 

986 while f1.tell() < size: 

987 data = f1.read(self.blocksize) 

988 segment_len = f2.write(data) 

989 if segment_len is None: 

990 segment_len = len(data) 

991 callback.relative_update(segment_len) 

992 

993 def put( 

994 self, 

995 lpath, 

996 rpath, 

997 recursive=False, 

998 callback=_DEFAULT_CALLBACK, 

999 maxdepth=None, 

1000 **kwargs, 

1001 ): 

1002 """Copy file(s) from local. 

1003 

1004 Copies a specific file or tree of files (if recursive=True). If rpath 

1005 ends with a "/", it will be assumed to be a directory, and target files 

1006 will go within. 

1007 

1008 Calls put_file for each source. 

1009 """ 

1010 if isinstance(lpath, list) and isinstance(rpath, list): 

1011 # No need to expand paths when both source and destination 

1012 # are provided as lists 

1013 rpaths = rpath 

1014 lpaths = lpath 

1015 else: 

1016 from .implementations.local import ( 

1017 LocalFileSystem, 

1018 make_path_posix, 

1019 trailing_sep, 

1020 ) 

1021 

1022 source_is_str = isinstance(lpath, str) 

1023 if source_is_str: 

1024 lpath = make_path_posix(lpath) 

1025 fs = LocalFileSystem() 

1026 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

1027 if source_is_str and (not recursive or maxdepth is not None): 

1028 # Non-recursive glob does not copy directories 

1029 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

1030 if not lpaths: 

1031 return 

1032 

1033 source_is_file = len(lpaths) == 1 

1034 dest_is_dir = isinstance(rpath, str) and ( 

1035 trailing_sep(rpath) or self.isdir(rpath) 

1036 ) 

1037 

1038 rpath = ( 

1039 self._strip_protocol(rpath) 

1040 if isinstance(rpath, str) 

1041 else [self._strip_protocol(p) for p in rpath] 

1042 ) 

1043 exists = source_is_str and ( 

1044 (has_magic(lpath) and source_is_file) 

1045 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) 

1046 ) 

1047 rpaths = other_paths( 

1048 lpaths, 

1049 rpath, 

1050 exists=exists, 

1051 flatten=not source_is_str, 

1052 ) 

1053 

1054 callback.set_size(len(rpaths)) 

1055 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): 

1056 callback.branch(lpath, rpath, kwargs) 

1057 self.put_file(lpath, rpath, **kwargs) 

1058 

1059 def head(self, path, size=1024): 

1060 """Get the first ``size`` bytes from file""" 

1061 with self.open(path, "rb") as f: 

1062 return f.read(size) 

1063 

1064 def tail(self, path, size=1024): 

1065 """Get the last ``size`` bytes from file""" 

1066 with self.open(path, "rb") as f: 

1067 f.seek(max(-size, -f.size), 2) 

1068 return f.read() 

1069 

1070 def cp_file(self, path1, path2, **kwargs): 

1071 raise NotImplementedError 

1072 

1073 def copy( 

1074 self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs 

1075 ): 

1076 """Copy within two locations in the filesystem 

1077 

1078 on_error : "raise", "ignore" 

1079 If raise, any not-found exceptions will be raised; if ignore any 

1080 not-found exceptions will cause the path to be skipped; defaults to 

1081 raise unless recursive is true, where the default is ignore 

1082 """ 

1083 if on_error is None and recursive: 

1084 on_error = "ignore" 

1085 elif on_error is None: 

1086 on_error = "raise" 

1087 

1088 if isinstance(path1, list) and isinstance(path2, list): 

1089 # No need to expand paths when both source and destination 

1090 # are provided as lists 

1091 paths1 = path1 

1092 paths2 = path2 

1093 else: 

1094 from .implementations.local import trailing_sep 

1095 

1096 source_is_str = isinstance(path1, str) 

1097 paths1 = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth) 

1098 if source_is_str and (not recursive or maxdepth is not None): 

1099 # Non-recursive glob does not copy directories 

1100 paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))] 

1101 if not paths1: 

1102 return 

1103 

1104 source_is_file = len(paths1) == 1 

1105 dest_is_dir = isinstance(path2, str) and ( 

1106 trailing_sep(path2) or self.isdir(path2) 

1107 ) 

1108 

1109 exists = source_is_str and ( 

1110 (has_magic(path1) and source_is_file) 

1111 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) 

1112 ) 

1113 paths2 = other_paths( 

1114 paths1, 

1115 path2, 

1116 exists=exists, 

1117 flatten=not source_is_str, 

1118 ) 

1119 

1120 for p1, p2 in zip(paths1, paths2): 

1121 try: 

1122 self.cp_file(p1, p2, **kwargs) 

1123 except FileNotFoundError: 

1124 if on_error == "raise": 

1125 raise 

1126 

1127 def expand_path(self, path, recursive=False, maxdepth=None, **kwargs): 

1128 """Turn one or more globs or directories into a list of all matching paths 

1129 to files or directories. 

1130 

1131 kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls`` 

1132 """ 

1133 

1134 if maxdepth is not None and maxdepth < 1: 

1135 raise ValueError("maxdepth must be at least 1") 

1136 

1137 if isinstance(path, str): 

1138 out = self.expand_path([path], recursive, maxdepth) 

1139 else: 

1140 out = set() 

1141 path = [self._strip_protocol(p) for p in path] 

1142 for p in path: 

1143 if has_magic(p): 

1144 bit = set(self.glob(p, maxdepth=maxdepth, **kwargs)) 

1145 out |= bit 

1146 if recursive: 

1147 # glob call above expanded one depth so if maxdepth is defined 

1148 # then decrement it in expand_path call below. If it is zero 

1149 # after decrementing then avoid expand_path call. 

1150 if maxdepth is not None and maxdepth <= 1: 

1151 continue 

1152 out |= set( 

1153 self.expand_path( 

1154 list(bit), 

1155 recursive=recursive, 

1156 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

1157 **kwargs, 

1158 ) 

1159 ) 

1160 continue 

1161 elif recursive: 

1162 rec = set( 

1163 self.find( 

1164 p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs 

1165 ) 

1166 ) 

1167 out |= rec 

1168 if p not in out and (recursive is False or self.exists(p)): 

1169 # should only check once, for the root 

1170 out.add(p) 

1171 if not out: 

1172 raise FileNotFoundError(path) 

1173 return sorted(out) 

1174 

1175 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): 

1176 """Move file(s) from one location to another""" 

1177 if path1 == path2: 

1178 logger.debug("%s mv: The paths are the same, so no files were moved.", self) 

1179 else: 

1180 self.copy(path1, path2, recursive=recursive, maxdepth=maxdepth) 

1181 self.rm(path1, recursive=recursive) 

1182 

1183 def rm_file(self, path): 

1184 """Delete a file""" 

1185 self._rm(path) 

1186 

1187 def _rm(self, path): 

1188 """Delete one file""" 

1189 # this is the old name for the method, prefer rm_file 

1190 raise NotImplementedError 

1191 

1192 def rm(self, path, recursive=False, maxdepth=None): 

1193 """Delete files. 

1194 

1195 Parameters 

1196 ---------- 

1197 path: str or list of str 

1198 File(s) to delete. 

1199 recursive: bool 

1200 If file(s) are directories, recursively delete contents and then 

1201 also remove the directory 

1202 maxdepth: int or None 

1203 Depth to pass to walk for finding files to delete, if recursive. 

1204 If None, there will be no limit and infinite recursion may be 

1205 possible. 

1206 """ 

1207 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) 

1208 for p in reversed(path): 

1209 self.rm_file(p) 

1210 

1211 @classmethod 

1212 def _parent(cls, path): 

1213 path = cls._strip_protocol(path) 

1214 if "/" in path: 

1215 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker) 

1216 return cls.root_marker + parent 

1217 else: 

1218 return cls.root_marker 

1219 

1220 def _open( 

1221 self, 

1222 path, 

1223 mode="rb", 

1224 block_size=None, 

1225 autocommit=True, 

1226 cache_options=None, 

1227 **kwargs, 

1228 ): 

1229 """Return raw bytes-mode file-like from the file-system""" 

1230 return AbstractBufferedFile( 

1231 self, 

1232 path, 

1233 mode, 

1234 block_size, 

1235 autocommit, 

1236 cache_options=cache_options, 

1237 **kwargs, 

1238 ) 

1239 

1240 def open( 

1241 self, 

1242 path, 

1243 mode="rb", 

1244 block_size=None, 

1245 cache_options=None, 

1246 compression=None, 

1247 **kwargs, 

1248 ): 

1249 """ 

1250 Return a file-like object from the filesystem 

1251 

1252 The resultant instance must function correctly in a context ``with`` 

1253 block. 

1254 

1255 Parameters 

1256 ---------- 

1257 path: str 

1258 Target file 

1259 mode: str like 'rb', 'w' 

1260 See builtin ``open()`` 

1261 block_size: int 

1262 Some indication of buffering - this is a value in bytes 

1263 cache_options : dict, optional 

1264 Extra arguments to pass through to the cache. 

1265 compression: string or None 

1266 If given, open file using compression codec. Can either be a compression 

1267 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 

1268 compression from the filename suffix. 

1269 encoding, errors, newline: passed on to TextIOWrapper for text mode 

1270 """ 

1271 import io 

1272 

1273 path = self._strip_protocol(path) 

1274 if "b" not in mode: 

1275 mode = mode.replace("t", "") + "b" 

1276 

1277 text_kwargs = { 

1278 k: kwargs.pop(k) 

1279 for k in ["encoding", "errors", "newline"] 

1280 if k in kwargs 

1281 } 

1282 return io.TextIOWrapper( 

1283 self.open( 

1284 path, 

1285 mode, 

1286 block_size=block_size, 

1287 cache_options=cache_options, 

1288 compression=compression, 

1289 **kwargs, 

1290 ), 

1291 **text_kwargs, 

1292 ) 

1293 else: 

1294 ac = kwargs.pop("autocommit", not self._intrans) 

1295 f = self._open( 

1296 path, 

1297 mode=mode, 

1298 block_size=block_size, 

1299 autocommit=ac, 

1300 cache_options=cache_options, 

1301 **kwargs, 

1302 ) 

1303 if compression is not None: 

1304 from fsspec.compression import compr 

1305 from fsspec.core import get_compression 

1306 

1307 compression = get_compression(path, compression) 

1308 compress = compr[compression] 

1309 f = compress(f, mode=mode[0]) 

1310 

1311 if not ac and "r" not in mode: 

1312 self.transaction.files.append(f) 

1313 return f 

1314 

1315 def touch(self, path, truncate=True, **kwargs): 

1316 """Create empty file, or update timestamp 

1317 

1318 Parameters 

1319 ---------- 

1320 path: str 

1321 file location 

1322 truncate: bool 

1323 If True, always set file size to 0; if False, update timestamp and 

1324 leave file unchanged, if backend allows this 

1325 """ 

1326 if truncate or not self.exists(path): 

1327 with self.open(path, "wb", **kwargs): 

1328 pass 

1329 else: 

1330 raise NotImplementedError # update timestamp, if possible 

1331 

1332 def ukey(self, path): 

1333 """Hash of file properties, to tell if it has changed""" 

1334 return sha256(str(self.info(path)).encode()).hexdigest() 

1335 

1336 def read_block(self, fn, offset, length, delimiter=None): 

1337 """Read a block of bytes from 

1338 

1339 Starting at ``offset`` of the file, read ``length`` bytes. If 

1340 ``delimiter`` is set then we ensure that the read starts and stops at 

1341 delimiter boundaries that follow the locations ``offset`` and ``offset 

1342 + length``. If ``offset`` is zero then we start at zero. The 

1343 bytestring returned WILL include the end delimiter string. 

1344 

1345 If offset+length is beyond the eof, reads to eof. 

1346 

1347 Parameters 

1348 ---------- 

1349 fn: string 

1350 Path to filename 

1351 offset: int 

1352 Byte offset to start read 

1353 length: int 

1354 Number of bytes to read. If None, read to end. 

1355 delimiter: bytes (optional) 

1356 Ensure reading starts and stops at delimiter bytestring 

1357 

1358 Examples 

1359 -------- 

1360 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP 

1361 b'Alice, 100\\nBo' 

1362 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP 

1363 b'Alice, 100\\nBob, 200\\n' 

1364 

1365 Use ``length=None`` to read to the end of the file. 

1366 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP 

1367 b'Alice, 100\\nBob, 200\\nCharlie, 300' 

1368 

1369 See Also 

1370 -------- 

1371 :func:`fsspec.utils.read_block` 

1372 """ 

1373 with self.open(fn, "rb") as f: 

1374 size = f.size 

1375 if length is None: 

1376 length = size 

1377 if size is not None and offset + length > size: 

1378 length = size - offset 

1379 return read_block(f, offset, length, delimiter) 

1380 

1381 def to_json(self): 

1382 """ 

1383 JSON representation of this filesystem instance 

1384 

1385 Returns 

1386 ------- 

1387 str: JSON structure with keys cls (the python location of this class), 

1388 protocol (text name of this class's protocol, first one in case of 

1389 multiple), args (positional args, usually empty), and all other 

1390 kwargs as their own keys. 

1391 """ 

1392 import json 

1393 

1394 cls = type(self) 

1395 cls = ".".join((cls.__module__, cls.__name__)) 

1396 proto = ( 

1397 self.protocol[0] 

1398 if isinstance(self.protocol, (tuple, list)) 

1399 else self.protocol 

1400 ) 

1401 return json.dumps( 

1402 dict( 

1403 **{"cls": cls, "protocol": proto, "args": self.storage_args}, 

1404 **self.storage_options, 

1405 ) 

1406 ) 

1407 

1408 @staticmethod 

1409 def from_json(blob): 

1410 """ 

1411 Recreate a filesystem instance from JSON representation 

1412 

1413 See ``.to_json()`` for the expected structure of the input 

1414 

1415 Parameters 

1416 ---------- 

1417 blob: str 

1418 

1419 Returns 

1420 ------- 

1421 file system instance, not necessarily of this particular class. 

1422 """ 

1423 import json 

1424 

1425 from .registry import _import_class, get_filesystem_class 

1426 

1427 dic = json.loads(blob) 

1428 protocol = dic.pop("protocol") 

1429 try: 

1430 cls = _import_class(dic.pop("cls")) 

1431 except (ImportError, ValueError, RuntimeError, KeyError): 

1432 cls = get_filesystem_class(protocol) 

1433 return cls(*dic.pop("args", ()), **dic) 

1434 

1435 def _get_pyarrow_filesystem(self): 

1436 """ 

1437 Make a version of the FS instance which will be acceptable to pyarrow 

1438 """ 

1439 # all instances already also derive from pyarrow 

1440 return self 

1441 

1442 def get_mapper(self, root="", check=False, create=False, missing_exceptions=None): 

1443 """Create key/value store based on this file-system 

1444 

1445 Makes a MutableMapping interface to the FS at the given root path. 

1446 See ``fsspec.mapping.FSMap`` for further details. 

1447 """ 

1448 from .mapping import FSMap 

1449 

1450 return FSMap( 

1451 root, 

1452 self, 

1453 check=check, 

1454 create=create, 

1455 missing_exceptions=missing_exceptions, 

1456 ) 

1457 

1458 @classmethod 

1459 def clear_instance_cache(cls): 

1460 """ 

1461 Clear the cache of filesystem instances. 

1462 

1463 Notes 

1464 ----- 

1465 Unless overridden by setting the ``cachable`` class attribute to False, 

1466 the filesystem class stores a reference to newly created instances. This 

1467 prevents Python's normal rules around garbage collection from working, 

1468 since the instances refcount will not drop to zero until 

1469 ``clear_instance_cache`` is called. 

1470 """ 

1471 cls._cache.clear() 

1472 

1473 def created(self, path): 

1474 """Return the created timestamp of a file as a datetime.datetime""" 

1475 raise NotImplementedError 

1476 

1477 def modified(self, path): 

1478 """Return the modified timestamp of a file as a datetime.datetime""" 

1479 raise NotImplementedError 

1480 

1481 # ------------------------------------------------------------------------ 

1482 # Aliases 

1483 

1484 def read_bytes(self, path, start=None, end=None, **kwargs): 

1485 """Alias of `AbstractFileSystem.cat_file`.""" 

1486 return self.cat_file(path, start=start, end=end, **kwargs) 

1487 

1488 def write_bytes(self, path, value, **kwargs): 

1489 """Alias of `AbstractFileSystem.pipe_file`.""" 

1490 self.pipe_file(path, value, **kwargs) 

1491 

1492 def makedir(self, path, create_parents=True, **kwargs): 

1493 """Alias of `AbstractFileSystem.mkdir`.""" 

1494 return self.mkdir(path, create_parents=create_parents, **kwargs) 

1495 

1496 def mkdirs(self, path, exist_ok=False): 

1497 """Alias of `AbstractFileSystem.makedirs`.""" 

1498 return self.makedirs(path, exist_ok=exist_ok) 

1499 

1500 def listdir(self, path, detail=True, **kwargs): 

1501 """Alias of `AbstractFileSystem.ls`.""" 

1502 return self.ls(path, detail=detail, **kwargs) 

1503 

1504 def cp(self, path1, path2, **kwargs): 

1505 """Alias of `AbstractFileSystem.copy`.""" 

1506 return self.copy(path1, path2, **kwargs) 

1507 

1508 def move(self, path1, path2, **kwargs): 

1509 """Alias of `AbstractFileSystem.mv`.""" 

1510 return self.mv(path1, path2, **kwargs) 

1511 

1512 def stat(self, path, **kwargs): 

1513 """Alias of `AbstractFileSystem.info`.""" 

1514 return self.info(path, **kwargs) 

1515 

1516 def disk_usage(self, path, total=True, maxdepth=None, **kwargs): 

1517 """Alias of `AbstractFileSystem.du`.""" 

1518 return self.du(path, total=total, maxdepth=maxdepth, **kwargs) 

1519 

1520 def rename(self, path1, path2, **kwargs): 

1521 """Alias of `AbstractFileSystem.mv`.""" 

1522 return self.mv(path1, path2, **kwargs) 

1523 

1524 def delete(self, path, recursive=False, maxdepth=None): 

1525 """Alias of `AbstractFileSystem.rm`.""" 

1526 return self.rm(path, recursive=recursive, maxdepth=maxdepth) 

1527 

1528 def upload(self, lpath, rpath, recursive=False, **kwargs): 

1529 """Alias of `AbstractFileSystem.put`.""" 

1530 return self.put(lpath, rpath, recursive=recursive, **kwargs) 

1531 

1532 def download(self, rpath, lpath, recursive=False, **kwargs): 

1533 """Alias of `AbstractFileSystem.get`.""" 

1534 return self.get(rpath, lpath, recursive=recursive, **kwargs) 

1535 

1536 def sign(self, path, expiration=100, **kwargs): 

1537 """Create a signed URL representing the given path 

1538 

1539 Some implementations allow temporary URLs to be generated, as a 

1540 way of delegating credentials. 

1541 

1542 Parameters 

1543 ---------- 

1544 path : str 

1545 The path on the filesystem 

1546 expiration : int 

1547 Number of seconds to enable the URL for (if supported) 

1548 

1549 Returns 

1550 ------- 

1551 URL : str 

1552 The signed URL 

1553 

1554 Raises 

1555 ------ 

1556 NotImplementedError : if method is not implemented for a filesystem 

1557 """ 

1558 raise NotImplementedError("Sign is not implemented for this filesystem") 

1559 

1560 def _isfilestore(self): 

1561 # Originally inherited from pyarrow DaskFileSystem. Keeping this 

1562 # here for backwards compatibility as long as pyarrow uses its 

1563 # legacy fsspec-compatible filesystems and thus accepts fsspec 

1564 # filesystems as well 

1565 return False 

1566 

1567 

1568class AbstractBufferedFile(io.IOBase): 

1569 """Convenient class to derive from to provide buffering 

1570 

1571 In the case that the backend does not provide a pythonic file-like object 

1572 already, this class contains much of the logic to build one. The only 

1573 methods that need to be overridden are ``_upload_chunk``, 

1574 ``_initiate_upload`` and ``_fetch_range``. 

1575 """ 

1576 

1577 DEFAULT_BLOCK_SIZE = 5 * 2**20 

1578 _details = None 

1579 

1580 def __init__( 

1581 self, 

1582 fs, 

1583 path, 

1584 mode="rb", 

1585 block_size="default", 

1586 autocommit=True, 

1587 cache_type="readahead", 

1588 cache_options=None, 

1589 size=None, 

1590 **kwargs, 

1591 ): 

1592 """ 

1593 Template for files with buffered reading and writing 

1594 

1595 Parameters 

1596 ---------- 

1597 fs: instance of FileSystem 

1598 path: str 

1599 location in file-system 

1600 mode: str 

1601 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file 

1602 systems may be read-only, and some may not support append. 

1603 block_size: int 

1604 Buffer size for reading or writing, 'default' for class default 

1605 autocommit: bool 

1606 Whether to write to final destination; may only impact what 

1607 happens when file is being closed. 

1608 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead" 

1609 Caching policy in read mode. See the definitions in ``core``. 

1610 cache_options : dict 

1611 Additional options passed to the constructor for the cache specified 

1612 by `cache_type`. 

1613 size: int 

1614 If given and in read mode, suppressed having to look up the file size 

1615 kwargs: 

1616 Gets stored as self.kwargs 

1617 """ 

1618 from .core import caches 

1619 

1620 self.path = path 

1621 self.fs = fs 

1622 self.mode = mode 

1623 self.blocksize = ( 

1624 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size 

1625 ) 

1626 self.loc = 0 

1627 self.autocommit = autocommit 

1628 self.end = None 

1629 self.start = None 

1630 self.closed = False 

1631 

1632 if cache_options is None: 

1633 cache_options = {} 

1634 

1635 if "trim" in kwargs: 

1636 warnings.warn( 

1637 "Passing 'trim' to control the cache behavior has been deprecated. " 

1638 "Specify it within the 'cache_options' argument instead.", 

1639 FutureWarning, 

1640 ) 

1641 cache_options["trim"] = kwargs.pop("trim") 

1642 

1643 self.kwargs = kwargs 

1644 

1645 if mode not in {"ab", "rb", "wb"}: 

1646 raise NotImplementedError("File mode not supported") 

1647 if mode == "rb": 

1648 if size is not None: 

1649 self.size = size 

1650 else: 

1651 self.size = self.details["size"] 

1652 self.cache = caches[cache_type]( 

1653 self.blocksize, self._fetch_range, self.size, **cache_options 

1654 ) 

1655 else: 

1656 self.buffer = io.BytesIO() 

1657 self.offset = None 

1658 self.forced = False 

1659 self.location = None 

1660 

1661 @property 

1662 def details(self): 

1663 if self._details is None: 

1664 self._details = self.fs.info(self.path) 

1665 return self._details 

1666 

1667 @details.setter 

1668 def details(self, value): 

1669 self._details = value 

1670 self.size = value["size"] 

1671 

1672 @property 

1673 def full_name(self): 

1674 return _unstrip_protocol(self.path, self.fs) 

1675 

1676 @property 

1677 def closed(self): 

1678 # get around this attr being read-only in IOBase 

1679 # use getattr here, since this can be called during del 

1680 return getattr(self, "_closed", True) 

1681 

1682 @closed.setter 

1683 def closed(self, c): 

1684 self._closed = c 

1685 

1686 def __hash__(self): 

1687 if "w" in self.mode: 

1688 return id(self) 

1689 else: 

1690 return int(tokenize(self.details), 16) 

1691 

1692 def __eq__(self, other): 

1693 """Files are equal if they have the same checksum, only in read mode""" 

1694 return self.mode == "rb" and other.mode == "rb" and hash(self) == hash(other) 

1695 

1696 def commit(self): 

1697 """Move from temp to final destination""" 

1698 

1699 def discard(self): 

1700 """Throw away temporary file""" 

1701 

1702 def info(self): 

1703 """File information about this path""" 

1704 if "r" in self.mode: 

1705 return self.details 

1706 else: 

1707 raise ValueError("Info not available while writing") 

1708 

1709 def tell(self): 

1710 """Current file location""" 

1711 return self.loc 

1712 

1713 def seek(self, loc, whence=0): 

1714 """Set current file location 

1715 

1716 Parameters 

1717 ---------- 

1718 loc: int 

1719 byte location 

1720 whence: {0, 1, 2} 

1721 from start of file, current location or end of file, resp. 

1722 """ 

1723 loc = int(loc) 

1724 if not self.mode == "rb": 

1725 raise OSError(ESPIPE, "Seek only available in read mode") 

1726 if whence == 0: 

1727 nloc = loc 

1728 elif whence == 1: 

1729 nloc = self.loc + loc 

1730 elif whence == 2: 

1731 nloc = self.size + loc 

1732 else: 

1733 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)") 

1734 if nloc < 0: 

1735 raise ValueError("Seek before start of file") 

1736 self.loc = nloc 

1737 return self.loc 

1738 

1739 def write(self, data): 

1740 """ 

1741 Write data to buffer. 

1742 

1743 Buffer only sent on flush() or if buffer is greater than 

1744 or equal to blocksize. 

1745 

1746 Parameters 

1747 ---------- 

1748 data: bytes 

1749 Set of bytes to be written. 

1750 """ 

1751 if self.mode not in {"wb", "ab"}: 

1752 raise ValueError("File not in write mode") 

1753 if self.closed: 

1754 raise ValueError("I/O operation on closed file.") 

1755 if self.forced: 

1756 raise ValueError("This file has been force-flushed, can only close") 

1757 out = self.buffer.write(data) 

1758 self.loc += out 

1759 if self.buffer.tell() >= self.blocksize: 

1760 self.flush() 

1761 return out 

1762 

1763 def flush(self, force=False): 

1764 """ 

1765 Write buffered data to backend store. 

1766 

1767 Writes the current buffer, if it is larger than the block-size, or if 

1768 the file is being closed. 

1769 

1770 Parameters 

1771 ---------- 

1772 force: bool 

1773 When closing, write the last block even if it is smaller than 

1774 blocks are allowed to be. Disallows further writing to this file. 

1775 """ 

1776 

1777 if self.closed: 

1778 raise ValueError("Flush on closed file") 

1779 if force and self.forced: 

1780 raise ValueError("Force flush cannot be called more than once") 

1781 if force: 

1782 self.forced = True 

1783 

1784 if self.mode not in {"wb", "ab"}: 

1785 # no-op to flush on read-mode 

1786 return 

1787 

1788 if not force and self.buffer.tell() < self.blocksize: 

1789 # Defer write on small block 

1790 return 

1791 

1792 if self.offset is None: 

1793 # Initialize a multipart upload 

1794 self.offset = 0 

1795 try: 

1796 self._initiate_upload() 

1797 except: # noqa: E722 

1798 self.closed = True 

1799 raise 

1800 

1801 if self._upload_chunk(final=force) is not False: 

1802 self.offset += self.buffer.seek(0, 2) 

1803 self.buffer = io.BytesIO() 

1804 

1805 def _upload_chunk(self, final=False): 

1806 """Write one part of a multi-block file upload 

1807 

1808 Parameters 

1809 ========== 

1810 final: bool 

1811 This is the last block, so should complete file, if 

1812 self.autocommit is True. 

1813 """ 

1814 # may not yet have been initialized, may need to call _initialize_upload 

1815 

1816 def _initiate_upload(self): 

1817 """Create remote file/upload""" 

1818 pass 

1819 

1820 def _fetch_range(self, start, end): 

1821 """Get the specified set of bytes from remote""" 

1822 raise NotImplementedError 

1823 

1824 def read(self, length=-1): 

1825 """ 

1826 Return data from cache, or fetch pieces as necessary 

1827 

1828 Parameters 

1829 ---------- 

1830 length: int (-1) 

1831 Number of bytes to read; if <0, all remaining bytes. 

1832 """ 

1833 length = -1 if length is None else int(length) 

1834 if self.mode != "rb": 

1835 raise ValueError("File not in read mode") 

1836 if length < 0: 

1837 length = self.size - self.loc 

1838 if self.closed: 

1839 raise ValueError("I/O operation on closed file.") 

1840 logger.debug("%s read: %i - %i", self, self.loc, self.loc + length) 

1841 if length == 0: 

1842 # don't even bother calling fetch 

1843 return b"" 

1844 out = self.cache._fetch(self.loc, self.loc + length) 

1845 self.loc += len(out) 

1846 return out 

1847 

1848 def readinto(self, b): 

1849 """mirrors builtin file's readinto method 

1850 

1851 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto 

1852 """ 

1853 out = memoryview(b).cast("B") 

1854 data = self.read(out.nbytes) 

1855 out[: len(data)] = data 

1856 return len(data) 

1857 

1858 def readuntil(self, char=b"\n", blocks=None): 

1859 """Return data between current position and first occurrence of char 

1860 

1861 char is included in the output, except if the end of the tile is 

1862 encountered first. 

1863 

1864 Parameters 

1865 ---------- 

1866 char: bytes 

1867 Thing to find 

1868 blocks: None or int 

1869 How much to read in each go. Defaults to file blocksize - which may 

1870 mean a new read on every call. 

1871 """ 

1872 out = [] 

1873 while True: 

1874 start = self.tell() 

1875 part = self.read(blocks or self.blocksize) 

1876 if len(part) == 0: 

1877 break 

1878 found = part.find(char) 

1879 if found > -1: 

1880 out.append(part[: found + len(char)]) 

1881 self.seek(start + found + len(char)) 

1882 break 

1883 out.append(part) 

1884 return b"".join(out) 

1885 

1886 def readline(self): 

1887 """Read until first occurrence of newline character 

1888 

1889 Note that, because of character encoding, this is not necessarily a 

1890 true line ending. 

1891 """ 

1892 return self.readuntil(b"\n") 

1893 

1894 def __next__(self): 

1895 out = self.readline() 

1896 if out: 

1897 return out 

1898 raise StopIteration 

1899 

1900 def __iter__(self): 

1901 return self 

1902 

1903 def readlines(self): 

1904 """Return all data, split by the newline character""" 

1905 data = self.read() 

1906 lines = data.split(b"\n") 

1907 out = [l + b"\n" for l in lines[:-1]] 

1908 if data.endswith(b"\n"): 

1909 return out 

1910 else: 

1911 return out + [lines[-1]] 

1912 # return list(self) ??? 

1913 

1914 def readinto1(self, b): 

1915 return self.readinto(b) 

1916 

1917 def close(self): 

1918 """Close file 

1919 

1920 Finalizes writes, discards cache 

1921 """ 

1922 if getattr(self, "_unclosable", False): 

1923 return 

1924 if self.closed: 

1925 return 

1926 if self.mode == "rb": 

1927 self.cache = None 

1928 else: 

1929 if not self.forced: 

1930 self.flush(force=True) 

1931 

1932 if self.fs is not None: 

1933 self.fs.invalidate_cache(self.path) 

1934 self.fs.invalidate_cache(self.fs._parent(self.path)) 

1935 

1936 self.closed = True 

1937 

1938 def readable(self): 

1939 """Whether opened for reading""" 

1940 return self.mode == "rb" and not self.closed 

1941 

1942 def seekable(self): 

1943 """Whether is seekable (only in read mode)""" 

1944 return self.readable() 

1945 

1946 def writable(self): 

1947 """Whether opened for writing""" 

1948 return self.mode in {"wb", "ab"} and not self.closed 

1949 

1950 def __del__(self): 

1951 if not self.closed: 

1952 self.close() 

1953 

1954 def __str__(self): 

1955 return f"<File-like object {type(self.fs).__name__}, {self.path}>" 

1956 

1957 __repr__ = __str__ 

1958 

1959 def __enter__(self): 

1960 return self 

1961 

1962 def __exit__(self, *args): 

1963 self.close()