Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/core.py: 17%

246 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1from __future__ import annotations 

2 

3import io 

4import logging 

5import os 

6import re 

7from glob import has_magic 

8from pathlib import Path 

9 

10# for backwards compat, we export cache things from here too 

11from .caching import ( # noqa: F401 

12 BaseCache, 

13 BlockCache, 

14 BytesCache, 

15 MMapCache, 

16 ReadAheadCache, 

17 caches, 

18) 

19from .compression import compr 

20from .registry import filesystem, get_filesystem_class 

21from .utils import ( 

22 _unstrip_protocol, 

23 build_name_function, 

24 infer_compression, 

25 stringify_path, 

26) 

27 

28logger = logging.getLogger("fsspec") 

29 

30 

31class OpenFile: 

32 """ 

33 File-like object to be used in a context 

34 

35 Can layer (buffered) text-mode and compression over any file-system, which 

36 are typically binary-only. 

37 

38 These instances are safe to serialize, as the low-level file object 

39 is not created until invoked using ``with``. 

40 

41 Parameters 

42 ---------- 

43 fs: FileSystem 

44 The file system to use for opening the file. Should be a subclass or duck-type 

45 with ``fsspec.spec.AbstractFileSystem`` 

46 path: str 

47 Location to open 

48 mode: str like 'rb', optional 

49 Mode of the opened file 

50 compression: str or None, optional 

51 Compression to apply 

52 encoding: str or None, optional 

53 The encoding to use if opened in text mode. 

54 errors: str or None, optional 

55 How to handle encoding errors if opened in text mode. 

56 newline: None or str 

57 Passed to TextIOWrapper in text mode, how to handle line endings. 

58 autoopen: bool 

59 If True, calls open() immediately. Mostly used by pickle 

60 pos: int 

61 If given and autoopen is True, seek to this location immediately 

62 """ 

63 

64 def __init__( 

65 self, 

66 fs, 

67 path, 

68 mode="rb", 

69 compression=None, 

70 encoding=None, 

71 errors=None, 

72 newline=None, 

73 ): 

74 self.fs = fs 

75 self.path = path 

76 self.mode = mode 

77 self.compression = get_compression(path, compression) 

78 self.encoding = encoding 

79 self.errors = errors 

80 self.newline = newline 

81 self.fobjects = [] 

82 

83 def __reduce__(self): 

84 return ( 

85 OpenFile, 

86 ( 

87 self.fs, 

88 self.path, 

89 self.mode, 

90 self.compression, 

91 self.encoding, 

92 self.errors, 

93 self.newline, 

94 ), 

95 ) 

96 

97 def __repr__(self): 

98 return f"<OpenFile '{self.path}'>" 

99 

100 def __enter__(self): 

101 mode = self.mode.replace("t", "").replace("b", "") + "b" 

102 

103 f = self.fs.open(self.path, mode=mode) 

104 

105 self.fobjects = [f] 

106 

107 if self.compression is not None: 

108 compress = compr[self.compression] 

109 f = compress(f, mode=mode[0]) 

110 self.fobjects.append(f) 

111 

112 if "b" not in self.mode: 

113 # assume, for example, that 'r' is equivalent to 'rt' as in builtin 

114 f = PickleableTextIOWrapper( 

115 f, encoding=self.encoding, errors=self.errors, newline=self.newline 

116 ) 

117 self.fobjects.append(f) 

118 

119 return self.fobjects[-1] 

120 

121 def __exit__(self, *args): 

122 self.close() 

123 

124 @property 

125 def full_name(self): 

126 return _unstrip_protocol(self.path, self.fs) 

127 

128 def open(self): 

129 """Materialise this as a real open file without context 

130 

131 The OpenFile object should be explicitly closed to avoid enclosed file 

132 instances persisting. You must, therefore, keep a reference to the OpenFile 

133 during the life of the file-like it generates. 

134 """ 

135 return self.__enter__() 

136 

137 def close(self): 

138 """Close all encapsulated file objects""" 

139 for f in reversed(self.fobjects): 

140 if "r" not in self.mode and not f.closed: 

141 f.flush() 

142 f.close() 

143 self.fobjects.clear() 

144 

145 

146class OpenFiles(list): 

147 """List of OpenFile instances 

148 

149 Can be used in a single context, which opens and closes all of the 

150 contained files. Normal list access to get the elements works as 

151 normal. 

152 

153 A special case is made for caching filesystems - the files will 

154 be down/uploaded together at the start or end of the context, and 

155 this may happen concurrently, if the target filesystem supports it. 

156 """ 

157 

158 def __init__(self, *args, mode="rb", fs=None): 

159 self.mode = mode 

160 self.fs = fs 

161 self.files = [] 

162 super().__init__(*args) 

163 

164 def __enter__(self): 

165 if self.fs is None: 

166 raise ValueError("Context has already been used") 

167 

168 fs = self.fs 

169 while True: 

170 if hasattr(fs, "open_many"): 

171 # check for concurrent cache download; or set up for upload 

172 self.files = fs.open_many(self) 

173 return self.files 

174 if hasattr(fs, "fs") and fs.fs is not None: 

175 fs = fs.fs 

176 else: 

177 break 

178 return [s.__enter__() for s in self] 

179 

180 def __exit__(self, *args): 

181 fs = self.fs 

182 [s.__exit__(*args) for s in self] 

183 if "r" not in self.mode: 

184 while True: 

185 if hasattr(fs, "open_many"): 

186 # check for concurrent cache upload 

187 fs.commit_many(self.files) 

188 return 

189 if hasattr(fs, "fs") and fs.fs is not None: 

190 fs = fs.fs 

191 else: 

192 break 

193 

194 def __getitem__(self, item): 

195 out = super().__getitem__(item) 

196 if isinstance(item, slice): 

197 return OpenFiles(out, mode=self.mode, fs=self.fs) 

198 return out 

199 

200 def __repr__(self): 

201 return f"<List of {len(self)} OpenFile instances>" 

202 

203 

204def open_files( 

205 urlpath, 

206 mode="rb", 

207 compression=None, 

208 encoding="utf8", 

209 errors=None, 

210 name_function=None, 

211 num=1, 

212 protocol=None, 

213 newline=None, 

214 auto_mkdir=True, 

215 expand=True, 

216 **kwargs, 

217): 

218 """Given a path or paths, return a list of ``OpenFile`` objects. 

219 

220 For writing, a str path must contain the "*" character, which will be filled 

221 in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2. 

222 

223 For either reading or writing, can instead provide explicit list of paths. 

224 

225 Parameters 

226 ---------- 

227 urlpath: string or list 

228 Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` 

229 to read from alternative filesystems. To read from multiple files you 

230 can pass a globstring or a list of paths, with the caveat that they 

231 must all have the same protocol. 

232 mode: 'rb', 'wt', etc. 

233 compression: string or None 

234 If given, open file using compression codec. Can either be a compression 

235 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 

236 compression from the filename suffix. 

237 encoding: str 

238 For text mode only 

239 errors: None or str 

240 Passed to TextIOWrapper in text mode 

241 name_function: function or None 

242 if opening a set of files for writing, those files do not yet exist, 

243 so we need to generate their names by formatting the urlpath for 

244 each sequence number 

245 num: int [1] 

246 if writing mode, number of files we expect to create (passed to 

247 name+function) 

248 protocol: str or None 

249 If given, overrides the protocol found in the URL. 

250 newline: bytes or None 

251 Used for line terminator in text mode. If None, uses system default; 

252 if blank, uses no translation. 

253 auto_mkdir: bool (True) 

254 If in write mode, this will ensure the target directory exists before 

255 writing, by calling ``fs.mkdirs(exist_ok=True)``. 

256 expand: bool 

257 **kwargs: dict 

258 Extra options that make sense to a particular storage connection, e.g. 

259 host, port, username, password, etc. 

260 

261 Examples 

262 -------- 

263 >>> files = open_files('2015-*-*.csv') # doctest: +SKIP 

264 >>> files = open_files( 

265 ... 's3://bucket/2015-*-*.csv.gz', compression='gzip' 

266 ... ) # doctest: +SKIP 

267 

268 Returns 

269 ------- 

270 An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can 

271 be used as a single context 

272 

273 Notes 

274 ----- 

275 For a full list of the available protocols and the implementations that 

276 they map across to see the latest online documentation: 

277 

278 - For implementations built into ``fsspec`` see 

279 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations 

280 - For implementations in separate packages see 

281 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations 

282 """ 

283 fs, fs_token, paths = get_fs_token_paths( 

284 urlpath, 

285 mode, 

286 num=num, 

287 name_function=name_function, 

288 storage_options=kwargs, 

289 protocol=protocol, 

290 expand=expand, 

291 ) 

292 if fs.protocol == "file": 

293 fs.auto_mkdir = auto_mkdir 

294 elif "r" not in mode and auto_mkdir: 

295 parents = {fs._parent(path) for path in paths} 

296 for parent in parents: 

297 try: 

298 fs.makedirs(parent, exist_ok=True) 

299 except PermissionError: 

300 pass 

301 return OpenFiles( 

302 [ 

303 OpenFile( 

304 fs, 

305 path, 

306 mode=mode, 

307 compression=compression, 

308 encoding=encoding, 

309 errors=errors, 

310 newline=newline, 

311 ) 

312 for path in paths 

313 ], 

314 mode=mode, 

315 fs=fs, 

316 ) 

317 

318 

319def _un_chain(path, kwargs): 

320 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word 

321 bits = ( 

322 [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")] 

323 if "::" in path 

324 else [path] 

325 ) 

326 # [[url, protocol, kwargs], ...] 

327 out = [] 

328 previous_bit = None 

329 kwargs = kwargs.copy() 

330 for bit in reversed(bits): 

331 protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file" 

332 cls = get_filesystem_class(protocol) 

333 extra_kwargs = cls._get_kwargs_from_urls(bit) 

334 kws = kwargs.pop(protocol, {}) 

335 if bit is bits[0]: 

336 kws.update(kwargs) 

337 kw = dict(**extra_kwargs, **kws) 

338 bit = cls._strip_protocol(bit) 

339 if ( 

340 protocol in {"blockcache", "filecache", "simplecache"} 

341 and "target_protocol" not in kw 

342 ): 

343 bit = previous_bit 

344 out.append((bit, protocol, kw)) 

345 previous_bit = bit 

346 out = list(reversed(out)) 

347 return out 

348 

349 

350def url_to_fs(url, **kwargs): 

351 """ 

352 Turn fully-qualified and potentially chained URL into filesystem instance 

353 

354 Parameters 

355 ---------- 

356 url : str 

357 The fsspec-compatible URL 

358 **kwargs: dict 

359 Extra options that make sense to a particular storage connection, e.g. 

360 host, port, username, password, etc. 

361 

362 Returns 

363 ------- 

364 filesystem : FileSystem 

365 The new filesystem discovered from ``url`` and created with 

366 ``**kwargs``. 

367 urlpath : str 

368 The file-systems-specific URL for ``url``. 

369 """ 

370 # non-FS arguments that appear in fsspec.open() 

371 # inspect could keep this in sync with open()'s signature 

372 known_kwargs = { 

373 "compression", 

374 "encoding", 

375 "errors", 

376 "expand", 

377 "mode", 

378 "name_function", 

379 "newline", 

380 "num", 

381 } 

382 kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs} 

383 chain = _un_chain(url, kwargs) 

384 inkwargs = {} 

385 # Reverse iterate the chain, creating a nested target_* structure 

386 for i, ch in enumerate(reversed(chain)): 

387 urls, protocol, kw = ch 

388 if i == len(chain) - 1: 

389 inkwargs = dict(**kw, **inkwargs) 

390 continue 

391 inkwargs["target_options"] = dict(**kw, **inkwargs) 

392 inkwargs["target_protocol"] = protocol 

393 inkwargs["fo"] = urls 

394 urlpath, protocol, _ = chain[0] 

395 fs = filesystem(protocol, **inkwargs) 

396 return fs, urlpath 

397 

398 

399def open( 

400 urlpath, 

401 mode="rb", 

402 compression=None, 

403 encoding="utf8", 

404 errors=None, 

405 protocol=None, 

406 newline=None, 

407 **kwargs, 

408): 

409 """Given a path or paths, return one ``OpenFile`` object. 

410 

411 Parameters 

412 ---------- 

413 urlpath: string or list 

414 Absolute or relative filepath. Prefix with a protocol like ``s3://`` 

415 to read from alternative filesystems. Should not include glob 

416 character(s). 

417 mode: 'rb', 'wt', etc. 

418 compression: string or None 

419 If given, open file using compression codec. Can either be a compression 

420 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 

421 compression from the filename suffix. 

422 encoding: str 

423 For text mode only 

424 errors: None or str 

425 Passed to TextIOWrapper in text mode 

426 protocol: str or None 

427 If given, overrides the protocol found in the URL. 

428 newline: bytes or None 

429 Used for line terminator in text mode. If None, uses system default; 

430 if blank, uses no translation. 

431 **kwargs: dict 

432 Extra options that make sense to a particular storage connection, e.g. 

433 host, port, username, password, etc. 

434 

435 Examples 

436 -------- 

437 >>> openfile = open('2015-01-01.csv') # doctest: +SKIP 

438 >>> openfile = open( 

439 ... 's3://bucket/2015-01-01.csv.gz', compression='gzip' 

440 ... ) # doctest: +SKIP 

441 >>> with openfile as f: 

442 ... df = pd.read_csv(f) # doctest: +SKIP 

443 ... 

444 

445 Returns 

446 ------- 

447 ``OpenFile`` object. 

448 

449 Notes 

450 ----- 

451 For a full list of the available protocols and the implementations that 

452 they map across to see the latest online documentation: 

453 

454 - For implementations built into ``fsspec`` see 

455 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations 

456 - For implementations in separate packages see 

457 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations 

458 """ 

459 out = open_files( 

460 urlpath=[urlpath], 

461 mode=mode, 

462 compression=compression, 

463 encoding=encoding, 

464 errors=errors, 

465 protocol=protocol, 

466 newline=newline, 

467 expand=False, 

468 **kwargs, 

469 ) 

470 if not out: 

471 raise FileNotFoundError(urlpath) 

472 return out[0] 

473 

474 

475def open_local( 

476 url: str | list[str] | Path | list[Path], 

477 mode: str = "rb", 

478 **storage_options: dict, 

479) -> str | list[str]: 

480 """Open file(s) which can be resolved to local 

481 

482 For files which either are local, or get downloaded upon open 

483 (e.g., by file caching) 

484 

485 Parameters 

486 ---------- 

487 url: str or list(str) 

488 mode: str 

489 Must be read mode 

490 storage_options: 

491 passed on to FS for or used by open_files (e.g., compression) 

492 """ 

493 if "r" not in mode: 

494 raise ValueError("Can only ensure local files when reading") 

495 of = open_files(url, mode=mode, **storage_options) 

496 if not getattr(of[0].fs, "local_file", False): 

497 raise ValueError( 

498 "open_local can only be used on a filesystem which" 

499 " has attribute local_file=True" 

500 ) 

501 with of as files: 

502 paths = [f.name for f in files] 

503 if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path): 

504 return paths[0] 

505 return paths 

506 

507 

508def get_compression(urlpath, compression): 

509 if compression == "infer": 

510 compression = infer_compression(urlpath) 

511 if compression is not None and compression not in compr: 

512 raise ValueError(f"Compression type {compression} not supported") 

513 return compression 

514 

515 

516def split_protocol(urlpath): 

517 """Return protocol, path pair""" 

518 urlpath = stringify_path(urlpath) 

519 if "://" in urlpath: 

520 protocol, path = urlpath.split("://", 1) 

521 if len(protocol) > 1: 

522 # excludes Windows paths 

523 return protocol, path 

524 if urlpath.startswith("data:"): 

525 return urlpath.split(":", 1) 

526 return None, urlpath 

527 

528 

529def strip_protocol(urlpath): 

530 """Return only path part of full URL, according to appropriate backend""" 

531 protocol, _ = split_protocol(urlpath) 

532 cls = get_filesystem_class(protocol) 

533 return cls._strip_protocol(urlpath) 

534 

535 

536def expand_paths_if_needed(paths, mode, num, fs, name_function): 

537 """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]`` 

538 in them (read mode). 

539 

540 :param paths: list of paths 

541 mode: str 

542 Mode in which to open files. 

543 num: int 

544 If opening in writing mode, number of files we expect to create. 

545 fs: filesystem object 

546 name_function: callable 

547 If opening in writing mode, this callable is used to generate path 

548 names. Names are generated for each partition by 

549 ``urlpath.replace('*', name_function(partition_index))``. 

550 :return: list of paths 

551 """ 

552 expanded_paths = [] 

553 paths = list(paths) 

554 

555 if "w" in mode: # read mode 

556 if sum([1 for p in paths if "*" in p]) > 1: 

557 raise ValueError( 

558 "When writing data, only one filename mask can be specified." 

559 ) 

560 num = max(num, len(paths)) 

561 

562 for curr_path in paths: 

563 if "*" in curr_path: 

564 # expand using name_function 

565 expanded_paths.extend(_expand_paths(curr_path, name_function, num)) 

566 else: 

567 expanded_paths.append(curr_path) 

568 # if we generated more paths that asked for, trim the list 

569 if len(expanded_paths) > num: 

570 expanded_paths = expanded_paths[:num] 

571 

572 else: # read mode 

573 for curr_path in paths: 

574 if has_magic(curr_path): 

575 # expand using glob 

576 expanded_paths.extend(fs.glob(curr_path)) 

577 else: 

578 expanded_paths.append(curr_path) 

579 

580 return expanded_paths 

581 

582 

583def get_fs_token_paths( 

584 urlpath, 

585 mode="rb", 

586 num=1, 

587 name_function=None, 

588 storage_options=None, 

589 protocol=None, 

590 expand=True, 

591): 

592 """Filesystem, deterministic token, and paths from a urlpath and options. 

593 

594 Parameters 

595 ---------- 

596 urlpath: string or iterable 

597 Absolute or relative filepath, URL (may include protocols like 

598 ``s3://``), or globstring pointing to data. 

599 mode: str, optional 

600 Mode in which to open files. 

601 num: int, optional 

602 If opening in writing mode, number of files we expect to create. 

603 name_function: callable, optional 

604 If opening in writing mode, this callable is used to generate path 

605 names. Names are generated for each partition by 

606 ``urlpath.replace('*', name_function(partition_index))``. 

607 storage_options: dict, optional 

608 Additional keywords to pass to the filesystem class. 

609 protocol: str or None 

610 To override the protocol specifier in the URL 

611 expand: bool 

612 Expand string paths for writing, assuming the path is a directory 

613 """ 

614 if isinstance(urlpath, (list, tuple, set)): 

615 if not urlpath: 

616 raise ValueError("empty urlpath sequence") 

617 urlpath0 = stringify_path(list(urlpath)[0]) 

618 else: 

619 urlpath0 = stringify_path(urlpath) 

620 storage_options = storage_options or {} 

621 if protocol: 

622 storage_options["protocol"] = protocol 

623 chain = _un_chain(urlpath0, storage_options or {}) 

624 inkwargs = {} 

625 # Reverse iterate the chain, creating a nested target_* structure 

626 for i, ch in enumerate(reversed(chain)): 

627 urls, nested_protocol, kw = ch 

628 if i == len(chain) - 1: 

629 inkwargs = dict(**kw, **inkwargs) 

630 continue 

631 inkwargs["target_options"] = dict(**kw, **inkwargs) 

632 inkwargs["target_protocol"] = nested_protocol 

633 inkwargs["fo"] = urls 

634 paths, protocol, _ = chain[0] 

635 fs = filesystem(protocol, **inkwargs) 

636 if isinstance(urlpath, (list, tuple, set)): 

637 pchains = [ 

638 _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath 

639 ] 

640 if len({pc[1] for pc in pchains}) > 1: 

641 raise ValueError("Protocol mismatch getting fs from %s", urlpath) 

642 paths = [pc[0] for pc in pchains] 

643 else: 

644 paths = fs._strip_protocol(paths) 

645 if isinstance(paths, (list, tuple, set)): 

646 paths = expand_paths_if_needed(paths, mode, num, fs, name_function) 

647 else: 

648 if "w" in mode and expand: 

649 paths = _expand_paths(paths, name_function, num) 

650 elif "x" in mode and expand: 

651 paths = _expand_paths(paths, name_function, num) 

652 elif "*" in paths: 

653 paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)] 

654 else: 

655 paths = [paths] 

656 

657 return fs, fs._fs_token, paths 

658 

659 

660def _expand_paths(path, name_function, num): 

661 if isinstance(path, str): 

662 if path.count("*") > 1: 

663 raise ValueError("Output path spec must contain exactly one '*'.") 

664 elif "*" not in path: 

665 path = os.path.join(path, "*.part") 

666 

667 if name_function is None: 

668 name_function = build_name_function(num - 1) 

669 

670 paths = [path.replace("*", name_function(i)) for i in range(num)] 

671 if paths != sorted(paths): 

672 logger.warning( 

673 "In order to preserve order between partitions" 

674 " paths created with ``name_function`` should " 

675 "sort to partition order" 

676 ) 

677 elif isinstance(path, (tuple, list)): 

678 assert len(path) == num 

679 paths = list(path) 

680 else: 

681 raise ValueError( 

682 "Path should be either\n" 

683 "1. A list of paths: ['foo.json', 'bar.json', ...]\n" 

684 "2. A directory: 'foo/\n" 

685 "3. A path with a '*' in it: 'foo.*.json'" 

686 ) 

687 return paths 

688 

689 

690class PickleableTextIOWrapper(io.TextIOWrapper): 

691 """TextIOWrapper cannot be pickled. This solves it. 

692 

693 Requires that ``buffer`` be pickleable, which all instances of 

694 AbstractBufferedFile are. 

695 """ 

696 

697 def __init__( 

698 self, 

699 buffer, 

700 encoding=None, 

701 errors=None, 

702 newline=None, 

703 line_buffering=False, 

704 write_through=False, 

705 ): 

706 self.args = buffer, encoding, errors, newline, line_buffering, write_through 

707 super().__init__(*self.args) 

708 

709 def __reduce__(self): 

710 return PickleableTextIOWrapper, self.args