Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/core.py: 17%
246 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1from __future__ import annotations
3import io
4import logging
5import os
6import re
7from glob import has_magic
8from pathlib import Path
10# for backwards compat, we export cache things from here too
11from .caching import ( # noqa: F401
12 BaseCache,
13 BlockCache,
14 BytesCache,
15 MMapCache,
16 ReadAheadCache,
17 caches,
18)
19from .compression import compr
20from .registry import filesystem, get_filesystem_class
21from .utils import (
22 _unstrip_protocol,
23 build_name_function,
24 infer_compression,
25 stringify_path,
26)
28logger = logging.getLogger("fsspec")
31class OpenFile:
32 """
33 File-like object to be used in a context
35 Can layer (buffered) text-mode and compression over any file-system, which
36 are typically binary-only.
38 These instances are safe to serialize, as the low-level file object
39 is not created until invoked using ``with``.
41 Parameters
42 ----------
43 fs: FileSystem
44 The file system to use for opening the file. Should be a subclass or duck-type
45 with ``fsspec.spec.AbstractFileSystem``
46 path: str
47 Location to open
48 mode: str like 'rb', optional
49 Mode of the opened file
50 compression: str or None, optional
51 Compression to apply
52 encoding: str or None, optional
53 The encoding to use if opened in text mode.
54 errors: str or None, optional
55 How to handle encoding errors if opened in text mode.
56 newline: None or str
57 Passed to TextIOWrapper in text mode, how to handle line endings.
58 autoopen: bool
59 If True, calls open() immediately. Mostly used by pickle
60 pos: int
61 If given and autoopen is True, seek to this location immediately
62 """
64 def __init__(
65 self,
66 fs,
67 path,
68 mode="rb",
69 compression=None,
70 encoding=None,
71 errors=None,
72 newline=None,
73 ):
74 self.fs = fs
75 self.path = path
76 self.mode = mode
77 self.compression = get_compression(path, compression)
78 self.encoding = encoding
79 self.errors = errors
80 self.newline = newline
81 self.fobjects = []
83 def __reduce__(self):
84 return (
85 OpenFile,
86 (
87 self.fs,
88 self.path,
89 self.mode,
90 self.compression,
91 self.encoding,
92 self.errors,
93 self.newline,
94 ),
95 )
97 def __repr__(self):
98 return f"<OpenFile '{self.path}'>"
100 def __enter__(self):
101 mode = self.mode.replace("t", "").replace("b", "") + "b"
103 f = self.fs.open(self.path, mode=mode)
105 self.fobjects = [f]
107 if self.compression is not None:
108 compress = compr[self.compression]
109 f = compress(f, mode=mode[0])
110 self.fobjects.append(f)
112 if "b" not in self.mode:
113 # assume, for example, that 'r' is equivalent to 'rt' as in builtin
114 f = PickleableTextIOWrapper(
115 f, encoding=self.encoding, errors=self.errors, newline=self.newline
116 )
117 self.fobjects.append(f)
119 return self.fobjects[-1]
121 def __exit__(self, *args):
122 self.close()
124 @property
125 def full_name(self):
126 return _unstrip_protocol(self.path, self.fs)
128 def open(self):
129 """Materialise this as a real open file without context
131 The OpenFile object should be explicitly closed to avoid enclosed file
132 instances persisting. You must, therefore, keep a reference to the OpenFile
133 during the life of the file-like it generates.
134 """
135 return self.__enter__()
137 def close(self):
138 """Close all encapsulated file objects"""
139 for f in reversed(self.fobjects):
140 if "r" not in self.mode and not f.closed:
141 f.flush()
142 f.close()
143 self.fobjects.clear()
146class OpenFiles(list):
147 """List of OpenFile instances
149 Can be used in a single context, which opens and closes all of the
150 contained files. Normal list access to get the elements works as
151 normal.
153 A special case is made for caching filesystems - the files will
154 be down/uploaded together at the start or end of the context, and
155 this may happen concurrently, if the target filesystem supports it.
156 """
158 def __init__(self, *args, mode="rb", fs=None):
159 self.mode = mode
160 self.fs = fs
161 self.files = []
162 super().__init__(*args)
164 def __enter__(self):
165 if self.fs is None:
166 raise ValueError("Context has already been used")
168 fs = self.fs
169 while True:
170 if hasattr(fs, "open_many"):
171 # check for concurrent cache download; or set up for upload
172 self.files = fs.open_many(self)
173 return self.files
174 if hasattr(fs, "fs") and fs.fs is not None:
175 fs = fs.fs
176 else:
177 break
178 return [s.__enter__() for s in self]
180 def __exit__(self, *args):
181 fs = self.fs
182 [s.__exit__(*args) for s in self]
183 if "r" not in self.mode:
184 while True:
185 if hasattr(fs, "open_many"):
186 # check for concurrent cache upload
187 fs.commit_many(self.files)
188 return
189 if hasattr(fs, "fs") and fs.fs is not None:
190 fs = fs.fs
191 else:
192 break
194 def __getitem__(self, item):
195 out = super().__getitem__(item)
196 if isinstance(item, slice):
197 return OpenFiles(out, mode=self.mode, fs=self.fs)
198 return out
200 def __repr__(self):
201 return f"<List of {len(self)} OpenFile instances>"
204def open_files(
205 urlpath,
206 mode="rb",
207 compression=None,
208 encoding="utf8",
209 errors=None,
210 name_function=None,
211 num=1,
212 protocol=None,
213 newline=None,
214 auto_mkdir=True,
215 expand=True,
216 **kwargs,
217):
218 """Given a path or paths, return a list of ``OpenFile`` objects.
220 For writing, a str path must contain the "*" character, which will be filled
221 in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2.
223 For either reading or writing, can instead provide explicit list of paths.
225 Parameters
226 ----------
227 urlpath: string or list
228 Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
229 to read from alternative filesystems. To read from multiple files you
230 can pass a globstring or a list of paths, with the caveat that they
231 must all have the same protocol.
232 mode: 'rb', 'wt', etc.
233 compression: string or None
234 If given, open file using compression codec. Can either be a compression
235 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
236 compression from the filename suffix.
237 encoding: str
238 For text mode only
239 errors: None or str
240 Passed to TextIOWrapper in text mode
241 name_function: function or None
242 if opening a set of files for writing, those files do not yet exist,
243 so we need to generate their names by formatting the urlpath for
244 each sequence number
245 num: int [1]
246 if writing mode, number of files we expect to create (passed to
247 name+function)
248 protocol: str or None
249 If given, overrides the protocol found in the URL.
250 newline: bytes or None
251 Used for line terminator in text mode. If None, uses system default;
252 if blank, uses no translation.
253 auto_mkdir: bool (True)
254 If in write mode, this will ensure the target directory exists before
255 writing, by calling ``fs.mkdirs(exist_ok=True)``.
256 expand: bool
257 **kwargs: dict
258 Extra options that make sense to a particular storage connection, e.g.
259 host, port, username, password, etc.
261 Examples
262 --------
263 >>> files = open_files('2015-*-*.csv') # doctest: +SKIP
264 >>> files = open_files(
265 ... 's3://bucket/2015-*-*.csv.gz', compression='gzip'
266 ... ) # doctest: +SKIP
268 Returns
269 -------
270 An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can
271 be used as a single context
273 Notes
274 -----
275 For a full list of the available protocols and the implementations that
276 they map across to see the latest online documentation:
278 - For implementations built into ``fsspec`` see
279 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
280 - For implementations in separate packages see
281 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
282 """
283 fs, fs_token, paths = get_fs_token_paths(
284 urlpath,
285 mode,
286 num=num,
287 name_function=name_function,
288 storage_options=kwargs,
289 protocol=protocol,
290 expand=expand,
291 )
292 if fs.protocol == "file":
293 fs.auto_mkdir = auto_mkdir
294 elif "r" not in mode and auto_mkdir:
295 parents = {fs._parent(path) for path in paths}
296 for parent in parents:
297 try:
298 fs.makedirs(parent, exist_ok=True)
299 except PermissionError:
300 pass
301 return OpenFiles(
302 [
303 OpenFile(
304 fs,
305 path,
306 mode=mode,
307 compression=compression,
308 encoding=encoding,
309 errors=errors,
310 newline=newline,
311 )
312 for path in paths
313 ],
314 mode=mode,
315 fs=fs,
316 )
319def _un_chain(path, kwargs):
320 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
321 bits = (
322 [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")]
323 if "::" in path
324 else [path]
325 )
326 # [[url, protocol, kwargs], ...]
327 out = []
328 previous_bit = None
329 kwargs = kwargs.copy()
330 for bit in reversed(bits):
331 protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file"
332 cls = get_filesystem_class(protocol)
333 extra_kwargs = cls._get_kwargs_from_urls(bit)
334 kws = kwargs.pop(protocol, {})
335 if bit is bits[0]:
336 kws.update(kwargs)
337 kw = dict(**extra_kwargs, **kws)
338 bit = cls._strip_protocol(bit)
339 if (
340 protocol in {"blockcache", "filecache", "simplecache"}
341 and "target_protocol" not in kw
342 ):
343 bit = previous_bit
344 out.append((bit, protocol, kw))
345 previous_bit = bit
346 out = list(reversed(out))
347 return out
350def url_to_fs(url, **kwargs):
351 """
352 Turn fully-qualified and potentially chained URL into filesystem instance
354 Parameters
355 ----------
356 url : str
357 The fsspec-compatible URL
358 **kwargs: dict
359 Extra options that make sense to a particular storage connection, e.g.
360 host, port, username, password, etc.
362 Returns
363 -------
364 filesystem : FileSystem
365 The new filesystem discovered from ``url`` and created with
366 ``**kwargs``.
367 urlpath : str
368 The file-systems-specific URL for ``url``.
369 """
370 # non-FS arguments that appear in fsspec.open()
371 # inspect could keep this in sync with open()'s signature
372 known_kwargs = {
373 "compression",
374 "encoding",
375 "errors",
376 "expand",
377 "mode",
378 "name_function",
379 "newline",
380 "num",
381 }
382 kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs}
383 chain = _un_chain(url, kwargs)
384 inkwargs = {}
385 # Reverse iterate the chain, creating a nested target_* structure
386 for i, ch in enumerate(reversed(chain)):
387 urls, protocol, kw = ch
388 if i == len(chain) - 1:
389 inkwargs = dict(**kw, **inkwargs)
390 continue
391 inkwargs["target_options"] = dict(**kw, **inkwargs)
392 inkwargs["target_protocol"] = protocol
393 inkwargs["fo"] = urls
394 urlpath, protocol, _ = chain[0]
395 fs = filesystem(protocol, **inkwargs)
396 return fs, urlpath
399def open(
400 urlpath,
401 mode="rb",
402 compression=None,
403 encoding="utf8",
404 errors=None,
405 protocol=None,
406 newline=None,
407 **kwargs,
408):
409 """Given a path or paths, return one ``OpenFile`` object.
411 Parameters
412 ----------
413 urlpath: string or list
414 Absolute or relative filepath. Prefix with a protocol like ``s3://``
415 to read from alternative filesystems. Should not include glob
416 character(s).
417 mode: 'rb', 'wt', etc.
418 compression: string or None
419 If given, open file using compression codec. Can either be a compression
420 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
421 compression from the filename suffix.
422 encoding: str
423 For text mode only
424 errors: None or str
425 Passed to TextIOWrapper in text mode
426 protocol: str or None
427 If given, overrides the protocol found in the URL.
428 newline: bytes or None
429 Used for line terminator in text mode. If None, uses system default;
430 if blank, uses no translation.
431 **kwargs: dict
432 Extra options that make sense to a particular storage connection, e.g.
433 host, port, username, password, etc.
435 Examples
436 --------
437 >>> openfile = open('2015-01-01.csv') # doctest: +SKIP
438 >>> openfile = open(
439 ... 's3://bucket/2015-01-01.csv.gz', compression='gzip'
440 ... ) # doctest: +SKIP
441 >>> with openfile as f:
442 ... df = pd.read_csv(f) # doctest: +SKIP
443 ...
445 Returns
446 -------
447 ``OpenFile`` object.
449 Notes
450 -----
451 For a full list of the available protocols and the implementations that
452 they map across to see the latest online documentation:
454 - For implementations built into ``fsspec`` see
455 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
456 - For implementations in separate packages see
457 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
458 """
459 out = open_files(
460 urlpath=[urlpath],
461 mode=mode,
462 compression=compression,
463 encoding=encoding,
464 errors=errors,
465 protocol=protocol,
466 newline=newline,
467 expand=False,
468 **kwargs,
469 )
470 if not out:
471 raise FileNotFoundError(urlpath)
472 return out[0]
475def open_local(
476 url: str | list[str] | Path | list[Path],
477 mode: str = "rb",
478 **storage_options: dict,
479) -> str | list[str]:
480 """Open file(s) which can be resolved to local
482 For files which either are local, or get downloaded upon open
483 (e.g., by file caching)
485 Parameters
486 ----------
487 url: str or list(str)
488 mode: str
489 Must be read mode
490 storage_options:
491 passed on to FS for or used by open_files (e.g., compression)
492 """
493 if "r" not in mode:
494 raise ValueError("Can only ensure local files when reading")
495 of = open_files(url, mode=mode, **storage_options)
496 if not getattr(of[0].fs, "local_file", False):
497 raise ValueError(
498 "open_local can only be used on a filesystem which"
499 " has attribute local_file=True"
500 )
501 with of as files:
502 paths = [f.name for f in files]
503 if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path):
504 return paths[0]
505 return paths
508def get_compression(urlpath, compression):
509 if compression == "infer":
510 compression = infer_compression(urlpath)
511 if compression is not None and compression not in compr:
512 raise ValueError(f"Compression type {compression} not supported")
513 return compression
516def split_protocol(urlpath):
517 """Return protocol, path pair"""
518 urlpath = stringify_path(urlpath)
519 if "://" in urlpath:
520 protocol, path = urlpath.split("://", 1)
521 if len(protocol) > 1:
522 # excludes Windows paths
523 return protocol, path
524 if urlpath.startswith("data:"):
525 return urlpath.split(":", 1)
526 return None, urlpath
529def strip_protocol(urlpath):
530 """Return only path part of full URL, according to appropriate backend"""
531 protocol, _ = split_protocol(urlpath)
532 cls = get_filesystem_class(protocol)
533 return cls._strip_protocol(urlpath)
536def expand_paths_if_needed(paths, mode, num, fs, name_function):
537 """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]``
538 in them (read mode).
540 :param paths: list of paths
541 mode: str
542 Mode in which to open files.
543 num: int
544 If opening in writing mode, number of files we expect to create.
545 fs: filesystem object
546 name_function: callable
547 If opening in writing mode, this callable is used to generate path
548 names. Names are generated for each partition by
549 ``urlpath.replace('*', name_function(partition_index))``.
550 :return: list of paths
551 """
552 expanded_paths = []
553 paths = list(paths)
555 if "w" in mode: # read mode
556 if sum([1 for p in paths if "*" in p]) > 1:
557 raise ValueError(
558 "When writing data, only one filename mask can be specified."
559 )
560 num = max(num, len(paths))
562 for curr_path in paths:
563 if "*" in curr_path:
564 # expand using name_function
565 expanded_paths.extend(_expand_paths(curr_path, name_function, num))
566 else:
567 expanded_paths.append(curr_path)
568 # if we generated more paths that asked for, trim the list
569 if len(expanded_paths) > num:
570 expanded_paths = expanded_paths[:num]
572 else: # read mode
573 for curr_path in paths:
574 if has_magic(curr_path):
575 # expand using glob
576 expanded_paths.extend(fs.glob(curr_path))
577 else:
578 expanded_paths.append(curr_path)
580 return expanded_paths
583def get_fs_token_paths(
584 urlpath,
585 mode="rb",
586 num=1,
587 name_function=None,
588 storage_options=None,
589 protocol=None,
590 expand=True,
591):
592 """Filesystem, deterministic token, and paths from a urlpath and options.
594 Parameters
595 ----------
596 urlpath: string or iterable
597 Absolute or relative filepath, URL (may include protocols like
598 ``s3://``), or globstring pointing to data.
599 mode: str, optional
600 Mode in which to open files.
601 num: int, optional
602 If opening in writing mode, number of files we expect to create.
603 name_function: callable, optional
604 If opening in writing mode, this callable is used to generate path
605 names. Names are generated for each partition by
606 ``urlpath.replace('*', name_function(partition_index))``.
607 storage_options: dict, optional
608 Additional keywords to pass to the filesystem class.
609 protocol: str or None
610 To override the protocol specifier in the URL
611 expand: bool
612 Expand string paths for writing, assuming the path is a directory
613 """
614 if isinstance(urlpath, (list, tuple, set)):
615 if not urlpath:
616 raise ValueError("empty urlpath sequence")
617 urlpath0 = stringify_path(list(urlpath)[0])
618 else:
619 urlpath0 = stringify_path(urlpath)
620 storage_options = storage_options or {}
621 if protocol:
622 storage_options["protocol"] = protocol
623 chain = _un_chain(urlpath0, storage_options or {})
624 inkwargs = {}
625 # Reverse iterate the chain, creating a nested target_* structure
626 for i, ch in enumerate(reversed(chain)):
627 urls, nested_protocol, kw = ch
628 if i == len(chain) - 1:
629 inkwargs = dict(**kw, **inkwargs)
630 continue
631 inkwargs["target_options"] = dict(**kw, **inkwargs)
632 inkwargs["target_protocol"] = nested_protocol
633 inkwargs["fo"] = urls
634 paths, protocol, _ = chain[0]
635 fs = filesystem(protocol, **inkwargs)
636 if isinstance(urlpath, (list, tuple, set)):
637 pchains = [
638 _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath
639 ]
640 if len({pc[1] for pc in pchains}) > 1:
641 raise ValueError("Protocol mismatch getting fs from %s", urlpath)
642 paths = [pc[0] for pc in pchains]
643 else:
644 paths = fs._strip_protocol(paths)
645 if isinstance(paths, (list, tuple, set)):
646 paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
647 else:
648 if "w" in mode and expand:
649 paths = _expand_paths(paths, name_function, num)
650 elif "x" in mode and expand:
651 paths = _expand_paths(paths, name_function, num)
652 elif "*" in paths:
653 paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
654 else:
655 paths = [paths]
657 return fs, fs._fs_token, paths
660def _expand_paths(path, name_function, num):
661 if isinstance(path, str):
662 if path.count("*") > 1:
663 raise ValueError("Output path spec must contain exactly one '*'.")
664 elif "*" not in path:
665 path = os.path.join(path, "*.part")
667 if name_function is None:
668 name_function = build_name_function(num - 1)
670 paths = [path.replace("*", name_function(i)) for i in range(num)]
671 if paths != sorted(paths):
672 logger.warning(
673 "In order to preserve order between partitions"
674 " paths created with ``name_function`` should "
675 "sort to partition order"
676 )
677 elif isinstance(path, (tuple, list)):
678 assert len(path) == num
679 paths = list(path)
680 else:
681 raise ValueError(
682 "Path should be either\n"
683 "1. A list of paths: ['foo.json', 'bar.json', ...]\n"
684 "2. A directory: 'foo/\n"
685 "3. A path with a '*' in it: 'foo.*.json'"
686 )
687 return paths
690class PickleableTextIOWrapper(io.TextIOWrapper):
691 """TextIOWrapper cannot be pickled. This solves it.
693 Requires that ``buffer`` be pickleable, which all instances of
694 AbstractBufferedFile are.
695 """
697 def __init__(
698 self,
699 buffer,
700 encoding=None,
701 errors=None,
702 newline=None,
703 line_buffering=False,
704 write_through=False,
705 ):
706 self.args = buffer, encoding, errors, newline, line_buffering, write_through
707 super().__init__(*self.args)
709 def __reduce__(self):
710 return PickleableTextIOWrapper, self.args