Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/utils.py: 14%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import contextlib
4import logging
5import math
6import os
7import pathlib
8import re
9import sys
10import tempfile
11from functools import partial
12from hashlib import md5
13from importlib.metadata import version
14from typing import (
15 IO,
16 TYPE_CHECKING,
17 Any,
18 Callable,
19 Iterable,
20 Iterator,
21 Sequence,
22 TypeVar,
23)
24from urllib.parse import urlsplit
26if TYPE_CHECKING:
27 from typing_extensions import TypeGuard
29 from fsspec.spec import AbstractFileSystem
32DEFAULT_BLOCK_SIZE = 5 * 2**20
34T = TypeVar("T")
37def infer_storage_options(
38 urlpath: str, inherit_storage_options: dict[str, Any] | None = None
39) -> dict[str, Any]:
40 """Infer storage options from URL path and merge it with existing storage
41 options.
43 Parameters
44 ----------
45 urlpath: str or unicode
46 Either local absolute file path or URL (hdfs://namenode:8020/file.csv)
47 inherit_storage_options: dict (optional)
48 Its contents will get merged with the inferred information from the
49 given path
51 Returns
52 -------
53 Storage options dict.
55 Examples
56 --------
57 >>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP
58 {"protocol": "file", "path", "/mnt/datasets/test.csv"}
59 >>> infer_storage_options(
60 ... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1',
61 ... inherit_storage_options={'extra': 'value'},
62 ... ) # doctest: +SKIP
63 {"protocol": "hdfs", "username": "username", "password": "pwd",
64 "host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
65 "url_query": "q=1", "extra": "value"}
66 """
67 # Handle Windows paths including disk name in this special case
68 if (
69 re.match(r"^[a-zA-Z]:[\\/]", urlpath)
70 or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None
71 ):
72 return {"protocol": "file", "path": urlpath}
74 parsed_path = urlsplit(urlpath)
75 protocol = parsed_path.scheme or "file"
76 if parsed_path.fragment:
77 path = "#".join([parsed_path.path, parsed_path.fragment])
78 else:
79 path = parsed_path.path
80 if protocol == "file":
81 # Special case parsing file protocol URL on Windows according to:
82 # https://msdn.microsoft.com/en-us/library/jj710207.aspx
83 windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path)
84 if windows_path:
85 path = "%s:%s" % windows_path.groups()
87 if protocol in ["http", "https"]:
88 # for HTTP, we don't want to parse, as requests will anyway
89 return {"protocol": protocol, "path": urlpath}
91 options: dict[str, Any] = {"protocol": protocol, "path": path}
93 if parsed_path.netloc:
94 # Parse `hostname` from netloc manually because `parsed_path.hostname`
95 # lowercases the hostname which is not always desirable (e.g. in S3):
96 # https://github.com/dask/dask/issues/1417
97 options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0]
99 if protocol in ("s3", "s3a", "gcs", "gs"):
100 options["path"] = options["host"] + options["path"]
101 else:
102 options["host"] = options["host"]
103 if parsed_path.port:
104 options["port"] = parsed_path.port
105 if parsed_path.username:
106 options["username"] = parsed_path.username
107 if parsed_path.password:
108 options["password"] = parsed_path.password
110 if parsed_path.query:
111 options["url_query"] = parsed_path.query
112 if parsed_path.fragment:
113 options["url_fragment"] = parsed_path.fragment
115 if inherit_storage_options:
116 update_storage_options(options, inherit_storage_options)
118 return options
121def update_storage_options(
122 options: dict[str, Any], inherited: dict[str, Any] | None = None
123) -> None:
124 if not inherited:
125 inherited = {}
126 collisions = set(options) & set(inherited)
127 if collisions:
128 for collision in collisions:
129 if options.get(collision) != inherited.get(collision):
130 raise KeyError(
131 f"Collision between inferred and specified storage "
132 f"option:\n{collision}"
133 )
134 options.update(inherited)
137# Compression extensions registered via fsspec.compression.register_compression
138compressions: dict[str, str] = {}
141def infer_compression(filename: str) -> str | None:
142 """Infer compression, if available, from filename.
144 Infer a named compression type, if registered and available, from filename
145 extension. This includes builtin (gz, bz2, zip) compressions, as well as
146 optional compressions. See fsspec.compression.register_compression.
147 """
148 extension = os.path.splitext(filename)[-1].strip(".").lower()
149 if extension in compressions:
150 return compressions[extension]
151 return None
154def build_name_function(max_int: float) -> Callable[[int], str]:
155 """Returns a function that receives a single integer
156 and returns it as a string padded by enough zero characters
157 to align with maximum possible integer
159 >>> name_f = build_name_function(57)
161 >>> name_f(7)
162 '07'
163 >>> name_f(31)
164 '31'
165 >>> build_name_function(1000)(42)
166 '0042'
167 >>> build_name_function(999)(42)
168 '042'
169 >>> build_name_function(0)(0)
170 '0'
171 """
172 # handle corner cases max_int is 0 or exact power of 10
173 max_int += 1e-8
175 pad_length = int(math.ceil(math.log10(max_int)))
177 def name_function(i: int) -> str:
178 return str(i).zfill(pad_length)
180 return name_function
183def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool:
184 r"""Seek current file to file start, file end, or byte after delimiter seq.
186 Seeks file to next chunk delimiter, where chunks are defined on file start,
187 a delimiting sequence, and file end. Use file.tell() to see location afterwards.
188 Note that file start is a valid split, so must be at offset > 0 to seek for
189 delimiter.
191 Parameters
192 ----------
193 file: a file
194 delimiter: bytes
195 a delimiter like ``b'\n'`` or message sentinel, matching file .read() type
196 blocksize: int
197 Number of bytes to read from the file at once.
200 Returns
201 -------
202 Returns True if a delimiter was found, False if at file start or end.
204 """
206 if file.tell() == 0:
207 # beginning-of-file, return without seek
208 return False
210 # Interface is for binary IO, with delimiter as bytes, but initialize last
211 # with result of file.read to preserve compatibility with text IO.
212 last: bytes | None = None
213 while True:
214 current = file.read(blocksize)
215 if not current:
216 # end-of-file without delimiter
217 return False
218 full = last + current if last else current
219 try:
220 if delimiter in full:
221 i = full.index(delimiter)
222 file.seek(file.tell() - (len(full) - i) + len(delimiter))
223 return True
224 elif len(current) < blocksize:
225 # end-of-file without delimiter
226 return False
227 except (OSError, ValueError):
228 pass
229 last = full[-len(delimiter) :]
232def read_block(
233 f: IO[bytes],
234 offset: int,
235 length: int | None,
236 delimiter: bytes | None = None,
237 split_before: bool = False,
238) -> bytes:
239 """Read a block of bytes from a file
241 Parameters
242 ----------
243 f: File
244 Open file
245 offset: int
246 Byte offset to start read
247 length: int
248 Number of bytes to read, read through end of file if None
249 delimiter: bytes (optional)
250 Ensure reading starts and stops at delimiter bytestring
251 split_before: bool (optional)
252 Start/stop read *before* delimiter bytestring.
255 If using the ``delimiter=`` keyword argument we ensure that the read
256 starts and stops at delimiter boundaries that follow the locations
257 ``offset`` and ``offset + length``. If ``offset`` is zero then we
258 start at zero, regardless of delimiter. The bytestring returned WILL
259 include the terminating delimiter string.
261 Examples
262 --------
264 >>> from io import BytesIO # doctest: +SKIP
265 >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
266 >>> read_block(f, 0, 13) # doctest: +SKIP
267 b'Alice, 100\\nBo'
269 >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
270 b'Alice, 100\\nBob, 200\\n'
272 >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
273 b'Bob, 200\\nCharlie, 300'
274 """
275 if delimiter:
276 f.seek(offset)
277 found_start_delim = seek_delimiter(f, delimiter, 2**16)
278 if length is None:
279 return f.read()
280 start = f.tell()
281 length -= start - offset
283 f.seek(start + length)
284 found_end_delim = seek_delimiter(f, delimiter, 2**16)
285 end = f.tell()
287 # Adjust split location to before delimiter if seek found the
288 # delimiter sequence, not start or end of file.
289 if found_start_delim and split_before:
290 start -= len(delimiter)
292 if found_end_delim and split_before:
293 end -= len(delimiter)
295 offset = start
296 length = end - start
298 f.seek(offset)
300 # TODO: allow length to be None and read to the end of the file?
301 assert length is not None
302 b = f.read(length)
303 return b
306def tokenize(*args: Any, **kwargs: Any) -> str:
307 """Deterministic token
309 (modified from dask.base)
311 >>> tokenize([1, 2, '3'])
312 '9d71491b50023b06fc76928e6eddb952'
314 >>> tokenize('Hello') == tokenize('Hello')
315 True
316 """
317 if kwargs:
318 args += (kwargs,)
319 try:
320 h = md5(str(args).encode())
321 except ValueError:
322 # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380
323 h = md5(str(args).encode(), usedforsecurity=False)
324 return h.hexdigest()
327def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str:
328 """Attempt to convert a path-like object to a string.
330 Parameters
331 ----------
332 filepath: object to be converted
334 Returns
335 -------
336 filepath_str: maybe a string version of the object
338 Notes
339 -----
340 Objects supporting the fspath protocol are coerced according to its
341 __fspath__ method.
343 For backwards compatibility with older Python version, pathlib.Path
344 objects are specially coerced.
346 Any other object is passed through unchanged, which includes bytes,
347 strings, buffers, or anything else that's not even path-like.
348 """
349 if isinstance(filepath, str):
350 return filepath
351 elif hasattr(filepath, "__fspath__"):
352 return filepath.__fspath__()
353 elif hasattr(filepath, "path"):
354 return filepath.path
355 else:
356 return filepath # type: ignore[return-value]
359def make_instance(
360 cls: Callable[..., T], args: Sequence[Any], kwargs: dict[str, Any]
361) -> T:
362 inst = cls(*args, **kwargs)
363 inst._determine_worker() # type: ignore[attr-defined]
364 return inst
367def common_prefix(paths: Iterable[str]) -> str:
368 """For a list of paths, find the shortest prefix common to all"""
369 parts = [p.split("/") for p in paths]
370 lmax = min(len(p) for p in parts)
371 end = 0
372 for i in range(lmax):
373 end = all(p[i] == parts[0][i] for p in parts)
374 if not end:
375 break
376 i += end
377 return "/".join(parts[0][:i])
380def other_paths(
381 paths: list[str],
382 path2: str | list[str],
383 exists: bool = False,
384 flatten: bool = False,
385) -> list[str]:
386 """In bulk file operations, construct a new file tree from a list of files
388 Parameters
389 ----------
390 paths: list of str
391 The input file tree
392 path2: str or list of str
393 Root to construct the new list in. If this is already a list of str, we just
394 assert it has the right number of elements.
395 exists: bool (optional)
396 For a str destination, it is already exists (and is a dir), files should
397 end up inside.
398 flatten: bool (optional)
399 Whether to flatten the input directory tree structure so that the output files
400 are in the same directory.
402 Returns
403 -------
404 list of str
405 """
407 if isinstance(path2, str):
408 path2 = path2.rstrip("/")
410 if flatten:
411 path2 = ["/".join((path2, p.split("/")[-1])) for p in paths]
412 else:
413 cp = common_prefix(paths)
414 if exists:
415 cp = cp.rsplit("/", 1)[0]
416 if not cp and all(not s.startswith("/") for s in paths):
417 path2 = ["/".join([path2, p]) for p in paths]
418 else:
419 path2 = [p.replace(cp, path2, 1) for p in paths]
420 else:
421 assert len(paths) == len(path2)
422 return path2
425def is_exception(obj: Any) -> bool:
426 return isinstance(obj, BaseException)
429def isfilelike(f: Any) -> TypeGuard[IO[bytes]]:
430 for attr in ["read", "close", "tell"]:
431 if not hasattr(f, attr):
432 return False
433 return True
436def get_protocol(url: str) -> str:
437 url = stringify_path(url)
438 parts = re.split(r"(\:\:|\://)", url, maxsplit=1)
439 if len(parts) > 1:
440 return parts[0]
441 return "file"
444def can_be_local(path: str) -> bool:
445 """Can the given URL be used with open_local?"""
446 from fsspec import get_filesystem_class
448 try:
449 return getattr(get_filesystem_class(get_protocol(path)), "local_file", False)
450 except (ValueError, ImportError):
451 # not in registry or import failed
452 return False
455def get_package_version_without_import(name: str) -> str | None:
456 """For given package name, try to find the version without importing it
458 Import and package.__version__ is still the backup here, so an import
459 *might* happen.
461 Returns either the version string, or None if the package
462 or the version was not readily found.
463 """
464 if name in sys.modules:
465 mod = sys.modules[name]
466 if hasattr(mod, "__version__"):
467 return mod.__version__
468 try:
469 return version(name)
470 except: # noqa: E722
471 pass
472 try:
473 import importlib
475 mod = importlib.import_module(name)
476 return mod.__version__
477 except (ImportError, AttributeError):
478 return None
481def setup_logging(
482 logger: logging.Logger | None = None,
483 logger_name: str | None = None,
484 level: str = "DEBUG",
485 clear: bool = True,
486) -> logging.Logger:
487 if logger is None and logger_name is None:
488 raise ValueError("Provide either logger object or logger name")
489 logger = logger or logging.getLogger(logger_name)
490 handle = logging.StreamHandler()
491 formatter = logging.Formatter(
492 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s"
493 )
494 handle.setFormatter(formatter)
495 if clear:
496 logger.handlers.clear()
497 logger.addHandler(handle)
498 logger.setLevel(level)
499 return logger
502def _unstrip_protocol(name: str, fs: AbstractFileSystem) -> str:
503 return fs.unstrip_protocol(name)
506def mirror_from(
507 origin_name: str, methods: Iterable[str]
508) -> Callable[[type[T]], type[T]]:
509 """Mirror attributes and methods from the given
510 origin_name attribute of the instance to the
511 decorated class"""
513 def origin_getter(method: str, self: Any) -> Any:
514 origin = getattr(self, origin_name)
515 return getattr(origin, method)
517 def wrapper(cls: type[T]) -> type[T]:
518 for method in methods:
519 wrapped_method = partial(origin_getter, method)
520 setattr(cls, method, property(wrapped_method))
521 return cls
523 return wrapper
526@contextlib.contextmanager
527def nullcontext(obj: T) -> Iterator[T]:
528 yield obj
531def merge_offset_ranges(
532 paths: list[str],
533 starts: list[int] | int,
534 ends: list[int] | int,
535 max_gap: int = 0,
536 max_block: int | None = None,
537 sort: bool = True,
538) -> tuple[list[str], list[int], list[int]]:
539 """Merge adjacent byte-offset ranges when the inter-range
540 gap is <= `max_gap`, and when the merged byte range does not
541 exceed `max_block` (if specified). By default, this function
542 will re-order the input paths and byte ranges to ensure sorted
543 order. If the user can guarantee that the inputs are already
544 sorted, passing `sort=False` will skip the re-ordering.
545 """
546 # Check input
547 if not isinstance(paths, list):
548 raise TypeError
549 if not isinstance(starts, list):
550 starts = [starts] * len(paths)
551 if not isinstance(ends, list):
552 ends = [ends] * len(paths)
553 if len(starts) != len(paths) or len(ends) != len(paths):
554 raise ValueError
556 # Early Return
557 if len(starts) <= 1:
558 return paths, starts, ends
560 starts = [s or 0 for s in starts]
561 # Sort by paths and then ranges if `sort=True`
562 if sort:
563 paths, starts, ends = (
564 list(v)
565 for v in zip(
566 *sorted(
567 zip(paths, starts, ends),
568 )
569 )
570 )
572 if paths:
573 # Loop through the coupled `paths`, `starts`, and
574 # `ends`, and merge adjacent blocks when appropriate
575 new_paths = paths[:1]
576 new_starts = starts[:1]
577 new_ends = ends[:1]
578 for i in range(1, len(paths)):
579 if paths[i] == paths[i - 1] and new_ends[-1] is None:
580 continue
581 elif (
582 paths[i] != paths[i - 1]
583 or ((starts[i] - new_ends[-1]) > max_gap)
584 or (max_block is not None and (ends[i] - new_starts[-1]) > max_block)
585 ):
586 # Cannot merge with previous block.
587 # Add new `paths`, `starts`, and `ends` elements
588 new_paths.append(paths[i])
589 new_starts.append(starts[i])
590 new_ends.append(ends[i])
591 else:
592 # Merge with previous block by updating the
593 # last element of `ends`
594 new_ends[-1] = ends[i]
595 return new_paths, new_starts, new_ends
597 # `paths` is empty. Just return input lists
598 return paths, starts, ends
601def file_size(filelike: IO[bytes]) -> int:
602 """Find length of any open read-mode file-like"""
603 pos = filelike.tell()
604 try:
605 return filelike.seek(0, 2)
606 finally:
607 filelike.seek(pos)
610@contextlib.contextmanager
611def atomic_write(path: str, mode: str = "wb"):
612 """
613 A context manager that opens a temporary file next to `path` and, on exit,
614 replaces `path` with the temporary file, thereby updating `path`
615 atomically.
616 """
617 fd, fn = tempfile.mkstemp(
618 dir=os.path.dirname(path), prefix=os.path.basename(path) + "-"
619 )
620 try:
621 with open(fd, mode) as fp:
622 yield fp
623 except BaseException:
624 with contextlib.suppress(FileNotFoundError):
625 os.unlink(fn)
626 raise
627 else:
628 os.replace(fn, path)
631def _translate(pat, STAR, QUESTION_MARK):
632 # Copied from: https://github.com/python/cpython/pull/106703.
633 res: list[str] = []
634 add = res.append
635 i, n = 0, len(pat)
636 while i < n:
637 c = pat[i]
638 i = i + 1
639 if c == "*":
640 # compress consecutive `*` into one
641 if (not res) or res[-1] is not STAR:
642 add(STAR)
643 elif c == "?":
644 add(QUESTION_MARK)
645 elif c == "[":
646 j = i
647 if j < n and pat[j] == "!":
648 j = j + 1
649 if j < n and pat[j] == "]":
650 j = j + 1
651 while j < n and pat[j] != "]":
652 j = j + 1
653 if j >= n:
654 add("\\[")
655 else:
656 stuff = pat[i:j]
657 if "-" not in stuff:
658 stuff = stuff.replace("\\", r"\\")
659 else:
660 chunks = []
661 k = i + 2 if pat[i] == "!" else i + 1
662 while True:
663 k = pat.find("-", k, j)
664 if k < 0:
665 break
666 chunks.append(pat[i:k])
667 i = k + 1
668 k = k + 3
669 chunk = pat[i:j]
670 if chunk:
671 chunks.append(chunk)
672 else:
673 chunks[-1] += "-"
674 # Remove empty ranges -- invalid in RE.
675 for k in range(len(chunks) - 1, 0, -1):
676 if chunks[k - 1][-1] > chunks[k][0]:
677 chunks[k - 1] = chunks[k - 1][:-1] + chunks[k][1:]
678 del chunks[k]
679 # Escape backslashes and hyphens for set difference (--).
680 # Hyphens that create ranges shouldn't be escaped.
681 stuff = "-".join(
682 s.replace("\\", r"\\").replace("-", r"\-") for s in chunks
683 )
684 # Escape set operations (&&, ~~ and ||).
685 stuff = re.sub(r"([&~|])", r"\\\1", stuff)
686 i = j + 1
687 if not stuff:
688 # Empty range: never match.
689 add("(?!)")
690 elif stuff == "!":
691 # Negated empty range: match any character.
692 add(".")
693 else:
694 if stuff[0] == "!":
695 stuff = "^" + stuff[1:]
696 elif stuff[0] in ("^", "["):
697 stuff = "\\" + stuff
698 add(f"[{stuff}]")
699 else:
700 add(re.escape(c))
701 assert i == n
702 return res
705def glob_translate(pat):
706 # Copied from: https://github.com/python/cpython/pull/106703.
707 # The keyword parameters' values are fixed to:
708 # recursive=True, include_hidden=True, seps=None
709 """Translate a pathname with shell wildcards to a regular expression."""
710 if os.path.altsep:
711 seps = os.path.sep + os.path.altsep
712 else:
713 seps = os.path.sep
714 escaped_seps = "".join(map(re.escape, seps))
715 any_sep = f"[{escaped_seps}]" if len(seps) > 1 else escaped_seps
716 not_sep = f"[^{escaped_seps}]"
717 one_last_segment = f"{not_sep}+"
718 one_segment = f"{one_last_segment}{any_sep}"
719 any_segments = f"(?:.+{any_sep})?"
720 any_last_segments = ".*"
721 results = []
722 parts = re.split(any_sep, pat)
723 last_part_idx = len(parts) - 1
724 for idx, part in enumerate(parts):
725 if part == "*":
726 results.append(one_segment if idx < last_part_idx else one_last_segment)
727 continue
728 if part == "**":
729 results.append(any_segments if idx < last_part_idx else any_last_segments)
730 continue
731 elif "**" in part:
732 raise ValueError(
733 "Invalid pattern: '**' can only be an entire path component"
734 )
735 if part:
736 results.extend(_translate(part, f"{not_sep}*", not_sep))
737 if idx < last_part_idx:
738 results.append(any_sep)
739 res = "".join(results)
740 return rf"(?s:{res})\Z"