Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/core.py: 18%

230 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1from __future__ import absolute_import, division, print_function 

2 

3import io 

4import logging 

5import os 

6import re 

7from glob import has_magic 

8 

9# for backwards compat, we export cache things from here too 

10from .caching import ( # noqa: F401 

11 BaseCache, 

12 BlockCache, 

13 BytesCache, 

14 MMapCache, 

15 ReadAheadCache, 

16 caches, 

17) 

18from .compression import compr 

19from .registry import filesystem, get_filesystem_class 

20from .utils import ( 

21 _unstrip_protocol, 

22 build_name_function, 

23 infer_compression, 

24 stringify_path, 

25) 

26 

27logger = logging.getLogger("fsspec") 

28 

29 

30class OpenFile: 

31 """ 

32 File-like object to be used in a context 

33 

34 Can layer (buffered) text-mode and compression over any file-system, which 

35 are typically binary-only. 

36 

37 These instances are safe to serialize, as the low-level file object 

38 is not created until invoked using ``with``. 

39 

40 Parameters 

41 ---------- 

42 fs: FileSystem 

43 The file system to use for opening the file. Should be a subclass or duck-type 

44 with ``fsspec.spec.AbstractFileSystem`` 

45 path: str 

46 Location to open 

47 mode: str like 'rb', optional 

48 Mode of the opened file 

49 compression: str or None, optional 

50 Compression to apply 

51 encoding: str or None, optional 

52 The encoding to use if opened in text mode. 

53 errors: str or None, optional 

54 How to handle encoding errors if opened in text mode. 

55 newline: None or str 

56 Passed to TextIOWrapper in text mode, how to handle line endings. 

57 autoopen: bool 

58 If True, calls open() immediately. Mostly used by pickle 

59 pos: int 

60 If given and autoopen is True, seek to this location immediately 

61 """ 

62 

63 def __init__( 

64 self, 

65 fs, 

66 path, 

67 mode="rb", 

68 compression=None, 

69 encoding=None, 

70 errors=None, 

71 newline=None, 

72 ): 

73 self.fs = fs 

74 self.path = path 

75 self.mode = mode 

76 self.compression = get_compression(path, compression) 

77 self.encoding = encoding 

78 self.errors = errors 

79 self.newline = newline 

80 self.fobjects = [] 

81 

82 def __reduce__(self): 

83 return ( 

84 OpenFile, 

85 ( 

86 self.fs, 

87 self.path, 

88 self.mode, 

89 self.compression, 

90 self.encoding, 

91 self.errors, 

92 self.newline, 

93 ), 

94 ) 

95 

96 def __repr__(self): 

97 return "<OpenFile '{}'>".format(self.path) 

98 

99 def __enter__(self): 

100 mode = self.mode.replace("t", "").replace("b", "") + "b" 

101 

102 f = self.fs.open(self.path, mode=mode) 

103 

104 self.fobjects = [f] 

105 

106 if self.compression is not None: 

107 compress = compr[self.compression] 

108 f = compress(f, mode=mode[0]) 

109 self.fobjects.append(f) 

110 

111 if "b" not in self.mode: 

112 # assume, for example, that 'r' is equivalent to 'rt' as in builtin 

113 f = PickleableTextIOWrapper( 

114 f, encoding=self.encoding, errors=self.errors, newline=self.newline 

115 ) 

116 self.fobjects.append(f) 

117 

118 return self.fobjects[-1] 

119 

120 def __exit__(self, *args): 

121 self.close() 

122 

123 @property 

124 def full_name(self): 

125 return _unstrip_protocol(self.path, self.fs) 

126 

127 def open(self): 

128 """Materialise this as a real open file without context 

129 

130 The OpenFile object should be explicitly closed to avoid enclosed file 

131 instances persisting. You must, therefore, keep a reference to the OpenFile 

132 during the life of the file-like it generates. 

133 """ 

134 return self.__enter__() 

135 

136 def close(self): 

137 """Close all encapsulated file objects""" 

138 for f in reversed(self.fobjects): 

139 if "r" not in self.mode and not f.closed: 

140 f.flush() 

141 f.close() 

142 self.fobjects.clear() 

143 

144 

145class OpenFiles(list): 

146 """List of OpenFile instances 

147 

148 Can be used in a single context, which opens and closes all of the 

149 contained files. Normal list access to get the elements works as 

150 normal. 

151 

152 A special case is made for caching filesystems - the files will 

153 be down/uploaded together at the start or end of the context, and 

154 this may happen concurrently, if the target filesystem supports it. 

155 """ 

156 

157 def __init__(self, *args, mode="rb", fs=None): 

158 self.mode = mode 

159 self.fs = fs 

160 self.files = [] 

161 super().__init__(*args) 

162 

163 def __enter__(self): 

164 if self.fs is None: 

165 raise ValueError("Context has already been used") 

166 

167 fs = self.fs 

168 while True: 

169 if hasattr(fs, "open_many"): 

170 # check for concurrent cache download; or set up for upload 

171 self.files = fs.open_many(self) 

172 return self.files 

173 if hasattr(fs, "fs") and fs.fs is not None: 

174 fs = fs.fs 

175 else: 

176 break 

177 return [s.__enter__() for s in self] 

178 

179 def __exit__(self, *args): 

180 fs = self.fs 

181 [s.__exit__(*args) for s in self] 

182 if "r" not in self.mode: 

183 while True: 

184 if hasattr(fs, "open_many"): 

185 # check for concurrent cache upload 

186 fs.commit_many(self.files) 

187 return 

188 if hasattr(fs, "fs") and fs.fs is not None: 

189 fs = fs.fs 

190 else: 

191 break 

192 

193 def __getitem__(self, item): 

194 out = super().__getitem__(item) 

195 if isinstance(item, slice): 

196 return OpenFiles(out, mode=self.mode, fs=self.fs) 

197 return out 

198 

199 def __repr__(self): 

200 return "<List of %s OpenFile instances>" % len(self) 

201 

202 

203def open_files( 

204 urlpath, 

205 mode="rb", 

206 compression=None, 

207 encoding="utf8", 

208 errors=None, 

209 name_function=None, 

210 num=1, 

211 protocol=None, 

212 newline=None, 

213 auto_mkdir=True, 

214 expand=True, 

215 **kwargs, 

216): 

217 """Given a path or paths, return a list of ``OpenFile`` objects. 

218 

219 For writing, a str path must contain the "*" character, which will be filled 

220 in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2. 

221 

222 For either reading or writing, can instead provide explicit list of paths. 

223 

224 Parameters 

225 ---------- 

226 urlpath: string or list 

227 Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` 

228 to read from alternative filesystems. To read from multiple files you 

229 can pass a globstring or a list of paths, with the caveat that they 

230 must all have the same protocol. 

231 mode: 'rb', 'wt', etc. 

232 compression: string or None 

233 If given, open file using compression codec. Can either be a compression 

234 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 

235 compression from the filename suffix. 

236 encoding: str 

237 For text mode only 

238 errors: None or str 

239 Passed to TextIOWrapper in text mode 

240 name_function: function or None 

241 if opening a set of files for writing, those files do not yet exist, 

242 so we need to generate their names by formatting the urlpath for 

243 each sequence number 

244 num: int [1] 

245 if writing mode, number of files we expect to create (passed to 

246 name+function) 

247 protocol: str or None 

248 If given, overrides the protocol found in the URL. 

249 newline: bytes or None 

250 Used for line terminator in text mode. If None, uses system default; 

251 if blank, uses no translation. 

252 auto_mkdir: bool (True) 

253 If in write mode, this will ensure the target directory exists before 

254 writing, by calling ``fs.mkdirs(exist_ok=True)``. 

255 expand: bool 

256 **kwargs: dict 

257 Extra options that make sense to a particular storage connection, e.g. 

258 host, port, username, password, etc. 

259 

260 Examples 

261 -------- 

262 >>> files = open_files('2015-*-*.csv') # doctest: +SKIP 

263 >>> files = open_files( 

264 ... 's3://bucket/2015-*-*.csv.gz', compression='gzip' 

265 ... ) # doctest: +SKIP 

266 

267 Returns 

268 ------- 

269 An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can 

270 be used as a single context 

271 

272 Notes 

273 ----- 

274 For a full list of the available protocols and the implementations that 

275 they map across to see the latest online documentation: 

276 

277 - For implementations built into ``fsspec`` see 

278 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations 

279 - For implementations in separate packages see 

280 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations 

281 """ 

282 fs, fs_token, paths = get_fs_token_paths( 

283 urlpath, 

284 mode, 

285 num=num, 

286 name_function=name_function, 

287 storage_options=kwargs, 

288 protocol=protocol, 

289 expand=expand, 

290 ) 

291 if "r" not in mode and auto_mkdir: 

292 parents = {fs._parent(path) for path in paths} 

293 [fs.makedirs(parent, exist_ok=True) for parent in parents] 

294 return OpenFiles( 

295 [ 

296 OpenFile( 

297 fs, 

298 path, 

299 mode=mode, 

300 compression=compression, 

301 encoding=encoding, 

302 errors=errors, 

303 newline=newline, 

304 ) 

305 for path in paths 

306 ], 

307 mode=mode, 

308 fs=fs, 

309 ) 

310 

311 

312def _un_chain(path, kwargs): 

313 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word 

314 bits = ( 

315 [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")] 

316 if "::" in path 

317 else [path] 

318 ) 

319 # [[url, protocol, kwargs], ...] 

320 out = [] 

321 previous_bit = None 

322 kwargs = kwargs.copy() 

323 for bit in reversed(bits): 

324 protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file" 

325 cls = get_filesystem_class(protocol) 

326 extra_kwargs = cls._get_kwargs_from_urls(bit) 

327 kws = kwargs.pop(protocol, {}) 

328 if bit is bits[0]: 

329 kws.update(kwargs) 

330 kw = dict(**extra_kwargs, **kws) 

331 bit = cls._strip_protocol(bit) 

332 if ( 

333 protocol in {"blockcache", "filecache", "simplecache"} 

334 and "target_protocol" not in kw 

335 ): 

336 bit = previous_bit 

337 out.append((bit, protocol, kw)) 

338 previous_bit = bit 

339 out = list(reversed(out)) 

340 return out 

341 

342 

343def url_to_fs(url, **kwargs): 

344 """ 

345 Turn fully-qualified and potentially chained URL into filesystem instance 

346 

347 Parameters 

348 ---------- 

349 url : str 

350 The fsspec-compatible URL 

351 **kwargs: dict 

352 Extra options that make sense to a particular storage connection, e.g. 

353 host, port, username, password, etc. 

354 

355 Returns 

356 ------- 

357 filesystem : FileSystem 

358 The new filesystem discovered from ``url`` and created with 

359 ``**kwargs``. 

360 urlpath : str 

361 The file-systems-specific URL for ``url``. 

362 """ 

363 chain = _un_chain(url, kwargs) 

364 inkwargs = {} 

365 # Reverse iterate the chain, creating a nested target_* structure 

366 for i, ch in enumerate(reversed(chain)): 

367 urls, protocol, kw = ch 

368 if i == len(chain) - 1: 

369 inkwargs = dict(**kw, **inkwargs) 

370 continue 

371 inkwargs["target_options"] = dict(**kw, **inkwargs) 

372 inkwargs["target_protocol"] = protocol 

373 inkwargs["fo"] = urls 

374 urlpath, protocol, _ = chain[0] 

375 fs = filesystem(protocol, **inkwargs) 

376 return fs, urlpath 

377 

378 

379def open( 

380 urlpath, 

381 mode="rb", 

382 compression=None, 

383 encoding="utf8", 

384 errors=None, 

385 protocol=None, 

386 newline=None, 

387 **kwargs, 

388): 

389 """Given a path or paths, return one ``OpenFile`` object. 

390 

391 Parameters 

392 ---------- 

393 urlpath: string or list 

394 Absolute or relative filepath. Prefix with a protocol like ``s3://`` 

395 to read from alternative filesystems. Should not include glob 

396 character(s). 

397 mode: 'rb', 'wt', etc. 

398 compression: string or None 

399 If given, open file using compression codec. Can either be a compression 

400 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 

401 compression from the filename suffix. 

402 encoding: str 

403 For text mode only 

404 errors: None or str 

405 Passed to TextIOWrapper in text mode 

406 protocol: str or None 

407 If given, overrides the protocol found in the URL. 

408 newline: bytes or None 

409 Used for line terminator in text mode. If None, uses system default; 

410 if blank, uses no translation. 

411 **kwargs: dict 

412 Extra options that make sense to a particular storage connection, e.g. 

413 host, port, username, password, etc. 

414 

415 Examples 

416 -------- 

417 >>> openfile = open('2015-01-01.csv') # doctest: +SKIP 

418 >>> openfile = open( 

419 ... 's3://bucket/2015-01-01.csv.gz', compression='gzip' 

420 ... ) # doctest: +SKIP 

421 >>> with openfile as f: 

422 ... df = pd.read_csv(f) # doctest: +SKIP 

423 ... 

424 

425 Returns 

426 ------- 

427 ``OpenFile`` object. 

428 

429 Notes 

430 ----- 

431 For a full list of the available protocols and the implementations that 

432 they map across to see the latest online documentation: 

433 

434 - For implementations built into ``fsspec`` see 

435 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations 

436 - For implementations in separate packages see 

437 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations 

438 """ 

439 return open_files( 

440 urlpath=[urlpath], 

441 mode=mode, 

442 compression=compression, 

443 encoding=encoding, 

444 errors=errors, 

445 protocol=protocol, 

446 newline=newline, 

447 expand=False, 

448 **kwargs, 

449 )[0] 

450 

451 

452def open_local(url, mode="rb", **storage_options): 

453 """Open file(s) which can be resolved to local 

454 

455 For files which either are local, or get downloaded upon open 

456 (e.g., by file caching) 

457 

458 Parameters 

459 ---------- 

460 url: str or list(str) 

461 mode: str 

462 Must be read mode 

463 storage_options: 

464 passed on to FS for or used by open_files (e.g., compression) 

465 """ 

466 if "r" not in mode: 

467 raise ValueError("Can only ensure local files when reading") 

468 of = open_files(url, mode=mode, **storage_options) 

469 if not getattr(of[0].fs, "local_file", False): 

470 raise ValueError( 

471 "open_local can only be used on a filesystem which" 

472 " has attribute local_file=True" 

473 ) 

474 with of as files: 

475 paths = [f.name for f in files] 

476 if isinstance(url, str) and not has_magic(url): 

477 return paths[0] 

478 return paths 

479 

480 

481def get_compression(urlpath, compression): 

482 if compression == "infer": 

483 compression = infer_compression(urlpath) 

484 if compression is not None and compression not in compr: 

485 raise ValueError("Compression type %s not supported" % compression) 

486 return compression 

487 

488 

489def split_protocol(urlpath): 

490 """Return protocol, path pair""" 

491 urlpath = stringify_path(urlpath) 

492 if "://" in urlpath: 

493 protocol, path = urlpath.split("://", 1) 

494 if len(protocol) > 1: 

495 # excludes Windows paths 

496 return protocol, path 

497 return None, urlpath 

498 

499 

500def strip_protocol(urlpath): 

501 """Return only path part of full URL, according to appropriate backend""" 

502 protocol, _ = split_protocol(urlpath) 

503 cls = get_filesystem_class(protocol) 

504 return cls._strip_protocol(urlpath) 

505 

506 

507def expand_paths_if_needed(paths, mode, num, fs, name_function): 

508 """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]`` 

509 in them (read mode). 

510 

511 :param paths: list of paths 

512 mode: str 

513 Mode in which to open files. 

514 num: int 

515 If opening in writing mode, number of files we expect to create. 

516 fs: filesystem object 

517 name_function: callable 

518 If opening in writing mode, this callable is used to generate path 

519 names. Names are generated for each partition by 

520 ``urlpath.replace('*', name_function(partition_index))``. 

521 :return: list of paths 

522 """ 

523 expanded_paths = [] 

524 paths = list(paths) 

525 

526 if "w" in mode: # read mode 

527 if sum([1 for p in paths if "*" in p]) > 1: 

528 raise ValueError( 

529 "When writing data, only one filename mask can be specified." 

530 ) 

531 num = max(num, len(paths)) 

532 

533 for curr_path in paths: 

534 if "*" in curr_path: 

535 # expand using name_function 

536 expanded_paths.extend(_expand_paths(curr_path, name_function, num)) 

537 else: 

538 expanded_paths.append(curr_path) 

539 # if we generated more paths that asked for, trim the list 

540 if len(expanded_paths) > num: 

541 expanded_paths = expanded_paths[:num] 

542 

543 else: # read mode 

544 for curr_path in paths: 

545 if has_magic(curr_path): 

546 # expand using glob 

547 expanded_paths.extend(fs.glob(curr_path)) 

548 else: 

549 expanded_paths.append(curr_path) 

550 

551 return expanded_paths 

552 

553 

554def get_fs_token_paths( 

555 urlpath, 

556 mode="rb", 

557 num=1, 

558 name_function=None, 

559 storage_options=None, 

560 protocol=None, 

561 expand=True, 

562): 

563 """Filesystem, deterministic token, and paths from a urlpath and options. 

564 

565 Parameters 

566 ---------- 

567 urlpath: string or iterable 

568 Absolute or relative filepath, URL (may include protocols like 

569 ``s3://``), or globstring pointing to data. 

570 mode: str, optional 

571 Mode in which to open files. 

572 num: int, optional 

573 If opening in writing mode, number of files we expect to create. 

574 name_function: callable, optional 

575 If opening in writing mode, this callable is used to generate path 

576 names. Names are generated for each partition by 

577 ``urlpath.replace('*', name_function(partition_index))``. 

578 storage_options: dict, optional 

579 Additional keywords to pass to the filesystem class. 

580 protocol: str or None 

581 To override the protocol specifier in the URL 

582 expand: bool 

583 Expand string paths for writing, assuming the path is a directory 

584 """ 

585 if isinstance(urlpath, (list, tuple, set)): 

586 if not urlpath: 

587 raise ValueError("empty urlpath sequence") 

588 urlpath0 = stringify_path(list(urlpath)[0]) 

589 else: 

590 urlpath0 = stringify_path(urlpath) 

591 storage_options = storage_options or {} 

592 if protocol: 

593 storage_options["protocol"] = protocol 

594 chain = _un_chain(urlpath0, storage_options or {}) 

595 inkwargs = {} 

596 # Reverse iterate the chain, creating a nested target_* structure 

597 for i, ch in enumerate(reversed(chain)): 

598 urls, nested_protocol, kw = ch 

599 if i == len(chain) - 1: 

600 inkwargs = dict(**kw, **inkwargs) 

601 continue 

602 inkwargs["target_options"] = dict(**kw, **inkwargs) 

603 inkwargs["target_protocol"] = nested_protocol 

604 inkwargs["fo"] = urls 

605 paths, protocol, _ = chain[0] 

606 fs = filesystem(protocol, **inkwargs) 

607 if isinstance(urlpath, (list, tuple, set)): 

608 pchains = [ 

609 _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath 

610 ] 

611 if len(set(pc[1] for pc in pchains)) > 1: 

612 raise ValueError("Protocol mismatch getting fs from %s", urlpath) 

613 paths = [pc[0] for pc in pchains] 

614 else: 

615 paths = fs._strip_protocol(paths) 

616 if isinstance(paths, (list, tuple, set)): 

617 paths = expand_paths_if_needed(paths, mode, num, fs, name_function) 

618 else: 

619 if "w" in mode and expand: 

620 paths = _expand_paths(paths, name_function, num) 

621 elif "*" in paths: 

622 paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)] 

623 else: 

624 paths = [paths] 

625 

626 return fs, fs._fs_token, paths 

627 

628 

629def _expand_paths(path, name_function, num): 

630 if isinstance(path, str): 

631 if path.count("*") > 1: 

632 raise ValueError("Output path spec must contain exactly one '*'.") 

633 elif "*" not in path: 

634 path = os.path.join(path, "*.part") 

635 

636 if name_function is None: 

637 name_function = build_name_function(num - 1) 

638 

639 paths = [path.replace("*", name_function(i)) for i in range(num)] 

640 if paths != sorted(paths): 

641 logger.warning( 

642 "In order to preserve order between partitions" 

643 " paths created with ``name_function`` should " 

644 "sort to partition order" 

645 ) 

646 elif isinstance(path, (tuple, list)): 

647 assert len(path) == num 

648 paths = list(path) 

649 else: 

650 raise ValueError( 

651 "Path should be either\n" 

652 "1. A list of paths: ['foo.json', 'bar.json', ...]\n" 

653 "2. A directory: 'foo/\n" 

654 "3. A path with a '*' in it: 'foo.*.json'" 

655 ) 

656 return paths 

657 

658 

659class PickleableTextIOWrapper(io.TextIOWrapper): 

660 """TextIOWrapper cannot be pickled. This solves it. 

661 

662 Requires that ``buffer`` be pickleable, which all instances of 

663 AbstractBufferedFile are. 

664 """ 

665 

666 def __init__( 

667 self, 

668 buffer, 

669 encoding=None, 

670 errors=None, 

671 newline=None, 

672 line_buffering=False, 

673 write_through=False, 

674 ): 

675 self.args = buffer, encoding, errors, newline, line_buffering, write_through 

676 super().__init__(*self.args) 

677 

678 def __reduce__(self): 

679 return PickleableTextIOWrapper, self.args