Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/core.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

258 statements  

1from __future__ import annotations 

2 

3import io 

4import logging 

5import os 

6import re 

7from glob import has_magic 

8from pathlib import Path 

9 

10# for backwards compat, we export cache things from here too 

11from fsspec.caching import ( # noqa: F401 

12 BaseCache, 

13 BlockCache, 

14 BytesCache, 

15 MMapCache, 

16 ReadAheadCache, 

17 caches, 

18) 

19from fsspec.compression import compr 

20from fsspec.config import conf 

21from fsspec.registry import filesystem, get_filesystem_class 

22from fsspec.utils import ( 

23 _unstrip_protocol, 

24 build_name_function, 

25 infer_compression, 

26 stringify_path, 

27) 

28 

29logger = logging.getLogger("fsspec") 

30 

31 

32class OpenFile: 

33 """ 

34 File-like object to be used in a context 

35 

36 Can layer (buffered) text-mode and compression over any file-system, which 

37 are typically binary-only. 

38 

39 These instances are safe to serialize, as the low-level file object 

40 is not created until invoked using ``with``. 

41 

42 Parameters 

43 ---------- 

44 fs: FileSystem 

45 The file system to use for opening the file. Should be a subclass or duck-type 

46 with ``fsspec.spec.AbstractFileSystem`` 

47 path: str 

48 Location to open 

49 mode: str like 'rb', optional 

50 Mode of the opened file 

51 compression: str or None, optional 

52 Compression to apply 

53 encoding: str or None, optional 

54 The encoding to use if opened in text mode. 

55 errors: str or None, optional 

56 How to handle encoding errors if opened in text mode. 

57 newline: None or str 

58 Passed to TextIOWrapper in text mode, how to handle line endings. 

59 autoopen: bool 

60 If True, calls open() immediately. Mostly used by pickle 

61 pos: int 

62 If given and autoopen is True, seek to this location immediately 

63 """ 

64 

65 def __init__( 

66 self, 

67 fs, 

68 path, 

69 mode="rb", 

70 compression=None, 

71 encoding=None, 

72 errors=None, 

73 newline=None, 

74 ): 

75 self.fs = fs 

76 self.path = path 

77 self.mode = mode 

78 self.compression = get_compression(path, compression) 

79 self.encoding = encoding 

80 self.errors = errors 

81 self.newline = newline 

82 self.fobjects = [] 

83 

84 def __reduce__(self): 

85 return ( 

86 OpenFile, 

87 ( 

88 self.fs, 

89 self.path, 

90 self.mode, 

91 self.compression, 

92 self.encoding, 

93 self.errors, 

94 self.newline, 

95 ), 

96 ) 

97 

98 def __repr__(self): 

99 return f"<OpenFile '{self.path}'>" 

100 

101 def __enter__(self): 

102 mode = self.mode.replace("t", "").replace("b", "") + "b" 

103 

104 try: 

105 f = self.fs.open(self.path, mode=mode) 

106 except FileNotFoundError as e: 

107 if has_magic(self.path): 

108 raise FileNotFoundError( 

109 "%s not found. The URL contains glob characters: you maybe needed\n" 

110 "to pass expand=True in fsspec.open() or the storage_options of \n" 

111 "your library. You can also set the config value 'open_expand'\n" 

112 "before import, or fsspec.core.DEFAULT_EXPAND at runtime, to True.", 

113 self.path, 

114 ) from e 

115 raise 

116 

117 self.fobjects = [f] 

118 

119 if self.compression is not None: 

120 compress = compr[self.compression] 

121 f = compress(f, mode=mode[0]) 

122 self.fobjects.append(f) 

123 

124 if "b" not in self.mode: 

125 # assume, for example, that 'r' is equivalent to 'rt' as in builtin 

126 f = PickleableTextIOWrapper( 

127 f, encoding=self.encoding, errors=self.errors, newline=self.newline 

128 ) 

129 self.fobjects.append(f) 

130 

131 return self.fobjects[-1] 

132 

133 def __exit__(self, *args): 

134 self.close() 

135 

136 @property 

137 def full_name(self): 

138 return _unstrip_protocol(self.path, self.fs) 

139 

140 def open(self): 

141 """Materialise this as a real open file without context 

142 

143 The OpenFile object should be explicitly closed to avoid enclosed file 

144 instances persisting. You must, therefore, keep a reference to the OpenFile 

145 during the life of the file-like it generates. 

146 """ 

147 return self.__enter__() 

148 

149 def close(self): 

150 """Close all encapsulated file objects""" 

151 for f in reversed(self.fobjects): 

152 if "r" not in self.mode and not f.closed: 

153 f.flush() 

154 f.close() 

155 self.fobjects.clear() 

156 

157 

158class OpenFiles(list): 

159 """List of OpenFile instances 

160 

161 Can be used in a single context, which opens and closes all of the 

162 contained files. Normal list access to get the elements works as 

163 normal. 

164 

165 A special case is made for caching filesystems - the files will 

166 be down/uploaded together at the start or end of the context, and 

167 this may happen concurrently, if the target filesystem supports it. 

168 """ 

169 

170 def __init__(self, *args, mode="rb", fs=None): 

171 self.mode = mode 

172 self.fs = fs 

173 self.files = [] 

174 super().__init__(*args) 

175 

176 def __enter__(self): 

177 if self.fs is None: 

178 raise ValueError("Context has already been used") 

179 

180 fs = self.fs 

181 while True: 

182 if hasattr(fs, "open_many"): 

183 # check for concurrent cache download; or set up for upload 

184 self.files = fs.open_many(self) 

185 return self.files 

186 if hasattr(fs, "fs") and fs.fs is not None: 

187 fs = fs.fs 

188 else: 

189 break 

190 return [s.__enter__() for s in self] 

191 

192 def __exit__(self, *args): 

193 fs = self.fs 

194 [s.__exit__(*args) for s in self] 

195 if "r" not in self.mode: 

196 while True: 

197 if hasattr(fs, "open_many"): 

198 # check for concurrent cache upload 

199 fs.commit_many(self.files) 

200 return 

201 if hasattr(fs, "fs") and fs.fs is not None: 

202 fs = fs.fs 

203 else: 

204 break 

205 

206 def __getitem__(self, item): 

207 out = super().__getitem__(item) 

208 if isinstance(item, slice): 

209 return OpenFiles(out, mode=self.mode, fs=self.fs) 

210 return out 

211 

212 def __repr__(self): 

213 return f"<List of {len(self)} OpenFile instances>" 

214 

215 

216def open_files( 

217 urlpath, 

218 mode="rb", 

219 compression=None, 

220 encoding="utf8", 

221 errors=None, 

222 name_function=None, 

223 num=1, 

224 protocol=None, 

225 newline=None, 

226 auto_mkdir=True, 

227 expand=True, 

228 **kwargs, 

229): 

230 """Given a path or paths, return a list of ``OpenFile`` objects. 

231 

232 For writing, a str path must contain the "*" character, which will be filled 

233 in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2. 

234 

235 For either reading or writing, can instead provide explicit list of paths. 

236 

237 Parameters 

238 ---------- 

239 urlpath: string or list 

240 Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` 

241 to read from alternative filesystems. To read from multiple files you 

242 can pass a globstring or a list of paths, with the caveat that they 

243 must all have the same protocol. 

244 mode: 'rb', 'wt', etc. 

245 compression: string or None 

246 If given, open file using compression codec. Can either be a compression 

247 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 

248 compression from the filename suffix. 

249 encoding: str 

250 For text mode only 

251 errors: None or str 

252 Passed to TextIOWrapper in text mode 

253 name_function: function or None 

254 if opening a set of files for writing, those files do not yet exist, 

255 so we need to generate their names by formatting the urlpath for 

256 each sequence number 

257 num: int [1] 

258 if writing mode, number of files we expect to create (passed to 

259 name+function) 

260 protocol: str or None 

261 If given, overrides the protocol found in the URL. 

262 newline: bytes or None 

263 Used for line terminator in text mode. If None, uses system default; 

264 if blank, uses no translation. 

265 auto_mkdir: bool (True) 

266 If in write mode, this will ensure the target directory exists before 

267 writing, by calling ``fs.mkdirs(exist_ok=True)``. 

268 expand: bool 

269 **kwargs: dict 

270 Extra options that make sense to a particular storage connection, e.g. 

271 host, port, username, password, etc. 

272 

273 Examples 

274 -------- 

275 >>> files = open_files('2015-*-*.csv') # doctest: +SKIP 

276 >>> files = open_files( 

277 ... 's3://bucket/2015-*-*.csv.gz', compression='gzip' 

278 ... ) # doctest: +SKIP 

279 

280 Returns 

281 ------- 

282 An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can 

283 be used as a single context 

284 

285 Notes 

286 ----- 

287 For a full list of the available protocols and the implementations that 

288 they map across to see the latest online documentation: 

289 

290 - For implementations built into ``fsspec`` see 

291 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations 

292 - For implementations in separate packages see 

293 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations 

294 """ 

295 fs, fs_token, paths = get_fs_token_paths( 

296 urlpath, 

297 mode, 

298 num=num, 

299 name_function=name_function, 

300 storage_options=kwargs, 

301 protocol=protocol, 

302 expand=expand, 

303 ) 

304 if fs.protocol == "file": 

305 fs.auto_mkdir = auto_mkdir 

306 elif "r" not in mode and auto_mkdir: 

307 parents = {fs._parent(path) for path in paths} 

308 for parent in parents: 

309 try: 

310 fs.makedirs(parent, exist_ok=True) 

311 except PermissionError: 

312 pass 

313 return OpenFiles( 

314 [ 

315 OpenFile( 

316 fs, 

317 path, 

318 mode=mode, 

319 compression=compression, 

320 encoding=encoding, 

321 errors=errors, 

322 newline=newline, 

323 ) 

324 for path in paths 

325 ], 

326 mode=mode, 

327 fs=fs, 

328 ) 

329 

330 

331def _un_chain(path, kwargs): 

332 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word 

333 bits = ( 

334 [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")] 

335 if "::" in path 

336 else [path] 

337 ) 

338 # [[url, protocol, kwargs], ...] 

339 out = [] 

340 previous_bit = None 

341 kwargs = kwargs.copy() 

342 for bit in reversed(bits): 

343 protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file" 

344 cls = get_filesystem_class(protocol) 

345 extra_kwargs = cls._get_kwargs_from_urls(bit) 

346 kws = kwargs.pop(protocol, {}) 

347 if bit is bits[0]: 

348 kws.update(kwargs) 

349 kw = dict(**extra_kwargs, **kws) 

350 bit = cls._strip_protocol(bit) 

351 if ( 

352 protocol in {"blockcache", "filecache", "simplecache"} 

353 and "target_protocol" not in kw 

354 ): 

355 bit = previous_bit 

356 out.append((bit, protocol, kw)) 

357 previous_bit = bit 

358 out.reverse() 

359 return out 

360 

361 

362def url_to_fs(url, **kwargs): 

363 """ 

364 Turn fully-qualified and potentially chained URL into filesystem instance 

365 

366 Parameters 

367 ---------- 

368 url : str 

369 The fsspec-compatible URL 

370 **kwargs: dict 

371 Extra options that make sense to a particular storage connection, e.g. 

372 host, port, username, password, etc. 

373 

374 Returns 

375 ------- 

376 filesystem : FileSystem 

377 The new filesystem discovered from ``url`` and created with 

378 ``**kwargs``. 

379 urlpath : str 

380 The file-systems-specific URL for ``url``. 

381 """ 

382 url = stringify_path(url) 

383 # non-FS arguments that appear in fsspec.open() 

384 # inspect could keep this in sync with open()'s signature 

385 known_kwargs = { 

386 "compression", 

387 "encoding", 

388 "errors", 

389 "expand", 

390 "mode", 

391 "name_function", 

392 "newline", 

393 "num", 

394 } 

395 kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs} 

396 chain = _un_chain(url, kwargs) 

397 inkwargs = {} 

398 # Reverse iterate the chain, creating a nested target_* structure 

399 for i, ch in enumerate(reversed(chain)): 

400 urls, protocol, kw = ch 

401 if i == len(chain) - 1: 

402 inkwargs = dict(**kw, **inkwargs) 

403 continue 

404 inkwargs["target_options"] = dict(**kw, **inkwargs) 

405 inkwargs["target_protocol"] = protocol 

406 inkwargs["fo"] = urls 

407 urlpath, protocol, _ = chain[0] 

408 fs = filesystem(protocol, **inkwargs) 

409 return fs, urlpath 

410 

411 

412DEFAULT_EXPAND = conf.get("open_expand", False) 

413 

414 

415def open( 

416 urlpath, 

417 mode="rb", 

418 compression=None, 

419 encoding="utf8", 

420 errors=None, 

421 protocol=None, 

422 newline=None, 

423 expand=None, 

424 **kwargs, 

425): 

426 """Given a path or paths, return one ``OpenFile`` object. 

427 

428 Parameters 

429 ---------- 

430 urlpath: string or list 

431 Absolute or relative filepath. Prefix with a protocol like ``s3://`` 

432 to read from alternative filesystems. Should not include glob 

433 character(s). 

434 mode: 'rb', 'wt', etc. 

435 compression: string or None 

436 If given, open file using compression codec. Can either be a compression 

437 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 

438 compression from the filename suffix. 

439 encoding: str 

440 For text mode only 

441 errors: None or str 

442 Passed to TextIOWrapper in text mode 

443 protocol: str or None 

444 If given, overrides the protocol found in the URL. 

445 newline: bytes or None 

446 Used for line terminator in text mode. If None, uses system default; 

447 if blank, uses no translation. 

448 expand: bool or Nonw 

449 Whether to regard file paths containing special glob characters as needing 

450 expansion (finding the first match) or absolute. Setting False allows using 

451 paths which do embed such characters. If None (default), this argument 

452 takes its value from the DEFAULT_EXPAND module variable, which takes 

453 its initial value from the "open_expand" config value at startup, which will 

454 be False if not set. 

455 **kwargs: dict 

456 Extra options that make sense to a particular storage connection, e.g. 

457 host, port, username, password, etc. 

458 

459 Examples 

460 -------- 

461 >>> openfile = open('2015-01-01.csv') # doctest: +SKIP 

462 >>> openfile = open( 

463 ... 's3://bucket/2015-01-01.csv.gz', compression='gzip' 

464 ... ) # doctest: +SKIP 

465 >>> with openfile as f: 

466 ... df = pd.read_csv(f) # doctest: +SKIP 

467 ... 

468 

469 Returns 

470 ------- 

471 ``OpenFile`` object. 

472 

473 Notes 

474 ----- 

475 For a full list of the available protocols and the implementations that 

476 they map across to see the latest online documentation: 

477 

478 - For implementations built into ``fsspec`` see 

479 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations 

480 - For implementations in separate packages see 

481 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations 

482 """ 

483 expand = DEFAULT_EXPAND if expand is None else expand 

484 out = open_files( 

485 urlpath=[urlpath], 

486 mode=mode, 

487 compression=compression, 

488 encoding=encoding, 

489 errors=errors, 

490 protocol=protocol, 

491 newline=newline, 

492 expand=expand, 

493 **kwargs, 

494 ) 

495 if not out: 

496 raise FileNotFoundError(urlpath) 

497 return out[0] 

498 

499 

500def open_local( 

501 url: str | list[str] | Path | list[Path], 

502 mode: str = "rb", 

503 **storage_options: dict, 

504) -> str | list[str]: 

505 """Open file(s) which can be resolved to local 

506 

507 For files which either are local, or get downloaded upon open 

508 (e.g., by file caching) 

509 

510 Parameters 

511 ---------- 

512 url: str or list(str) 

513 mode: str 

514 Must be read mode 

515 storage_options: 

516 passed on to FS for or used by open_files (e.g., compression) 

517 """ 

518 if "r" not in mode: 

519 raise ValueError("Can only ensure local files when reading") 

520 of = open_files(url, mode=mode, **storage_options) 

521 if not getattr(of[0].fs, "local_file", False): 

522 raise ValueError( 

523 "open_local can only be used on a filesystem which" 

524 " has attribute local_file=True" 

525 ) 

526 with of as files: 

527 paths = [f.name for f in files] 

528 if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path): 

529 return paths[0] 

530 return paths 

531 

532 

533def get_compression(urlpath, compression): 

534 if compression == "infer": 

535 compression = infer_compression(urlpath) 

536 if compression is not None and compression not in compr: 

537 raise ValueError(f"Compression type {compression} not supported") 

538 return compression 

539 

540 

541def split_protocol(urlpath): 

542 """Return protocol, path pair""" 

543 urlpath = stringify_path(urlpath) 

544 if "://" in urlpath: 

545 protocol, path = urlpath.split("://", 1) 

546 if len(protocol) > 1: 

547 # excludes Windows paths 

548 return protocol, path 

549 if urlpath.startswith("data:"): 

550 return urlpath.split(":", 1) 

551 return None, urlpath 

552 

553 

554def strip_protocol(urlpath): 

555 """Return only path part of full URL, according to appropriate backend""" 

556 protocol, _ = split_protocol(urlpath) 

557 cls = get_filesystem_class(protocol) 

558 return cls._strip_protocol(urlpath) 

559 

560 

561def expand_paths_if_needed(paths, mode, num, fs, name_function): 

562 """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]`` 

563 in them (read mode). 

564 

565 :param paths: list of paths 

566 mode: str 

567 Mode in which to open files. 

568 num: int 

569 If opening in writing mode, number of files we expect to create. 

570 fs: filesystem object 

571 name_function: callable 

572 If opening in writing mode, this callable is used to generate path 

573 names. Names are generated for each partition by 

574 ``urlpath.replace('*', name_function(partition_index))``. 

575 :return: list of paths 

576 """ 

577 expanded_paths = [] 

578 paths = list(paths) 

579 

580 if "w" in mode: # read mode 

581 if sum([1 for p in paths if "*" in p]) > 1: 

582 raise ValueError( 

583 "When writing data, only one filename mask can be specified." 

584 ) 

585 num = max(num, len(paths)) 

586 

587 for curr_path in paths: 

588 if "*" in curr_path: 

589 # expand using name_function 

590 expanded_paths.extend(_expand_paths(curr_path, name_function, num)) 

591 else: 

592 expanded_paths.append(curr_path) 

593 # if we generated more paths that asked for, trim the list 

594 if len(expanded_paths) > num: 

595 expanded_paths = expanded_paths[:num] 

596 

597 else: # read mode 

598 for curr_path in paths: 

599 if has_magic(curr_path): 

600 # expand using glob 

601 expanded_paths.extend(fs.glob(curr_path)) 

602 else: 

603 expanded_paths.append(curr_path) 

604 

605 return expanded_paths 

606 

607 

608def get_fs_token_paths( 

609 urlpath, 

610 mode="rb", 

611 num=1, 

612 name_function=None, 

613 storage_options=None, 

614 protocol=None, 

615 expand=True, 

616): 

617 """Filesystem, deterministic token, and paths from a urlpath and options. 

618 

619 Parameters 

620 ---------- 

621 urlpath: string or iterable 

622 Absolute or relative filepath, URL (may include protocols like 

623 ``s3://``), or globstring pointing to data. 

624 mode: str, optional 

625 Mode in which to open files. 

626 num: int, optional 

627 If opening in writing mode, number of files we expect to create. 

628 name_function: callable, optional 

629 If opening in writing mode, this callable is used to generate path 

630 names. Names are generated for each partition by 

631 ``urlpath.replace('*', name_function(partition_index))``. 

632 storage_options: dict, optional 

633 Additional keywords to pass to the filesystem class. 

634 protocol: str or None 

635 To override the protocol specifier in the URL 

636 expand: bool 

637 Expand string paths for writing, assuming the path is a directory 

638 """ 

639 if isinstance(urlpath, (list, tuple, set)): 

640 if not urlpath: 

641 raise ValueError("empty urlpath sequence") 

642 urlpath0 = stringify_path(list(urlpath)[0]) 

643 else: 

644 urlpath0 = stringify_path(urlpath) 

645 storage_options = storage_options or {} 

646 if protocol: 

647 storage_options["protocol"] = protocol 

648 chain = _un_chain(urlpath0, storage_options or {}) 

649 inkwargs = {} 

650 # Reverse iterate the chain, creating a nested target_* structure 

651 for i, ch in enumerate(reversed(chain)): 

652 urls, nested_protocol, kw = ch 

653 if i == len(chain) - 1: 

654 inkwargs = dict(**kw, **inkwargs) 

655 continue 

656 inkwargs["target_options"] = dict(**kw, **inkwargs) 

657 inkwargs["target_protocol"] = nested_protocol 

658 inkwargs["fo"] = urls 

659 paths, protocol, _ = chain[0] 

660 fs = filesystem(protocol, **inkwargs) 

661 if isinstance(urlpath, (list, tuple, set)): 

662 pchains = [ 

663 _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath 

664 ] 

665 if len({pc[1] for pc in pchains}) > 1: 

666 raise ValueError("Protocol mismatch getting fs from %s", urlpath) 

667 paths = [pc[0] for pc in pchains] 

668 else: 

669 paths = fs._strip_protocol(paths) 

670 if isinstance(paths, (list, tuple, set)): 

671 if expand: 

672 paths = expand_paths_if_needed(paths, mode, num, fs, name_function) 

673 elif not isinstance(paths, list): 

674 paths = list(paths) 

675 else: 

676 if "w" in mode and expand: 

677 paths = _expand_paths(paths, name_function, num) 

678 elif "x" in mode and expand: 

679 paths = _expand_paths(paths, name_function, num) 

680 elif "*" in paths: 

681 paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)] 

682 else: 

683 paths = [paths] 

684 

685 return fs, fs._fs_token, paths 

686 

687 

688def _expand_paths(path, name_function, num): 

689 if isinstance(path, str): 

690 if path.count("*") > 1: 

691 raise ValueError("Output path spec must contain exactly one '*'.") 

692 elif "*" not in path: 

693 path = os.path.join(path, "*.part") 

694 

695 if name_function is None: 

696 name_function = build_name_function(num - 1) 

697 

698 paths = [path.replace("*", name_function(i)) for i in range(num)] 

699 if paths != sorted(paths): 

700 logger.warning( 

701 "In order to preserve order between partitions" 

702 " paths created with ``name_function`` should " 

703 "sort to partition order" 

704 ) 

705 elif isinstance(path, (tuple, list)): 

706 assert len(path) == num 

707 paths = list(path) 

708 else: 

709 raise ValueError( 

710 "Path should be either\n" 

711 "1. A list of paths: ['foo.json', 'bar.json', ...]\n" 

712 "2. A directory: 'foo/\n" 

713 "3. A path with a '*' in it: 'foo.*.json'" 

714 ) 

715 return paths 

716 

717 

718class PickleableTextIOWrapper(io.TextIOWrapper): 

719 """TextIOWrapper cannot be pickled. This solves it. 

720 

721 Requires that ``buffer`` be pickleable, which all instances of 

722 AbstractBufferedFile are. 

723 """ 

724 

725 def __init__( 

726 self, 

727 buffer, 

728 encoding=None, 

729 errors=None, 

730 newline=None, 

731 line_buffering=False, 

732 write_through=False, 

733 ): 

734 self.args = buffer, encoding, errors, newline, line_buffering, write_through 

735 super().__init__(*self.args) 

736 

737 def __reduce__(self): 

738 return PickleableTextIOWrapper, self.args