Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/spec.py: 25%

781 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1from __future__ import annotations 

2 

3import io 

4import logging 

5import os 

6import threading 

7import warnings 

8import weakref 

9from errno import ESPIPE 

10from glob import has_magic 

11from hashlib import sha256 

12from typing import ClassVar 

13 

14from .callbacks import _DEFAULT_CALLBACK 

15from .config import apply_config, conf 

16from .dircache import DirCache 

17from .transaction import Transaction 

18from .utils import ( 

19 _unstrip_protocol, 

20 isfilelike, 

21 other_paths, 

22 read_block, 

23 stringify_path, 

24 tokenize, 

25) 

26 

27logger = logging.getLogger("fsspec") 

28 

29 

30def make_instance(cls, args, kwargs): 

31 return cls(*args, **kwargs) 

32 

33 

34class _Cached(type): 

35 """ 

36 Metaclass for caching file system instances. 

37 

38 Notes 

39 ----- 

40 Instances are cached according to 

41 

42 * The values of the class attributes listed in `_extra_tokenize_attributes` 

43 * The arguments passed to ``__init__``. 

44 

45 This creates an additional reference to the filesystem, which prevents the 

46 filesystem from being garbage collected when all *user* references go away. 

47 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also* 

48 be made for a filesystem instance to be garbage collected. 

49 """ 

50 

51 def __init__(cls, *args, **kwargs): 

52 super().__init__(*args, **kwargs) 

53 # Note: we intentionally create a reference here, to avoid garbage 

54 # collecting instances when all other references are gone. To really 

55 # delete a FileSystem, the cache must be cleared. 

56 if conf.get("weakref_instance_cache"): # pragma: no cover 

57 # debug option for analysing fork/spawn conditions 

58 cls._cache = weakref.WeakValueDictionary() 

59 else: 

60 cls._cache = {} 

61 cls._pid = os.getpid() 

62 

63 def __call__(cls, *args, **kwargs): 

64 kwargs = apply_config(cls, kwargs) 

65 extra_tokens = tuple( 

66 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes 

67 ) 

68 token = tokenize( 

69 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs 

70 ) 

71 skip = kwargs.pop("skip_instance_cache", False) 

72 if os.getpid() != cls._pid: 

73 cls._cache.clear() 

74 cls._pid = os.getpid() 

75 if not skip and cls.cachable and token in cls._cache: 

76 cls._latest = token 

77 return cls._cache[token] 

78 else: 

79 obj = super().__call__(*args, **kwargs) 

80 # Setting _fs_token here causes some static linters to complain. 

81 obj._fs_token_ = token 

82 obj.storage_args = args 

83 obj.storage_options = kwargs 

84 if obj.async_impl and obj.mirror_sync_methods: 

85 from .asyn import mirror_sync_methods 

86 

87 mirror_sync_methods(obj) 

88 

89 if cls.cachable and not skip: 

90 cls._latest = token 

91 cls._cache[token] = obj 

92 return obj 

93 

94 

95class AbstractFileSystem(metaclass=_Cached): 

96 """ 

97 An abstract super-class for pythonic file-systems 

98 

99 Implementations are expected to be compatible with or, better, subclass 

100 from here. 

101 """ 

102 

103 cachable = True # this class can be cached, instances reused 

104 _cached = False 

105 blocksize = 2**22 

106 sep = "/" 

107 protocol: ClassVar[str | tuple[str, ...]] = "abstract" 

108 _latest = None 

109 async_impl = False 

110 mirror_sync_methods = False 

111 root_marker = "" # For some FSs, may require leading '/' or other character 

112 

113 #: Extra *class attributes* that should be considered when hashing. 

114 _extra_tokenize_attributes = () 

115 

116 def __init__(self, *args, **storage_options): 

117 """Create and configure file-system instance 

118 

119 Instances may be cachable, so if similar enough arguments are seen 

120 a new instance is not required. The token attribute exists to allow 

121 implementations to cache instances if they wish. 

122 

123 A reasonable default should be provided if there are no arguments. 

124 

125 Subclasses should call this method. 

126 

127 Parameters 

128 ---------- 

129 use_listings_cache, listings_expiry_time, max_paths: 

130 passed to ``DirCache``, if the implementation supports 

131 directory listing caching. Pass use_listings_cache=False 

132 to disable such caching. 

133 skip_instance_cache: bool 

134 If this is a cachable implementation, pass True here to force 

135 creating a new instance even if a matching instance exists, and prevent 

136 storing this instance. 

137 asynchronous: bool 

138 loop: asyncio-compatible IOLoop or None 

139 """ 

140 if self._cached: 

141 # reusing instance, don't change 

142 return 

143 self._cached = True 

144 self._intrans = False 

145 self._transaction = None 

146 self._invalidated_caches_in_transaction = [] 

147 self.dircache = DirCache(**storage_options) 

148 

149 if storage_options.pop("add_docs", None): 

150 warnings.warn("add_docs is no longer supported.", FutureWarning) 

151 

152 if storage_options.pop("add_aliases", None): 

153 warnings.warn("add_aliases has been removed.", FutureWarning) 

154 # This is set in _Cached 

155 self._fs_token_ = None 

156 

157 @property 

158 def fsid(self): 

159 """Persistent filesystem id that can be used to compare filesystems 

160 across sessions. 

161 """ 

162 raise NotImplementedError 

163 

164 @property 

165 def _fs_token(self): 

166 return self._fs_token_ 

167 

168 def __dask_tokenize__(self): 

169 return self._fs_token 

170 

171 def __hash__(self): 

172 return int(self._fs_token, 16) 

173 

174 def __eq__(self, other): 

175 return isinstance(other, type(self)) and self._fs_token == other._fs_token 

176 

177 def __reduce__(self): 

178 return make_instance, (type(self), self.storage_args, self.storage_options) 

179 

180 @classmethod 

181 def _strip_protocol(cls, path): 

182 """Turn path from fully-qualified to file-system-specific 

183 

184 May require FS-specific handling, e.g., for relative paths or links. 

185 """ 

186 if isinstance(path, list): 

187 return [cls._strip_protocol(p) for p in path] 

188 path = stringify_path(path) 

189 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol 

190 for protocol in protos: 

191 if path.startswith(protocol + "://"): 

192 path = path[len(protocol) + 3 :] 

193 elif path.startswith(protocol + "::"): 

194 path = path[len(protocol) + 2 :] 

195 path = path.rstrip("/") 

196 # use of root_marker to make minimum required path, e.g., "/" 

197 return path or cls.root_marker 

198 

199 def unstrip_protocol(self, name): 

200 """Format FS-specific path to generic, including protocol""" 

201 protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol 

202 for protocol in protos: 

203 if name.startswith(f"{protocol}://"): 

204 return name 

205 return f"{protos[0]}://{name}" 

206 

207 @staticmethod 

208 def _get_kwargs_from_urls(path): 

209 """If kwargs can be encoded in the paths, extract them here 

210 

211 This should happen before instantiation of the class; incoming paths 

212 then should be amended to strip the options in methods. 

213 

214 Examples may look like an sftp path "sftp://user@host:/my/path", where 

215 the user and host should become kwargs and later get stripped. 

216 """ 

217 # by default, nothing happens 

218 return {} 

219 

220 @classmethod 

221 def current(cls): 

222 """Return the most recently instantiated FileSystem 

223 

224 If no instance has been created, then create one with defaults 

225 """ 

226 if cls._latest in cls._cache: 

227 return cls._cache[cls._latest] 

228 return cls() 

229 

230 @property 

231 def transaction(self): 

232 """A context within which files are committed together upon exit 

233 

234 Requires the file class to implement `.commit()` and `.discard()` 

235 for the normal and exception cases. 

236 """ 

237 if self._transaction is None: 

238 self._transaction = Transaction(self) 

239 return self._transaction 

240 

241 def start_transaction(self): 

242 """Begin write transaction for deferring files, non-context version""" 

243 self._intrans = True 

244 self._transaction = Transaction(self) 

245 return self.transaction 

246 

247 def end_transaction(self): 

248 """Finish write transaction, non-context version""" 

249 self.transaction.complete() 

250 self._transaction = None 

251 # The invalid cache must be cleared after the transcation is completed. 

252 for path in self._invalidated_caches_in_transaction: 

253 self.invalidate_cache(path) 

254 self._invalidated_caches_in_transaction.clear() 

255 

256 def invalidate_cache(self, path=None): 

257 """ 

258 Discard any cached directory information 

259 

260 Parameters 

261 ---------- 

262 path: string or None 

263 If None, clear all listings cached else listings at or under given 

264 path. 

265 """ 

266 # Not necessary to implement invalidation mechanism, may have no cache. 

267 # But if have, you should call this method of parent class from your 

268 # subclass to ensure expiring caches after transacations correctly. 

269 # See the implementation of FTPFileSystem in ftp.py 

270 if self._intrans: 

271 self._invalidated_caches_in_transaction.append(path) 

272 

273 def mkdir(self, path, create_parents=True, **kwargs): 

274 """ 

275 Create directory entry at path 

276 

277 For systems that don't have true directories, may create an for 

278 this instance only and not touch the real filesystem 

279 

280 Parameters 

281 ---------- 

282 path: str 

283 location 

284 create_parents: bool 

285 if True, this is equivalent to ``makedirs`` 

286 kwargs: 

287 may be permissions, etc. 

288 """ 

289 pass # not necessary to implement, may not have directories 

290 

291 def makedirs(self, path, exist_ok=False): 

292 """Recursively make directories 

293 

294 Creates directory at path and any intervening required directories. 

295 Raises exception if, for instance, the path already exists but is a 

296 file. 

297 

298 Parameters 

299 ---------- 

300 path: str 

301 leaf directory name 

302 exist_ok: bool (False) 

303 If False, will error if the target already exists 

304 """ 

305 pass # not necessary to implement, may not have directories 

306 

307 def rmdir(self, path): 

308 """Remove a directory, if empty""" 

309 pass # not necessary to implement, may not have directories 

310 

311 def ls(self, path, detail=True, **kwargs): 

312 """List objects at path. 

313 

314 This should include subdirectories and files at that location. The 

315 difference between a file and a directory must be clear when details 

316 are requested. 

317 

318 The specific keys, or perhaps a FileInfo class, or similar, is TBD, 

319 but must be consistent across implementations. 

320 Must include: 

321 

322 - full path to the entry (without protocol) 

323 - size of the entry, in bytes. If the value cannot be determined, will 

324 be ``None``. 

325 - type of entry, "file", "directory" or other 

326 

327 Additional information 

328 may be present, appropriate to the file-system, e.g., generation, 

329 checksum, etc. 

330 

331 May use refresh=True|False to allow use of self._ls_from_cache to 

332 check for a saved listing and avoid calling the backend. This would be 

333 common where listing may be expensive. 

334 

335 Parameters 

336 ---------- 

337 path: str 

338 detail: bool 

339 if True, gives a list of dictionaries, where each is the same as 

340 the result of ``info(path)``. If False, gives a list of paths 

341 (str). 

342 kwargs: may have additional backend-specific options, such as version 

343 information 

344 

345 Returns 

346 ------- 

347 List of strings if detail is False, or list of directory information 

348 dicts if detail is True. 

349 """ 

350 raise NotImplementedError 

351 

352 def _ls_from_cache(self, path): 

353 """Check cache for listing 

354 

355 Returns listing, if found (may be empty list for a directly that exists 

356 but contains nothing), None if not in cache. 

357 """ 

358 parent = self._parent(path) 

359 if path.rstrip("/") in self.dircache: 

360 return self.dircache[path.rstrip("/")] 

361 try: 

362 files = [ 

363 f 

364 for f in self.dircache[parent] 

365 if f["name"] == path 

366 or (f["name"] == path.rstrip("/") and f["type"] == "directory") 

367 ] 

368 if len(files) == 0: 

369 # parent dir was listed but did not contain this file 

370 raise FileNotFoundError(path) 

371 return files 

372 except KeyError: 

373 pass 

374 

375 def walk(self, path, maxdepth=None, topdown=True, **kwargs): 

376 """Return all files belows path 

377 

378 List all files, recursing into subdirectories; output is iterator-style, 

379 like ``os.walk()``. For a simple list of files, ``find()`` is available. 

380 

381 When topdown is True, the caller can modify the dirnames list in-place (perhaps 

382 using del or slice assignment), and walk() will 

383 only recurse into the subdirectories whose names remain in dirnames; 

384 this can be used to prune the search, impose a specific order of visiting, 

385 or even to inform walk() about directories the caller creates or renames before 

386 it resumes walk() again. 

387 Modifying dirnames when topdown is False has no effect. (see os.walk) 

388 

389 Note that the "files" outputted will include anything that is not 

390 a directory, such as links. 

391 

392 Parameters 

393 ---------- 

394 path: str 

395 Root to recurse into 

396 maxdepth: int 

397 Maximum recursion depth. None means limitless, but not recommended 

398 on link-based file-systems. 

399 topdown: bool (True) 

400 Whether to walk the directory tree from the top downwards or from 

401 the bottom upwards. 

402 kwargs: passed to ``ls`` 

403 """ 

404 if maxdepth is not None and maxdepth < 1: 

405 raise ValueError("maxdepth must be at least 1") 

406 

407 path = self._strip_protocol(path) 

408 full_dirs = {} 

409 dirs = {} 

410 files = {} 

411 

412 detail = kwargs.pop("detail", False) 

413 try: 

414 listing = self.ls(path, detail=True, **kwargs) 

415 except (FileNotFoundError, OSError): 

416 if detail: 

417 return path, {}, {} 

418 return path, [], [] 

419 

420 for info in listing: 

421 # each info name must be at least [path]/part , but here 

422 # we check also for names like [path]/part/ 

423 pathname = info["name"].rstrip("/") 

424 name = pathname.rsplit("/", 1)[-1] 

425 if info["type"] == "directory" and pathname != path: 

426 # do not include "self" path 

427 full_dirs[name] = pathname 

428 dirs[name] = info 

429 elif pathname == path: 

430 # file-like with same name as give path 

431 files[""] = info 

432 else: 

433 files[name] = info 

434 

435 if not detail: 

436 dirs = list(dirs) 

437 files = list(files) 

438 

439 if topdown: 

440 # Yield before recursion if walking top down 

441 yield path, dirs, files 

442 

443 if maxdepth is not None: 

444 maxdepth -= 1 

445 if maxdepth < 1: 

446 if not topdown: 

447 yield path, dirs, files 

448 return 

449 

450 for d in dirs: 

451 yield from self.walk( 

452 full_dirs[d], 

453 maxdepth=maxdepth, 

454 detail=detail, 

455 topdown=topdown, 

456 **kwargs, 

457 ) 

458 

459 if not topdown: 

460 # Yield after recursion if walking bottom up 

461 yield path, dirs, files 

462 

463 def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs): 

464 """List all files below path. 

465 

466 Like posix ``find`` command without conditions 

467 

468 Parameters 

469 ---------- 

470 path : str 

471 maxdepth: int or None 

472 If not None, the maximum number of levels to descend 

473 withdirs: bool 

474 Whether to include directory paths in the output. This is True 

475 when used by glob, but users usually only want files. 

476 kwargs are passed to ``ls``. 

477 """ 

478 # TODO: allow equivalent of -name parameter 

479 path = self._strip_protocol(path) 

480 out = dict() 

481 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs): 

482 if withdirs: 

483 files.update(dirs) 

484 out.update({info["name"]: info for name, info in files.items()}) 

485 if not out and self.isfile(path): 

486 # walk works on directories, but find should also return [path] 

487 # when path happens to be a file 

488 out[path] = {} 

489 names = sorted(out) 

490 if not detail: 

491 return names 

492 else: 

493 return {name: out[name] for name in names} 

494 

495 def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs): 

496 """Space used by files and optionally directories within a path 

497 

498 Directory size does not include the size of its contents. 

499 

500 Parameters 

501 ---------- 

502 path: str 

503 total: bool 

504 Whether to sum all the file sizes 

505 maxdepth: int or None 

506 Maximum number of directory levels to descend, None for unlimited. 

507 withdirs: bool 

508 Whether to include directory paths in the output. 

509 kwargs: passed to ``find`` 

510 

511 Returns 

512 ------- 

513 Dict of {path: size} if total=False, or int otherwise, where numbers 

514 refer to bytes used. 

515 """ 

516 sizes = {} 

517 if withdirs and self.isdir(path): 

518 # Include top-level directory in output 

519 info = self.info(path) 

520 sizes[info["name"]] = info["size"] 

521 for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs): 

522 info = self.info(f) 

523 sizes[info["name"]] = info["size"] 

524 if total: 

525 return sum(sizes.values()) 

526 else: 

527 return sizes 

528 

529 def glob(self, path, **kwargs): 

530 """ 

531 Find files by glob-matching. 

532 

533 If the path ends with '/' and does not contain "*", it is essentially 

534 the same as ``ls(path)``, returning only files. 

535 

536 We support ``"**"``, 

537 ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation. 

538 

539 Search path names that contain embedded characters special to this 

540 implementation of glob may not produce expected results; 

541 e.g., 'foo/bar/*starredfilename*'. 

542 

543 kwargs are passed to ``ls``. 

544 """ 

545 import re 

546 

547 ends = path.endswith("/") 

548 path = self._strip_protocol(path) 

549 indstar = path.find("*") if path.find("*") >= 0 else len(path) 

550 indques = path.find("?") if path.find("?") >= 0 else len(path) 

551 indbrace = path.find("[") if path.find("[") >= 0 else len(path) 

552 

553 ind = min(indstar, indques, indbrace) 

554 

555 detail = kwargs.pop("detail", False) 

556 

557 if not has_magic(path): 

558 root = path 

559 depth = 1 

560 if ends: 

561 path += "/*" 

562 elif self.exists(path): 

563 if not detail: 

564 return [path] 

565 else: 

566 return {path: self.info(path)} 

567 else: 

568 if not detail: 

569 return [] # glob of non-existent returns empty 

570 else: 

571 return {} 

572 elif "/" in path[:ind]: 

573 ind2 = path[:ind].rindex("/") 

574 root = path[: ind2 + 1] 

575 depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1 

576 else: 

577 root = "" 

578 depth = None if "**" in path else path[ind + 1 :].count("/") + 1 

579 

580 allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs) 

581 # Escape characters special to python regex, leaving our supported 

582 # special characters in place. 

583 # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html 

584 # for shell globbing details. 

585 pattern = ( 

586 "^" 

587 + ( 

588 path.replace("\\", r"\\") 

589 .replace(".", r"\.") 

590 .replace("+", r"\+") 

591 .replace("//", "/") 

592 .replace("(", r"\(") 

593 .replace(")", r"\)") 

594 .replace("|", r"\|") 

595 .replace("^", r"\^") 

596 .replace("$", r"\$") 

597 .replace("{", r"\{") 

598 .replace("}", r"\}") 

599 .rstrip("/") 

600 .replace("?", ".") 

601 ) 

602 + "$" 

603 ) 

604 pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern) 

605 pattern = re.sub("[*]", "[^/]*", pattern) 

606 pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*")) 

607 out = { 

608 p: allpaths[p] 

609 for p in sorted(allpaths) 

610 if pattern.match(p.replace("//", "/").rstrip("/")) 

611 } 

612 if detail: 

613 return out 

614 else: 

615 return list(out) 

616 

617 def exists(self, path, **kwargs): 

618 """Is there a file at the given path""" 

619 try: 

620 self.info(path, **kwargs) 

621 return True 

622 except: # noqa: E722 

623 # any exception allowed bar FileNotFoundError? 

624 return False 

625 

626 def lexists(self, path, **kwargs): 

627 """If there is a file at the given path (including 

628 broken links)""" 

629 return self.exists(path) 

630 

631 def info(self, path, **kwargs): 

632 """Give details of entry at path 

633 

634 Returns a single dictionary, with exactly the same information as ``ls`` 

635 would with ``detail=True``. 

636 

637 The default implementation should calls ls and could be overridden by a 

638 shortcut. kwargs are passed on to ```ls()``. 

639 

640 Some file systems might not be able to measure the file's size, in 

641 which case, the returned dict will include ``'size': None``. 

642 

643 Returns 

644 ------- 

645 dict with keys: name (full path in the FS), size (in bytes), type (file, 

646 directory, or something else) and other FS-specific keys. 

647 """ 

648 path = self._strip_protocol(path) 

649 out = self.ls(self._parent(path), detail=True, **kwargs) 

650 out = [o for o in out if o["name"].rstrip("/") == path] 

651 if out: 

652 return out[0] 

653 out = self.ls(path, detail=True, **kwargs) 

654 path = path.rstrip("/") 

655 out1 = [o for o in out if o["name"].rstrip("/") == path] 

656 if len(out1) == 1: 

657 if "size" not in out1[0]: 

658 out1[0]["size"] = None 

659 return out1[0] 

660 elif len(out1) > 1 or out: 

661 return {"name": path, "size": 0, "type": "directory"} 

662 else: 

663 raise FileNotFoundError(path) 

664 

665 def checksum(self, path): 

666 """Unique value for current version of file 

667 

668 If the checksum is the same from one moment to another, the contents 

669 are guaranteed to be the same. If the checksum changes, the contents 

670 *might* have changed. 

671 

672 This should normally be overridden; default will probably capture 

673 creation/modification timestamp (which would be good) or maybe 

674 access timestamp (which would be bad) 

675 """ 

676 return int(tokenize(self.info(path)), 16) 

677 

678 def size(self, path): 

679 """Size in bytes of file""" 

680 return self.info(path).get("size", None) 

681 

682 def sizes(self, paths): 

683 """Size in bytes of each file in a list of paths""" 

684 return [self.size(p) for p in paths] 

685 

686 def isdir(self, path): 

687 """Is this entry directory-like?""" 

688 try: 

689 return self.info(path)["type"] == "directory" 

690 except OSError: 

691 return False 

692 

693 def isfile(self, path): 

694 """Is this entry file-like?""" 

695 try: 

696 return self.info(path)["type"] == "file" 

697 except: # noqa: E722 

698 return False 

699 

700 def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs): 

701 """Get the contents of the file as a string. 

702 

703 Parameters 

704 ---------- 

705 path: str 

706 URL of file on this filesystems 

707 encoding, errors, newline: same as `open`. 

708 """ 

709 with self.open( 

710 path, 

711 mode="r", 

712 encoding=encoding, 

713 errors=errors, 

714 newline=newline, 

715 **kwargs, 

716 ) as f: 

717 return f.read() 

718 

719 def write_text( 

720 self, path, value, encoding=None, errors=None, newline=None, **kwargs 

721 ): 

722 """Write the text to the given file. 

723 

724 An existing file will be overwritten. 

725 

726 Parameters 

727 ---------- 

728 path: str 

729 URL of file on this filesystems 

730 value: str 

731 Text to write. 

732 encoding, errors, newline: same as `open`. 

733 """ 

734 with self.open( 

735 path, 

736 mode="w", 

737 encoding=encoding, 

738 errors=errors, 

739 newline=newline, 

740 **kwargs, 

741 ) as f: 

742 return f.write(value) 

743 

744 def cat_file(self, path, start=None, end=None, **kwargs): 

745 """Get the content of a file 

746 

747 Parameters 

748 ---------- 

749 path: URL of file on this filesystems 

750 start, end: int 

751 Bytes limits of the read. If negative, backwards from end, 

752 like usual python slices. Either can be None for start or 

753 end of file, respectively 

754 kwargs: passed to ``open()``. 

755 """ 

756 # explicitly set buffering off? 

757 with self.open(path, "rb", **kwargs) as f: 

758 if start is not None: 

759 if start >= 0: 

760 f.seek(start) 

761 else: 

762 f.seek(max(0, f.size + start)) 

763 if end is not None: 

764 if end < 0: 

765 end = f.size + end 

766 return f.read(end - f.tell()) 

767 return f.read() 

768 

769 def pipe_file(self, path, value, **kwargs): 

770 """Set the bytes of given file""" 

771 with self.open(path, "wb", **kwargs) as f: 

772 f.write(value) 

773 

774 def pipe(self, path, value=None, **kwargs): 

775 """Put value into path 

776 

777 (counterpart to ``cat``) 

778 

779 Parameters 

780 ---------- 

781 path: string or dict(str, bytes) 

782 If a string, a single remote location to put ``value`` bytes; if a dict, 

783 a mapping of {path: bytesvalue}. 

784 value: bytes, optional 

785 If using a single path, these are the bytes to put there. Ignored if 

786 ``path`` is a dict 

787 """ 

788 if isinstance(path, str): 

789 self.pipe_file(self._strip_protocol(path), value, **kwargs) 

790 elif isinstance(path, dict): 

791 for k, v in path.items(): 

792 self.pipe_file(self._strip_protocol(k), v, **kwargs) 

793 else: 

794 raise ValueError("path must be str or dict") 

795 

796 def cat_ranges( 

797 self, paths, starts, ends, max_gap=None, on_error="return", **kwargs 

798 ): 

799 if max_gap is not None: 

800 raise NotImplementedError 

801 if not isinstance(paths, list): 

802 raise TypeError 

803 if not isinstance(starts, list): 

804 starts = [starts] * len(paths) 

805 if not isinstance(ends, list): 

806 ends = [starts] * len(paths) 

807 if len(starts) != len(paths) or len(ends) != len(paths): 

808 raise ValueError 

809 out = [] 

810 for p, s, e in zip(paths, starts, ends): 

811 try: 

812 out.append(self.cat_file(p, s, e)) 

813 except Exception as e: 

814 if on_error == "return": 

815 out.append(e) 

816 else: 

817 raise 

818 return out 

819 

820 def cat(self, path, recursive=False, on_error="raise", **kwargs): 

821 """Fetch (potentially multiple) paths' contents 

822 

823 Parameters 

824 ---------- 

825 recursive: bool 

826 If True, assume the path(s) are directories, and get all the 

827 contained files 

828 on_error : "raise", "omit", "return" 

829 If raise, an underlying exception will be raised (converted to KeyError 

830 if the type is in self.missing_exceptions); if omit, keys with exception 

831 will simply not be included in the output; if "return", all keys are 

832 included in the output, but the value will be bytes or an exception 

833 instance. 

834 kwargs: passed to cat_file 

835 

836 Returns 

837 ------- 

838 dict of {path: contents} if there are multiple paths 

839 or the path has been otherwise expanded 

840 """ 

841 paths = self.expand_path(path, recursive=recursive) 

842 if ( 

843 len(paths) > 1 

844 or isinstance(path, list) 

845 or paths[0] != self._strip_protocol(path) 

846 ): 

847 out = {} 

848 for path in paths: 

849 try: 

850 out[path] = self.cat_file(path, **kwargs) 

851 except Exception as e: 

852 if on_error == "raise": 

853 raise 

854 if on_error == "return": 

855 out[path] = e 

856 return out 

857 else: 

858 return self.cat_file(paths[0], **kwargs) 

859 

860 def get_file( 

861 self, rpath, lpath, callback=_DEFAULT_CALLBACK, outfile=None, **kwargs 

862 ): 

863 """Copy single remote file to local""" 

864 from .implementations.local import LocalFileSystem 

865 

866 if isfilelike(lpath): 

867 outfile = lpath 

868 elif self.isdir(rpath): 

869 os.makedirs(lpath, exist_ok=True) 

870 return None 

871 

872 LocalFileSystem(auto_mkdir=True).makedirs(self._parent(lpath), exist_ok=True) 

873 

874 with self.open(rpath, "rb", **kwargs) as f1: 

875 if outfile is None: 

876 outfile = open(lpath, "wb") 

877 

878 try: 

879 callback.set_size(getattr(f1, "size", None)) 

880 data = True 

881 while data: 

882 data = f1.read(self.blocksize) 

883 segment_len = outfile.write(data) 

884 if segment_len is None: 

885 segment_len = len(data) 

886 callback.relative_update(segment_len) 

887 finally: 

888 if not isfilelike(lpath): 

889 outfile.close() 

890 

891 def get( 

892 self, 

893 rpath, 

894 lpath, 

895 recursive=False, 

896 callback=_DEFAULT_CALLBACK, 

897 maxdepth=None, 

898 **kwargs, 

899 ): 

900 """Copy file(s) to local. 

901 

902 Copies a specific file or tree of files (if recursive=True). If lpath 

903 ends with a "/", it will be assumed to be a directory, and target files 

904 will go within. Can submit a list of paths, which may be glob-patterns 

905 and will be expanded. 

906 

907 Calls get_file for each source. 

908 """ 

909 from .implementations.local import ( 

910 LocalFileSystem, 

911 make_path_posix, 

912 trailing_sep, 

913 trailing_sep_maybe_asterisk, 

914 ) 

915 

916 source_is_str = isinstance(rpath, str) 

917 rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth) 

918 if source_is_str and (not recursive or maxdepth is not None): 

919 # Non-recursive glob does not copy directories 

920 rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))] 

921 if not rpaths: 

922 return 

923 

924 if isinstance(lpath, str): 

925 lpath = make_path_posix(lpath) 

926 isdir = isinstance(lpath, str) and ( 

927 trailing_sep(lpath) or LocalFileSystem().isdir(lpath) 

928 ) 

929 lpaths = other_paths( 

930 rpaths, 

931 lpath, 

932 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(rpath), 

933 is_dir=isdir, 

934 flatten=not source_is_str, 

935 ) 

936 

937 callback.set_size(len(lpaths)) 

938 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): 

939 callback.branch(rpath, lpath, kwargs) 

940 self.get_file(rpath, lpath, **kwargs) 

941 

942 def put_file(self, lpath, rpath, callback=_DEFAULT_CALLBACK, **kwargs): 

943 """Copy single file to remote""" 

944 if os.path.isdir(lpath): 

945 self.makedirs(rpath, exist_ok=True) 

946 return None 

947 

948 with open(lpath, "rb") as f1: 

949 size = f1.seek(0, 2) 

950 callback.set_size(size) 

951 f1.seek(0) 

952 

953 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True) 

954 with self.open(rpath, "wb", **kwargs) as f2: 

955 while f1.tell() < size: 

956 data = f1.read(self.blocksize) 

957 segment_len = f2.write(data) 

958 if segment_len is None: 

959 segment_len = len(data) 

960 callback.relative_update(segment_len) 

961 

962 def put( 

963 self, 

964 lpath, 

965 rpath, 

966 recursive=False, 

967 callback=_DEFAULT_CALLBACK, 

968 maxdepth=None, 

969 **kwargs, 

970 ): 

971 """Copy file(s) from local. 

972 

973 Copies a specific file or tree of files (if recursive=True). If rpath 

974 ends with a "/", it will be assumed to be a directory, and target files 

975 will go within. 

976 

977 Calls put_file for each source. 

978 """ 

979 from .implementations.local import ( 

980 LocalFileSystem, 

981 make_path_posix, 

982 trailing_sep, 

983 trailing_sep_maybe_asterisk, 

984 ) 

985 

986 source_is_str = isinstance(lpath, str) 

987 if source_is_str: 

988 lpath = make_path_posix(lpath) 

989 fs = LocalFileSystem() 

990 lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) 

991 if source_is_str and (not recursive or maxdepth is not None): 

992 # Non-recursive glob does not copy directories 

993 lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] 

994 if not lpaths: 

995 return 

996 

997 isdir = isinstance(rpath, str) and (trailing_sep(rpath) or self.isdir(rpath)) 

998 rpath = ( 

999 self._strip_protocol(rpath) 

1000 if isinstance(rpath, str) 

1001 else [self._strip_protocol(p) for p in rpath] 

1002 ) 

1003 rpaths = other_paths( 

1004 lpaths, 

1005 rpath, 

1006 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(lpath), 

1007 is_dir=isdir, 

1008 flatten=not source_is_str, 

1009 ) 

1010 

1011 callback.set_size(len(rpaths)) 

1012 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): 

1013 callback.branch(lpath, rpath, kwargs) 

1014 self.put_file(lpath, rpath, **kwargs) 

1015 

1016 def head(self, path, size=1024): 

1017 """Get the first ``size`` bytes from file""" 

1018 with self.open(path, "rb") as f: 

1019 return f.read(size) 

1020 

1021 def tail(self, path, size=1024): 

1022 """Get the last ``size`` bytes from file""" 

1023 with self.open(path, "rb") as f: 

1024 f.seek(max(-size, -f.size), 2) 

1025 return f.read() 

1026 

1027 def cp_file(self, path1, path2, **kwargs): 

1028 raise NotImplementedError 

1029 

1030 def copy( 

1031 self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs 

1032 ): 

1033 """Copy within two locations in the filesystem 

1034 

1035 on_error : "raise", "ignore" 

1036 If raise, any not-found exceptions will be raised; if ignore any 

1037 not-found exceptions will cause the path to be skipped; defaults to 

1038 raise unless recursive is true, where the default is ignore 

1039 """ 

1040 from .implementations.local import trailing_sep, trailing_sep_maybe_asterisk 

1041 

1042 if on_error is None and recursive: 

1043 on_error = "ignore" 

1044 elif on_error is None: 

1045 on_error = "raise" 

1046 

1047 source_is_str = isinstance(path1, str) 

1048 paths = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth) 

1049 if source_is_str and (not recursive or maxdepth is not None): 

1050 # Non-recursive glob does not copy directories 

1051 paths = [p for p in paths if not (trailing_sep(p) or self.isdir(p))] 

1052 if not paths: 

1053 return 

1054 

1055 isdir = isinstance(path2, str) and (trailing_sep(path2) or self.isdir(path2)) 

1056 path2 = other_paths( 

1057 paths, 

1058 path2, 

1059 exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(path1), 

1060 is_dir=isdir, 

1061 flatten=not source_is_str, 

1062 ) 

1063 

1064 for p1, p2 in zip(paths, path2): 

1065 try: 

1066 self.cp_file(p1, p2, **kwargs) 

1067 except FileNotFoundError: 

1068 if on_error == "raise": 

1069 raise 

1070 

1071 def expand_path(self, path, recursive=False, maxdepth=None, **kwargs): 

1072 """Turn one or more globs or directories into a list of all matching paths 

1073 to files or directories. 

1074 

1075 kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls`` 

1076 """ 

1077 

1078 if maxdepth is not None and maxdepth < 1: 

1079 raise ValueError("maxdepth must be at least 1") 

1080 

1081 if isinstance(path, str): 

1082 out = self.expand_path([path], recursive, maxdepth) 

1083 else: 

1084 out = set() 

1085 path = [self._strip_protocol(p) for p in path] 

1086 for p in path: 

1087 if has_magic(p): 

1088 bit = set(self.glob(p, **kwargs)) 

1089 out |= bit 

1090 if recursive: 

1091 # glob call above expanded one depth so if maxdepth is defined 

1092 # then decrement it in expand_path call below. If it is zero 

1093 # after decrementing then avoid expand_path call. 

1094 if maxdepth is not None and maxdepth <= 1: 

1095 continue 

1096 out |= set( 

1097 self.expand_path( 

1098 list(bit), 

1099 recursive=recursive, 

1100 maxdepth=maxdepth - 1 if maxdepth is not None else None, 

1101 **kwargs, 

1102 ) 

1103 ) 

1104 continue 

1105 elif recursive: 

1106 rec = set( 

1107 self.find( 

1108 p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs 

1109 ) 

1110 ) 

1111 out |= rec 

1112 if p not in out and (recursive is False or self.exists(p)): 

1113 # should only check once, for the root 

1114 out.add(p) 

1115 if not out: 

1116 raise FileNotFoundError(path) 

1117 return list(sorted(out)) 

1118 

1119 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): 

1120 """Move file(s) from one location to another""" 

1121 if path1 == path2: 

1122 logger.debug( 

1123 "%s mv: The paths are the same, so no files were moved." % (self) 

1124 ) 

1125 else: 

1126 self.copy(path1, path2, recursive=recursive, maxdepth=maxdepth) 

1127 self.rm(path1, recursive=recursive) 

1128 

1129 def rm_file(self, path): 

1130 """Delete a file""" 

1131 self._rm(path) 

1132 

1133 def _rm(self, path): 

1134 """Delete one file""" 

1135 # this is the old name for the method, prefer rm_file 

1136 raise NotImplementedError 

1137 

1138 def rm(self, path, recursive=False, maxdepth=None): 

1139 """Delete files. 

1140 

1141 Parameters 

1142 ---------- 

1143 path: str or list of str 

1144 File(s) to delete. 

1145 recursive: bool 

1146 If file(s) are directories, recursively delete contents and then 

1147 also remove the directory 

1148 maxdepth: int or None 

1149 Depth to pass to walk for finding files to delete, if recursive. 

1150 If None, there will be no limit and infinite recursion may be 

1151 possible. 

1152 """ 

1153 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) 

1154 for p in reversed(path): 

1155 self.rm_file(p) 

1156 

1157 @classmethod 

1158 def _parent(cls, path): 

1159 path = cls._strip_protocol(path) 

1160 if "/" in path: 

1161 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker) 

1162 return cls.root_marker + parent 

1163 else: 

1164 return cls.root_marker 

1165 

1166 def _open( 

1167 self, 

1168 path, 

1169 mode="rb", 

1170 block_size=None, 

1171 autocommit=True, 

1172 cache_options=None, 

1173 **kwargs, 

1174 ): 

1175 """Return raw bytes-mode file-like from the file-system""" 

1176 return AbstractBufferedFile( 

1177 self, 

1178 path, 

1179 mode, 

1180 block_size, 

1181 autocommit, 

1182 cache_options=cache_options, 

1183 **kwargs, 

1184 ) 

1185 

1186 def open( 

1187 self, 

1188 path, 

1189 mode="rb", 

1190 block_size=None, 

1191 cache_options=None, 

1192 compression=None, 

1193 **kwargs, 

1194 ): 

1195 """ 

1196 Return a file-like object from the filesystem 

1197 

1198 The resultant instance must function correctly in a context ``with`` 

1199 block. 

1200 

1201 Parameters 

1202 ---------- 

1203 path: str 

1204 Target file 

1205 mode: str like 'rb', 'w' 

1206 See builtin ``open()`` 

1207 block_size: int 

1208 Some indication of buffering - this is a value in bytes 

1209 cache_options : dict, optional 

1210 Extra arguments to pass through to the cache. 

1211 compression: string or None 

1212 If given, open file using compression codec. Can either be a compression 

1213 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 

1214 compression from the filename suffix. 

1215 encoding, errors, newline: passed on to TextIOWrapper for text mode 

1216 """ 

1217 import io 

1218 

1219 path = self._strip_protocol(path) 

1220 if "b" not in mode: 

1221 mode = mode.replace("t", "") + "b" 

1222 

1223 text_kwargs = { 

1224 k: kwargs.pop(k) 

1225 for k in ["encoding", "errors", "newline"] 

1226 if k in kwargs 

1227 } 

1228 return io.TextIOWrapper( 

1229 self.open( 

1230 path, 

1231 mode, 

1232 block_size=block_size, 

1233 cache_options=cache_options, 

1234 compression=compression, 

1235 **kwargs, 

1236 ), 

1237 **text_kwargs, 

1238 ) 

1239 else: 

1240 ac = kwargs.pop("autocommit", not self._intrans) 

1241 f = self._open( 

1242 path, 

1243 mode=mode, 

1244 block_size=block_size, 

1245 autocommit=ac, 

1246 cache_options=cache_options, 

1247 **kwargs, 

1248 ) 

1249 if compression is not None: 

1250 from fsspec.compression import compr 

1251 from fsspec.core import get_compression 

1252 

1253 compression = get_compression(path, compression) 

1254 compress = compr[compression] 

1255 f = compress(f, mode=mode[0]) 

1256 

1257 if not ac and "r" not in mode: 

1258 self.transaction.files.append(f) 

1259 return f 

1260 

1261 def touch(self, path, truncate=True, **kwargs): 

1262 """Create empty file, or update timestamp 

1263 

1264 Parameters 

1265 ---------- 

1266 path: str 

1267 file location 

1268 truncate: bool 

1269 If True, always set file size to 0; if False, update timestamp and 

1270 leave file unchanged, if backend allows this 

1271 """ 

1272 if truncate or not self.exists(path): 

1273 with self.open(path, "wb", **kwargs): 

1274 pass 

1275 else: 

1276 raise NotImplementedError # update timestamp, if possible 

1277 

1278 def ukey(self, path): 

1279 """Hash of file properties, to tell if it has changed""" 

1280 return sha256(str(self.info(path)).encode()).hexdigest() 

1281 

1282 def read_block(self, fn, offset, length, delimiter=None): 

1283 """Read a block of bytes from 

1284 

1285 Starting at ``offset`` of the file, read ``length`` bytes. If 

1286 ``delimiter`` is set then we ensure that the read starts and stops at 

1287 delimiter boundaries that follow the locations ``offset`` and ``offset 

1288 + length``. If ``offset`` is zero then we start at zero. The 

1289 bytestring returned WILL include the end delimiter string. 

1290 

1291 If offset+length is beyond the eof, reads to eof. 

1292 

1293 Parameters 

1294 ---------- 

1295 fn: string 

1296 Path to filename 

1297 offset: int 

1298 Byte offset to start read 

1299 length: int 

1300 Number of bytes to read. If None, read to end. 

1301 delimiter: bytes (optional) 

1302 Ensure reading starts and stops at delimiter bytestring 

1303 

1304 Examples 

1305 -------- 

1306 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP 

1307 b'Alice, 100\\nBo' 

1308 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP 

1309 b'Alice, 100\\nBob, 200\\n' 

1310 

1311 Use ``length=None`` to read to the end of the file. 

1312 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP 

1313 b'Alice, 100\\nBob, 200\\nCharlie, 300' 

1314 

1315 See Also 

1316 -------- 

1317 :func:`fsspec.utils.read_block` 

1318 """ 

1319 with self.open(fn, "rb") as f: 

1320 size = f.size 

1321 if length is None: 

1322 length = size 

1323 if size is not None and offset + length > size: 

1324 length = size - offset 

1325 return read_block(f, offset, length, delimiter) 

1326 

1327 def to_json(self): 

1328 """ 

1329 JSON representation of this filesystem instance 

1330 

1331 Returns 

1332 ------- 

1333 str: JSON structure with keys cls (the python location of this class), 

1334 protocol (text name of this class's protocol, first one in case of 

1335 multiple), args (positional args, usually empty), and all other 

1336 kwargs as their own keys. 

1337 """ 

1338 import json 

1339 

1340 cls = type(self) 

1341 cls = ".".join((cls.__module__, cls.__name__)) 

1342 proto = ( 

1343 self.protocol[0] 

1344 if isinstance(self.protocol, (tuple, list)) 

1345 else self.protocol 

1346 ) 

1347 return json.dumps( 

1348 dict( 

1349 **{"cls": cls, "protocol": proto, "args": self.storage_args}, 

1350 **self.storage_options, 

1351 ) 

1352 ) 

1353 

1354 @staticmethod 

1355 def from_json(blob): 

1356 """ 

1357 Recreate a filesystem instance from JSON representation 

1358 

1359 See ``.to_json()`` for the expected structure of the input 

1360 

1361 Parameters 

1362 ---------- 

1363 blob: str 

1364 

1365 Returns 

1366 ------- 

1367 file system instance, not necessarily of this particular class. 

1368 """ 

1369 import json 

1370 

1371 from .registry import _import_class, get_filesystem_class 

1372 

1373 dic = json.loads(blob) 

1374 protocol = dic.pop("protocol") 

1375 try: 

1376 cls = _import_class(dic.pop("cls")) 

1377 except (ImportError, ValueError, RuntimeError, KeyError): 

1378 cls = get_filesystem_class(protocol) 

1379 return cls(*dic.pop("args", ()), **dic) 

1380 

1381 def _get_pyarrow_filesystem(self): 

1382 """ 

1383 Make a version of the FS instance which will be acceptable to pyarrow 

1384 """ 

1385 # all instances already also derive from pyarrow 

1386 return self 

1387 

1388 def get_mapper(self, root="", check=False, create=False, missing_exceptions=None): 

1389 """Create key/value store based on this file-system 

1390 

1391 Makes a MutableMapping interface to the FS at the given root path. 

1392 See ``fsspec.mapping.FSMap`` for further details. 

1393 """ 

1394 from .mapping import FSMap 

1395 

1396 return FSMap( 

1397 root, 

1398 self, 

1399 check=check, 

1400 create=create, 

1401 missing_exceptions=missing_exceptions, 

1402 ) 

1403 

1404 @classmethod 

1405 def clear_instance_cache(cls): 

1406 """ 

1407 Clear the cache of filesystem instances. 

1408 

1409 Notes 

1410 ----- 

1411 Unless overridden by setting the ``cachable`` class attribute to False, 

1412 the filesystem class stores a reference to newly created instances. This 

1413 prevents Python's normal rules around garbage collection from working, 

1414 since the instances refcount will not drop to zero until 

1415 ``clear_instance_cache`` is called. 

1416 """ 

1417 cls._cache.clear() 

1418 

1419 def created(self, path): 

1420 """Return the created timestamp of a file as a datetime.datetime""" 

1421 raise NotImplementedError 

1422 

1423 def modified(self, path): 

1424 """Return the modified timestamp of a file as a datetime.datetime""" 

1425 raise NotImplementedError 

1426 

1427 # ------------------------------------------------------------------------ 

1428 # Aliases 

1429 

1430 def read_bytes(self, path, start=None, end=None, **kwargs): 

1431 """Alias of `AbstractFileSystem.cat_file`.""" 

1432 return self.cat_file(path, start=start, end=end, **kwargs) 

1433 

1434 def write_bytes(self, path, value, **kwargs): 

1435 """Alias of `AbstractFileSystem.pipe_file`.""" 

1436 self.pipe_file(path, value, **kwargs) 

1437 

1438 def makedir(self, path, create_parents=True, **kwargs): 

1439 """Alias of `AbstractFileSystem.mkdir`.""" 

1440 return self.mkdir(path, create_parents=create_parents, **kwargs) 

1441 

1442 def mkdirs(self, path, exist_ok=False): 

1443 """Alias of `AbstractFileSystem.makedirs`.""" 

1444 return self.makedirs(path, exist_ok=exist_ok) 

1445 

1446 def listdir(self, path, detail=True, **kwargs): 

1447 """Alias of `AbstractFileSystem.ls`.""" 

1448 return self.ls(path, detail=detail, **kwargs) 

1449 

1450 def cp(self, path1, path2, **kwargs): 

1451 """Alias of `AbstractFileSystem.copy`.""" 

1452 return self.copy(path1, path2, **kwargs) 

1453 

1454 def move(self, path1, path2, **kwargs): 

1455 """Alias of `AbstractFileSystem.mv`.""" 

1456 return self.mv(path1, path2, **kwargs) 

1457 

1458 def stat(self, path, **kwargs): 

1459 """Alias of `AbstractFileSystem.info`.""" 

1460 return self.info(path, **kwargs) 

1461 

1462 def disk_usage(self, path, total=True, maxdepth=None, **kwargs): 

1463 """Alias of `AbstractFileSystem.du`.""" 

1464 return self.du(path, total=total, maxdepth=maxdepth, **kwargs) 

1465 

1466 def rename(self, path1, path2, **kwargs): 

1467 """Alias of `AbstractFileSystem.mv`.""" 

1468 return self.mv(path1, path2, **kwargs) 

1469 

1470 def delete(self, path, recursive=False, maxdepth=None): 

1471 """Alias of `AbstractFileSystem.rm`.""" 

1472 return self.rm(path, recursive=recursive, maxdepth=maxdepth) 

1473 

1474 def upload(self, lpath, rpath, recursive=False, **kwargs): 

1475 """Alias of `AbstractFileSystem.put`.""" 

1476 return self.put(lpath, rpath, recursive=recursive, **kwargs) 

1477 

1478 def download(self, rpath, lpath, recursive=False, **kwargs): 

1479 """Alias of `AbstractFileSystem.get`.""" 

1480 return self.get(rpath, lpath, recursive=recursive, **kwargs) 

1481 

1482 def sign(self, path, expiration=100, **kwargs): 

1483 """Create a signed URL representing the given path 

1484 

1485 Some implementations allow temporary URLs to be generated, as a 

1486 way of delegating credentials. 

1487 

1488 Parameters 

1489 ---------- 

1490 path : str 

1491 The path on the filesystem 

1492 expiration : int 

1493 Number of seconds to enable the URL for (if supported) 

1494 

1495 Returns 

1496 ------- 

1497 URL : str 

1498 The signed URL 

1499 

1500 Raises 

1501 ------ 

1502 NotImplementedError : if method is not implemented for a filesystem 

1503 """ 

1504 raise NotImplementedError("Sign is not implemented for this filesystem") 

1505 

1506 def _isfilestore(self): 

1507 # Originally inherited from pyarrow DaskFileSystem. Keeping this 

1508 # here for backwards compatibility as long as pyarrow uses its 

1509 # legacy fsspec-compatible filesystems and thus accepts fsspec 

1510 # filesystems as well 

1511 return False 

1512 

1513 

1514class AbstractBufferedFile(io.IOBase): 

1515 """Convenient class to derive from to provide buffering 

1516 

1517 In the case that the backend does not provide a pythonic file-like object 

1518 already, this class contains much of the logic to build one. The only 

1519 methods that need to be overridden are ``_upload_chunk``, 

1520 ``_initiate_upload`` and ``_fetch_range``. 

1521 """ 

1522 

1523 DEFAULT_BLOCK_SIZE = 5 * 2**20 

1524 _details = None 

1525 

1526 def __init__( 

1527 self, 

1528 fs, 

1529 path, 

1530 mode="rb", 

1531 block_size="default", 

1532 autocommit=True, 

1533 cache_type="readahead", 

1534 cache_options=None, 

1535 size=None, 

1536 **kwargs, 

1537 ): 

1538 """ 

1539 Template for files with buffered reading and writing 

1540 

1541 Parameters 

1542 ---------- 

1543 fs: instance of FileSystem 

1544 path: str 

1545 location in file-system 

1546 mode: str 

1547 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file 

1548 systems may be read-only, and some may not support append. 

1549 block_size: int 

1550 Buffer size for reading or writing, 'default' for class default 

1551 autocommit: bool 

1552 Whether to write to final destination; may only impact what 

1553 happens when file is being closed. 

1554 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead" 

1555 Caching policy in read mode. See the definitions in ``core``. 

1556 cache_options : dict 

1557 Additional options passed to the constructor for the cache specified 

1558 by `cache_type`. 

1559 size: int 

1560 If given and in read mode, suppressed having to look up the file size 

1561 kwargs: 

1562 Gets stored as self.kwargs 

1563 """ 

1564 from .core import caches 

1565 

1566 self.path = path 

1567 self.fs = fs 

1568 self.mode = mode 

1569 self.blocksize = ( 

1570 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size 

1571 ) 

1572 self.loc = 0 

1573 self.autocommit = autocommit 

1574 self.end = None 

1575 self.start = None 

1576 self.closed = False 

1577 

1578 if cache_options is None: 

1579 cache_options = {} 

1580 

1581 if "trim" in kwargs: 

1582 warnings.warn( 

1583 "Passing 'trim' to control the cache behavior has been deprecated. " 

1584 "Specify it within the 'cache_options' argument instead.", 

1585 FutureWarning, 

1586 ) 

1587 cache_options["trim"] = kwargs.pop("trim") 

1588 

1589 self.kwargs = kwargs 

1590 

1591 if mode not in {"ab", "rb", "wb"}: 

1592 raise NotImplementedError("File mode not supported") 

1593 if mode == "rb": 

1594 if size is not None: 

1595 self.size = size 

1596 else: 

1597 self.size = self.details["size"] 

1598 self.cache = caches[cache_type]( 

1599 self.blocksize, self._fetch_range, self.size, **cache_options 

1600 ) 

1601 else: 

1602 self.buffer = io.BytesIO() 

1603 self.offset = None 

1604 self.forced = False 

1605 self.location = None 

1606 

1607 @property 

1608 def details(self): 

1609 if self._details is None: 

1610 self._details = self.fs.info(self.path) 

1611 return self._details 

1612 

1613 @details.setter 

1614 def details(self, value): 

1615 self._details = value 

1616 self.size = value["size"] 

1617 

1618 @property 

1619 def full_name(self): 

1620 return _unstrip_protocol(self.path, self.fs) 

1621 

1622 @property 

1623 def closed(self): 

1624 # get around this attr being read-only in IOBase 

1625 # use getattr here, since this can be called during del 

1626 return getattr(self, "_closed", True) 

1627 

1628 @closed.setter 

1629 def closed(self, c): 

1630 self._closed = c 

1631 

1632 def __hash__(self): 

1633 if "w" in self.mode: 

1634 return id(self) 

1635 else: 

1636 return int(tokenize(self.details), 16) 

1637 

1638 def __eq__(self, other): 

1639 """Files are equal if they have the same checksum, only in read mode""" 

1640 return self.mode == "rb" and other.mode == "rb" and hash(self) == hash(other) 

1641 

1642 def commit(self): 

1643 """Move from temp to final destination""" 

1644 

1645 def discard(self): 

1646 """Throw away temporary file""" 

1647 

1648 def info(self): 

1649 """File information about this path""" 

1650 if "r" in self.mode: 

1651 return self.details 

1652 else: 

1653 raise ValueError("Info not available while writing") 

1654 

1655 def tell(self): 

1656 """Current file location""" 

1657 return self.loc 

1658 

1659 def seek(self, loc, whence=0): 

1660 """Set current file location 

1661 

1662 Parameters 

1663 ---------- 

1664 loc: int 

1665 byte location 

1666 whence: {0, 1, 2} 

1667 from start of file, current location or end of file, resp. 

1668 """ 

1669 loc = int(loc) 

1670 if not self.mode == "rb": 

1671 raise OSError(ESPIPE, "Seek only available in read mode") 

1672 if whence == 0: 

1673 nloc = loc 

1674 elif whence == 1: 

1675 nloc = self.loc + loc 

1676 elif whence == 2: 

1677 nloc = self.size + loc 

1678 else: 

1679 raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % whence) 

1680 if nloc < 0: 

1681 raise ValueError("Seek before start of file") 

1682 self.loc = nloc 

1683 return self.loc 

1684 

1685 def write(self, data): 

1686 """ 

1687 Write data to buffer. 

1688 

1689 Buffer only sent on flush() or if buffer is greater than 

1690 or equal to blocksize. 

1691 

1692 Parameters 

1693 ---------- 

1694 data: bytes 

1695 Set of bytes to be written. 

1696 """ 

1697 if self.mode not in {"wb", "ab"}: 

1698 raise ValueError("File not in write mode") 

1699 if self.closed: 

1700 raise ValueError("I/O operation on closed file.") 

1701 if self.forced: 

1702 raise ValueError("This file has been force-flushed, can only close") 

1703 out = self.buffer.write(data) 

1704 self.loc += out 

1705 if self.buffer.tell() >= self.blocksize: 

1706 self.flush() 

1707 return out 

1708 

1709 def flush(self, force=False): 

1710 """ 

1711 Write buffered data to backend store. 

1712 

1713 Writes the current buffer, if it is larger than the block-size, or if 

1714 the file is being closed. 

1715 

1716 Parameters 

1717 ---------- 

1718 force: bool 

1719 When closing, write the last block even if it is smaller than 

1720 blocks are allowed to be. Disallows further writing to this file. 

1721 """ 

1722 

1723 if self.closed: 

1724 raise ValueError("Flush on closed file") 

1725 if force and self.forced: 

1726 raise ValueError("Force flush cannot be called more than once") 

1727 if force: 

1728 self.forced = True 

1729 

1730 if self.mode not in {"wb", "ab"}: 

1731 # no-op to flush on read-mode 

1732 return 

1733 

1734 if not force and self.buffer.tell() < self.blocksize: 

1735 # Defer write on small block 

1736 return 

1737 

1738 if self.offset is None: 

1739 # Initialize a multipart upload 

1740 self.offset = 0 

1741 try: 

1742 self._initiate_upload() 

1743 except: # noqa: E722 

1744 self.closed = True 

1745 raise 

1746 

1747 if self._upload_chunk(final=force) is not False: 

1748 self.offset += self.buffer.seek(0, 2) 

1749 self.buffer = io.BytesIO() 

1750 

1751 def _upload_chunk(self, final=False): 

1752 """Write one part of a multi-block file upload 

1753 

1754 Parameters 

1755 ========== 

1756 final: bool 

1757 This is the last block, so should complete file, if 

1758 self.autocommit is True. 

1759 """ 

1760 # may not yet have been initialized, may need to call _initialize_upload 

1761 

1762 def _initiate_upload(self): 

1763 """Create remote file/upload""" 

1764 pass 

1765 

1766 def _fetch_range(self, start, end): 

1767 """Get the specified set of bytes from remote""" 

1768 raise NotImplementedError 

1769 

1770 def read(self, length=-1): 

1771 """ 

1772 Return data from cache, or fetch pieces as necessary 

1773 

1774 Parameters 

1775 ---------- 

1776 length: int (-1) 

1777 Number of bytes to read; if <0, all remaining bytes. 

1778 """ 

1779 length = -1 if length is None else int(length) 

1780 if self.mode != "rb": 

1781 raise ValueError("File not in read mode") 

1782 if length < 0: 

1783 length = self.size - self.loc 

1784 if self.closed: 

1785 raise ValueError("I/O operation on closed file.") 

1786 logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length)) 

1787 if length == 0: 

1788 # don't even bother calling fetch 

1789 return b"" 

1790 out = self.cache._fetch(self.loc, self.loc + length) 

1791 self.loc += len(out) 

1792 return out 

1793 

1794 def readinto(self, b): 

1795 """mirrors builtin file's readinto method 

1796 

1797 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto 

1798 """ 

1799 out = memoryview(b).cast("B") 

1800 data = self.read(out.nbytes) 

1801 out[: len(data)] = data 

1802 return len(data) 

1803 

1804 def readuntil(self, char=b"\n", blocks=None): 

1805 """Return data between current position and first occurrence of char 

1806 

1807 char is included in the output, except if the end of the tile is 

1808 encountered first. 

1809 

1810 Parameters 

1811 ---------- 

1812 char: bytes 

1813 Thing to find 

1814 blocks: None or int 

1815 How much to read in each go. Defaults to file blocksize - which may 

1816 mean a new read on every call. 

1817 """ 

1818 out = [] 

1819 while True: 

1820 start = self.tell() 

1821 part = self.read(blocks or self.blocksize) 

1822 if len(part) == 0: 

1823 break 

1824 found = part.find(char) 

1825 if found > -1: 

1826 out.append(part[: found + len(char)]) 

1827 self.seek(start + found + len(char)) 

1828 break 

1829 out.append(part) 

1830 return b"".join(out) 

1831 

1832 def readline(self): 

1833 """Read until first occurrence of newline character 

1834 

1835 Note that, because of character encoding, this is not necessarily a 

1836 true line ending. 

1837 """ 

1838 return self.readuntil(b"\n") 

1839 

1840 def __next__(self): 

1841 out = self.readline() 

1842 if out: 

1843 return out 

1844 raise StopIteration 

1845 

1846 def __iter__(self): 

1847 return self 

1848 

1849 def readlines(self): 

1850 """Return all data, split by the newline character""" 

1851 data = self.read() 

1852 lines = data.split(b"\n") 

1853 out = [l + b"\n" for l in lines[:-1]] 

1854 if data.endswith(b"\n"): 

1855 return out 

1856 else: 

1857 return out + [lines[-1]] 

1858 # return list(self) ??? 

1859 

1860 def readinto1(self, b): 

1861 return self.readinto(b) 

1862 

1863 def close(self): 

1864 """Close file 

1865 

1866 Finalizes writes, discards cache 

1867 """ 

1868 if getattr(self, "_unclosable", False): 

1869 return 

1870 if self.closed: 

1871 return 

1872 if self.mode == "rb": 

1873 self.cache = None 

1874 else: 

1875 if not self.forced: 

1876 self.flush(force=True) 

1877 

1878 if self.fs is not None: 

1879 self.fs.invalidate_cache(self.path) 

1880 self.fs.invalidate_cache(self.fs._parent(self.path)) 

1881 

1882 self.closed = True 

1883 

1884 def readable(self): 

1885 """Whether opened for reading""" 

1886 return self.mode == "rb" and not self.closed 

1887 

1888 def seekable(self): 

1889 """Whether is seekable (only in read mode)""" 

1890 return self.readable() 

1891 

1892 def writable(self): 

1893 """Whether opened for writing""" 

1894 return self.mode in {"wb", "ab"} and not self.closed 

1895 

1896 def __del__(self): 

1897 if not self.closed: 

1898 self.close() 

1899 

1900 def __str__(self): 

1901 return "<File-like object %s, %s>" % (type(self.fs).__name__, self.path) 

1902 

1903 __repr__ = __str__ 

1904 

1905 def __enter__(self): 

1906 return self 

1907 

1908 def __exit__(self, *args): 

1909 self.close()