Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/core.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import logging
5import os
6import re
7from glob import has_magic
8from pathlib import Path
10# for backwards compat, we export cache things from here too
11from fsspec.caching import ( # noqa: F401
12 BaseCache,
13 BlockCache,
14 BytesCache,
15 MMapCache,
16 ReadAheadCache,
17 caches,
18)
19from fsspec.compression import compr
20from fsspec.config import conf
21from fsspec.registry import available_protocols, filesystem, get_filesystem_class
22from fsspec.utils import (
23 _unstrip_protocol,
24 build_name_function,
25 infer_compression,
26 stringify_path,
27)
29logger = logging.getLogger("fsspec")
32class OpenFile:
33 """
34 File-like object to be used in a context
36 Can layer (buffered) text-mode and compression over any file-system, which
37 are typically binary-only.
39 These instances are safe to serialize, as the low-level file object
40 is not created until invoked using ``with``.
42 Parameters
43 ----------
44 fs: FileSystem
45 The file system to use for opening the file. Should be a subclass or duck-type
46 with ``fsspec.spec.AbstractFileSystem``
47 path: str
48 Location to open
49 mode: str like 'rb', optional
50 Mode of the opened file
51 compression: str or None, optional
52 Compression to apply
53 encoding: str or None, optional
54 The encoding to use if opened in text mode.
55 errors: str or None, optional
56 How to handle encoding errors if opened in text mode.
57 newline: None or str
58 Passed to TextIOWrapper in text mode, how to handle line endings.
59 autoopen: bool
60 If True, calls open() immediately. Mostly used by pickle
61 pos: int
62 If given and autoopen is True, seek to this location immediately
63 """
65 def __init__(
66 self,
67 fs,
68 path,
69 mode="rb",
70 compression=None,
71 encoding=None,
72 errors=None,
73 newline=None,
74 ):
75 self.fs = fs
76 self.path = path
77 self.mode = mode
78 self.compression = get_compression(path, compression)
79 self.encoding = encoding
80 self.errors = errors
81 self.newline = newline
82 self.fobjects = []
84 def __reduce__(self):
85 return (
86 OpenFile,
87 (
88 self.fs,
89 self.path,
90 self.mode,
91 self.compression,
92 self.encoding,
93 self.errors,
94 self.newline,
95 ),
96 )
98 def __repr__(self):
99 return f"<OpenFile '{self.path}'>"
101 def __enter__(self):
102 mode = self.mode.replace("t", "").replace("b", "") + "b"
104 try:
105 f = self.fs.open(self.path, mode=mode)
106 except FileNotFoundError as e:
107 if has_magic(self.path):
108 raise FileNotFoundError(
109 "%s not found. The URL contains glob characters: you maybe needed\n"
110 "to pass expand=True in fsspec.open() or the storage_options of \n"
111 "your library. You can also set the config value 'open_expand'\n"
112 "before import, or fsspec.core.DEFAULT_EXPAND at runtime, to True.",
113 self.path,
114 ) from e
115 raise
117 self.fobjects = [f]
119 if self.compression is not None:
120 compress = compr[self.compression]
121 f = compress(f, mode=mode[0])
122 self.fobjects.append(f)
124 if "b" not in self.mode:
125 # assume, for example, that 'r' is equivalent to 'rt' as in builtin
126 f = PickleableTextIOWrapper(
127 f, encoding=self.encoding, errors=self.errors, newline=self.newline
128 )
129 self.fobjects.append(f)
131 return self.fobjects[-1]
133 def __exit__(self, *args):
134 self.close()
136 @property
137 def full_name(self):
138 return _unstrip_protocol(self.path, self.fs)
140 def open(self):
141 """Materialise this as a real open file without context
143 The OpenFile object should be explicitly closed to avoid enclosed file
144 instances persisting. You must, therefore, keep a reference to the OpenFile
145 during the life of the file-like it generates.
146 """
147 return self.__enter__()
149 def close(self):
150 """Close all encapsulated file objects"""
151 for f in reversed(self.fobjects):
152 if "r" not in self.mode and not f.closed:
153 f.flush()
154 f.close()
155 self.fobjects.clear()
158class OpenFiles(list):
159 """List of OpenFile instances
161 Can be used in a single context, which opens and closes all of the
162 contained files. Normal list access to get the elements works as
163 normal.
165 A special case is made for caching filesystems - the files will
166 be down/uploaded together at the start or end of the context, and
167 this may happen concurrently, if the target filesystem supports it.
168 """
170 def __init__(self, *args, mode="rb", fs=None):
171 self.mode = mode
172 self.fs = fs
173 self.files = []
174 super().__init__(*args)
176 def __enter__(self):
177 if self.fs is None:
178 raise ValueError("Context has already been used")
180 fs = self.fs
181 while True:
182 if hasattr(fs, "open_many"):
183 # check for concurrent cache download; or set up for upload
184 self.files = fs.open_many(self)
185 return self.files
186 if hasattr(fs, "fs") and fs.fs is not None:
187 fs = fs.fs
188 else:
189 break
190 return [s.__enter__() for s in self]
192 def __exit__(self, *args):
193 fs = self.fs
194 [s.__exit__(*args) for s in self]
195 if "r" not in self.mode:
196 while True:
197 if hasattr(fs, "open_many"):
198 # check for concurrent cache upload
199 fs.commit_many(self.files)
200 return
201 if hasattr(fs, "fs") and fs.fs is not None:
202 fs = fs.fs
203 else:
204 break
206 def __getitem__(self, item):
207 out = super().__getitem__(item)
208 if isinstance(item, slice):
209 return OpenFiles(out, mode=self.mode, fs=self.fs)
210 return out
212 def __repr__(self):
213 return f"<List of {len(self)} OpenFile instances>"
216def open_files(
217 urlpath,
218 mode="rb",
219 compression=None,
220 encoding="utf8",
221 errors=None,
222 name_function=None,
223 num=1,
224 protocol=None,
225 newline=None,
226 auto_mkdir=True,
227 expand=True,
228 **kwargs,
229):
230 """Given a path or paths, return a list of ``OpenFile`` objects.
232 For writing, a str path must contain the "*" character, which will be filled
233 in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2.
235 For either reading or writing, can instead provide explicit list of paths.
237 Parameters
238 ----------
239 urlpath: string or list
240 Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
241 to read from alternative filesystems. To read from multiple files you
242 can pass a globstring or a list of paths, with the caveat that they
243 must all have the same protocol.
244 mode: 'rb', 'wt', etc.
245 compression: string or None
246 If given, open file using compression codec. Can either be a compression
247 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
248 compression from the filename suffix.
249 encoding: str
250 For text mode only
251 errors: None or str
252 Passed to TextIOWrapper in text mode
253 name_function: function or None
254 if opening a set of files for writing, those files do not yet exist,
255 so we need to generate their names by formatting the urlpath for
256 each sequence number
257 num: int [1]
258 if writing mode, number of files we expect to create (passed to
259 name+function)
260 protocol: str or None
261 If given, overrides the protocol found in the URL.
262 newline: bytes or None
263 Used for line terminator in text mode. If None, uses system default;
264 if blank, uses no translation.
265 auto_mkdir: bool (True)
266 If in write mode, this will ensure the target directory exists before
267 writing, by calling ``fs.mkdirs(exist_ok=True)``.
268 expand: bool
269 **kwargs: dict
270 Extra options that make sense to a particular storage connection, e.g.
271 host, port, username, password, etc.
273 Examples
274 --------
275 >>> files = open_files('2015-*-*.csv') # doctest: +SKIP
276 >>> files = open_files(
277 ... 's3://bucket/2015-*-*.csv.gz', compression='gzip'
278 ... ) # doctest: +SKIP
280 Returns
281 -------
282 An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can
283 be used as a single context
285 Notes
286 -----
287 For a full list of the available protocols and the implementations that
288 they map across to see the latest online documentation:
290 - For implementations built into ``fsspec`` see
291 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
292 - For implementations in separate packages see
293 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
294 """
295 fs, fs_token, paths = get_fs_token_paths(
296 urlpath,
297 mode,
298 num=num,
299 name_function=name_function,
300 storage_options=kwargs,
301 protocol=protocol,
302 expand=expand,
303 )
304 if fs.protocol == "file":
305 fs.auto_mkdir = auto_mkdir
306 elif "r" not in mode and auto_mkdir:
307 parents = {fs._parent(path) for path in paths}
308 for parent in parents:
309 try:
310 fs.makedirs(parent, exist_ok=True)
311 except PermissionError:
312 pass
313 return OpenFiles(
314 [
315 OpenFile(
316 fs,
317 path,
318 mode=mode,
319 compression=compression,
320 encoding=encoding,
321 errors=errors,
322 newline=newline,
323 )
324 for path in paths
325 ],
326 mode=mode,
327 fs=fs,
328 )
331def _un_chain(path, kwargs):
332 # Avoid a circular import
333 from fsspec.implementations.chained import ChainedFileSystem
335 if "::" in path:
336 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
337 known_protocols = set(available_protocols())
338 bits = []
340 # split on '::', then ensure each bit has a protocol
341 for p in path.split("::"):
342 if p in known_protocols:
343 bits.append(p + "://")
344 elif "://" in p or x.match(p):
345 bits.append(p)
346 else:
347 bits.append(p + "://")
348 else:
349 bits = [path]
351 # [[url, protocol, kwargs], ...]
352 out = []
353 previous_bit = None
354 kwargs = kwargs.copy()
356 for bit in reversed(bits):
357 protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file"
358 cls = get_filesystem_class(protocol)
359 extra_kwargs = cls._get_kwargs_from_urls(bit)
360 kws = kwargs.pop(protocol, {})
362 if bit is bits[0]:
363 kws.update(kwargs)
365 kw = dict(
366 **{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]},
367 **kws,
368 )
369 bit = cls._strip_protocol(bit)
371 if (
372 "target_protocol" not in kw
373 and issubclass(cls, ChainedFileSystem)
374 and not bit
375 ):
376 # replace bit if we are chaining and no path given
377 bit = previous_bit
379 out.append((bit, protocol, kw))
380 previous_bit = bit
382 out.reverse()
383 return out
386def url_to_fs(url, **kwargs):
387 """
388 Turn fully-qualified and potentially chained URL into filesystem instance
390 Parameters
391 ----------
392 url : str
393 The fsspec-compatible URL
394 **kwargs: dict
395 Extra options that make sense to a particular storage connection, e.g.
396 host, port, username, password, etc.
398 Returns
399 -------
400 filesystem : FileSystem
401 The new filesystem discovered from ``url`` and created with
402 ``**kwargs``.
403 urlpath : str
404 The file-systems-specific URL for ``url``.
405 """
406 url = stringify_path(url)
407 # non-FS arguments that appear in fsspec.open()
408 # inspect could keep this in sync with open()'s signature
409 known_kwargs = {
410 "compression",
411 "encoding",
412 "errors",
413 "expand",
414 "mode",
415 "name_function",
416 "newline",
417 "num",
418 }
419 kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs}
420 chain = _un_chain(url, kwargs)
421 inkwargs = {}
422 # Reverse iterate the chain, creating a nested target_* structure
423 for i, ch in enumerate(reversed(chain)):
424 urls, protocol, kw = ch
425 if i == len(chain) - 1:
426 inkwargs = dict(**kw, **inkwargs)
427 continue
428 inkwargs["target_options"] = dict(**kw, **inkwargs)
429 inkwargs["target_protocol"] = protocol
430 inkwargs["fo"] = urls
431 urlpath, protocol, _ = chain[0]
432 fs = filesystem(protocol, **inkwargs)
433 return fs, urlpath
436DEFAULT_EXPAND = conf.get("open_expand", False)
439def open(
440 urlpath,
441 mode="rb",
442 compression=None,
443 encoding="utf8",
444 errors=None,
445 protocol=None,
446 newline=None,
447 expand=None,
448 **kwargs,
449):
450 """Given a path or paths, return one ``OpenFile`` object.
452 Parameters
453 ----------
454 urlpath: string or list
455 Absolute or relative filepath. Prefix with a protocol like ``s3://``
456 to read from alternative filesystems. Should not include glob
457 character(s).
458 mode: 'rb', 'wt', etc.
459 compression: string or None
460 If given, open file using compression codec. Can either be a compression
461 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
462 compression from the filename suffix.
463 encoding: str
464 For text mode only
465 errors: None or str
466 Passed to TextIOWrapper in text mode
467 protocol: str or None
468 If given, overrides the protocol found in the URL.
469 newline: bytes or None
470 Used for line terminator in text mode. If None, uses system default;
471 if blank, uses no translation.
472 expand: bool or None
473 Whether to regard file paths containing special glob characters as needing
474 expansion (finding the first match) or absolute. Setting False allows using
475 paths which do embed such characters. If None (default), this argument
476 takes its value from the DEFAULT_EXPAND module variable, which takes
477 its initial value from the "open_expand" config value at startup, which will
478 be False if not set.
479 **kwargs: dict
480 Extra options that make sense to a particular storage connection, e.g.
481 host, port, username, password, etc.
483 Examples
484 --------
485 >>> openfile = open('2015-01-01.csv') # doctest: +SKIP
486 >>> openfile = open(
487 ... 's3://bucket/2015-01-01.csv.gz', compression='gzip'
488 ... ) # doctest: +SKIP
489 >>> with openfile as f:
490 ... df = pd.read_csv(f) # doctest: +SKIP
491 ...
493 Returns
494 -------
495 ``OpenFile`` object.
497 Notes
498 -----
499 For a full list of the available protocols and the implementations that
500 they map across to see the latest online documentation:
502 - For implementations built into ``fsspec`` see
503 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
504 - For implementations in separate packages see
505 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
506 """
507 expand = DEFAULT_EXPAND if expand is None else expand
508 out = open_files(
509 urlpath=[urlpath],
510 mode=mode,
511 compression=compression,
512 encoding=encoding,
513 errors=errors,
514 protocol=protocol,
515 newline=newline,
516 expand=expand,
517 **kwargs,
518 )
519 if not out:
520 raise FileNotFoundError(urlpath)
521 return out[0]
524def open_local(
525 url: str | list[str] | Path | list[Path],
526 mode: str = "rb",
527 **storage_options: dict,
528) -> str | list[str]:
529 """Open file(s) which can be resolved to local
531 For files which either are local, or get downloaded upon open
532 (e.g., by file caching)
534 Parameters
535 ----------
536 url: str or list(str)
537 mode: str
538 Must be read mode
539 storage_options:
540 passed on to FS for or used by open_files (e.g., compression)
541 """
542 if "r" not in mode:
543 raise ValueError("Can only ensure local files when reading")
544 of = open_files(url, mode=mode, **storage_options)
545 if not getattr(of[0].fs, "local_file", False):
546 raise ValueError(
547 "open_local can only be used on a filesystem which"
548 " has attribute local_file=True"
549 )
550 with of as files:
551 paths = [f.name for f in files]
552 if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path):
553 return paths[0]
554 return paths
557def get_compression(urlpath, compression):
558 if compression == "infer":
559 compression = infer_compression(urlpath)
560 if compression is not None and compression not in compr:
561 raise ValueError(f"Compression type {compression} not supported")
562 return compression
565def split_protocol(urlpath):
566 """Return protocol, path pair"""
567 urlpath = stringify_path(urlpath)
568 if "://" in urlpath:
569 protocol, path = urlpath.split("://", 1)
570 if len(protocol) > 1:
571 # excludes Windows paths
572 return protocol, path
573 if urlpath.startswith("data:"):
574 return urlpath.split(":", 1)
575 return None, urlpath
578def strip_protocol(urlpath):
579 """Return only path part of full URL, according to appropriate backend"""
580 protocol, _ = split_protocol(urlpath)
581 cls = get_filesystem_class(protocol)
582 return cls._strip_protocol(urlpath)
585def expand_paths_if_needed(paths, mode, num, fs, name_function):
586 """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]``
587 in them (read mode).
589 :param paths: list of paths
590 mode: str
591 Mode in which to open files.
592 num: int
593 If opening in writing mode, number of files we expect to create.
594 fs: filesystem object
595 name_function: callable
596 If opening in writing mode, this callable is used to generate path
597 names. Names are generated for each partition by
598 ``urlpath.replace('*', name_function(partition_index))``.
599 :return: list of paths
600 """
601 expanded_paths = []
602 paths = list(paths)
604 if "w" in mode: # read mode
605 if sum(1 for p in paths if "*" in p) > 1:
606 raise ValueError(
607 "When writing data, only one filename mask can be specified."
608 )
609 num = max(num, len(paths))
611 for curr_path in paths:
612 if "*" in curr_path:
613 # expand using name_function
614 expanded_paths.extend(_expand_paths(curr_path, name_function, num))
615 else:
616 expanded_paths.append(curr_path)
617 # if we generated more paths that asked for, trim the list
618 if len(expanded_paths) > num:
619 expanded_paths = expanded_paths[:num]
621 else: # read mode
622 for curr_path in paths:
623 if has_magic(curr_path):
624 # expand using glob
625 expanded_paths.extend(fs.glob(curr_path))
626 else:
627 expanded_paths.append(curr_path)
629 return expanded_paths
632def get_fs_token_paths(
633 urlpath,
634 mode="rb",
635 num=1,
636 name_function=None,
637 storage_options=None,
638 protocol=None,
639 expand=True,
640):
641 """Filesystem, deterministic token, and paths from a urlpath and options.
643 Parameters
644 ----------
645 urlpath: string or iterable
646 Absolute or relative filepath, URL (may include protocols like
647 ``s3://``), or globstring pointing to data.
648 mode: str, optional
649 Mode in which to open files.
650 num: int, optional
651 If opening in writing mode, number of files we expect to create.
652 name_function: callable, optional
653 If opening in writing mode, this callable is used to generate path
654 names. Names are generated for each partition by
655 ``urlpath.replace('*', name_function(partition_index))``.
656 storage_options: dict, optional
657 Additional keywords to pass to the filesystem class.
658 protocol: str or None
659 To override the protocol specifier in the URL
660 expand: bool
661 Expand string paths for writing, assuming the path is a directory
662 """
663 if isinstance(urlpath, (list, tuple, set)):
664 if not urlpath:
665 raise ValueError("empty urlpath sequence")
666 urlpath0 = stringify_path(next(iter(urlpath)))
667 else:
668 urlpath0 = stringify_path(urlpath)
669 storage_options = storage_options or {}
670 if protocol:
671 storage_options["protocol"] = protocol
672 chain = _un_chain(urlpath0, storage_options or {})
673 inkwargs = {}
674 # Reverse iterate the chain, creating a nested target_* structure
675 for i, ch in enumerate(reversed(chain)):
676 urls, nested_protocol, kw = ch
677 if i == len(chain) - 1:
678 inkwargs = dict(**kw, **inkwargs)
679 continue
680 inkwargs["target_options"] = dict(**kw, **inkwargs)
681 inkwargs["target_protocol"] = nested_protocol
682 inkwargs["fo"] = urls
683 paths, protocol, _ = chain[0]
684 fs = filesystem(protocol, **inkwargs)
685 if isinstance(urlpath, (list, tuple, set)):
686 pchains = [
687 _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath
688 ]
689 if len({pc[1] for pc in pchains}) > 1:
690 raise ValueError("Protocol mismatch getting fs from %s", urlpath)
691 paths = [pc[0] for pc in pchains]
692 else:
693 paths = fs._strip_protocol(paths)
694 if isinstance(paths, (list, tuple, set)):
695 if expand:
696 paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
697 elif not isinstance(paths, list):
698 paths = list(paths)
699 else:
700 if ("w" in mode or "x" in mode) and expand:
701 paths = _expand_paths(paths, name_function, num)
702 elif "*" in paths:
703 paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
704 else:
705 paths = [paths]
707 return fs, fs._fs_token, paths
710def _expand_paths(path, name_function, num):
711 if isinstance(path, str):
712 if path.count("*") > 1:
713 raise ValueError("Output path spec must contain exactly one '*'.")
714 elif "*" not in path:
715 path = os.path.join(path, "*.part")
717 if name_function is None:
718 name_function = build_name_function(num - 1)
720 paths = [path.replace("*", name_function(i)) for i in range(num)]
721 if paths != sorted(paths):
722 logger.warning(
723 "In order to preserve order between partitions"
724 " paths created with ``name_function`` should "
725 "sort to partition order"
726 )
727 elif isinstance(path, (tuple, list)):
728 assert len(path) == num
729 paths = list(path)
730 else:
731 raise ValueError(
732 "Path should be either\n"
733 "1. A list of paths: ['foo.json', 'bar.json', ...]\n"
734 "2. A directory: 'foo/\n"
735 "3. A path with a '*' in it: 'foo.*.json'"
736 )
737 return paths
740class PickleableTextIOWrapper(io.TextIOWrapper):
741 """TextIOWrapper cannot be pickled. This solves it.
743 Requires that ``buffer`` be pickleable, which all instances of
744 AbstractBufferedFile are.
745 """
747 def __init__(
748 self,
749 buffer,
750 encoding=None,
751 errors=None,
752 newline=None,
753 line_buffering=False,
754 write_through=False,
755 ):
756 self.args = buffer, encoding, errors, newline, line_buffering, write_through
757 super().__init__(*self.args)
759 def __reduce__(self):
760 return PickleableTextIOWrapper, self.args