Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/core.py: 18%
230 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1from __future__ import absolute_import, division, print_function
3import io
4import logging
5import os
6import re
7from glob import has_magic
9# for backwards compat, we export cache things from here too
10from .caching import ( # noqa: F401
11 BaseCache,
12 BlockCache,
13 BytesCache,
14 MMapCache,
15 ReadAheadCache,
16 caches,
17)
18from .compression import compr
19from .registry import filesystem, get_filesystem_class
20from .utils import (
21 _unstrip_protocol,
22 build_name_function,
23 infer_compression,
24 stringify_path,
25)
27logger = logging.getLogger("fsspec")
30class OpenFile:
31 """
32 File-like object to be used in a context
34 Can layer (buffered) text-mode and compression over any file-system, which
35 are typically binary-only.
37 These instances are safe to serialize, as the low-level file object
38 is not created until invoked using ``with``.
40 Parameters
41 ----------
42 fs: FileSystem
43 The file system to use for opening the file. Should be a subclass or duck-type
44 with ``fsspec.spec.AbstractFileSystem``
45 path: str
46 Location to open
47 mode: str like 'rb', optional
48 Mode of the opened file
49 compression: str or None, optional
50 Compression to apply
51 encoding: str or None, optional
52 The encoding to use if opened in text mode.
53 errors: str or None, optional
54 How to handle encoding errors if opened in text mode.
55 newline: None or str
56 Passed to TextIOWrapper in text mode, how to handle line endings.
57 autoopen: bool
58 If True, calls open() immediately. Mostly used by pickle
59 pos: int
60 If given and autoopen is True, seek to this location immediately
61 """
63 def __init__(
64 self,
65 fs,
66 path,
67 mode="rb",
68 compression=None,
69 encoding=None,
70 errors=None,
71 newline=None,
72 ):
73 self.fs = fs
74 self.path = path
75 self.mode = mode
76 self.compression = get_compression(path, compression)
77 self.encoding = encoding
78 self.errors = errors
79 self.newline = newline
80 self.fobjects = []
82 def __reduce__(self):
83 return (
84 OpenFile,
85 (
86 self.fs,
87 self.path,
88 self.mode,
89 self.compression,
90 self.encoding,
91 self.errors,
92 self.newline,
93 ),
94 )
96 def __repr__(self):
97 return "<OpenFile '{}'>".format(self.path)
99 def __enter__(self):
100 mode = self.mode.replace("t", "").replace("b", "") + "b"
102 f = self.fs.open(self.path, mode=mode)
104 self.fobjects = [f]
106 if self.compression is not None:
107 compress = compr[self.compression]
108 f = compress(f, mode=mode[0])
109 self.fobjects.append(f)
111 if "b" not in self.mode:
112 # assume, for example, that 'r' is equivalent to 'rt' as in builtin
113 f = PickleableTextIOWrapper(
114 f, encoding=self.encoding, errors=self.errors, newline=self.newline
115 )
116 self.fobjects.append(f)
118 return self.fobjects[-1]
120 def __exit__(self, *args):
121 self.close()
123 @property
124 def full_name(self):
125 return _unstrip_protocol(self.path, self.fs)
127 def open(self):
128 """Materialise this as a real open file without context
130 The OpenFile object should be explicitly closed to avoid enclosed file
131 instances persisting. You must, therefore, keep a reference to the OpenFile
132 during the life of the file-like it generates.
133 """
134 return self.__enter__()
136 def close(self):
137 """Close all encapsulated file objects"""
138 for f in reversed(self.fobjects):
139 if "r" not in self.mode and not f.closed:
140 f.flush()
141 f.close()
142 self.fobjects.clear()
145class OpenFiles(list):
146 """List of OpenFile instances
148 Can be used in a single context, which opens and closes all of the
149 contained files. Normal list access to get the elements works as
150 normal.
152 A special case is made for caching filesystems - the files will
153 be down/uploaded together at the start or end of the context, and
154 this may happen concurrently, if the target filesystem supports it.
155 """
157 def __init__(self, *args, mode="rb", fs=None):
158 self.mode = mode
159 self.fs = fs
160 self.files = []
161 super().__init__(*args)
163 def __enter__(self):
164 if self.fs is None:
165 raise ValueError("Context has already been used")
167 fs = self.fs
168 while True:
169 if hasattr(fs, "open_many"):
170 # check for concurrent cache download; or set up for upload
171 self.files = fs.open_many(self)
172 return self.files
173 if hasattr(fs, "fs") and fs.fs is not None:
174 fs = fs.fs
175 else:
176 break
177 return [s.__enter__() for s in self]
179 def __exit__(self, *args):
180 fs = self.fs
181 [s.__exit__(*args) for s in self]
182 if "r" not in self.mode:
183 while True:
184 if hasattr(fs, "open_many"):
185 # check for concurrent cache upload
186 fs.commit_many(self.files)
187 return
188 if hasattr(fs, "fs") and fs.fs is not None:
189 fs = fs.fs
190 else:
191 break
193 def __getitem__(self, item):
194 out = super().__getitem__(item)
195 if isinstance(item, slice):
196 return OpenFiles(out, mode=self.mode, fs=self.fs)
197 return out
199 def __repr__(self):
200 return "<List of %s OpenFile instances>" % len(self)
203def open_files(
204 urlpath,
205 mode="rb",
206 compression=None,
207 encoding="utf8",
208 errors=None,
209 name_function=None,
210 num=1,
211 protocol=None,
212 newline=None,
213 auto_mkdir=True,
214 expand=True,
215 **kwargs,
216):
217 """Given a path or paths, return a list of ``OpenFile`` objects.
219 For writing, a str path must contain the "*" character, which will be filled
220 in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2.
222 For either reading or writing, can instead provide explicit list of paths.
224 Parameters
225 ----------
226 urlpath: string or list
227 Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
228 to read from alternative filesystems. To read from multiple files you
229 can pass a globstring or a list of paths, with the caveat that they
230 must all have the same protocol.
231 mode: 'rb', 'wt', etc.
232 compression: string or None
233 If given, open file using compression codec. Can either be a compression
234 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
235 compression from the filename suffix.
236 encoding: str
237 For text mode only
238 errors: None or str
239 Passed to TextIOWrapper in text mode
240 name_function: function or None
241 if opening a set of files for writing, those files do not yet exist,
242 so we need to generate their names by formatting the urlpath for
243 each sequence number
244 num: int [1]
245 if writing mode, number of files we expect to create (passed to
246 name+function)
247 protocol: str or None
248 If given, overrides the protocol found in the URL.
249 newline: bytes or None
250 Used for line terminator in text mode. If None, uses system default;
251 if blank, uses no translation.
252 auto_mkdir: bool (True)
253 If in write mode, this will ensure the target directory exists before
254 writing, by calling ``fs.mkdirs(exist_ok=True)``.
255 expand: bool
256 **kwargs: dict
257 Extra options that make sense to a particular storage connection, e.g.
258 host, port, username, password, etc.
260 Examples
261 --------
262 >>> files = open_files('2015-*-*.csv') # doctest: +SKIP
263 >>> files = open_files(
264 ... 's3://bucket/2015-*-*.csv.gz', compression='gzip'
265 ... ) # doctest: +SKIP
267 Returns
268 -------
269 An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can
270 be used as a single context
272 Notes
273 -----
274 For a full list of the available protocols and the implementations that
275 they map across to see the latest online documentation:
277 - For implementations built into ``fsspec`` see
278 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
279 - For implementations in separate packages see
280 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
281 """
282 fs, fs_token, paths = get_fs_token_paths(
283 urlpath,
284 mode,
285 num=num,
286 name_function=name_function,
287 storage_options=kwargs,
288 protocol=protocol,
289 expand=expand,
290 )
291 if "r" not in mode and auto_mkdir:
292 parents = {fs._parent(path) for path in paths}
293 [fs.makedirs(parent, exist_ok=True) for parent in parents]
294 return OpenFiles(
295 [
296 OpenFile(
297 fs,
298 path,
299 mode=mode,
300 compression=compression,
301 encoding=encoding,
302 errors=errors,
303 newline=newline,
304 )
305 for path in paths
306 ],
307 mode=mode,
308 fs=fs,
309 )
312def _un_chain(path, kwargs):
313 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
314 bits = (
315 [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")]
316 if "::" in path
317 else [path]
318 )
319 # [[url, protocol, kwargs], ...]
320 out = []
321 previous_bit = None
322 kwargs = kwargs.copy()
323 for bit in reversed(bits):
324 protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file"
325 cls = get_filesystem_class(protocol)
326 extra_kwargs = cls._get_kwargs_from_urls(bit)
327 kws = kwargs.pop(protocol, {})
328 if bit is bits[0]:
329 kws.update(kwargs)
330 kw = dict(**extra_kwargs, **kws)
331 bit = cls._strip_protocol(bit)
332 if (
333 protocol in {"blockcache", "filecache", "simplecache"}
334 and "target_protocol" not in kw
335 ):
336 bit = previous_bit
337 out.append((bit, protocol, kw))
338 previous_bit = bit
339 out = list(reversed(out))
340 return out
343def url_to_fs(url, **kwargs):
344 """
345 Turn fully-qualified and potentially chained URL into filesystem instance
347 Parameters
348 ----------
349 url : str
350 The fsspec-compatible URL
351 **kwargs: dict
352 Extra options that make sense to a particular storage connection, e.g.
353 host, port, username, password, etc.
355 Returns
356 -------
357 filesystem : FileSystem
358 The new filesystem discovered from ``url`` and created with
359 ``**kwargs``.
360 urlpath : str
361 The file-systems-specific URL for ``url``.
362 """
363 chain = _un_chain(url, kwargs)
364 inkwargs = {}
365 # Reverse iterate the chain, creating a nested target_* structure
366 for i, ch in enumerate(reversed(chain)):
367 urls, protocol, kw = ch
368 if i == len(chain) - 1:
369 inkwargs = dict(**kw, **inkwargs)
370 continue
371 inkwargs["target_options"] = dict(**kw, **inkwargs)
372 inkwargs["target_protocol"] = protocol
373 inkwargs["fo"] = urls
374 urlpath, protocol, _ = chain[0]
375 fs = filesystem(protocol, **inkwargs)
376 return fs, urlpath
379def open(
380 urlpath,
381 mode="rb",
382 compression=None,
383 encoding="utf8",
384 errors=None,
385 protocol=None,
386 newline=None,
387 **kwargs,
388):
389 """Given a path or paths, return one ``OpenFile`` object.
391 Parameters
392 ----------
393 urlpath: string or list
394 Absolute or relative filepath. Prefix with a protocol like ``s3://``
395 to read from alternative filesystems. Should not include glob
396 character(s).
397 mode: 'rb', 'wt', etc.
398 compression: string or None
399 If given, open file using compression codec. Can either be a compression
400 name (a key in ``fsspec.compression.compr``) or "infer" to guess the
401 compression from the filename suffix.
402 encoding: str
403 For text mode only
404 errors: None or str
405 Passed to TextIOWrapper in text mode
406 protocol: str or None
407 If given, overrides the protocol found in the URL.
408 newline: bytes or None
409 Used for line terminator in text mode. If None, uses system default;
410 if blank, uses no translation.
411 **kwargs: dict
412 Extra options that make sense to a particular storage connection, e.g.
413 host, port, username, password, etc.
415 Examples
416 --------
417 >>> openfile = open('2015-01-01.csv') # doctest: +SKIP
418 >>> openfile = open(
419 ... 's3://bucket/2015-01-01.csv.gz', compression='gzip'
420 ... ) # doctest: +SKIP
421 >>> with openfile as f:
422 ... df = pd.read_csv(f) # doctest: +SKIP
423 ...
425 Returns
426 -------
427 ``OpenFile`` object.
429 Notes
430 -----
431 For a full list of the available protocols and the implementations that
432 they map across to see the latest online documentation:
434 - For implementations built into ``fsspec`` see
435 https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
436 - For implementations in separate packages see
437 https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
438 """
439 return open_files(
440 urlpath=[urlpath],
441 mode=mode,
442 compression=compression,
443 encoding=encoding,
444 errors=errors,
445 protocol=protocol,
446 newline=newline,
447 expand=False,
448 **kwargs,
449 )[0]
452def open_local(url, mode="rb", **storage_options):
453 """Open file(s) which can be resolved to local
455 For files which either are local, or get downloaded upon open
456 (e.g., by file caching)
458 Parameters
459 ----------
460 url: str or list(str)
461 mode: str
462 Must be read mode
463 storage_options:
464 passed on to FS for or used by open_files (e.g., compression)
465 """
466 if "r" not in mode:
467 raise ValueError("Can only ensure local files when reading")
468 of = open_files(url, mode=mode, **storage_options)
469 if not getattr(of[0].fs, "local_file", False):
470 raise ValueError(
471 "open_local can only be used on a filesystem which"
472 " has attribute local_file=True"
473 )
474 with of as files:
475 paths = [f.name for f in files]
476 if isinstance(url, str) and not has_magic(url):
477 return paths[0]
478 return paths
481def get_compression(urlpath, compression):
482 if compression == "infer":
483 compression = infer_compression(urlpath)
484 if compression is not None and compression not in compr:
485 raise ValueError("Compression type %s not supported" % compression)
486 return compression
489def split_protocol(urlpath):
490 """Return protocol, path pair"""
491 urlpath = stringify_path(urlpath)
492 if "://" in urlpath:
493 protocol, path = urlpath.split("://", 1)
494 if len(protocol) > 1:
495 # excludes Windows paths
496 return protocol, path
497 return None, urlpath
500def strip_protocol(urlpath):
501 """Return only path part of full URL, according to appropriate backend"""
502 protocol, _ = split_protocol(urlpath)
503 cls = get_filesystem_class(protocol)
504 return cls._strip_protocol(urlpath)
507def expand_paths_if_needed(paths, mode, num, fs, name_function):
508 """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]``
509 in them (read mode).
511 :param paths: list of paths
512 mode: str
513 Mode in which to open files.
514 num: int
515 If opening in writing mode, number of files we expect to create.
516 fs: filesystem object
517 name_function: callable
518 If opening in writing mode, this callable is used to generate path
519 names. Names are generated for each partition by
520 ``urlpath.replace('*', name_function(partition_index))``.
521 :return: list of paths
522 """
523 expanded_paths = []
524 paths = list(paths)
526 if "w" in mode: # read mode
527 if sum([1 for p in paths if "*" in p]) > 1:
528 raise ValueError(
529 "When writing data, only one filename mask can be specified."
530 )
531 num = max(num, len(paths))
533 for curr_path in paths:
534 if "*" in curr_path:
535 # expand using name_function
536 expanded_paths.extend(_expand_paths(curr_path, name_function, num))
537 else:
538 expanded_paths.append(curr_path)
539 # if we generated more paths that asked for, trim the list
540 if len(expanded_paths) > num:
541 expanded_paths = expanded_paths[:num]
543 else: # read mode
544 for curr_path in paths:
545 if has_magic(curr_path):
546 # expand using glob
547 expanded_paths.extend(fs.glob(curr_path))
548 else:
549 expanded_paths.append(curr_path)
551 return expanded_paths
554def get_fs_token_paths(
555 urlpath,
556 mode="rb",
557 num=1,
558 name_function=None,
559 storage_options=None,
560 protocol=None,
561 expand=True,
562):
563 """Filesystem, deterministic token, and paths from a urlpath and options.
565 Parameters
566 ----------
567 urlpath: string or iterable
568 Absolute or relative filepath, URL (may include protocols like
569 ``s3://``), or globstring pointing to data.
570 mode: str, optional
571 Mode in which to open files.
572 num: int, optional
573 If opening in writing mode, number of files we expect to create.
574 name_function: callable, optional
575 If opening in writing mode, this callable is used to generate path
576 names. Names are generated for each partition by
577 ``urlpath.replace('*', name_function(partition_index))``.
578 storage_options: dict, optional
579 Additional keywords to pass to the filesystem class.
580 protocol: str or None
581 To override the protocol specifier in the URL
582 expand: bool
583 Expand string paths for writing, assuming the path is a directory
584 """
585 if isinstance(urlpath, (list, tuple, set)):
586 if not urlpath:
587 raise ValueError("empty urlpath sequence")
588 urlpath0 = stringify_path(list(urlpath)[0])
589 else:
590 urlpath0 = stringify_path(urlpath)
591 storage_options = storage_options or {}
592 if protocol:
593 storage_options["protocol"] = protocol
594 chain = _un_chain(urlpath0, storage_options or {})
595 inkwargs = {}
596 # Reverse iterate the chain, creating a nested target_* structure
597 for i, ch in enumerate(reversed(chain)):
598 urls, nested_protocol, kw = ch
599 if i == len(chain) - 1:
600 inkwargs = dict(**kw, **inkwargs)
601 continue
602 inkwargs["target_options"] = dict(**kw, **inkwargs)
603 inkwargs["target_protocol"] = nested_protocol
604 inkwargs["fo"] = urls
605 paths, protocol, _ = chain[0]
606 fs = filesystem(protocol, **inkwargs)
607 if isinstance(urlpath, (list, tuple, set)):
608 pchains = [
609 _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath
610 ]
611 if len(set(pc[1] for pc in pchains)) > 1:
612 raise ValueError("Protocol mismatch getting fs from %s", urlpath)
613 paths = [pc[0] for pc in pchains]
614 else:
615 paths = fs._strip_protocol(paths)
616 if isinstance(paths, (list, tuple, set)):
617 paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
618 else:
619 if "w" in mode and expand:
620 paths = _expand_paths(paths, name_function, num)
621 elif "*" in paths:
622 paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
623 else:
624 paths = [paths]
626 return fs, fs._fs_token, paths
629def _expand_paths(path, name_function, num):
630 if isinstance(path, str):
631 if path.count("*") > 1:
632 raise ValueError("Output path spec must contain exactly one '*'.")
633 elif "*" not in path:
634 path = os.path.join(path, "*.part")
636 if name_function is None:
637 name_function = build_name_function(num - 1)
639 paths = [path.replace("*", name_function(i)) for i in range(num)]
640 if paths != sorted(paths):
641 logger.warning(
642 "In order to preserve order between partitions"
643 " paths created with ``name_function`` should "
644 "sort to partition order"
645 )
646 elif isinstance(path, (tuple, list)):
647 assert len(path) == num
648 paths = list(path)
649 else:
650 raise ValueError(
651 "Path should be either\n"
652 "1. A list of paths: ['foo.json', 'bar.json', ...]\n"
653 "2. A directory: 'foo/\n"
654 "3. A path with a '*' in it: 'foo.*.json'"
655 )
656 return paths
659class PickleableTextIOWrapper(io.TextIOWrapper):
660 """TextIOWrapper cannot be pickled. This solves it.
662 Requires that ``buffer`` be pickleable, which all instances of
663 AbstractBufferedFile are.
664 """
666 def __init__(
667 self,
668 buffer,
669 encoding=None,
670 errors=None,
671 newline=None,
672 line_buffering=False,
673 write_through=False,
674 ):
675 self.args = buffer, encoding, errors, newline, line_buffering, write_through
676 super().__init__(*self.args)
678 def __reduce__(self):
679 return PickleableTextIOWrapper, self.args