Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/core.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import logging
5import os
6import re
7from glob import has_magic
8from pathlib import Path
10# for backwards compat, we export cache things from here too
11from fsspec.caching import ( # noqa: F401
12 BaseCache,
13 BlockCache,
14 BytesCache,
15 MMapCache,
16 ReadAheadCache,
17 caches,
18)
19from fsspec.compression import compr
20from fsspec.config import conf
21from fsspec.registry import filesystem, get_filesystem_class
22from fsspec.utils import (
23 _unstrip_protocol,
24 build_name_function,
25 infer_compression,
26 stringify_path,
27)
29logger = logging.getLogger("fsspec")
32class OpenFile:
33 """
34 File-like object to be used in a context
36 Can layer (buffered) text-mode and compression over any file-system, which
37 are typically binary-only.
39 These instances are safe to serialize, as the low-level file object
40 is not created until invoked using ``with``.
42 Parameters
43 ----------
44 fs: FileSystem
45 The file system to use for opening the file. Should be a subclass or duck-type
46 with ``fsspec.spec.AbstractFileSystem``
47 path: str
48 Location to open
49 mode: str like 'rb', optional
50 Mode of the opened file
51 compression: str or None, optional
52 Compression to apply
53 encoding: str or None, optional
54 The encoding to use if opened in text mode.
55 errors: str or None, optional
56 How to handle encoding errors if opened in text mode.
57 newline: None or str
58 Passed to TextIOWrapper in text mode, how to handle line endings.
59 autoopen: bool
60 If True, calls open() immediately. Mostly used by pickle
61 pos: int
62 If given and autoopen is True, seek to this location immediately
63 """
65 def __init__(
66 self,
67 fs,
68 path,
69 mode="rb",
70 compression=None,
71 encoding=None,
72 errors=None,
73 newline=None,
74 ):
75 self.fs = fs
76 self.path = path
77 self.mode = mode
78 self.compression = get_compression(path, compression)
79 self.encoding = encoding
80 self.errors = errors
81 self.newline = newline
82 self.fobjects = []
84 def __reduce__(self):
85 return (
86 OpenFile,
87 (
88 self.fs,
89 self.path,
90 self.mode,
91 self.compression,
92 self.encoding,
93 self.errors,
94 self.newline,
95 ),
96 )
98 def __repr__(self):
99 return f"<OpenFile '{self.path}'>"
101 def __enter__(self):
102 mode = self.mode.replace("t", "").replace("b", "") + "b"
104 try:
105 f = self.fs.open(self.path, mode=mode)
106 except FileNotFoundError as e:
107 if has_magic(self.path):
108 raise FileNotFoundError(
109 "%s not found. The URL contains glob characters: you maybe needed\n"
110 "to pass expand=True in fsspec.open() or the storage_options of \n"
111 "your library. You can also set the config value 'open_expand'\n"
112 "before import, or fsspec.core.DEFAULT_EXPAND at runtime, to True.",
113 self.path,
114 ) from e
115 raise
117 self.fobjects = [f]
119 if self.compression is not None:
120 compress = compr[self.compression]
121 f = compress(f, mode=mode[0])
122 self.fobjects.append(f)
124 if "b" not in self.mode:
125 # assume, for example, that 'r' is equivalent to 'rt' as in builtin
126 f = PickleableTextIOWrapper(
127 f, encoding=self.encoding, errors=self.errors, newline=self.newline
128 )
129 self.fobjects.append(f)
131 return self.fobjects[-1]
133 def __exit__(self, *args):
134 self.close()
136 @property
137 def full_name(self):
138 return _unstrip_protocol(self.path, self.fs)
140 def open(self):
141 """Materialise this as a real open file without context
143 The OpenFile object should be explicitly closed to avoid enclosed file
144 instances persisting. You must, therefore, keep a reference to the OpenFile
145 during the life of the file-like it generates.
146 """
147 return self.__enter__()
149 def close(self):
150 """Close all encapsulated file objects"""
151 for f in reversed(self.fobjects):
152 if "r" not in self.mode and not f.closed:
153 f.flush()
154 f.close()
155 self.fobjects.clear()
158class OpenFiles(list):
159 """List of OpenFile instances
161 Can be used in a single context, which opens and closes all of the
162 contained files. Normal list access to get the elements works as
163 normal.
165 A special case is made for caching filesystems - the files will
166 be down/uploaded together at the start or end of the context, and
167 this may happen concurrently, if the target filesystem supports it.
168 """
170 def __init__(self, *args, mode="rb", fs=None):
171 self.mode = mode
172 self.fs = fs
173 self.files = []
174 super().__init__(*args)
176 def __enter__(self):
177 if self.fs is None:
178 raise ValueError("Context has already been used")
180 fs = self.fs
181 while True:
182 if hasattr(fs, "open_many"):
183 # check for concurrent cache download; or set up for upload
184 self.files = fs.open_many(self)
185 return self.files
186 if hasattr(fs, "fs") and fs.fs is not None:
187 fs = fs.fs
188 else:
189 break
190 return [s.__enter__() for s in self]
192 def __exit__(self, *args):
193 fs = self.fs
194 [s.__exit__(*args) for s in self]
195 if "r" not in self.mode:
196 while True:
197 if hasattr(fs, "open_many"):
198 # check for concurrent cache upload
199 fs.commit_many(self.files)
200 return
201 if hasattr(fs, "fs") and fs.fs is not None:
202 fs = fs.fs
203 else:
204 break
206 def __getitem__(self, item):
207 out = super().__getitem__(item)
208 if isinstance(item, slice):
209 return OpenFiles(out, mode=self.mode, fs=self.fs)
210 return out
212 def __repr__(self):
213 return f"<List of {len(self)} OpenFile instances>"
216def open_files(
217 urlpath,
218 mode="rb",
219 compression=None,
220 encoding="utf8",
221 errors=None,
222 name_function=None,
223 num=1,
224 protocol=None,
225 newline=None,
226 auto_mkdir=True,
227 expand=True,
228 **kwargs,
229):
230 """Given a path or paths, return a list of ``OpenFile`` objects.
232 For writing, a str path must contain the "*" character, which will be filled
233 in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2.
235 For either reading or writing, can instead provide explicit list of paths.
237 Parameters
238 ----------
239 urlpath: string or list
240 Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
241 to read from alternative filesystems. To read from multiple files you
242 can pass a globstring or a list of paths, with the caveat that they
243 must all have the same protocol.
244 mode: 'rb', 'wt', etc.
245 compression: string or None
246 If given, open file using compression codec. Can either be a compression
247 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
248 compression from the filename suffix.
249 encoding: str
250 For text mode only
251 errors: None or str
252 Passed to TextIOWrapper in text mode
253 name_function: function or None
254 if opening a set of files for writing, those files do not yet exist,
255 so we need to generate their names by formatting the urlpath for
256 each sequence number
257 num: int [1]
258 if writing mode, number of files we expect to create (passed to
259 name+function)
260 protocol: str or None
261 If given, overrides the protocol found in the URL.
262 newline: bytes or None
263 Used for line terminator in text mode. If None, uses system default;
264 if blank, uses no translation.
265 auto_mkdir: bool (True)
266 If in write mode, this will ensure the target directory exists before
267 writing, by calling ``fs.mkdirs(exist_ok=True)``.
268 expand: bool
269 **kwargs: dict
270 Extra options that make sense to a particular storage connection, e.g.
271 host, port, username, password, etc.
273 Examples
274 --------
275 >>> files = open_files('2015-*-*.csv') # doctest: +SKIP
276 >>> files = open_files(
277 ... 's3://bucket/2015-*-*.csv.gz', compression='gzip'
278 ... ) # doctest: +SKIP
280 Returns
281 -------
282 An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can
283 be used as a single context
285 Notes
286 -----
287 For a full list of the available protocols and the implementations that
288 they map across to see the latest online documentation:
290 - For implementations built into ``fsspec`` see
291 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
292 - For implementations in separate packages see
293 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
294 """
295 fs, fs_token, paths = get_fs_token_paths(
296 urlpath,
297 mode,
298 num=num,
299 name_function=name_function,
300 storage_options=kwargs,
301 protocol=protocol,
302 expand=expand,
303 )
304 if fs.protocol == "file":
305 fs.auto_mkdir = auto_mkdir
306 elif "r" not in mode and auto_mkdir:
307 parents = {fs._parent(path) for path in paths}
308 for parent in parents:
309 try:
310 fs.makedirs(parent, exist_ok=True)
311 except PermissionError:
312 pass
313 return OpenFiles(
314 [
315 OpenFile(
316 fs,
317 path,
318 mode=mode,
319 compression=compression,
320 encoding=encoding,
321 errors=errors,
322 newline=newline,
323 )
324 for path in paths
325 ],
326 mode=mode,
327 fs=fs,
328 )
331def _un_chain(path, kwargs):
332 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
333 bits = (
334 [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")]
335 if "::" in path
336 else [path]
337 )
338 # [[url, protocol, kwargs], ...]
339 out = []
340 previous_bit = None
341 kwargs = kwargs.copy()
342 for bit in reversed(bits):
343 protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file"
344 cls = get_filesystem_class(protocol)
345 extra_kwargs = cls._get_kwargs_from_urls(bit)
346 kws = kwargs.pop(protocol, {})
347 if bit is bits[0]:
348 kws.update(kwargs)
349 kw = dict(**extra_kwargs, **kws)
350 bit = cls._strip_protocol(bit)
351 if (
352 protocol in {"blockcache", "filecache", "simplecache"}
353 and "target_protocol" not in kw
354 ):
355 bit = previous_bit
356 out.append((bit, protocol, kw))
357 previous_bit = bit
358 out.reverse()
359 return out
362def url_to_fs(url, **kwargs):
363 """
364 Turn fully-qualified and potentially chained URL into filesystem instance
366 Parameters
367 ----------
368 url : str
369 The fsspec-compatible URL
370 **kwargs: dict
371 Extra options that make sense to a particular storage connection, e.g.
372 host, port, username, password, etc.
374 Returns
375 -------
376 filesystem : FileSystem
377 The new filesystem discovered from ``url`` and created with
378 ``**kwargs``.
379 urlpath : str
380 The file-systems-specific URL for ``url``.
381 """
382 url = stringify_path(url)
383 # non-FS arguments that appear in fsspec.open()
384 # inspect could keep this in sync with open()'s signature
385 known_kwargs = {
386 "compression",
387 "encoding",
388 "errors",
389 "expand",
390 "mode",
391 "name_function",
392 "newline",
393 "num",
394 }
395 kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs}
396 chain = _un_chain(url, kwargs)
397 inkwargs = {}
398 # Reverse iterate the chain, creating a nested target_* structure
399 for i, ch in enumerate(reversed(chain)):
400 urls, protocol, kw = ch
401 if i == len(chain) - 1:
402 inkwargs = dict(**kw, **inkwargs)
403 continue
404 inkwargs["target_options"] = dict(**kw, **inkwargs)
405 inkwargs["target_protocol"] = protocol
406 inkwargs["fo"] = urls
407 urlpath, protocol, _ = chain[0]
408 fs = filesystem(protocol, **inkwargs)
409 return fs, urlpath
412DEFAULT_EXPAND = conf.get("open_expand", False)
415def open(
416 urlpath,
417 mode="rb",
418 compression=None,
419 encoding="utf8",
420 errors=None,
421 protocol=None,
422 newline=None,
423 expand=None,
424 **kwargs,
425):
426 """Given a path or paths, return one ``OpenFile`` object.
428 Parameters
429 ----------
430 urlpath: string or list
431 Absolute or relative filepath. Prefix with a protocol like ``s3://``
432 to read from alternative filesystems. Should not include glob
433 character(s).
434 mode: 'rb', 'wt', etc.
435 compression: string or None
436 If given, open file using compression codec. Can either be a compression
437 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
438 compression from the filename suffix.
439 encoding: str
440 For text mode only
441 errors: None or str
442 Passed to TextIOWrapper in text mode
443 protocol: str or None
444 If given, overrides the protocol found in the URL.
445 newline: bytes or None
446 Used for line terminator in text mode. If None, uses system default;
447 if blank, uses no translation.
448 expand: bool or Nonw
449 Whether to regard file paths containing special glob characters as needing
450 expansion (finding the first match) or absolute. Setting False allows using
451 paths which do embed such characters. If None (default), this argument
452 takes its value from the DEFAULT_EXPAND module variable, which takes
453 its initial value from the "open_expand" config value at startup, which will
454 be False if not set.
455 **kwargs: dict
456 Extra options that make sense to a particular storage connection, e.g.
457 host, port, username, password, etc.
459 Examples
460 --------
461 >>> openfile = open('2015-01-01.csv') # doctest: +SKIP
462 >>> openfile = open(
463 ... 's3://bucket/2015-01-01.csv.gz', compression='gzip'
464 ... ) # doctest: +SKIP
465 >>> with openfile as f:
466 ... df = pd.read_csv(f) # doctest: +SKIP
467 ...
469 Returns
470 -------
471 ``OpenFile`` object.
473 Notes
474 -----
475 For a full list of the available protocols and the implementations that
476 they map across to see the latest online documentation:
478 - For implementations built into ``fsspec`` see
479 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
480 - For implementations in separate packages see
481 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
482 """
483 expand = DEFAULT_EXPAND if expand is None else expand
484 out = open_files(
485 urlpath=[urlpath],
486 mode=mode,
487 compression=compression,
488 encoding=encoding,
489 errors=errors,
490 protocol=protocol,
491 newline=newline,
492 expand=expand,
493 **kwargs,
494 )
495 if not out:
496 raise FileNotFoundError(urlpath)
497 return out[0]
500def open_local(
501 url: str | list[str] | Path | list[Path],
502 mode: str = "rb",
503 **storage_options: dict,
504) -> str | list[str]:
505 """Open file(s) which can be resolved to local
507 For files which either are local, or get downloaded upon open
508 (e.g., by file caching)
510 Parameters
511 ----------
512 url: str or list(str)
513 mode: str
514 Must be read mode
515 storage_options:
516 passed on to FS for or used by open_files (e.g., compression)
517 """
518 if "r" not in mode:
519 raise ValueError("Can only ensure local files when reading")
520 of = open_files(url, mode=mode, **storage_options)
521 if not getattr(of[0].fs, "local_file", False):
522 raise ValueError(
523 "open_local can only be used on a filesystem which"
524 " has attribute local_file=True"
525 )
526 with of as files:
527 paths = [f.name for f in files]
528 if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path):
529 return paths[0]
530 return paths
533def get_compression(urlpath, compression):
534 if compression == "infer":
535 compression = infer_compression(urlpath)
536 if compression is not None and compression not in compr:
537 raise ValueError(f"Compression type {compression} not supported")
538 return compression
541def split_protocol(urlpath):
542 """Return protocol, path pair"""
543 urlpath = stringify_path(urlpath)
544 if "://" in urlpath:
545 protocol, path = urlpath.split("://", 1)
546 if len(protocol) > 1:
547 # excludes Windows paths
548 return protocol, path
549 if urlpath.startswith("data:"):
550 return urlpath.split(":", 1)
551 return None, urlpath
554def strip_protocol(urlpath):
555 """Return only path part of full URL, according to appropriate backend"""
556 protocol, _ = split_protocol(urlpath)
557 cls = get_filesystem_class(protocol)
558 return cls._strip_protocol(urlpath)
561def expand_paths_if_needed(paths, mode, num, fs, name_function):
562 """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]``
563 in them (read mode).
565 :param paths: list of paths
566 mode: str
567 Mode in which to open files.
568 num: int
569 If opening in writing mode, number of files we expect to create.
570 fs: filesystem object
571 name_function: callable
572 If opening in writing mode, this callable is used to generate path
573 names. Names are generated for each partition by
574 ``urlpath.replace('*', name_function(partition_index))``.
575 :return: list of paths
576 """
577 expanded_paths = []
578 paths = list(paths)
580 if "w" in mode: # read mode
581 if sum([1 for p in paths if "*" in p]) > 1:
582 raise ValueError(
583 "When writing data, only one filename mask can be specified."
584 )
585 num = max(num, len(paths))
587 for curr_path in paths:
588 if "*" in curr_path:
589 # expand using name_function
590 expanded_paths.extend(_expand_paths(curr_path, name_function, num))
591 else:
592 expanded_paths.append(curr_path)
593 # if we generated more paths that asked for, trim the list
594 if len(expanded_paths) > num:
595 expanded_paths = expanded_paths[:num]
597 else: # read mode
598 for curr_path in paths:
599 if has_magic(curr_path):
600 # expand using glob
601 expanded_paths.extend(fs.glob(curr_path))
602 else:
603 expanded_paths.append(curr_path)
605 return expanded_paths
608def get_fs_token_paths(
609 urlpath,
610 mode="rb",
611 num=1,
612 name_function=None,
613 storage_options=None,
614 protocol=None,
615 expand=True,
616):
617 """Filesystem, deterministic token, and paths from a urlpath and options.
619 Parameters
620 ----------
621 urlpath: string or iterable
622 Absolute or relative filepath, URL (may include protocols like
623 ``s3://``), or globstring pointing to data.
624 mode: str, optional
625 Mode in which to open files.
626 num: int, optional
627 If opening in writing mode, number of files we expect to create.
628 name_function: callable, optional
629 If opening in writing mode, this callable is used to generate path
630 names. Names are generated for each partition by
631 ``urlpath.replace('*', name_function(partition_index))``.
632 storage_options: dict, optional
633 Additional keywords to pass to the filesystem class.
634 protocol: str or None
635 To override the protocol specifier in the URL
636 expand: bool
637 Expand string paths for writing, assuming the path is a directory
638 """
639 if isinstance(urlpath, (list, tuple, set)):
640 if not urlpath:
641 raise ValueError("empty urlpath sequence")
642 urlpath0 = stringify_path(list(urlpath)[0])
643 else:
644 urlpath0 = stringify_path(urlpath)
645 storage_options = storage_options or {}
646 if protocol:
647 storage_options["protocol"] = protocol
648 chain = _un_chain(urlpath0, storage_options or {})
649 inkwargs = {}
650 # Reverse iterate the chain, creating a nested target_* structure
651 for i, ch in enumerate(reversed(chain)):
652 urls, nested_protocol, kw = ch
653 if i == len(chain) - 1:
654 inkwargs = dict(**kw, **inkwargs)
655 continue
656 inkwargs["target_options"] = dict(**kw, **inkwargs)
657 inkwargs["target_protocol"] = nested_protocol
658 inkwargs["fo"] = urls
659 paths, protocol, _ = chain[0]
660 fs = filesystem(protocol, **inkwargs)
661 if isinstance(urlpath, (list, tuple, set)):
662 pchains = [
663 _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath
664 ]
665 if len({pc[1] for pc in pchains}) > 1:
666 raise ValueError("Protocol mismatch getting fs from %s", urlpath)
667 paths = [pc[0] for pc in pchains]
668 else:
669 paths = fs._strip_protocol(paths)
670 if isinstance(paths, (list, tuple, set)):
671 if expand:
672 paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
673 elif not isinstance(paths, list):
674 paths = list(paths)
675 else:
676 if "w" in mode and expand:
677 paths = _expand_paths(paths, name_function, num)
678 elif "x" in mode and expand:
679 paths = _expand_paths(paths, name_function, num)
680 elif "*" in paths:
681 paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
682 else:
683 paths = [paths]
685 return fs, fs._fs_token, paths
688def _expand_paths(path, name_function, num):
689 if isinstance(path, str):
690 if path.count("*") > 1:
691 raise ValueError("Output path spec must contain exactly one '*'.")
692 elif "*" not in path:
693 path = os.path.join(path, "*.part")
695 if name_function is None:
696 name_function = build_name_function(num - 1)
698 paths = [path.replace("*", name_function(i)) for i in range(num)]
699 if paths != sorted(paths):
700 logger.warning(
701 "In order to preserve order between partitions"
702 " paths created with ``name_function`` should "
703 "sort to partition order"
704 )
705 elif isinstance(path, (tuple, list)):
706 assert len(path) == num
707 paths = list(path)
708 else:
709 raise ValueError(
710 "Path should be either\n"
711 "1. A list of paths: ['foo.json', 'bar.json', ...]\n"
712 "2. A directory: 'foo/\n"
713 "3. A path with a '*' in it: 'foo.*.json'"
714 )
715 return paths
718class PickleableTextIOWrapper(io.TextIOWrapper):
719 """TextIOWrapper cannot be pickled. This solves it.
721 Requires that ``buffer`` be pickleable, which all instances of
722 AbstractBufferedFile are.
723 """
725 def __init__(
726 self,
727 buffer,
728 encoding=None,
729 errors=None,
730 newline=None,
731 line_buffering=False,
732 write_through=False,
733 ):
734 self.args = buffer, encoding, errors, newline, line_buffering, write_through
735 super().__init__(*self.args)
737 def __reduce__(self):
738 return PickleableTextIOWrapper, self.args