Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/utils.py: 14%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import contextlib
4import logging
5import math
6import os
7import re
8import sys
9import tempfile
10from collections.abc import Callable, Iterable, Iterator, Sequence
11from functools import partial
12from hashlib import md5
13from importlib.metadata import version
14from typing import IO, TYPE_CHECKING, Any, TypeVar
15from urllib.parse import urlsplit
17if TYPE_CHECKING:
18 import pathlib
19 from typing import TypeGuard
21 from fsspec.spec import AbstractFileSystem
24DEFAULT_BLOCK_SIZE = 5 * 2**20
26T = TypeVar("T")
29def infer_storage_options(
30 urlpath: str, inherit_storage_options: dict[str, Any] | None = None
31) -> dict[str, Any]:
32 """Infer storage options from URL path and merge it with existing storage
33 options.
35 Parameters
36 ----------
37 urlpath: str or unicode
38 Either local absolute file path or URL (hdfs://namenode:8020/file.csv)
39 inherit_storage_options: dict (optional)
40 Its contents will get merged with the inferred information from the
41 given path
43 Returns
44 -------
45 Storage options dict.
47 Examples
48 --------
49 >>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP
50 {"protocol": "file", "path", "/mnt/datasets/test.csv"}
51 >>> infer_storage_options(
52 ... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1',
53 ... inherit_storage_options={'extra': 'value'},
54 ... ) # doctest: +SKIP
55 {"protocol": "hdfs", "username": "username", "password": "pwd",
56 "host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
57 "url_query": "q=1", "extra": "value"}
58 """
59 # Handle Windows paths including disk name in this special case
60 if (
61 re.match(r"^[a-zA-Z]:[\\/]", urlpath)
62 or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None
63 ):
64 return {"protocol": "file", "path": urlpath}
66 parsed_path = urlsplit(urlpath)
67 protocol = parsed_path.scheme or "file"
68 if parsed_path.fragment:
69 path = "#".join([parsed_path.path, parsed_path.fragment])
70 else:
71 path = parsed_path.path
72 if protocol == "file":
73 # Special case parsing file protocol URL on Windows according to:
74 # https://msdn.microsoft.com/en-us/library/jj710207.aspx
75 windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path)
76 if windows_path:
77 drive, path = windows_path.groups()
78 path = f"{drive}:{path}"
80 if protocol in ["http", "https"]:
81 # for HTTP, we don't want to parse, as requests will anyway
82 return {"protocol": protocol, "path": urlpath}
84 options: dict[str, Any] = {"protocol": protocol, "path": path}
86 if parsed_path.netloc:
87 # Parse `hostname` from netloc manually because `parsed_path.hostname`
88 # lowercases the hostname which is not always desirable (e.g. in S3):
89 # https://github.com/dask/dask/issues/1417
90 options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0]
92 if protocol in ("s3", "s3a", "gcs", "gs"):
93 options["path"] = options["host"] + options["path"]
94 else:
95 options["host"] = options["host"]
96 if parsed_path.port:
97 options["port"] = parsed_path.port
98 if parsed_path.username:
99 options["username"] = parsed_path.username
100 if parsed_path.password:
101 options["password"] = parsed_path.password
103 if parsed_path.query:
104 options["url_query"] = parsed_path.query
105 if parsed_path.fragment:
106 options["url_fragment"] = parsed_path.fragment
108 if inherit_storage_options:
109 update_storage_options(options, inherit_storage_options)
111 return options
114def update_storage_options(
115 options: dict[str, Any], inherited: dict[str, Any] | None = None
116) -> None:
117 if not inherited:
118 inherited = {}
119 collisions = set(options) & set(inherited)
120 if collisions:
121 for collision in collisions:
122 if options.get(collision) != inherited.get(collision):
123 raise KeyError(
124 f"Collision between inferred and specified storage "
125 f"option:\n{collision}"
126 )
127 options.update(inherited)
130# Compression extensions registered via fsspec.compression.register_compression
131compressions: dict[str, str] = {}
134def infer_compression(filename: str) -> str | None:
135 """Infer compression, if available, from filename.
137 Infer a named compression type, if registered and available, from filename
138 extension. This includes builtin (gz, bz2, zip) compressions, as well as
139 optional compressions. See fsspec.compression.register_compression.
140 """
141 extension = os.path.splitext(filename)[-1].strip(".").lower()
142 if extension in compressions:
143 return compressions[extension]
144 return None
147def build_name_function(max_int: float) -> Callable[[int], str]:
148 """Returns a function that receives a single integer
149 and returns it as a string padded by enough zero characters
150 to align with maximum possible integer
152 >>> name_f = build_name_function(57)
154 >>> name_f(7)
155 '07'
156 >>> name_f(31)
157 '31'
158 >>> build_name_function(1000)(42)
159 '0042'
160 >>> build_name_function(999)(42)
161 '042'
162 >>> build_name_function(0)(0)
163 '0'
164 """
165 # handle corner cases max_int is 0 or exact power of 10
166 max_int += 1e-8
168 pad_length = int(math.ceil(math.log10(max_int)))
170 def name_function(i: int) -> str:
171 return str(i).zfill(pad_length)
173 return name_function
176def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool:
177 r"""Seek current file to file start, file end, or byte after delimiter seq.
179 Seeks file to next chunk delimiter, where chunks are defined on file start,
180 a delimiting sequence, and file end. Use file.tell() to see location afterwards.
181 Note that file start is a valid split, so must be at offset > 0 to seek for
182 delimiter.
184 Parameters
185 ----------
186 file: a file
187 delimiter: bytes
188 a delimiter like ``b'\n'`` or message sentinel, matching file .read() type
189 blocksize: int
190 Number of bytes to read from the file at once.
193 Returns
194 -------
195 Returns True if a delimiter was found, False if at file start or end.
197 """
199 if file.tell() == 0:
200 # beginning-of-file, return without seek
201 return False
203 # Interface is for binary IO, with delimiter as bytes, but initialize last
204 # with result of file.read to preserve compatibility with text IO.
205 last: bytes | None = None
206 while True:
207 current = file.read(blocksize)
208 if not current:
209 # end-of-file without delimiter
210 return False
211 full = last + current if last else current
212 try:
213 if delimiter in full:
214 i = full.index(delimiter)
215 file.seek(file.tell() - (len(full) - i) + len(delimiter))
216 return True
217 elif len(current) < blocksize:
218 # end-of-file without delimiter
219 return False
220 except (OSError, ValueError):
221 pass
222 last = full[-len(delimiter) :]
225def read_block(
226 f: IO[bytes],
227 offset: int,
228 length: int | None,
229 delimiter: bytes | None = None,
230 split_before: bool = False,
231) -> bytes:
232 """Read a block of bytes from a file
234 Parameters
235 ----------
236 f: File
237 Open file
238 offset: int
239 Byte offset to start read
240 length: int
241 Number of bytes to read, read through end of file if None
242 delimiter: bytes (optional)
243 Ensure reading starts and stops at delimiter bytestring
244 split_before: bool (optional)
245 Start/stop read *before* delimiter bytestring.
248 If using the ``delimiter=`` keyword argument we ensure that the read
249 starts and stops at delimiter boundaries that follow the locations
250 ``offset`` and ``offset + length``. If ``offset`` is zero then we
251 start at zero, regardless of delimiter. The bytestring returned WILL
252 include the terminating delimiter string.
254 Examples
255 --------
257 >>> from io import BytesIO # doctest: +SKIP
258 >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
259 >>> read_block(f, 0, 13) # doctest: +SKIP
260 b'Alice, 100\\nBo'
262 >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
263 b'Alice, 100\\nBob, 200\\n'
265 >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
266 b'Bob, 200\\nCharlie, 300'
267 """
268 if delimiter:
269 f.seek(offset)
270 found_start_delim = seek_delimiter(f, delimiter, 2**16)
271 if length is None:
272 return f.read()
273 start = f.tell()
274 length -= start - offset
276 f.seek(start + length)
277 found_end_delim = seek_delimiter(f, delimiter, 2**16)
278 end = f.tell()
280 # Adjust split location to before delimiter if seek found the
281 # delimiter sequence, not start or end of file.
282 if found_start_delim and split_before:
283 start -= len(delimiter)
285 if found_end_delim and split_before:
286 end -= len(delimiter)
288 offset = start
289 length = end - start
291 f.seek(offset)
293 # TODO: allow length to be None and read to the end of the file?
294 assert length is not None
295 b = f.read(length)
296 return b
299def tokenize(*args: Any, **kwargs: Any) -> str:
300 """Deterministic token
302 (modified from dask.base)
304 >>> tokenize([1, 2, '3'])
305 '9d71491b50023b06fc76928e6eddb952'
307 >>> tokenize('Hello') == tokenize('Hello')
308 True
309 """
310 if kwargs:
311 args += (kwargs,)
312 try:
313 h = md5(str(args).encode())
314 except ValueError:
315 # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380
316 h = md5(str(args).encode(), usedforsecurity=False)
317 return h.hexdigest()
320def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str:
321 """Attempt to convert a path-like object to a string.
323 Parameters
324 ----------
325 filepath: object to be converted
327 Returns
328 -------
329 filepath_str: maybe a string version of the object
331 Notes
332 -----
333 Objects supporting the fspath protocol are coerced according to its
334 __fspath__ method.
336 For backwards compatibility with older Python version, pathlib.Path
337 objects are specially coerced.
339 Any other object is passed through unchanged, which includes bytes,
340 strings, buffers, or anything else that's not even path-like.
341 """
342 if isinstance(filepath, str):
343 return filepath
344 elif hasattr(filepath, "__fspath__"):
345 return filepath.__fspath__()
346 elif hasattr(filepath, "path"):
347 return filepath.path
348 else:
349 return filepath # type: ignore[return-value]
352def make_instance(
353 cls: Callable[..., T], args: Sequence[Any], kwargs: dict[str, Any]
354) -> T:
355 inst = cls(*args, **kwargs)
356 inst._determine_worker() # type: ignore[attr-defined]
357 return inst
360def common_prefix(paths: Iterable[str]) -> str:
361 """For a list of paths, find the shortest prefix common to all"""
362 parts = [p.split("/") for p in paths]
363 lmax = min(len(p) for p in parts)
364 end = 0
365 for i in range(lmax):
366 end = all(p[i] == parts[0][i] for p in parts)
367 if not end:
368 break
369 i += end
370 return "/".join(parts[0][:i])
373def other_paths(
374 paths: list[str],
375 path2: str | list[str],
376 exists: bool = False,
377 flatten: bool = False,
378) -> list[str]:
379 """In bulk file operations, construct a new file tree from a list of files
381 Parameters
382 ----------
383 paths: list of str
384 The input file tree
385 path2: str or list of str
386 Root to construct the new list in. If this is already a list of str, we just
387 assert it has the right number of elements.
388 exists: bool (optional)
389 For a str destination, it is already exists (and is a dir), files should
390 end up inside.
391 flatten: bool (optional)
392 Whether to flatten the input directory tree structure so that the output files
393 are in the same directory.
395 Returns
396 -------
397 list of str
398 """
400 if isinstance(path2, str):
401 path2 = path2.rstrip("/")
403 if flatten:
404 path2 = ["/".join((path2, p.split("/")[-1])) for p in paths]
405 else:
406 cp = common_prefix(paths)
407 if exists:
408 cp = cp.rsplit("/", 1)[0]
409 if not cp and all(not s.startswith("/") for s in paths):
410 path2 = ["/".join([path2, p]) for p in paths]
411 else:
412 path2 = [p.replace(cp, path2, 1) for p in paths]
413 else:
414 assert len(paths) == len(path2)
415 return path2
418def is_exception(obj: Any) -> bool:
419 return isinstance(obj, BaseException)
422def isfilelike(f: Any) -> TypeGuard[IO[bytes]]:
423 return all(hasattr(f, attr) for attr in ["read", "close", "tell"])
426def get_protocol(url: str) -> str:
427 url = stringify_path(url)
428 parts = re.split(r"(\:\:|\://)", url, maxsplit=1)
429 if len(parts) > 1:
430 return parts[0]
431 return "file"
434def get_file_extension(url: str) -> str:
435 url = stringify_path(url)
436 ext_parts = url.rsplit(".", 1)
437 if len(ext_parts) > 1:
438 return ext_parts[-1]
439 return ""
442def can_be_local(path: str) -> bool:
443 """Can the given URL be used with open_local?"""
444 from fsspec import get_filesystem_class
446 try:
447 return getattr(get_filesystem_class(get_protocol(path)), "local_file", False)
448 except (ValueError, ImportError):
449 # not in registry or import failed
450 return False
453def get_package_version_without_import(name: str) -> str | None:
454 """For given package name, try to find the version without importing it
456 Import and package.__version__ is still the backup here, so an import
457 *might* happen.
459 Returns either the version string, or None if the package
460 or the version was not readily found.
461 """
462 if name in sys.modules:
463 mod = sys.modules[name]
464 if hasattr(mod, "__version__"):
465 return mod.__version__
466 try:
467 return version(name)
468 except: # noqa: E722
469 pass
470 try:
471 import importlib
473 mod = importlib.import_module(name)
474 return mod.__version__
475 except (ImportError, AttributeError):
476 return None
479def setup_logging(
480 logger: logging.Logger | None = None,
481 logger_name: str | None = None,
482 level: str = "DEBUG",
483 clear: bool = True,
484) -> logging.Logger:
485 if logger is None and logger_name is None:
486 raise ValueError("Provide either logger object or logger name")
487 logger = logger or logging.getLogger(logger_name)
488 handle = logging.StreamHandler()
489 formatter = logging.Formatter(
490 "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s"
491 )
492 handle.setFormatter(formatter)
493 if clear:
494 logger.handlers.clear()
495 logger.addHandler(handle)
496 logger.setLevel(level)
497 return logger
500def _unstrip_protocol(name: str, fs: AbstractFileSystem) -> str:
501 return fs.unstrip_protocol(name)
504def mirror_from(
505 origin_name: str, methods: Iterable[str]
506) -> Callable[[type[T]], type[T]]:
507 """Mirror attributes and methods from the given
508 origin_name attribute of the instance to the
509 decorated class"""
511 def origin_getter(method: str, self: Any) -> Any:
512 origin = getattr(self, origin_name)
513 return getattr(origin, method)
515 def wrapper(cls: type[T]) -> type[T]:
516 for method in methods:
517 wrapped_method = partial(origin_getter, method)
518 setattr(cls, method, property(wrapped_method))
519 return cls
521 return wrapper
524@contextlib.contextmanager
525def nullcontext(obj: T) -> Iterator[T]:
526 yield obj
529def merge_offset_ranges(
530 paths: list[str],
531 starts: list[int] | int,
532 ends: list[int] | int,
533 max_gap: int = 0,
534 max_block: int | None = None,
535 sort: bool = True,
536) -> tuple[list[str], list[int], list[int]]:
537 """Merge adjacent byte-offset ranges when the inter-range
538 gap is <= `max_gap`, and when the merged byte range does not
539 exceed `max_block` (if specified). By default, this function
540 will re-order the input paths and byte ranges to ensure sorted
541 order. If the user can guarantee that the inputs are already
542 sorted, passing `sort=False` will skip the re-ordering.
543 """
544 # Check input
545 if not isinstance(paths, list):
546 raise TypeError
547 if not isinstance(starts, list):
548 starts = [starts] * len(paths)
549 if not isinstance(ends, list):
550 ends = [ends] * len(paths)
551 if len(starts) != len(paths) or len(ends) != len(paths):
552 raise ValueError
554 # Early Return
555 if len(starts) <= 1:
556 return paths, starts, ends
558 starts = [s or 0 for s in starts]
559 # Sort by paths and then ranges if `sort=True`
560 if sort:
561 paths, starts, ends = (
562 list(v)
563 for v in zip(
564 *sorted(
565 zip(paths, starts, ends),
566 )
567 )
568 )
569 remove = []
570 for i, (path, start, end) in enumerate(zip(paths, starts, ends)):
571 if any(
572 e is not None and p == path and start >= s and end <= e and i != i2
573 for i2, (p, s, e) in enumerate(zip(paths, starts, ends))
574 ):
575 remove.append(i)
576 paths = [p for i, p in enumerate(paths) if i not in remove]
577 starts = [s for i, s in enumerate(starts) if i not in remove]
578 ends = [e for i, e in enumerate(ends) if i not in remove]
580 if paths:
581 # Loop through the coupled `paths`, `starts`, and
582 # `ends`, and merge adjacent blocks when appropriate
583 new_paths = paths[:1]
584 new_starts = starts[:1]
585 new_ends = ends[:1]
586 for i in range(1, len(paths)):
587 if paths[i] == paths[i - 1] and new_ends[-1] is None:
588 continue
589 elif (
590 paths[i] != paths[i - 1]
591 or ((starts[i] - new_ends[-1]) > max_gap)
592 or (max_block is not None and (ends[i] - new_starts[-1]) > max_block)
593 ):
594 # Cannot merge with previous block.
595 # Add new `paths`, `starts`, and `ends` elements
596 new_paths.append(paths[i])
597 new_starts.append(starts[i])
598 new_ends.append(ends[i])
599 else:
600 # Merge with the previous block by updating the
601 # last element of `ends`
602 new_ends[-1] = ends[i]
603 return new_paths, new_starts, new_ends
605 # `paths` is empty. Just return input lists
606 return paths, starts, ends
609def file_size(filelike: IO[bytes]) -> int:
610 """Find length of any open read-mode file-like"""
611 pos = filelike.tell()
612 try:
613 return filelike.seek(0, 2)
614 finally:
615 filelike.seek(pos)
618@contextlib.contextmanager
619def atomic_write(path: str, mode: str = "wb"):
620 """
621 A context manager that opens a temporary file next to `path` and, on exit,
622 replaces `path` with the temporary file, thereby updating `path`
623 atomically.
624 """
625 fd, fn = tempfile.mkstemp(
626 dir=os.path.dirname(path), prefix=os.path.basename(path) + "-"
627 )
628 try:
629 with open(fd, mode) as fp:
630 yield fp
631 except BaseException:
632 with contextlib.suppress(FileNotFoundError):
633 os.unlink(fn)
634 raise
635 else:
636 os.replace(fn, path)
639def _translate(pat, STAR, QUESTION_MARK):
640 # Copied from: https://github.com/python/cpython/pull/106703.
641 res: list[str] = []
642 add = res.append
643 i, n = 0, len(pat)
644 while i < n:
645 c = pat[i]
646 i = i + 1
647 if c == "*":
648 # compress consecutive `*` into one
649 if (not res) or res[-1] is not STAR:
650 add(STAR)
651 elif c == "?":
652 add(QUESTION_MARK)
653 elif c == "[":
654 j = i
655 if j < n and pat[j] == "!":
656 j = j + 1
657 if j < n and pat[j] == "]":
658 j = j + 1
659 while j < n and pat[j] != "]":
660 j = j + 1
661 if j >= n:
662 add("\\[")
663 else:
664 stuff = pat[i:j]
665 if "-" not in stuff:
666 stuff = stuff.replace("\\", r"\\")
667 else:
668 chunks = []
669 k = i + 2 if pat[i] == "!" else i + 1
670 while True:
671 k = pat.find("-", k, j)
672 if k < 0:
673 break
674 chunks.append(pat[i:k])
675 i = k + 1
676 k = k + 3
677 chunk = pat[i:j]
678 if chunk:
679 chunks.append(chunk)
680 else:
681 chunks[-1] += "-"
682 # Remove empty ranges -- invalid in RE.
683 for k in range(len(chunks) - 1, 0, -1):
684 if chunks[k - 1][-1] > chunks[k][0]:
685 chunks[k - 1] = chunks[k - 1][:-1] + chunks[k][1:]
686 del chunks[k]
687 # Escape backslashes and hyphens for set difference (--).
688 # Hyphens that create ranges shouldn't be escaped.
689 stuff = "-".join(
690 s.replace("\\", r"\\").replace("-", r"\-") for s in chunks
691 )
692 # Escape set operations (&&, ~~ and ||).
693 stuff = re.sub(r"([&~|])", r"\\\1", stuff)
694 i = j + 1
695 if not stuff:
696 # Empty range: never match.
697 add("(?!)")
698 elif stuff == "!":
699 # Negated empty range: match any character.
700 add(".")
701 else:
702 if stuff[0] == "!":
703 stuff = "^" + stuff[1:]
704 elif stuff[0] in ("^", "["):
705 stuff = "\\" + stuff
706 add(f"[{stuff}]")
707 else:
708 add(re.escape(c))
709 assert i == n
710 return res
713def glob_translate(pat):
714 # Copied from: https://github.com/python/cpython/pull/106703.
715 # The keyword parameters' values are fixed to:
716 # recursive=True, include_hidden=True, seps=None
717 """Translate a pathname with shell wildcards to a regular expression."""
718 if os.path.altsep:
719 seps = os.path.sep + os.path.altsep
720 else:
721 seps = os.path.sep
722 escaped_seps = "".join(map(re.escape, seps))
723 any_sep = f"[{escaped_seps}]" if len(seps) > 1 else escaped_seps
724 not_sep = f"[^{escaped_seps}]"
725 one_last_segment = f"{not_sep}+"
726 one_segment = f"{one_last_segment}{any_sep}"
727 any_segments = f"(?:.+{any_sep})?"
728 any_last_segments = ".*"
729 results = []
730 parts = re.split(any_sep, pat)
731 last_part_idx = len(parts) - 1
732 for idx, part in enumerate(parts):
733 if part == "*":
734 results.append(one_segment if idx < last_part_idx else one_last_segment)
735 continue
736 if part == "**":
737 results.append(any_segments if idx < last_part_idx else any_last_segments)
738 continue
739 elif "**" in part:
740 raise ValueError(
741 "Invalid pattern: '**' can only be an entire path component"
742 )
743 if part:
744 results.extend(_translate(part, f"{not_sep}*", not_sep))
745 if idx < last_part_idx:
746 results.append(any_sep)
747 res = "".join(results)
748 return rf"(?s:{res})\Z"