Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/core.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import logging
5import os
6import re
7from glob import has_magic
8from pathlib import Path
10# for backwards compat, we export cache things from here too
11from fsspec.caching import ( # noqa: F401
12 BaseCache,
13 BlockCache,
14 BytesCache,
15 MMapCache,
16 ReadAheadCache,
17 caches,
18)
19from fsspec.compression import compr
20from fsspec.config import conf
21from fsspec.registry import filesystem, get_filesystem_class
22from fsspec.utils import (
23 _unstrip_protocol,
24 build_name_function,
25 infer_compression,
26 stringify_path,
27)
29logger = logging.getLogger("fsspec")
32class OpenFile:
33 """
34 File-like object to be used in a context
36 Can layer (buffered) text-mode and compression over any file-system, which
37 are typically binary-only.
39 These instances are safe to serialize, as the low-level file object
40 is not created until invoked using ``with``.
42 Parameters
43 ----------
44 fs: FileSystem
45 The file system to use for opening the file. Should be a subclass or duck-type
46 with ``fsspec.spec.AbstractFileSystem``
47 path: str
48 Location to open
49 mode: str like 'rb', optional
50 Mode of the opened file
51 compression: str or None, optional
52 Compression to apply
53 encoding: str or None, optional
54 The encoding to use if opened in text mode.
55 errors: str or None, optional
56 How to handle encoding errors if opened in text mode.
57 newline: None or str
58 Passed to TextIOWrapper in text mode, how to handle line endings.
59 autoopen: bool
60 If True, calls open() immediately. Mostly used by pickle
61 pos: int
62 If given and autoopen is True, seek to this location immediately
63 """
65 def __init__(
66 self,
67 fs,
68 path,
69 mode="rb",
70 compression=None,
71 encoding=None,
72 errors=None,
73 newline=None,
74 ):
75 self.fs = fs
76 self.path = path
77 self.mode = mode
78 self.compression = get_compression(path, compression)
79 self.encoding = encoding
80 self.errors = errors
81 self.newline = newline
82 self.fobjects = []
84 def __reduce__(self):
85 return (
86 OpenFile,
87 (
88 self.fs,
89 self.path,
90 self.mode,
91 self.compression,
92 self.encoding,
93 self.errors,
94 self.newline,
95 ),
96 )
98 def __repr__(self):
99 return f"<OpenFile '{self.path}'>"
101 def __enter__(self):
102 mode = self.mode.replace("t", "").replace("b", "") + "b"
104 try:
105 f = self.fs.open(self.path, mode=mode)
106 except FileNotFoundError as e:
107 if has_magic(self.path):
108 raise FileNotFoundError(
109 "%s not found. The URL contains glob characters: you maybe needed\n"
110 "to pass expand=True in fsspec.open() or the storage_options of \n"
111 "your library. You can also set the config value 'open_expand'\n"
112 "before import, or fsspec.core.DEFAULT_EXPAND at runtime, to True.",
113 self.path,
114 ) from e
115 raise
117 self.fobjects = [f]
119 if self.compression is not None:
120 compress = compr[self.compression]
121 f = compress(f, mode=mode[0])
122 self.fobjects.append(f)
124 if "b" not in self.mode:
125 # assume, for example, that 'r' is equivalent to 'rt' as in builtin
126 f = PickleableTextIOWrapper(
127 f, encoding=self.encoding, errors=self.errors, newline=self.newline
128 )
129 self.fobjects.append(f)
131 return self.fobjects[-1]
133 def __exit__(self, *args):
134 self.close()
136 @property
137 def full_name(self):
138 return _unstrip_protocol(self.path, self.fs)
140 def open(self):
141 """Materialise this as a real open file without context
143 The OpenFile object should be explicitly closed to avoid enclosed file
144 instances persisting. You must, therefore, keep a reference to the OpenFile
145 during the life of the file-like it generates.
146 """
147 return self.__enter__()
149 def close(self):
150 """Close all encapsulated file objects"""
151 for f in reversed(self.fobjects):
152 if "r" not in self.mode and not f.closed:
153 f.flush()
154 f.close()
155 self.fobjects.clear()
158class OpenFiles(list):
159 """List of OpenFile instances
161 Can be used in a single context, which opens and closes all of the
162 contained files. Normal list access to get the elements works as
163 normal.
165 A special case is made for caching filesystems - the files will
166 be down/uploaded together at the start or end of the context, and
167 this may happen concurrently, if the target filesystem supports it.
168 """
170 def __init__(self, *args, mode="rb", fs=None):
171 self.mode = mode
172 self.fs = fs
173 self.files = []
174 super().__init__(*args)
176 def __enter__(self):
177 if self.fs is None:
178 raise ValueError("Context has already been used")
180 fs = self.fs
181 while True:
182 if hasattr(fs, "open_many"):
183 # check for concurrent cache download; or set up for upload
184 self.files = fs.open_many(self)
185 return self.files
186 if hasattr(fs, "fs") and fs.fs is not None:
187 fs = fs.fs
188 else:
189 break
190 return [s.__enter__() for s in self]
192 def __exit__(self, *args):
193 fs = self.fs
194 [s.__exit__(*args) for s in self]
195 if "r" not in self.mode:
196 while True:
197 if hasattr(fs, "open_many"):
198 # check for concurrent cache upload
199 fs.commit_many(self.files)
200 return
201 if hasattr(fs, "fs") and fs.fs is not None:
202 fs = fs.fs
203 else:
204 break
206 def __getitem__(self, item):
207 out = super().__getitem__(item)
208 if isinstance(item, slice):
209 return OpenFiles(out, mode=self.mode, fs=self.fs)
210 return out
212 def __repr__(self):
213 return f"<List of {len(self)} OpenFile instances>"
216def open_files(
217 urlpath,
218 mode="rb",
219 compression=None,
220 encoding="utf8",
221 errors=None,
222 name_function=None,
223 num=1,
224 protocol=None,
225 newline=None,
226 auto_mkdir=True,
227 expand=True,
228 **kwargs,
229):
230 """Given a path or paths, return a list of ``OpenFile`` objects.
232 For writing, a str path must contain the "*" character, which will be filled
233 in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2.
235 For either reading or writing, can instead provide explicit list of paths.
237 Parameters
238 ----------
239 urlpath: string or list
240 Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
241 to read from alternative filesystems. To read from multiple files you
242 can pass a globstring or a list of paths, with the caveat that they
243 must all have the same protocol.
244 mode: 'rb', 'wt', etc.
245 compression: string or None
246 If given, open file using compression codec. Can either be a compression
247 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
248 compression from the filename suffix.
249 encoding: str
250 For text mode only
251 errors: None or str
252 Passed to TextIOWrapper in text mode
253 name_function: function or None
254 if opening a set of files for writing, those files do not yet exist,
255 so we need to generate their names by formatting the urlpath for
256 each sequence number
257 num: int [1]
258 if writing mode, number of files we expect to create (passed to
259 name+function)
260 protocol: str or None
261 If given, overrides the protocol found in the URL.
262 newline: bytes or None
263 Used for line terminator in text mode. If None, uses system default;
264 if blank, uses no translation.
265 auto_mkdir: bool (True)
266 If in write mode, this will ensure the target directory exists before
267 writing, by calling ``fs.mkdirs(exist_ok=True)``.
268 expand: bool
269 **kwargs: dict
270 Extra options that make sense to a particular storage connection, e.g.
271 host, port, username, password, etc.
273 Examples
274 --------
275 >>> files = open_files('2015-*-*.csv') # doctest: +SKIP
276 >>> files = open_files(
277 ... 's3://bucket/2015-*-*.csv.gz', compression='gzip'
278 ... ) # doctest: +SKIP
280 Returns
281 -------
282 An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can
283 be used as a single context
285 Notes
286 -----
287 For a full list of the available protocols and the implementations that
288 they map across to see the latest online documentation:
290 - For implementations built into ``fsspec`` see
291 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
292 - For implementations in separate packages see
293 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
294 """
295 fs, fs_token, paths = get_fs_token_paths(
296 urlpath,
297 mode,
298 num=num,
299 name_function=name_function,
300 storage_options=kwargs,
301 protocol=protocol,
302 expand=expand,
303 )
304 if fs.protocol == "file":
305 fs.auto_mkdir = auto_mkdir
306 elif "r" not in mode and auto_mkdir:
307 parents = {fs._parent(path) for path in paths}
308 for parent in parents:
309 try:
310 fs.makedirs(parent, exist_ok=True)
311 except PermissionError:
312 pass
313 return OpenFiles(
314 [
315 OpenFile(
316 fs,
317 path,
318 mode=mode,
319 compression=compression,
320 encoding=encoding,
321 errors=errors,
322 newline=newline,
323 )
324 for path in paths
325 ],
326 mode=mode,
327 fs=fs,
328 )
331def _un_chain(path, kwargs):
332 # Avoid a circular import
333 from fsspec.implementations.cached import CachingFileSystem
335 if "::" in path:
336 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
337 bits = []
338 for p in path.split("::"):
339 if "://" in p or x.match(p):
340 bits.append(p)
341 else:
342 bits.append(p + "://")
343 else:
344 bits = [path]
345 # [[url, protocol, kwargs], ...]
346 out = []
347 previous_bit = None
348 kwargs = kwargs.copy()
349 for bit in reversed(bits):
350 protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file"
351 cls = get_filesystem_class(protocol)
352 extra_kwargs = cls._get_kwargs_from_urls(bit)
353 kws = kwargs.pop(protocol, {})
354 if bit is bits[0]:
355 kws.update(kwargs)
356 kw = dict(
357 **{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]},
358 **kws,
359 )
360 bit = cls._strip_protocol(bit)
361 if "target_protocol" not in kw and issubclass(cls, CachingFileSystem):
362 bit = previous_bit
363 out.append((bit, protocol, kw))
364 previous_bit = bit
365 out.reverse()
366 return out
369def url_to_fs(url, **kwargs):
370 """
371 Turn fully-qualified and potentially chained URL into filesystem instance
373 Parameters
374 ----------
375 url : str
376 The fsspec-compatible URL
377 **kwargs: dict
378 Extra options that make sense to a particular storage connection, e.g.
379 host, port, username, password, etc.
381 Returns
382 -------
383 filesystem : FileSystem
384 The new filesystem discovered from ``url`` and created with
385 ``**kwargs``.
386 urlpath : str
387 The file-systems-specific URL for ``url``.
388 """
389 url = stringify_path(url)
390 # non-FS arguments that appear in fsspec.open()
391 # inspect could keep this in sync with open()'s signature
392 known_kwargs = {
393 "compression",
394 "encoding",
395 "errors",
396 "expand",
397 "mode",
398 "name_function",
399 "newline",
400 "num",
401 }
402 kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs}
403 chain = _un_chain(url, kwargs)
404 inkwargs = {}
405 # Reverse iterate the chain, creating a nested target_* structure
406 for i, ch in enumerate(reversed(chain)):
407 urls, protocol, kw = ch
408 if i == len(chain) - 1:
409 inkwargs = dict(**kw, **inkwargs)
410 continue
411 inkwargs["target_options"] = dict(**kw, **inkwargs)
412 inkwargs["target_protocol"] = protocol
413 inkwargs["fo"] = urls
414 urlpath, protocol, _ = chain[0]
415 fs = filesystem(protocol, **inkwargs)
416 return fs, urlpath
419DEFAULT_EXPAND = conf.get("open_expand", False)
422def open(
423 urlpath,
424 mode="rb",
425 compression=None,
426 encoding="utf8",
427 errors=None,
428 protocol=None,
429 newline=None,
430 expand=None,
431 **kwargs,
432):
433 """Given a path or paths, return one ``OpenFile`` object.
435 Parameters
436 ----------
437 urlpath: string or list
438 Absolute or relative filepath. Prefix with a protocol like ``s3://``
439 to read from alternative filesystems. Should not include glob
440 character(s).
441 mode: 'rb', 'wt', etc.
442 compression: string or None
443 If given, open file using compression codec. Can either be a compression
444 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
445 compression from the filename suffix.
446 encoding: str
447 For text mode only
448 errors: None or str
449 Passed to TextIOWrapper in text mode
450 protocol: str or None
451 If given, overrides the protocol found in the URL.
452 newline: bytes or None
453 Used for line terminator in text mode. If None, uses system default;
454 if blank, uses no translation.
455 expand: bool or None
456 Whether to regard file paths containing special glob characters as needing
457 expansion (finding the first match) or absolute. Setting False allows using
458 paths which do embed such characters. If None (default), this argument
459 takes its value from the DEFAULT_EXPAND module variable, which takes
460 its initial value from the "open_expand" config value at startup, which will
461 be False if not set.
462 **kwargs: dict
463 Extra options that make sense to a particular storage connection, e.g.
464 host, port, username, password, etc.
466 Examples
467 --------
468 >>> openfile = open('2015-01-01.csv') # doctest: +SKIP
469 >>> openfile = open(
470 ... 's3://bucket/2015-01-01.csv.gz', compression='gzip'
471 ... ) # doctest: +SKIP
472 >>> with openfile as f:
473 ... df = pd.read_csv(f) # doctest: +SKIP
474 ...
476 Returns
477 -------
478 ``OpenFile`` object.
480 Notes
481 -----
482 For a full list of the available protocols and the implementations that
483 they map across to see the latest online documentation:
485 - For implementations built into ``fsspec`` see
486 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
487 - For implementations in separate packages see
488 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
489 """
490 expand = DEFAULT_EXPAND if expand is None else expand
491 out = open_files(
492 urlpath=[urlpath],
493 mode=mode,
494 compression=compression,
495 encoding=encoding,
496 errors=errors,
497 protocol=protocol,
498 newline=newline,
499 expand=expand,
500 **kwargs,
501 )
502 if not out:
503 raise FileNotFoundError(urlpath)
504 return out[0]
507def open_local(
508 url: str | list[str] | Path | list[Path],
509 mode: str = "rb",
510 **storage_options: dict,
511) -> str | list[str]:
512 """Open file(s) which can be resolved to local
514 For files which either are local, or get downloaded upon open
515 (e.g., by file caching)
517 Parameters
518 ----------
519 url: str or list(str)
520 mode: str
521 Must be read mode
522 storage_options:
523 passed on to FS for or used by open_files (e.g., compression)
524 """
525 if "r" not in mode:
526 raise ValueError("Can only ensure local files when reading")
527 of = open_files(url, mode=mode, **storage_options)
528 if not getattr(of[0].fs, "local_file", False):
529 raise ValueError(
530 "open_local can only be used on a filesystem which"
531 " has attribute local_file=True"
532 )
533 with of as files:
534 paths = [f.name for f in files]
535 if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path):
536 return paths[0]
537 return paths
540def get_compression(urlpath, compression):
541 if compression == "infer":
542 compression = infer_compression(urlpath)
543 if compression is not None and compression not in compr:
544 raise ValueError(f"Compression type {compression} not supported")
545 return compression
548def split_protocol(urlpath):
549 """Return protocol, path pair"""
550 urlpath = stringify_path(urlpath)
551 if "://" in urlpath:
552 protocol, path = urlpath.split("://", 1)
553 if len(protocol) > 1:
554 # excludes Windows paths
555 return protocol, path
556 if urlpath.startswith("data:"):
557 return urlpath.split(":", 1)
558 return None, urlpath
561def strip_protocol(urlpath):
562 """Return only path part of full URL, according to appropriate backend"""
563 protocol, _ = split_protocol(urlpath)
564 cls = get_filesystem_class(protocol)
565 return cls._strip_protocol(urlpath)
568def expand_paths_if_needed(paths, mode, num, fs, name_function):
569 """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]``
570 in them (read mode).
572 :param paths: list of paths
573 mode: str
574 Mode in which to open files.
575 num: int
576 If opening in writing mode, number of files we expect to create.
577 fs: filesystem object
578 name_function: callable
579 If opening in writing mode, this callable is used to generate path
580 names. Names are generated for each partition by
581 ``urlpath.replace('*', name_function(partition_index))``.
582 :return: list of paths
583 """
584 expanded_paths = []
585 paths = list(paths)
587 if "w" in mode: # read mode
588 if sum(1 for p in paths if "*" in p) > 1:
589 raise ValueError(
590 "When writing data, only one filename mask can be specified."
591 )
592 num = max(num, len(paths))
594 for curr_path in paths:
595 if "*" in curr_path:
596 # expand using name_function
597 expanded_paths.extend(_expand_paths(curr_path, name_function, num))
598 else:
599 expanded_paths.append(curr_path)
600 # if we generated more paths that asked for, trim the list
601 if len(expanded_paths) > num:
602 expanded_paths = expanded_paths[:num]
604 else: # read mode
605 for curr_path in paths:
606 if has_magic(curr_path):
607 # expand using glob
608 expanded_paths.extend(fs.glob(curr_path))
609 else:
610 expanded_paths.append(curr_path)
612 return expanded_paths
615def get_fs_token_paths(
616 urlpath,
617 mode="rb",
618 num=1,
619 name_function=None,
620 storage_options=None,
621 protocol=None,
622 expand=True,
623):
624 """Filesystem, deterministic token, and paths from a urlpath and options.
626 Parameters
627 ----------
628 urlpath: string or iterable
629 Absolute or relative filepath, URL (may include protocols like
630 ``s3://``), or globstring pointing to data.
631 mode: str, optional
632 Mode in which to open files.
633 num: int, optional
634 If opening in writing mode, number of files we expect to create.
635 name_function: callable, optional
636 If opening in writing mode, this callable is used to generate path
637 names. Names are generated for each partition by
638 ``urlpath.replace('*', name_function(partition_index))``.
639 storage_options: dict, optional
640 Additional keywords to pass to the filesystem class.
641 protocol: str or None
642 To override the protocol specifier in the URL
643 expand: bool
644 Expand string paths for writing, assuming the path is a directory
645 """
646 if isinstance(urlpath, (list, tuple, set)):
647 if not urlpath:
648 raise ValueError("empty urlpath sequence")
649 urlpath0 = stringify_path(next(iter(urlpath)))
650 else:
651 urlpath0 = stringify_path(urlpath)
652 storage_options = storage_options or {}
653 if protocol:
654 storage_options["protocol"] = protocol
655 chain = _un_chain(urlpath0, storage_options or {})
656 inkwargs = {}
657 # Reverse iterate the chain, creating a nested target_* structure
658 for i, ch in enumerate(reversed(chain)):
659 urls, nested_protocol, kw = ch
660 if i == len(chain) - 1:
661 inkwargs = dict(**kw, **inkwargs)
662 continue
663 inkwargs["target_options"] = dict(**kw, **inkwargs)
664 inkwargs["target_protocol"] = nested_protocol
665 inkwargs["fo"] = urls
666 paths, protocol, _ = chain[0]
667 fs = filesystem(protocol, **inkwargs)
668 if isinstance(urlpath, (list, tuple, set)):
669 pchains = [
670 _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath
671 ]
672 if len({pc[1] for pc in pchains}) > 1:
673 raise ValueError("Protocol mismatch getting fs from %s", urlpath)
674 paths = [pc[0] for pc in pchains]
675 else:
676 paths = fs._strip_protocol(paths)
677 if isinstance(paths, (list, tuple, set)):
678 if expand:
679 paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
680 elif not isinstance(paths, list):
681 paths = list(paths)
682 else:
683 if ("w" in mode or "x" in mode) and expand:
684 paths = _expand_paths(paths, name_function, num)
685 elif "*" in paths:
686 paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
687 else:
688 paths = [paths]
690 return fs, fs._fs_token, paths
693def _expand_paths(path, name_function, num):
694 if isinstance(path, str):
695 if path.count("*") > 1:
696 raise ValueError("Output path spec must contain exactly one '*'.")
697 elif "*" not in path:
698 path = os.path.join(path, "*.part")
700 if name_function is None:
701 name_function = build_name_function(num - 1)
703 paths = [path.replace("*", name_function(i)) for i in range(num)]
704 if paths != sorted(paths):
705 logger.warning(
706 "In order to preserve order between partitions"
707 " paths created with ``name_function`` should "
708 "sort to partition order"
709 )
710 elif isinstance(path, (tuple, list)):
711 assert len(path) == num
712 paths = list(path)
713 else:
714 raise ValueError(
715 "Path should be either\n"
716 "1. A list of paths: ['foo.json', 'bar.json', ...]\n"
717 "2. A directory: 'foo/\n"
718 "3. A path with a '*' in it: 'foo.*.json'"
719 )
720 return paths
723class PickleableTextIOWrapper(io.TextIOWrapper):
724 """TextIOWrapper cannot be pickled. This solves it.
726 Requires that ``buffer`` be pickleable, which all instances of
727 AbstractBufferedFile are.
728 """
730 def __init__(
731 self,
732 buffer,
733 encoding=None,
734 errors=None,
735 newline=None,
736 line_buffering=False,
737 write_through=False,
738 ):
739 self.args = buffer, encoding, errors, newline, line_buffering, write_through
740 super().__init__(*self.args)
742 def __reduce__(self):
743 return PickleableTextIOWrapper, self.args