Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/spec.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

822 statements  

1from __future__ import annotations 

2 

3import io 

4import json 

5import logging 

6import os 

7import threading 

8import warnings 

9import weakref 

10from errno import ESPIPE 

11from glob import has_magic 

12from hashlib import sha256 

13from typing import Any, ClassVar, Dict, Tuple 

14 

15from .callbacks import DEFAULT_CALLBACK 

16from .config import apply_config, conf 

17from .dircache import DirCache 

18from .transaction import Transaction 

19from .utils import ( 

20 _unstrip_protocol, 

21 glob_translate, 

22 isfilelike, 

23 other_paths, 

24 read_block, 

25 stringify_path, 

26 tokenize, 

27) 

28 

29logger = logging.getLogger("fsspec") 

30 

31 

32def make_instance(cls, args, kwargs): 

33 return cls(*args, **kwargs) 

34 

35 

36class _Cached(type): 

37 """ 

38 Metaclass for caching file system instances. 

39 

40 Notes 

41 ----- 

42 Instances are cached according to 

43 

44 * The values of the class attributes listed in `_extra_tokenize_attributes` 

45 * The arguments passed to ``__init__``. 

46 

47 This creates an additional reference to the filesystem, which prevents the 

48 filesystem from being garbage collected when all *user* references go away. 

49 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also* 

50 be made for a filesystem instance to be garbage collected. 

51 """ 

52 

53 def __init__(cls, *args, **kwargs): 

54 super().__init__(*args, **kwargs) 

55 # Note: we intentionally create a reference here, to avoid garbage 

56 # collecting instances when all other references are gone. To really 

57 # delete a FileSystem, the cache must be cleared. 

58 if conf.get("weakref_instance_cache"): # pragma: no cover 

59 # debug option for analysing fork/spawn conditions 

60 cls._cache = weakref.WeakValueDictionary() 

61 else: 

62 cls._cache = {} 

63 cls._pid = os.getpid() 

64 

65 def __call__(cls, *args, **kwargs): 

66 kwargs = apply_config(cls, kwargs) 

67 extra_tokens = tuple( 

68 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes 

69 ) 

70 token = tokenize( 

71 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs 

72 ) 

73 skip = kwargs.pop("skip_instance_cache", False) 

74 if os.getpid() != cls._pid: 

75 cls._cache.clear() 

76 cls._pid = os.getpid() 

77 if not skip and cls.cachable and token in cls._cache: 

78 cls._latest = token 

79 return cls._cache[token] 

80 else: 

81 obj = super().__call__(*args, **kwargs) 

82 # Setting _fs_token here causes some static linters to complain. 

83 obj._fs_token_ = token 

84 obj.storage_args = args 

85 obj.storage_options = kwargs 

86 if obj.async_impl and obj.mirror_sync_methods: 

87 from .asyn import mirror_sync_methods 

88 

89 mirror_sync_methods(obj) 

90 

91 if cls.cachable and not skip: 

92 cls._latest = token 

93 cls._cache[token] = obj 

94 return obj 

95 

96 

97class AbstractFileSystem(metaclass=_Cached): 

98 """ 

99 An abstract super-class for pythonic file-systems 

100 

101 Implementations are expected to be compatible with or, better, subclass 

102 from here. 

103 """ 

104 

105 cachable = True # this class can be cached, instances reused 

106 _cached = False 

107 blocksize = 2**22 

108 sep = "/" 

109 protocol: ClassVar[str | tuple[str, ...]] = "abstract" 

110 _latest = None 

111 async_impl = False 

112 mirror_sync_methods = False 

113 root_marker = "" # For some FSs, may require leading '/' or other character 

114 transaction_type = Transaction 

115 

116 #: Extra *class attributes* that should be considered when hashing. 

117 _extra_tokenize_attributes = () 

118 

119 # Set by _Cached metaclass 

120 storage_args: Tuple[Any, ...] 

121 storage_options: Dict[str, Any] 

122 

123 def __init__(self, *args, **storage_options): 

124 """Create and configure file-system instance 

125 

126 Instances may be cachable, so if similar enough arguments are seen 

127 a new instance is not required. The token attribute exists to allow 

128 implementations to cache instances if they wish. 

129 

130 A reasonable default should be provided if there are no arguments. 

131 

132 Subclasses should call this method. 

133 

134 Parameters 

135 ---------- 

136 use_listings_cache, listings_expiry_time, max_paths: 

137 passed to ``DirCache``, if the implementation supports 

138 directory listing caching. Pass use_listings_cache=False 

139 to disable such caching. 

140 skip_instance_cache: bool 

141 If this is a cachable implementation, pass True here to force 

142 creating a new instance even if a matching instance exists, and prevent 

143 storing this instance. 

144 asynchronous: bool 

145 loop: asyncio-compatible IOLoop or None 

146 """ 

147 if self._cached: 

148 # reusing instance, don't change 

149 return 

150 self._cached = True 

151 self._intrans = False 

152 self._transaction = None 

153 self._invalidated_caches_in_transaction = [] 

154 self.dircache = DirCache(**storage_options) 

155 

156 if storage_options.pop("add_docs", None): 

157 warnings.warn("add_docs is no longer supported.", FutureWarning) 

158 

159 if storage_options.pop("add_aliases", None): 

160 warnings.warn("add_aliases has been removed.", FutureWarning) 

161 # This is set in _Cached 

162 self._fs_token_ = None 

163 

164 @property 

165 def fsid(self): 

166 """Persistent filesystem id that can be used to compare filesystems 

167 across sessions. 

168 """ 

169 raise NotImplementedError 

170 

171 @property 

172 def _fs_token(self): 

173 return self._fs_token_ 

174 

175 def __dask_tokenize__(self): 

176 return self._fs_token 

177 

178 def __hash__(self): 

179 return int(self._fs_token, 16) 

180 

181 def __eq__(self, other): 

182 return isinstance(other, type(self)) and self._fs_token == other._fs_token 

183 

184 def __reduce__(self): 

185 return make_instance, (type(self), self.storage_args, self.storage_options) 

186 

187 @classmethod 

188 def _strip_protocol(cls, path): 

189 """Turn path from fully-qualified to file-system-specific 

190 

191 May require FS-specific handling, e.g., for relative paths or links. 

192 """ 

193 if isinstance(path, list): 

194 return [cls._strip_protocol(p) for p in path] 

195 path = stringify_path(path) 

196 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol 

197 for protocol in protos: 

198 if path.startswith(protocol + "://"): 

199 path = path[len(protocol) + 3 :] 

200 elif path.startswith(protocol + "::"): 

201 path = path[len(protocol) + 2 :] 

202 path = path.rstrip("/") 

203 # use of root_marker to make minimum required path, e.g., "/" 

204 return path or cls.root_marker 

205 

206 def unstrip_protocol(self, name: str) -> str: 

207 """Format FS-specific path to generic, including protocol""" 

208 protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol 

209 for protocol in protos: 

210 if name.startswith(f"{protocol}://"): 

211 return name 

212 return f"{protos[0]}://{name}" 

213 

214 @staticmethod 

215 def _get_kwargs_from_urls(path): 

216 """If kwargs can be encoded in the paths, extract them here 

217 

218 This should happen before instantiation of the class; incoming paths 

219 then should be amended to strip the options in methods. 

220 

221 Examples may look like an sftp path "sftp://user@host:/my/path", where 

222 the user and host should become kwargs and later get stripped. 

223 """ 

224 # by default, nothing happens 

225 return {} 

226 

227 @classmethod 

228 def current(cls): 

229 """Return the most recently instantiated FileSystem 

230 

231 If no instance has been created, then create one with defaults 

232 """ 

233 if cls._latest in cls._cache: 

234 return cls._cache[cls._latest] 

235 return cls() 

236 

237 @property 

238 def transaction(self): 

239 """A context within which files are committed together upon exit 

240 

241 Requires the file class to implement `.commit()` and `.discard()` 

242 for the normal and exception cases. 

243 """ 

244 if self._transaction is None: 

245 self._transaction = self.transaction_type(self) 

246 return self._transaction 

247 

248 def start_transaction(self): 

249 """Begin write transaction for deferring files, non-context version""" 

250 self._intrans = True 

251 self._transaction = self.transaction_type(self) 

252 return self.transaction 

253 

254 def end_transaction(self): 

255 """Finish write transaction, non-context version""" 

256 self.transaction.complete() 

257 self._transaction = None 

258 # The invalid cache must be cleared after the transaction is completed. 

259 for path in self._invalidated_caches_in_transaction: 

260 self.invalidate_cache(path) 

261 self._invalidated_caches_in_transaction.clear() 

262 

263 def invalidate_cache(self, path=None): 

264 """ 

265 Discard any cached directory information 

266 

267 Parameters 

268 ---------- 

269 path: string or None 

270 If None, clear all listings cached else listings at or under given 

271 path. 

272 """ 

273 # Not necessary to implement invalidation mechanism, may have no cache. 

274 # But if have, you should call this method of parent class from your 

275 # subclass to ensure expiring caches after transacations correctly. 

276 # See the implementation of FTPFileSystem in ftp.py 

277 if self._intrans: 

278 self._invalidated_caches_in_transaction.append(path) 

279 

280 def mkdir(self, path, create_parents=True, **kwargs): 

281 """ 

282 Create directory entry at path 

283 

284 For systems that don't have true directories, may create an for 

285 this instance only and not touch the real filesystem 

286 

287 Parameters 

288 ---------- 

289 path: str 

290 location 

291 create_parents: bool 

292 if True, this is equivalent to ``makedirs`` 

293 kwargs: 

294 may be permissions, etc. 

295 """ 

296 pass # not necessary to implement, may not have directories 

297 

298 def makedirs(self, path, exist_ok=False): 

299 """Recursively make directories 

300 

301 Creates directory at path and any intervening required directories. 

302 Raises exception if, for instance, the path already exists but is a 

303 file. 

304 

305 Parameters 

306 ---------- 

307 path: str 

308 leaf directory name 

309 exist_ok: bool (False) 

310 If False, will error if the target already exists 

311 """ 

312 pass # not necessary to implement, may not have directories 

313 

314 def rmdir(self, path): 

315 """Remove a directory, if empty""" 

316 pass # not necessary to implement, may not have directories 

317 

318 def ls(self, path, detail=True, **kwargs): 

319 """List objects at path. 

320 

321 This should include subdirectories and files at that location. The 

322 difference between a file and a directory must be clear when details 

323 are requested. 

324 

325 The specific keys, or perhaps a FileInfo class, or similar, is TBD, 

326 but must be consistent across implementations. 

327 Must include: 

328 

329 - full path to the entry (without protocol) 

330 - size of the entry, in bytes. If the value cannot be determined, will 

331 be ``None``. 

332 - type of entry, "file", "directory" or other 

333 

334 Additional information 

335 may be present, appropriate to the file-system, e.g., generation, 

336 checksum, etc. 

337 

338 May use refresh=True|False to allow use of self._ls_from_cache to 

339 check for a saved listing and avoid calling the backend. This would be 

340 common where listing may be expensive. 

341 

342 Parameters 

343 ---------- 

344 path: str 

345 detail: bool 

346 if True, gives a list of dictionaries, where each is the same as 

347 the result of ``info(path)``. If False, gives a list of paths 

348 (str). 

349 kwargs: may have additional backend-specific options, such as version 

350 information 

351 

352 Returns 

353 ------- 

354 List of strings if detail is False, or list of directory information 

355 dicts if detail is True. 

356 """ 

357 raise NotImplementedError 

358 

359 def _ls_from_cache(self, path): 

360 """Check cache for listing 

361 

362 Returns listing, if found (may be empty list for a directly that exists 

363 but contains nothing), None if not in cache. 

364 """ 

365 parent = self._parent(path) 

366 try: 

367 return self.dircache[path.rstrip("/")] 

368 except KeyError: 

369 pass 

370 try: 

371 files = [ 

372 f 

373 for f in self.dircache[parent] 

374 if f["name"] == path 

375 or (f["name"] == path.rstrip("/") and f["type"] == "directory") 

376 ] 

377 if len(files) == 0: 

378 # parent dir was listed but did not contain this file 

379 raise FileNotFoundError(path) 

380 return files 

381 except KeyError: 

382 pass 

383 

384 def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs): 

385 """Return all files belows path 

386 

387 List all files, recursing into subdirectories; output is iterator-style, 

388 like ``os.walk()``. For a simple list of files, ``find()`` is available. 

389 

390 When topdown is True, the caller can modify the dirnames list in-place (perhaps 

391 using del or slice assignment), and walk() will 

392 only recurse into the subdirectories whose names remain in dirnames; 

393 this can be used to prune the search, impose a specific order of visiting, 

394 or even to inform walk() about directories the caller creates or renames before 

395 it resumes walk() again. 

396 Modifying dirnames when topdown is False has no effect. (see os.walk) 

397 

398 Note that the "files" outputted will include anything that is not 

399 a directory, such as links. 

400 

401 Parameters 

402 ---------- 

403 path: str 

404 Root to recurse into 

405 maxdepth: int 

406 Maximum recursion depth. None means limitless, but not recommended 

407 on link-based file-systems. 

408 topdown: bool (True) 

409 Whether to walk the directory tree from the top downwards or from 

410 the bottom upwards. 

411 on_error: "omit", "raise", a collable 

412 if omit (default), path with exception will simply be empty; 

413 If raise, an underlying exception will be raised; 

414 if callable, it will be called with a single OSError instance as argument 

415 kwargs: passed to ``ls`` 

416 """ 

417 if maxdepth is not None and maxdepth < 1: 

418 raise ValueError("maxdepth must be at least 1") 

419 

420 path = self._strip_protocol(path) 

421 full_dirs = {} 

422 dirs = {} 

423 files = {} 

424 

425 detail = kwargs.pop("detail", False) 

426 try: 

427 listing = self.ls(path, detail=True, **kwargs) 

428 except (FileNotFoundError, OSError) as e: 

429 if on_error == "raise": 

430 raise 

431 elif callable(on_error): 

432 on_error(e) 

433 if detail: 

434 return path, {}, {} 

435 return path, [], [] 

436 

437 for info in listing: 

438 # each info name must be at least [path]/part , but here 

439 # we check also for names like [path]/part/ 

440 pathname = info["name"].rstrip("/") 

441 name = pathname.rsplit("/", 1)[-1] 

442 if info["type"] == "directory" and pathname != path: 

443 # do not include "self" path 

444 full_dirs[name] = pathname 

445 dirs[name] = info 

446 elif pathname == path: 

447 # file-like with same name as give path 

448 files[""] = info 

449 else: 

450 files[name] = info 

451 

452 if not detail: 

453 dirs = list(dirs) 

454 files = list(files) 

455 

456 if topdown: 

457 # Yield before recursion if walking top down 

458 yield path, dirs, files 

459 

460 if maxdepth is not None: 

461 maxdepth -= 1 

462 if maxdepth < 1: 

463 if not topdown: 

464 yield path, dirs, files 

465 return 

466 

467 for d in dirs: 

468 yield from self.walk( 

469 full_dirs[d], 

470 maxdepth=maxdepth, 

471 detail=detail, 

472 topdown=topdown, 

473 **kwargs, 

474 ) 

475 

476 if not topdown: 

477 # Yield after recursion if walking bottom up 

478 yield path, dirs, files 

479 

480 def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs): 

481 """List all files below path. 

482 

483 Like posix ``find`` command without conditions 

484 

485 Parameters 

486 ---------- 

487 path : str 

488 maxdepth: int or None 

489 If not None, the maximum number of levels to descend 

490 withdirs: bool 

491 Whether to include directory paths in the output. This is True 

492 when used by glob, but users usually only want files. 

493 kwargs are passed to ``ls``. 

494 """ 

495 # TODO: allow equivalent of -name parameter 

496 path = self._strip_protocol(path) 

497 out = {} 

498 

499 # Add the root directory if withdirs is requested 

500 # This is needed for posix glob compliance 

501 if withdirs and path != "" and self.isdir(path): 

502 out[path] = self.info(path) 

503 

504 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs): 

505 if withdirs: 

506 files.update(dirs) 

507 out.update({info["name"]: info for name, info in files.items()}) 

508 if not out and self.isfile(path): 

509 # walk works on directories, but find should also return [path] 

510 # when path happens to be a file 

511 out[path] = {} 

512 names = sorted(out) 

513 if not detail: 

514 return names 

515 else: 

516 return {name: out[name] for name in names} 

517 

518 def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs): 

519 """Space used by files and optionally directories within a path 

520 

521 Directory size does not include the size of its contents. 

522 

523 Parameters 

524 ---------- 

525 path: str 

526 total: bool 

527 Whether to sum all the file sizes 

528 maxdepth: int or None 

529 Maximum number of directory levels to descend, None for unlimited. 

530 withdirs: bool 

531 Whether to include directory paths in the output. 

532 kwargs: passed to ``find`` 

533 

534 Returns 

535 ------- 

536 Dict of {path: size} if total=False, or int otherwise, where numbers 

537 refer to bytes used. 

538 """ 

539 sizes = {} 

540 if withdirs and self.isdir(path): 

541 # Include top-level directory in output 

542 info = self.info(path) 

543 sizes[info["name"]] = info["size"] 

544 for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs): 

545 info = self.info(f) 

546 sizes[info["name"]] = info["size"] 

547 if total: 

548 return sum(sizes.values()) 

549 else: 

550 return sizes 

551 

552 def glob(self, path, maxdepth=None, **kwargs): 

553 """ 

554 Find files by glob-matching. 

555 

556 If the path ends with '/', only folders are returned. 

557 

558 We support ``"**"``, 

559 ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation. 

560 

561 The `maxdepth` option is applied on the first `**` found in the path. 

562 

563 kwargs are passed to ``ls``. 

564 """ 

565 if maxdepth is not None and maxdepth < 1: 

566 raise ValueError("maxdepth must be at least 1") 

567 

568 import re 

569 

570 seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) 

571 ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash 

572 path = self._strip_protocol(path) 

573 append_slash_to_dirname = ends_with_sep or path.endswith( 

574 tuple(sep + "**" for sep in seps) 

575 ) 

576 idx_star = path.find("*") if path.find("*") >= 0 else len(path) 

577 idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) 

578 idx_brace = path.find("[") if path.find("[") >= 0 else len(path) 

579 

580 min_idx = min(idx_star, idx_qmark, idx_brace) 

581 

582 detail = kwargs.pop("detail", False) 

583 

584 if not has_magic(path): 

585 if self.exists(path, **kwargs): 

586 if not detail: 

587 return [path] 

588 else: 

589 return {path: self.info(path, **kwargs)} 

590 else: 

591 if not detail: 

592 return [] # glob of non-existent returns empty 

593 else: 

594 return {} 

595 elif "/" in path[:min_idx]: 

596 min_idx = path[:min_idx].rindex("/") 

597 root = path[: min_idx + 1] 

598 depth = path[min_idx + 1 :].count("/") + 1 

599 else: 

600 root = "" 

601 depth = path[min_idx + 1 :].count("/") + 1 

602 

603 if "**" in path: 

604 if maxdepth is not None: 

605 idx_double_stars = path.find("**") 

606 depth_double_stars = path[idx_double_stars:].count("/") + 1 

607 depth = depth - depth_double_stars + maxdepth 

608 else: 

609 depth = None 

610 

611 allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs) 

612 

613 pattern = glob_translate(path + ("/" if ends_with_sep else "")) 

614 pattern = re.compile(pattern) 

615 

616 out = { 

617 p: info 

618 for p, info in sorted(allpaths.items()) 

619 if pattern.match( 

620 ( 

621 p + "/" 

622 if append_slash_to_dirname and info["type"] == "directory" 

623 else p 

624 ) 

625 ) 

626 } 

627 

628 if detail: 

629 return out 

630 else: 

631 return list(out) 

632 

633 def exists(self, path, **kwargs): 

634 """Is there a file at the given path""" 

635 try: 

636 self.info(path, **kwargs) 

637 return True 

638 except: # noqa: E722 

639 # any exception allowed bar FileNotFoundError? 

640 return False 

641 

642 def lexists(self, path, **kwargs): 

643 """If there is a file at the given path (including 

644 broken links)""" 

645 return self.exists(path) 

646 

647 def info(self, path, **kwargs): 

648 """Give details of entry at path 

649 

650 Returns a single dictionary, with exactly the same information as ``ls`` 

651 would with ``detail=True``. 

652 

653 The default implementation should calls ls and could be overridden by a 

654 shortcut. kwargs are passed on to ```ls()``. 

655 

656 Some file systems might not be able to measure the file's size, in 

657 which case, the returned dict will include ``'size': None``. 

658 

659 Returns 

660 ------- 

661 dict with keys: name (full path in the FS), size (in bytes), type (file, 

662 directory, or something else) and other FS-specific keys. 

663 """ 

664 path = self._strip_protocol(path) 

665 out = self.ls(self._parent(path), detail=True, **kwargs) 

666 out = [o for o in out if o["name"].rstrip("/") == path] 

667 if out: 

668 return out[0] 

669 out = self.ls(path, detail=True, **kwargs) 

670 path = path.rstrip("/") 

671 out1 = [o for o in out if o["name"].rstrip("/") == path] 

672 if len(out1) == 1: 

673 if "size" not in out1[0]: 

674 out1[0]["size"] = None 

675 return out1[0] 

676 elif len(out1) > 1 or out: 

677 return {"name": path, "size": 0, "type": "directory"} 

678 else: 

679 raise FileNotFoundError(path) 

680 

681 def checksum(self, path): 

682 """Unique value for current version of file 

683 

684 If the checksum is the same from one moment to another, the contents 

685 are guaranteed to be the same. If the checksum changes, the contents 

686 *might* have changed. 

687 

688 This should normally be overridden; default will probably capture 

689 creation/modification timestamp (which would be good) or maybe 

690 access timestamp (which would be bad) 

691 """ 

692 return int(tokenize(self.info(path)), 16) 

693 

694 def size(self, path): 

695 """Size in bytes of file""" 

696 return self.info(path).get("size", None) 

697 

698 def sizes(self, paths): 

699 """Size in bytes of each file in a list of paths""" 

700 return [self.size(p) for p in paths] 

701 

702 def isdir(self, path): 

703 """Is this entry directory-like?""" 

704 try: 

705 return self.info(path)["type"] == "directory" 

706 except OSError: 

707 return False 

708 

709 def isfile(self, path): 

710 """Is this entry file-like?""" 

711 try: 

712 return self.info(path)["type"] == "file" 

713 except: # noqa: E722 

714 return False 

715 

716 def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs): 

717 """Get the contents of the file as a string. 

718 

719 Parameters 

720 ---------- 

721 path: str 

722 URL of file on this filesystems 

723 encoding, errors, newline: same as `open`. 

724 """ 

725 with self.open( 

726 path, 

727 mode="r", 

728 encoding=encoding, 

729 errors=errors, 

730 newline=newline, 

731 **kwargs, 

732 ) as f: 

733 return f.read() 

734 

735 def write_text( 

736 self, path, value, encoding=None, errors=None, newline=None, **kwargs 

737 ): 

738 """Write the text to the given file. 

739 

740 An existing file will be overwritten. 

741 

742 Parameters 

743 ---------- 

744 path: str 

745 URL of file on this filesystems 

746 value: str 

747 Text to write. 

748 encoding, errors, newline: same as `open`. 

749 """ 

750 with self.open( 

751 path, 

752 mode="w", 

753 encoding=encoding, 

754 errors=errors, 

755 newline=newline, 

756 **kwargs, 

757 ) as f: 

758 return f.write(value) 

759 

760 def cat_file(self, path, start=None, end=None, **kwargs): 

761 """Get the content of a file 

762 

763 Parameters 

764 ---------- 

765 path: URL of file on this filesystems 

766 start, end: int 

767 Bytes limits of the read. If negative, backwards from end, 

768 like usual python slices. Either can be None for start or 

769 end of file, respectively 

770 kwargs: passed to ``open()``. 

771 """ 

772 # explicitly set buffering off? 

773 with self.open(path, "rb", **kwargs) as f: 

774 if start is not None: 

775 if start >= 0: 

776 f.seek(start) 

777 else: 

778 f.seek(max(0, f.size + start)) 

779 if end is not None: 

780 if end < 0: 

781 end = f.size + end 

782 return f.read(end - f.tell()) 

783 return f.read() 

784 

785 def pipe_file(self, path, value, **kwargs): 

786 """Set the bytes of given file""" 

787 with self.open(path, "wb", **kwargs) as f: 

788 f.write(value) 

789 

790 def pipe(self, path, value=None, **kwargs): 

791 """Put value into path 

792 

793 (counterpart to ``cat``) 

794 

795 Parameters 

796 ---------- 

797 path: string or dict(str, bytes) 

798 If a string, a single remote location to put ``value`` bytes; if a dict, 

799 a mapping of {path: bytesvalue}. 

800 value: bytes, optional 

801 If using a single path, these are the bytes to put there. Ignored if 

802 ``path`` is a dict 

803 """ 

804 if isinstance(path, str): 

805 self.pipe_file(self._strip_protocol(path), value, **kwargs) 

806 elif isinstance(path, dict): 

807 for k, v in path.items(): 

808 self.pipe_file(self._strip_protocol(k), v, **kwargs) 

809 else: 

810 raise ValueError("path must be str or dict") 

811 

812 def cat_ranges( 

813 self, paths, starts, ends, max_gap=None, on_error="return", **kwargs 

814 ): 

815 """Get the contents of byte ranges from one or more files 

816 

817 Parameters 

818 ---------- 

819 paths: list 

820 A list of of filepaths on this filesystems 

821 starts, ends: int or list 

822 Bytes limits of the read. If using a single int, the same value will be 

823 used to read all the specified files. 

824 """ 

825 if max_gap is not None: 

826 raise NotImplementedError 

827 if not isinstance(paths, list): 

828 raise TypeError 

829 if not isinstance(starts, list): 

830 starts = [starts] * len(paths) 

831 if not isinstance(ends, list): 

832 ends = [ends] * len(paths) 

833 if len(starts) != len(paths) or len(ends) != len(paths): 

834 raise ValueError 

835 out = [] 

836 for p, s, e in zip(paths, starts, ends): 

837 try: 

838 out.append(self.cat_file(p, s, e)) 

839 except Exception as e: 

840 if on_error == "return": 

841 out.append(e) 

842 else: 

843 raise 

844 return out 

845 

846 def cat(self, path, recursive=False, on_error="raise", **kwargs): 

847 """Fetch (potentially multiple) paths' contents 

848 

849 Parameters 

850 ---------- 

851 recursive: bool 

852 If True, assume the path(s) are directories, and get all the 

853 contained files 

854 on_error : "raise", "omit", "return" 

855 If raise, an underlying exception will be raised (converted to KeyError 

856 if the type is in self.missing_exceptions); if omit, keys with exception 

857 will simply not be included in the output; if "return", all keys are 

858 included in the output, but the value will be bytes or an exception 

859 instance. 

860 kwargs: passed to cat_file 

861 

862 Returns 

863 ------- 

864 dict of {path: contents} if there are multiple paths 

865 or the path has been otherwise expanded 

866 """ 

867 paths = self.expand_path(path, recursive=recursive) 

868 if ( 

869 len(paths) > 1 

870 or isinstance(path, list) 

871 or paths[0] != self._strip_protocol(path) 

872 ): 

873 out = {} 

874 for path in paths: 

875 try: 

876 out[path] = self.cat_file(path, **kwargs) 

877 except Exception as e: 

878 if on_error == "raise": 

879 raise 

880 if on_error == "return": 

881 out[path] = e 

882 return out 

883 else: 

884 return self.cat_file(paths[0], **kwargs) 

885 

886 def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, outfile=None, **kwargs): 

887 """Copy single remote file to local""" 

888 from .implementations.local import LocalFileSystem 

889 

890 if isfilelike(lpath): 

891 outfile = lpath 

892 elif self.isdir(rpath): 

893 os.makedirs(lpath, exist_ok=True) 

894 return None 

895 

896 fs = LocalFileSystem(auto_mkdir=True) 

897 fs.makedirs(fs._parent(lpath), exist_ok=True) 

898 

899 with self.open(rpath, "rb", **kwargs) as f1: 

900 if outfile is None: 

901 outfile = open(lpath, "wb") 

902 

903 try: 

904 callback.set_size(getattr(f1, "size", None)) 

905 data = True 

906 while data: 

907 data = f1.read(self.blocksize) 

908 segment_len = outfile.write(data) 

909 if segment_len is None: 

910 segment_len = len(data) 

911 callback.relative_update(segment_len) 

912 finally: 

913 if not isfilelike(lpath): 

914 outfile.close() 

915 

916 def get( 

917 self, 

918 rpath, 

919 lpath, 

920 recursive=False, 

921 callback=DEFAULT_CALLBACK, 

922 maxdepth=None, 

923 **kwargs, 

924 ): 

925 """Copy file(s) to local. 

926 

927 Copies a specific file or tree of files (if recursive=True). If lpath 

928 ends with a "/", it will be assumed to be a directory, and target files 

929 will go within. Can submit a list of paths, which may be glob-patterns 

930 and will be expanded. 

931 

932 Calls get_file for each source. 

933 """ 

934 if isinstance(lpath, list) and isinstance(rpath, list): 

935 # No need to expand paths when both source and destination 

936 # are provided as lists 

937 rpaths = rpath 

938 lpaths = lpath 

939 else: 

940 from .implementations.local import ( 

941 LocalFileSystem, 

942 make_path_posix, 

943 trailing_sep, 

944 ) 

945 

946 source_is_str = isinstance(rpath, str) 

947 rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth) 

948 if source_is_str and (not recursive or maxdepth is not None): 

949 # Non-recursive glob does not copy directories 

950 rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))] 

951 if not rpaths: 

952 return 

953 

954 if isinstance(lpath, str): 

955 lpath = make_path_posix(lpath) 

956 

957 source_is_file = len(rpaths) == 1 

958 dest_is_dir = isinstance(lpath, str) and ( 

959 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

960 ) 

961 

962 exists = source_is_str and ( 

963 (has_magic(rpath) and source_is_file) 

964 or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath)) 

965 ) 

966 lpaths = other_paths( 

967 rpaths, 

968 lpath, 

969 exists=exists, 

970 flatten=not source_is_str, 

971 ) 

972 

973 callback.set_size(len(lpaths)) 

974 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): 

975 with callback.branched(rpath, lpath) as child: 

976 self.get_file(rpath, lpath, callback=child, **kwargs) 

977 

978 def put_file(self, lpath, rpath, callback=DEFAULT_CALLBACK, **kwargs): 

979 """Copy single file to remote""" 

980 if os.path.isdir(lpath): 

981 self.makedirs(rpath, exist_ok=True) 

982 return None 

983 

984 with open(lpath, "rb") as f1: 

985 size = f1.seek(0, 2) 

986 callback.set_size(size) 

987 f1.seek(0) 

988 

989 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True) 

990 with self.open(rpath, "wb", **kwargs) as f2: 

991 while f1.tell() < size: 

992 data = f1.read(self.blocksize) 

993 segment_len = f2.write(data) 

994 if segment_len is None: 

995 segment_len = len(data) 

996 callback.relative_update(segment_len) 

997 

998 def put( 

999 self, 

1000 lpath, 

1001 rpath, 

1002 recursive=False, 

1003 callback=DEFAULT_CALLBACK, 

1004 maxdepth=None, 

1005 **kwargs, 

1006 ): 

1007 """Copy file(s) from local. 

1008 

1009 Copies a specific file or tree of files (if recursive=True). If rpath 

1010 ends with a "/", it will be assumed to be a directory, and target files 

1011 will go within. 

1012 

1013 Calls put_file for each source. 

1014 """ 

1015 if isinstance(lpath, list) and isinstance(rpath, list): 

1016 # No need to expand paths when both source and destination 

1017 # are provided as lists 

1018 rpaths = rpath 

1019 lpaths = lpath 

1020 else: 

1021 from .implementations.local import ( 

1022 LocalFileSystem, 

1023 make_path_posix, 

1024 trailing_sep, 

1025 ) 

1026 

1027 source_is_str = isinstance(lpath, str) 

1028 if source_is_str: 

1029 lpath = make_path_posix(lpath) 

1030 fs = LocalFileSystem() 

1031 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

1032 if source_is_str and (not recursive or maxdepth is not None): 

1033 # Non-recursive glob does not copy directories 

1034 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

1035 if not lpaths: 

1036 return 

1037 

1038 source_is_file = len(lpaths) == 1 

1039 dest_is_dir = isinstance(rpath, str) and ( 

1040 trailing_sep(rpath) or self.isdir(rpath) 

1041 ) 

1042 

1043 rpath = ( 

1044 self._strip_protocol(rpath) 

1045 if isinstance(rpath, str) 

1046 else [self._strip_protocol(p) for p in rpath] 

1047 ) 

1048 exists = source_is_str and ( 

1049 (has_magic(lpath) and source_is_file) 

1050 or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) 

1051 ) 

1052 rpaths = other_paths( 

1053 lpaths, 

1054 rpath, 

1055 exists=exists, 

1056 flatten=not source_is_str, 

1057 ) 

1058 

1059 callback.set_size(len(rpaths)) 

1060 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): 

1061 with callback.branched(lpath, rpath) as child: 

1062 self.put_file(lpath, rpath, callback=child, **kwargs) 

1063 

1064 def head(self, path, size=1024): 

1065 """Get the first ``size`` bytes from file""" 

1066 with self.open(path, "rb") as f: 

1067 return f.read(size) 

1068 

1069 def tail(self, path, size=1024): 

1070 """Get the last ``size`` bytes from file""" 

1071 with self.open(path, "rb") as f: 

1072 f.seek(max(-size, -f.size), 2) 

1073 return f.read() 

1074 

1075 def cp_file(self, path1, path2, **kwargs): 

1076 raise NotImplementedError 

1077 

1078 def copy( 

1079 self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs 

1080 ): 

1081 """Copy within two locations in the filesystem 

1082 

1083 on_error : "raise", "ignore" 

1084 If raise, any not-found exceptions will be raised; if ignore any 

1085 not-found exceptions will cause the path to be skipped; defaults to 

1086 raise unless recursive is true, where the default is ignore 

1087 """ 

1088 if on_error is None and recursive: 

1089 on_error = "ignore" 

1090 elif on_error is None: 

1091 on_error = "raise" 

1092 

1093 if isinstance(path1, list) and isinstance(path2, list): 

1094 # No need to expand paths when both source and destination 

1095 # are provided as lists 

1096 paths1 = path1 

1097 paths2 = path2 

1098 else: 

1099 from .implementations.local import trailing_sep 

1100 

1101 source_is_str = isinstance(path1, str) 

1102 paths1 = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth) 

1103 if source_is_str and (not recursive or maxdepth is not None): 

1104 # Non-recursive glob does not copy directories 

1105 paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))] 

1106 if not paths1: 

1107 return 

1108 

1109 source_is_file = len(paths1) == 1 

1110 dest_is_dir = isinstance(path2, str) and ( 

1111 trailing_sep(path2) or self.isdir(path2) 

1112 ) 

1113 

1114 exists = source_is_str and ( 

1115 (has_magic(path1) and source_is_file) 

1116 or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) 

1117 ) 

1118 paths2 = other_paths( 

1119 paths1, 

1120 path2, 

1121 exists=exists, 

1122 flatten=not source_is_str, 

1123 ) 

1124 

1125 for p1, p2 in zip(paths1, paths2): 

1126 try: 

1127 self.cp_file(p1, p2, **kwargs) 

1128 except FileNotFoundError: 

1129 if on_error == "raise": 

1130 raise 

1131 

1132 def expand_path(self, path, recursive=False, maxdepth=None, **kwargs): 

1133 """Turn one or more globs or directories into a list of all matching paths 

1134 to files or directories. 

1135 

1136 kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls`` 

1137 """ 

1138 

1139 if maxdepth is not None and maxdepth < 1: 

1140 raise ValueError("maxdepth must be at least 1") 

1141 

1142 if isinstance(path, (str, os.PathLike)): 

1143 out = self.expand_path([path], recursive, maxdepth) 

1144 else: 

1145 out = set() 

1146 path = [self._strip_protocol(p) for p in path] 

1147 for p in path: 

1148 if has_magic(p): 

1149 bit = set(self.glob(p, maxdepth=maxdepth, **kwargs)) 

1150 out |= bit 

1151 if recursive: 

1152 # glob call above expanded one depth so if maxdepth is defined 

1153 # then decrement it in expand_path call below. If it is zero 

1154 # after decrementing then avoid expand_path call. 

1155 if maxdepth is not None and maxdepth <= 1: 

1156 continue 

1157 out |= set( 

1158 self.expand_path( 

1159 list(bit), 

1160 recursive=recursive, 

1161 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

1162 **kwargs, 

1163 ) 

1164 ) 

1165 continue 

1166 elif recursive: 

1167 rec = set( 

1168 self.find( 

1169 p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs 

1170 ) 

1171 ) 

1172 out |= rec 

1173 if p not in out and (recursive is False or self.exists(p)): 

1174 # should only check once, for the root 

1175 out.add(p) 

1176 if not out: 

1177 raise FileNotFoundError(path) 

1178 return sorted(out) 

1179 

1180 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): 

1181 """Move file(s) from one location to another""" 

1182 if path1 == path2: 

1183 logger.debug("%s mv: The paths are the same, so no files were moved.", self) 

1184 else: 

1185 # explicitly raise exception to prevent data corruption 

1186 self.copy( 

1187 path1, path2, recursive=recursive, maxdepth=maxdepth, onerror="raise" 

1188 ) 

1189 self.rm(path1, recursive=recursive) 

1190 

1191 def rm_file(self, path): 

1192 """Delete a file""" 

1193 self._rm(path) 

1194 

1195 def _rm(self, path): 

1196 """Delete one file""" 

1197 # this is the old name for the method, prefer rm_file 

1198 raise NotImplementedError 

1199 

1200 def rm(self, path, recursive=False, maxdepth=None): 

1201 """Delete files. 

1202 

1203 Parameters 

1204 ---------- 

1205 path: str or list of str 

1206 File(s) to delete. 

1207 recursive: bool 

1208 If file(s) are directories, recursively delete contents and then 

1209 also remove the directory 

1210 maxdepth: int or None 

1211 Depth to pass to walk for finding files to delete, if recursive. 

1212 If None, there will be no limit and infinite recursion may be 

1213 possible. 

1214 """ 

1215 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) 

1216 for p in reversed(path): 

1217 self.rm_file(p) 

1218 

1219 @classmethod 

1220 def _parent(cls, path): 

1221 path = cls._strip_protocol(path) 

1222 if "/" in path: 

1223 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker) 

1224 return cls.root_marker + parent 

1225 else: 

1226 return cls.root_marker 

1227 

1228 def _open( 

1229 self, 

1230 path, 

1231 mode="rb", 

1232 block_size=None, 

1233 autocommit=True, 

1234 cache_options=None, 

1235 **kwargs, 

1236 ): 

1237 """Return raw bytes-mode file-like from the file-system""" 

1238 return AbstractBufferedFile( 

1239 self, 

1240 path, 

1241 mode, 

1242 block_size, 

1243 autocommit, 

1244 cache_options=cache_options, 

1245 **kwargs, 

1246 ) 

1247 

1248 def open( 

1249 self, 

1250 path, 

1251 mode="rb", 

1252 block_size=None, 

1253 cache_options=None, 

1254 compression=None, 

1255 **kwargs, 

1256 ): 

1257 """ 

1258 Return a file-like object from the filesystem 

1259 

1260 The resultant instance must function correctly in a context ``with`` 

1261 block. 

1262 

1263 Parameters 

1264 ---------- 

1265 path: str 

1266 Target file 

1267 mode: str like 'rb', 'w' 

1268 See builtin ``open()`` 

1269 block_size: int 

1270 Some indication of buffering - this is a value in bytes 

1271 cache_options : dict, optional 

1272 Extra arguments to pass through to the cache. 

1273 compression: string or None 

1274 If given, open file using compression codec. Can either be a compression 

1275 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 

1276 compression from the filename suffix. 

1277 encoding, errors, newline: passed on to TextIOWrapper for text mode 

1278 """ 

1279 import io 

1280 

1281 path = self._strip_protocol(path) 

1282 if "b" not in mode: 

1283 mode = mode.replace("t", "") + "b" 

1284 

1285 text_kwargs = { 

1286 k: kwargs.pop(k) 

1287 for k in ["encoding", "errors", "newline"] 

1288 if k in kwargs 

1289 } 

1290 return io.TextIOWrapper( 

1291 self.open( 

1292 path, 

1293 mode, 

1294 block_size=block_size, 

1295 cache_options=cache_options, 

1296 compression=compression, 

1297 **kwargs, 

1298 ), 

1299 **text_kwargs, 

1300 ) 

1301 else: 

1302 ac = kwargs.pop("autocommit", not self._intrans) 

1303 f = self._open( 

1304 path, 

1305 mode=mode, 

1306 block_size=block_size, 

1307 autocommit=ac, 

1308 cache_options=cache_options, 

1309 **kwargs, 

1310 ) 

1311 if compression is not None: 

1312 from fsspec.compression import compr 

1313 from fsspec.core import get_compression 

1314 

1315 compression = get_compression(path, compression) 

1316 compress = compr[compression] 

1317 f = compress(f, mode=mode[0]) 

1318 

1319 if not ac and "r" not in mode: 

1320 self.transaction.files.append(f) 

1321 return f 

1322 

1323 def touch(self, path, truncate=True, **kwargs): 

1324 """Create empty file, or update timestamp 

1325 

1326 Parameters 

1327 ---------- 

1328 path: str 

1329 file location 

1330 truncate: bool 

1331 If True, always set file size to 0; if False, update timestamp and 

1332 leave file unchanged, if backend allows this 

1333 """ 

1334 if truncate or not self.exists(path): 

1335 with self.open(path, "wb", **kwargs): 

1336 pass 

1337 else: 

1338 raise NotImplementedError # update timestamp, if possible 

1339 

1340 def ukey(self, path): 

1341 """Hash of file properties, to tell if it has changed""" 

1342 return sha256(str(self.info(path)).encode()).hexdigest() 

1343 

1344 def read_block(self, fn, offset, length, delimiter=None): 

1345 """Read a block of bytes from 

1346 

1347 Starting at ``offset`` of the file, read ``length`` bytes. If 

1348 ``delimiter`` is set then we ensure that the read starts and stops at 

1349 delimiter boundaries that follow the locations ``offset`` and ``offset 

1350 + length``. If ``offset`` is zero then we start at zero. The 

1351 bytestring returned WILL include the end delimiter string. 

1352 

1353 If offset+length is beyond the eof, reads to eof. 

1354 

1355 Parameters 

1356 ---------- 

1357 fn: string 

1358 Path to filename 

1359 offset: int 

1360 Byte offset to start read 

1361 length: int 

1362 Number of bytes to read. If None, read to end. 

1363 delimiter: bytes (optional) 

1364 Ensure reading starts and stops at delimiter bytestring 

1365 

1366 Examples 

1367 -------- 

1368 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP 

1369 b'Alice, 100\\nBo' 

1370 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP 

1371 b'Alice, 100\\nBob, 200\\n' 

1372 

1373 Use ``length=None`` to read to the end of the file. 

1374 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP 

1375 b'Alice, 100\\nBob, 200\\nCharlie, 300' 

1376 

1377 See Also 

1378 -------- 

1379 :func:`fsspec.utils.read_block` 

1380 """ 

1381 with self.open(fn, "rb") as f: 

1382 size = f.size 

1383 if length is None: 

1384 length = size 

1385 if size is not None and offset + length > size: 

1386 length = size - offset 

1387 return read_block(f, offset, length, delimiter) 

1388 

1389 def to_json(self, *, include_password: bool = True) -> str: 

1390 """ 

1391 JSON representation of this filesystem instance. 

1392 

1393 Parameters 

1394 ---------- 

1395 include_password: bool, default True 

1396 Whether to include the password (if any) in the output. 

1397 

1398 Returns 

1399 ------- 

1400 JSON string with keys ``cls`` (the python location of this class), 

1401 protocol (text name of this class's protocol, first one in case of 

1402 multiple), ``args`` (positional args, usually empty), and all other 

1403 keyword arguments as their own keys. 

1404 

1405 Warnings 

1406 -------- 

1407 Serialized filesystems may contain sensitive information which have been 

1408 passed to the constructor, such as passwords and tokens. Make sure you 

1409 store and send them in a secure environment! 

1410 """ 

1411 from .json import FilesystemJSONEncoder 

1412 

1413 return json.dumps( 

1414 self, 

1415 cls=type( 

1416 "_FilesystemJSONEncoder", 

1417 (FilesystemJSONEncoder,), 

1418 {"include_password": include_password}, 

1419 ), 

1420 ) 

1421 

1422 @staticmethod 

1423 def from_json(blob: str) -> AbstractFileSystem: 

1424 """ 

1425 Recreate a filesystem instance from JSON representation. 

1426 

1427 See ``.to_json()`` for the expected structure of the input. 

1428 

1429 Parameters 

1430 ---------- 

1431 blob: str 

1432 

1433 Returns 

1434 ------- 

1435 file system instance, not necessarily of this particular class. 

1436 

1437 Warnings 

1438 -------- 

1439 This can import arbitrary modules (as determined by the ``cls`` key). 

1440 Make sure you haven't installed any modules that may execute malicious code 

1441 at import time. 

1442 """ 

1443 from .json import FilesystemJSONDecoder 

1444 

1445 return json.loads(blob, cls=FilesystemJSONDecoder) 

1446 

1447 def to_dict(self, *, include_password: bool = True) -> Dict[str, Any]: 

1448 """ 

1449 JSON-serializable dictionary representation of this filesystem instance. 

1450 

1451 Parameters 

1452 ---------- 

1453 include_password: bool, default True 

1454 Whether to include the password (if any) in the output. 

1455 

1456 Returns 

1457 ------- 

1458 Dictionary with keys ``cls`` (the python location of this class), 

1459 protocol (text name of this class's protocol, first one in case of 

1460 multiple), ``args`` (positional args, usually empty), and all other 

1461 keyword arguments as their own keys. 

1462 

1463 Warnings 

1464 -------- 

1465 Serialized filesystems may contain sensitive information which have been 

1466 passed to the constructor, such as passwords and tokens. Make sure you 

1467 store and send them in a secure environment! 

1468 """ 

1469 cls = type(self) 

1470 proto = self.protocol 

1471 

1472 storage_options = dict(self.storage_options) 

1473 if not include_password: 

1474 storage_options.pop("password", None) 

1475 

1476 return dict( 

1477 cls=f"{cls.__module__}:{cls.__name__}", 

1478 protocol=proto[0] if isinstance(proto, (tuple, list)) else proto, 

1479 args=self.storage_args, 

1480 **storage_options, 

1481 ) 

1482 

1483 @staticmethod 

1484 def from_dict(dct: Dict[str, Any]) -> AbstractFileSystem: 

1485 """ 

1486 Recreate a filesystem instance from dictionary representation. 

1487 

1488 See ``.to_dict()`` for the expected structure of the input. 

1489 

1490 Parameters 

1491 ---------- 

1492 dct: Dict[str, Any] 

1493 

1494 Returns 

1495 ------- 

1496 file system instance, not necessarily of this particular class. 

1497 

1498 Warnings 

1499 -------- 

1500 This can import arbitrary modules (as determined by the ``cls`` key). 

1501 Make sure you haven't installed any modules that may execute malicious code 

1502 at import time. 

1503 """ 

1504 from .json import FilesystemJSONDecoder 

1505 

1506 dct = dict(dct) # Defensive copy 

1507 

1508 cls = FilesystemJSONDecoder.try_resolve_fs_cls(dct) 

1509 if cls is None: 

1510 raise ValueError("Not a serialized AbstractFileSystem") 

1511 

1512 dct.pop("cls", None) 

1513 dct.pop("protocol", None) 

1514 

1515 return cls(*dct.pop("args", ()), **dct) 

1516 

1517 def _get_pyarrow_filesystem(self): 

1518 """ 

1519 Make a version of the FS instance which will be acceptable to pyarrow 

1520 """ 

1521 # all instances already also derive from pyarrow 

1522 return self 

1523 

1524 def get_mapper(self, root="", check=False, create=False, missing_exceptions=None): 

1525 """Create key/value store based on this file-system 

1526 

1527 Makes a MutableMapping interface to the FS at the given root path. 

1528 See ``fsspec.mapping.FSMap`` for further details. 

1529 """ 

1530 from .mapping import FSMap 

1531 

1532 return FSMap( 

1533 root, 

1534 self, 

1535 check=check, 

1536 create=create, 

1537 missing_exceptions=missing_exceptions, 

1538 ) 

1539 

1540 @classmethod 

1541 def clear_instance_cache(cls): 

1542 """ 

1543 Clear the cache of filesystem instances. 

1544 

1545 Notes 

1546 ----- 

1547 Unless overridden by setting the ``cachable`` class attribute to False, 

1548 the filesystem class stores a reference to newly created instances. This 

1549 prevents Python's normal rules around garbage collection from working, 

1550 since the instances refcount will not drop to zero until 

1551 ``clear_instance_cache`` is called. 

1552 """ 

1553 cls._cache.clear() 

1554 

1555 def created(self, path): 

1556 """Return the created timestamp of a file as a datetime.datetime""" 

1557 raise NotImplementedError 

1558 

1559 def modified(self, path): 

1560 """Return the modified timestamp of a file as a datetime.datetime""" 

1561 raise NotImplementedError 

1562 

1563 # ------------------------------------------------------------------------ 

1564 # Aliases 

1565 

1566 def read_bytes(self, path, start=None, end=None, **kwargs): 

1567 """Alias of `AbstractFileSystem.cat_file`.""" 

1568 return self.cat_file(path, start=start, end=end, **kwargs) 

1569 

1570 def write_bytes(self, path, value, **kwargs): 

1571 """Alias of `AbstractFileSystem.pipe_file`.""" 

1572 self.pipe_file(path, value, **kwargs) 

1573 

1574 def makedir(self, path, create_parents=True, **kwargs): 

1575 """Alias of `AbstractFileSystem.mkdir`.""" 

1576 return self.mkdir(path, create_parents=create_parents, **kwargs) 

1577 

1578 def mkdirs(self, path, exist_ok=False): 

1579 """Alias of `AbstractFileSystem.makedirs`.""" 

1580 return self.makedirs(path, exist_ok=exist_ok) 

1581 

1582 def listdir(self, path, detail=True, **kwargs): 

1583 """Alias of `AbstractFileSystem.ls`.""" 

1584 return self.ls(path, detail=detail, **kwargs) 

1585 

1586 def cp(self, path1, path2, **kwargs): 

1587 """Alias of `AbstractFileSystem.copy`.""" 

1588 return self.copy(path1, path2, **kwargs) 

1589 

1590 def move(self, path1, path2, **kwargs): 

1591 """Alias of `AbstractFileSystem.mv`.""" 

1592 return self.mv(path1, path2, **kwargs) 

1593 

1594 def stat(self, path, **kwargs): 

1595 """Alias of `AbstractFileSystem.info`.""" 

1596 return self.info(path, **kwargs) 

1597 

1598 def disk_usage(self, path, total=True, maxdepth=None, **kwargs): 

1599 """Alias of `AbstractFileSystem.du`.""" 

1600 return self.du(path, total=total, maxdepth=maxdepth, **kwargs) 

1601 

1602 def rename(self, path1, path2, **kwargs): 

1603 """Alias of `AbstractFileSystem.mv`.""" 

1604 return self.mv(path1, path2, **kwargs) 

1605 

1606 def delete(self, path, recursive=False, maxdepth=None): 

1607 """Alias of `AbstractFileSystem.rm`.""" 

1608 return self.rm(path, recursive=recursive, maxdepth=maxdepth) 

1609 

1610 def upload(self, lpath, rpath, recursive=False, **kwargs): 

1611 """Alias of `AbstractFileSystem.put`.""" 

1612 return self.put(lpath, rpath, recursive=recursive, **kwargs) 

1613 

1614 def download(self, rpath, lpath, recursive=False, **kwargs): 

1615 """Alias of `AbstractFileSystem.get`.""" 

1616 return self.get(rpath, lpath, recursive=recursive, **kwargs) 

1617 

1618 def sign(self, path, expiration=100, **kwargs): 

1619 """Create a signed URL representing the given path 

1620 

1621 Some implementations allow temporary URLs to be generated, as a 

1622 way of delegating credentials. 

1623 

1624 Parameters 

1625 ---------- 

1626 path : str 

1627 The path on the filesystem 

1628 expiration : int 

1629 Number of seconds to enable the URL for (if supported) 

1630 

1631 Returns 

1632 ------- 

1633 URL : str 

1634 The signed URL 

1635 

1636 Raises 

1637 ------ 

1638 NotImplementedError : if method is not implemented for a filesystem 

1639 """ 

1640 raise NotImplementedError("Sign is not implemented for this filesystem") 

1641 

1642 def _isfilestore(self): 

1643 # Originally inherited from pyarrow DaskFileSystem. Keeping this 

1644 # here for backwards compatibility as long as pyarrow uses its 

1645 # legacy fsspec-compatible filesystems and thus accepts fsspec 

1646 # filesystems as well 

1647 return False 

1648 

1649 

1650class AbstractBufferedFile(io.IOBase): 

1651 """Convenient class to derive from to provide buffering 

1652 

1653 In the case that the backend does not provide a pythonic file-like object 

1654 already, this class contains much of the logic to build one. The only 

1655 methods that need to be overridden are ``_upload_chunk``, 

1656 ``_initiate_upload`` and ``_fetch_range``. 

1657 """ 

1658 

1659 DEFAULT_BLOCK_SIZE = 5 * 2**20 

1660 _details = None 

1661 

1662 def __init__( 

1663 self, 

1664 fs, 

1665 path, 

1666 mode="rb", 

1667 block_size="default", 

1668 autocommit=True, 

1669 cache_type="readahead", 

1670 cache_options=None, 

1671 size=None, 

1672 **kwargs, 

1673 ): 

1674 """ 

1675 Template for files with buffered reading and writing 

1676 

1677 Parameters 

1678 ---------- 

1679 fs: instance of FileSystem 

1680 path: str 

1681 location in file-system 

1682 mode: str 

1683 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file 

1684 systems may be read-only, and some may not support append. 

1685 block_size: int 

1686 Buffer size for reading or writing, 'default' for class default 

1687 autocommit: bool 

1688 Whether to write to final destination; may only impact what 

1689 happens when file is being closed. 

1690 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead" 

1691 Caching policy in read mode. See the definitions in ``core``. 

1692 cache_options : dict 

1693 Additional options passed to the constructor for the cache specified 

1694 by `cache_type`. 

1695 size: int 

1696 If given and in read mode, suppressed having to look up the file size 

1697 kwargs: 

1698 Gets stored as self.kwargs 

1699 """ 

1700 from .core import caches 

1701 

1702 self.path = path 

1703 self.fs = fs 

1704 self.mode = mode 

1705 self.blocksize = ( 

1706 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size 

1707 ) 

1708 self.loc = 0 

1709 self.autocommit = autocommit 

1710 self.end = None 

1711 self.start = None 

1712 self.closed = False 

1713 

1714 if cache_options is None: 

1715 cache_options = {} 

1716 

1717 if "trim" in kwargs: 

1718 warnings.warn( 

1719 "Passing 'trim' to control the cache behavior has been deprecated. " 

1720 "Specify it within the 'cache_options' argument instead.", 

1721 FutureWarning, 

1722 ) 

1723 cache_options["trim"] = kwargs.pop("trim") 

1724 

1725 self.kwargs = kwargs 

1726 

1727 if mode not in {"ab", "rb", "wb"}: 

1728 raise NotImplementedError("File mode not supported") 

1729 if mode == "rb": 

1730 if size is not None: 

1731 self.size = size 

1732 else: 

1733 self.size = self.details["size"] 

1734 self.cache = caches[cache_type]( 

1735 self.blocksize, self._fetch_range, self.size, **cache_options 

1736 ) 

1737 else: 

1738 self.buffer = io.BytesIO() 

1739 self.offset = None 

1740 self.forced = False 

1741 self.location = None 

1742 

1743 @property 

1744 def details(self): 

1745 if self._details is None: 

1746 self._details = self.fs.info(self.path) 

1747 return self._details 

1748 

1749 @details.setter 

1750 def details(self, value): 

1751 self._details = value 

1752 self.size = value["size"] 

1753 

1754 @property 

1755 def full_name(self): 

1756 return _unstrip_protocol(self.path, self.fs) 

1757 

1758 @property 

1759 def closed(self): 

1760 # get around this attr being read-only in IOBase 

1761 # use getattr here, since this can be called during del 

1762 return getattr(self, "_closed", True) 

1763 

1764 @closed.setter 

1765 def closed(self, c): 

1766 self._closed = c 

1767 

1768 def __hash__(self): 

1769 if "w" in self.mode: 

1770 return id(self) 

1771 else: 

1772 return int(tokenize(self.details), 16) 

1773 

1774 def __eq__(self, other): 

1775 """Files are equal if they have the same checksum, only in read mode""" 

1776 if self is other: 

1777 return True 

1778 return ( 

1779 isinstance(other, type(self)) 

1780 and self.mode == "rb" 

1781 and other.mode == "rb" 

1782 and hash(self) == hash(other) 

1783 ) 

1784 

1785 def commit(self): 

1786 """Move from temp to final destination""" 

1787 

1788 def discard(self): 

1789 """Throw away temporary file""" 

1790 

1791 def info(self): 

1792 """File information about this path""" 

1793 if "r" in self.mode: 

1794 return self.details 

1795 else: 

1796 raise ValueError("Info not available while writing") 

1797 

1798 def tell(self): 

1799 """Current file location""" 

1800 return self.loc 

1801 

1802 def seek(self, loc, whence=0): 

1803 """Set current file location 

1804 

1805 Parameters 

1806 ---------- 

1807 loc: int 

1808 byte location 

1809 whence: {0, 1, 2} 

1810 from start of file, current location or end of file, resp. 

1811 """ 

1812 loc = int(loc) 

1813 if not self.mode == "rb": 

1814 raise OSError(ESPIPE, "Seek only available in read mode") 

1815 if whence == 0: 

1816 nloc = loc 

1817 elif whence == 1: 

1818 nloc = self.loc + loc 

1819 elif whence == 2: 

1820 nloc = self.size + loc 

1821 else: 

1822 raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)") 

1823 if nloc < 0: 

1824 raise ValueError("Seek before start of file") 

1825 self.loc = nloc 

1826 return self.loc 

1827 

1828 def write(self, data): 

1829 """ 

1830 Write data to buffer. 

1831 

1832 Buffer only sent on flush() or if buffer is greater than 

1833 or equal to blocksize. 

1834 

1835 Parameters 

1836 ---------- 

1837 data: bytes 

1838 Set of bytes to be written. 

1839 """ 

1840 if self.mode not in {"wb", "ab"}: 

1841 raise ValueError("File not in write mode") 

1842 if self.closed: 

1843 raise ValueError("I/O operation on closed file.") 

1844 if self.forced: 

1845 raise ValueError("This file has been force-flushed, can only close") 

1846 out = self.buffer.write(data) 

1847 self.loc += out 

1848 if self.buffer.tell() >= self.blocksize: 

1849 self.flush() 

1850 return out 

1851 

1852 def flush(self, force=False): 

1853 """ 

1854 Write buffered data to backend store. 

1855 

1856 Writes the current buffer, if it is larger than the block-size, or if 

1857 the file is being closed. 

1858 

1859 Parameters 

1860 ---------- 

1861 force: bool 

1862 When closing, write the last block even if it is smaller than 

1863 blocks are allowed to be. Disallows further writing to this file. 

1864 """ 

1865 

1866 if self.closed: 

1867 raise ValueError("Flush on closed file") 

1868 if force and self.forced: 

1869 raise ValueError("Force flush cannot be called more than once") 

1870 if force: 

1871 self.forced = True 

1872 

1873 if self.mode not in {"wb", "ab"}: 

1874 # no-op to flush on read-mode 

1875 return 

1876 

1877 if not force and self.buffer.tell() < self.blocksize: 

1878 # Defer write on small block 

1879 return 

1880 

1881 if self.offset is None: 

1882 # Initialize a multipart upload 

1883 self.offset = 0 

1884 try: 

1885 self._initiate_upload() 

1886 except: # noqa: E722 

1887 self.closed = True 

1888 raise 

1889 

1890 if self._upload_chunk(final=force) is not False: 

1891 self.offset += self.buffer.seek(0, 2) 

1892 self.buffer = io.BytesIO() 

1893 

1894 def _upload_chunk(self, final=False): 

1895 """Write one part of a multi-block file upload 

1896 

1897 Parameters 

1898 ========== 

1899 final: bool 

1900 This is the last block, so should complete file, if 

1901 self.autocommit is True. 

1902 """ 

1903 # may not yet have been initialized, may need to call _initialize_upload 

1904 

1905 def _initiate_upload(self): 

1906 """Create remote file/upload""" 

1907 pass 

1908 

1909 def _fetch_range(self, start, end): 

1910 """Get the specified set of bytes from remote""" 

1911 raise NotImplementedError 

1912 

1913 def read(self, length=-1): 

1914 """ 

1915 Return data from cache, or fetch pieces as necessary 

1916 

1917 Parameters 

1918 ---------- 

1919 length: int (-1) 

1920 Number of bytes to read; if <0, all remaining bytes. 

1921 """ 

1922 length = -1 if length is None else int(length) 

1923 if self.mode != "rb": 

1924 raise ValueError("File not in read mode") 

1925 if length < 0: 

1926 length = self.size - self.loc 

1927 if self.closed: 

1928 raise ValueError("I/O operation on closed file.") 

1929 if length == 0: 

1930 # don't even bother calling fetch 

1931 return b"" 

1932 out = self.cache._fetch(self.loc, self.loc + length) 

1933 

1934 logger.debug( 

1935 "%s read: %i - %i %s", 

1936 self, 

1937 self.loc, 

1938 self.loc + length, 

1939 self.cache._log_stats(), 

1940 ) 

1941 self.loc += len(out) 

1942 return out 

1943 

1944 def readinto(self, b): 

1945 """mirrors builtin file's readinto method 

1946 

1947 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto 

1948 """ 

1949 out = memoryview(b).cast("B") 

1950 data = self.read(out.nbytes) 

1951 out[: len(data)] = data 

1952 return len(data) 

1953 

1954 def readuntil(self, char=b"\n", blocks=None): 

1955 """Return data between current position and first occurrence of char 

1956 

1957 char is included in the output, except if the end of the tile is 

1958 encountered first. 

1959 

1960 Parameters 

1961 ---------- 

1962 char: bytes 

1963 Thing to find 

1964 blocks: None or int 

1965 How much to read in each go. Defaults to file blocksize - which may 

1966 mean a new read on every call. 

1967 """ 

1968 out = [] 

1969 while True: 

1970 start = self.tell() 

1971 part = self.read(blocks or self.blocksize) 

1972 if len(part) == 0: 

1973 break 

1974 found = part.find(char) 

1975 if found > -1: 

1976 out.append(part[: found + len(char)]) 

1977 self.seek(start + found + len(char)) 

1978 break 

1979 out.append(part) 

1980 return b"".join(out) 

1981 

1982 def readline(self): 

1983 """Read until first occurrence of newline character 

1984 

1985 Note that, because of character encoding, this is not necessarily a 

1986 true line ending. 

1987 """ 

1988 return self.readuntil(b"\n") 

1989 

1990 def __next__(self): 

1991 out = self.readline() 

1992 if out: 

1993 return out 

1994 raise StopIteration 

1995 

1996 def __iter__(self): 

1997 return self 

1998 

1999 def readlines(self): 

2000 """Return all data, split by the newline character""" 

2001 data = self.read() 

2002 lines = data.split(b"\n") 

2003 out = [l + b"\n" for l in lines[:-1]] 

2004 if data.endswith(b"\n"): 

2005 return out 

2006 else: 

2007 return out + [lines[-1]] 

2008 # return list(self) ??? 

2009 

2010 def readinto1(self, b): 

2011 return self.readinto(b) 

2012 

2013 def close(self): 

2014 """Close file 

2015 

2016 Finalizes writes, discards cache 

2017 """ 

2018 if getattr(self, "_unclosable", False): 

2019 return 

2020 if self.closed: 

2021 return 

2022 if self.mode == "rb": 

2023 self.cache = None 

2024 else: 

2025 if not self.forced: 

2026 self.flush(force=True) 

2027 

2028 if self.fs is not None: 

2029 self.fs.invalidate_cache(self.path) 

2030 self.fs.invalidate_cache(self.fs._parent(self.path)) 

2031 

2032 self.closed = True 

2033 

2034 def readable(self): 

2035 """Whether opened for reading""" 

2036 return self.mode == "rb" and not self.closed 

2037 

2038 def seekable(self): 

2039 """Whether is seekable (only in read mode)""" 

2040 return self.readable() 

2041 

2042 def writable(self): 

2043 """Whether opened for writing""" 

2044 return self.mode in {"wb", "ab"} and not self.closed 

2045 

2046 def __del__(self): 

2047 if not self.closed: 

2048 self.close() 

2049 

2050 def __str__(self): 

2051 return f"<File-like object {type(self.fs).__name__}, {self.path}>" 

2052 

2053 __repr__ = __str__ 

2054 

2055 def __enter__(self): 

2056 return self 

2057 

2058 def __exit__(self, *args): 

2059 self.close()